001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.util; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Iterator; 024import java.util.List; 025import java.util.NoSuchElementException; 026 027/** 028 * Keeps track of a added long values. Collapses ranges of numbers using a 029 * Sequence representation. Use to keep track of received message ids to find 030 * out if a message is duplicate or if there are any missing messages. 031 * 032 * @author chirino 033 */ 034public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Long> { 035 036 public static class Marshaller implements org.apache.activemq.store.kahadb.disk.util.Marshaller<SequenceSet> { 037 038 public static final Marshaller INSTANCE = new Marshaller(); 039 040 public SequenceSet readPayload(DataInput in) throws IOException { 041 SequenceSet value = new SequenceSet(); 042 int count = in.readInt(); 043 for (int i = 0; i < count; i++) { 044 if( in.readBoolean() ) { 045 Sequence sequence = new Sequence(in.readLong(), in.readLong()); 046 value.addLast(sequence); 047 } else { 048 Sequence sequence = new Sequence(in.readLong()); 049 value.addLast(sequence); 050 } 051 } 052 return value; 053 } 054 055 public void writePayload(SequenceSet value, DataOutput out) throws IOException { 056 out.writeInt(value.size()); 057 Sequence sequence = value.getHead(); 058 while (sequence != null ) { 059 if( sequence.range() > 1 ) { 060 out.writeBoolean(true); 061 out.writeLong(sequence.first); 062 out.writeLong(sequence.last); 063 } else { 064 out.writeBoolean(false); 065 out.writeLong(sequence.first); 066 } 067 sequence = sequence.getNext(); 068 } 069 } 070 071 public int getFixedSize() { 072 return -1; 073 } 074 075 public SequenceSet deepCopy(SequenceSet value) { 076 SequenceSet rc = new SequenceSet(); 077 Sequence sequence = value.getHead(); 078 while (sequence != null ) { 079 rc.add(new Sequence(sequence.first, sequence.last)); 080 sequence = sequence.getNext(); 081 } 082 return rc; 083 } 084 085 public boolean isDeepCopySupported() { 086 return true; 087 } 088 } 089 090 public void add(Sequence value) { 091 // TODO we can probably optimize this a bit 092 for(long i=value.first; i<value.last+1; i++) { 093 add(i); 094 } 095 } 096 097 /** 098 * 099 * @param value 100 * the value to add to the list 101 * @return false if the value was a duplicate. 102 */ 103 public boolean add(long value) { 104 105 if (isEmpty()) { 106 addFirst(new Sequence(value)); 107 return true; 108 } 109 110 // check for append 111 Sequence sequence = getTail(); 112 if (sequence.isAdjacentToLast(value)) { 113 sequence.last = value; 114 return true; 115 } 116 117 sequence = getHead(); 118 while (sequence != null) { 119 120 if (sequence.isAdjacentToLast(value)) { 121 // grow the sequence... 122 sequence.last = value; 123 // it might connect us to the next sequence.. 124 if (sequence.getNext() != null) { 125 Sequence next = sequence.getNext(); 126 if (next.isAdjacentToFirst(value)) { 127 // Yep the sequence connected.. so join them. 128 sequence.last = next.last; 129 next.unlink(); 130 } 131 } 132 return true; 133 } 134 135 if (sequence.isAdjacentToFirst(value)) { 136 // grow the sequence... 137 sequence.first = value; 138 139 // it might connect us to the previous 140 if (sequence.getPrevious() != null) { 141 Sequence prev = sequence.getPrevious(); 142 if (prev.isAdjacentToLast(value)) { 143 // Yep the sequence connected.. so join them. 144 sequence.first = prev.first; 145 prev.unlink(); 146 } 147 } 148 return true; 149 } 150 151 // Did that value land before this sequence? 152 if (value < sequence.first) { 153 // Then insert a new entry before this sequence item. 154 sequence.linkBefore(new Sequence(value)); 155 return true; 156 } 157 158 // Did that value land within the sequence? The it's a duplicate. 159 if (sequence.contains(value)) { 160 return false; 161 } 162 163 sequence = sequence.getNext(); 164 } 165 166 // Then the value is getting appended to the tail of the sequence. 167 addLast(new Sequence(value)); 168 return true; 169 } 170 171 /** 172 * Removes the given value from the Sequence set, splitting a 173 * contained sequence if necessary. 174 * 175 * @param value 176 * The value that should be removed from the SequenceSet. 177 * 178 * @return true if the value was removed from the set, false if there 179 * was no sequence in the set that contained the given value. 180 */ 181 public boolean remove(long value) { 182 Sequence sequence = getHead(); 183 while (sequence != null ) { 184 if(sequence.contains(value)) { 185 if (sequence.range() == 1) { 186 sequence.unlink(); 187 return true; 188 } else if (sequence.getFirst() == value) { 189 sequence.setFirst(value+1); 190 return true; 191 } else if (sequence.getLast() == value) { 192 sequence.setLast(value-1); 193 return true; 194 } else { 195 sequence.linkBefore(new Sequence(sequence.first, value-1)); 196 sequence.linkAfter(new Sequence(value+1, sequence.last)); 197 sequence.unlink(); 198 return true; 199 } 200 } 201 202 sequence = sequence.getNext(); 203 } 204 205 return false; 206 } 207 208 /** 209 * Removes and returns the first element from this list. 210 * 211 * @return the first element from this list. 212 * @throws NoSuchElementException if this list is empty. 213 */ 214 public long removeFirst() { 215 if (isEmpty()) { 216 throw new NoSuchElementException(); 217 } 218 219 Sequence rc = removeFirstSequence(1); 220 return rc.first; 221 } 222 223 /** 224 * Removes and returns the last sequence from this list. 225 * 226 * @return the last sequence from this list or null if the list is empty. 227 */ 228 public Sequence removeLastSequence() { 229 if (isEmpty()) { 230 return null; 231 } 232 233 Sequence rc = getTail(); 234 rc.unlink(); 235 return rc; 236 } 237 238 /** 239 * Removes and returns the first sequence that is count range large. 240 * 241 * @return a sequence that is count range large, or null if no sequence is that large in the list. 242 */ 243 public Sequence removeFirstSequence(long count) { 244 if (isEmpty()) { 245 return null; 246 } 247 248 Sequence sequence = getHead(); 249 while (sequence != null ) { 250 if (sequence.range() == count ) { 251 sequence.unlink(); 252 return sequence; 253 } 254 if (sequence.range() > count ) { 255 Sequence rc = new Sequence(sequence.first, sequence.first+count-1); 256 sequence.first+=count; 257 return rc; 258 } 259 sequence = sequence.getNext(); 260 } 261 return null; 262 } 263 264 /** 265 * @return all the id Sequences that are missing from this set that are not 266 * in between the range provided. 267 */ 268 public List<Sequence> getMissing(long first, long last) { 269 ArrayList<Sequence> rc = new ArrayList<Sequence>(); 270 if (first > last) { 271 throw new IllegalArgumentException("First cannot be more than last"); 272 } 273 if (isEmpty()) { 274 // We are missing all the messages. 275 rc.add(new Sequence(first, last)); 276 return rc; 277 } 278 279 Sequence sequence = getHead(); 280 while (sequence != null && first <= last) { 281 if (sequence.contains(first)) { 282 first = sequence.last + 1; 283 } else { 284 if (first < sequence.first) { 285 if (last < sequence.first) { 286 rc.add(new Sequence(first, last)); 287 return rc; 288 } else { 289 rc.add(new Sequence(first, sequence.first - 1)); 290 first = sequence.last + 1; 291 } 292 } 293 } 294 sequence = sequence.getNext(); 295 } 296 297 if (first <= last) { 298 rc.add(new Sequence(first, last)); 299 } 300 return rc; 301 } 302 303 /** 304 * @return all the Sequence that are in this list 305 */ 306 public List<Sequence> getReceived() { 307 ArrayList<Sequence> rc = new ArrayList<Sequence>(size()); 308 Sequence sequence = getHead(); 309 while (sequence != null) { 310 rc.add(new Sequence(sequence.first, sequence.last)); 311 sequence = sequence.getNext(); 312 } 313 return rc; 314 } 315 316 /** 317 * Returns true if the value given is contained within one of the 318 * sequences held in this set. 319 * 320 * @param value 321 * The value to search for in the set. 322 * 323 * @return true if the value is contained in the set. 324 */ 325 public boolean contains(long value) { 326 if (isEmpty()) { 327 return false; 328 } 329 330 Sequence sequence = getHead(); 331 while (sequence != null) { 332 if (sequence.contains(value)) { 333 return true; 334 } 335 sequence = sequence.getNext(); 336 } 337 338 return false; 339 } 340 341 public boolean contains(int first, int last) { 342 if (isEmpty()) { 343 return false; 344 } 345 Sequence sequence = getHead(); 346 while (sequence != null) { 347 if (sequence.first <= first && first <= sequence.last ) { 348 return last <= sequence.last; 349 } 350 sequence = sequence.getNext(); 351 } 352 return false; 353 } 354 355 public Sequence get(int value) { 356 if (!isEmpty()) { 357 Sequence sequence = getHead(); 358 while (sequence != null) { 359 if (sequence.contains(value)) { 360 return sequence; 361 } 362 sequence = sequence.getNext(); 363 } 364 } 365 return null; 366 } 367 368 369 /** 370 * Computes the size of this Sequence by summing the values of all 371 * the contained sequences. 372 * 373 * @return the total number of values contained in this set if it 374 * were to be iterated over like an array. 375 */ 376 public long rangeSize() { 377 long result = 0; 378 Sequence sequence = getHead(); 379 while (sequence != null) { 380 result += sequence.range(); 381 sequence = sequence.getNext(); 382 } 383 return result; 384 } 385 386 public Iterator<Long> iterator() { 387 return new SequenceIterator(); 388 } 389 390 private class SequenceIterator implements Iterator<Long> { 391 392 private Sequence currentEntry; 393 private long lastReturned = -1; 394 395 public SequenceIterator() { 396 currentEntry = getHead(); 397 if (currentEntry != null) { 398 lastReturned = currentEntry.first - 1; 399 } 400 } 401 402 public boolean hasNext() { 403 return currentEntry != null; 404 } 405 406 public Long next() { 407 if (currentEntry == null) { 408 throw new NoSuchElementException(); 409 } 410 411 if(lastReturned < currentEntry.first) { 412 lastReturned = currentEntry.first; 413 if (currentEntry.range() == 1) { 414 currentEntry = currentEntry.getNext(); 415 } 416 } else { 417 lastReturned++; 418 if (lastReturned == currentEntry.last) { 419 currentEntry = currentEntry.getNext(); 420 } 421 } 422 423 return lastReturned; 424 } 425 426 public void remove() { 427 throw new UnsupportedOperationException(); 428 } 429 430 } 431 432}