001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.util; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.Iterator; 024import java.util.List; 025import java.util.NoSuchElementException; 026 027/** 028 * Keeps track of a added long values. Collapses ranges of numbers using a 029 * Sequence representation. Use to keep track of received message ids to find 030 * out if a message is duplicate or if there are any missing messages. 031 * 032 * @author chirino 033 */ 034public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Long> { 035 036 public static class Marshaller implements org.apache.activemq.store.kahadb.disk.util.Marshaller<SequenceSet> { 037 038 public static final Marshaller INSTANCE = new Marshaller(); 039 040 public SequenceSet readPayload(DataInput in) throws IOException { 041 SequenceSet value = new SequenceSet(); 042 int count = in.readInt(); 043 for (int i = 0; i < count; i++) { 044 if( in.readBoolean() ) { 045 Sequence sequence = new Sequence(in.readLong(), in.readLong()); 046 value.addLast(sequence); 047 } else { 048 Sequence sequence = new Sequence(in.readLong()); 049 value.addLast(sequence); 050 } 051 } 052 return value; 053 } 054 055 public void writePayload(SequenceSet value, DataOutput out) throws IOException { 056 out.writeInt(value.size()); 057 Sequence sequence = value.getHead(); 058 while (sequence != null ) { 059 if( sequence.range() > 1 ) { 060 out.writeBoolean(true); 061 out.writeLong(sequence.first); 062 out.writeLong(sequence.last); 063 } else { 064 out.writeBoolean(false); 065 out.writeLong(sequence.first); 066 } 067 sequence = sequence.getNext(); 068 } 069 } 070 071 public int getFixedSize() { 072 return -1; 073 } 074 075 public SequenceSet deepCopy(SequenceSet value) { 076 SequenceSet rc = new SequenceSet(); 077 Sequence sequence = value.getHead(); 078 while (sequence != null ) { 079 rc.add(new Sequence(sequence.first, sequence.last)); 080 sequence = sequence.getNext(); 081 } 082 return rc; 083 } 084 085 public boolean isDeepCopySupported() { 086 return true; 087 } 088 } 089 090 public void add(Sequence value) { 091 // TODO we can probably optimize this a bit 092 for(long i=value.first; i<value.last+1; i++) { 093 add(i); 094 } 095 } 096 097 public void merge(SequenceSet sequenceSet) { 098 Sequence node = sequenceSet.getHead(); 099 100 while (node != null) { 101 add(node); 102 node = node.getNext(); 103 } 104 } 105 106 public void remove(SequenceSet sequenceSet) { 107 Sequence node = sequenceSet.getHead(); 108 109 while (node != null) { 110 remove(node); 111 node = node.getNext(); 112 } 113 } 114 115 public void remove(Sequence value) { 116 for(long i=value.first; i<value.last+1; i++) { 117 remove(i); 118 } 119 } 120 121 /** 122 * 123 * @param value 124 * the value to add to the list 125 * @return false if the value was a duplicate. 126 */ 127 public boolean add(long value) { 128 129 if (isEmpty()) { 130 addFirst(new Sequence(value)); 131 return true; 132 } 133 134 // check for append 135 Sequence sequence = getTail(); 136 if (sequence.isAdjacentToLast(value)) { 137 sequence.last = value; 138 return true; 139 } 140 141 // check if the value is greater than the bigger sequence value and if it's not adjacent to it 142 // in this case, we are sure that the value should be add to the tail of the sequence. 143 if (sequence.isBiggerButNotAdjacentToLast(value)) { 144 addLast(new Sequence(value)); 145 return true; 146 } 147 148 sequence = getHead(); 149 while (sequence != null) { 150 151 if (sequence.isAdjacentToLast(value)) { 152 // grow the sequence... 153 sequence.last = value; 154 // it might connect us to the next sequence.. 155 if (sequence.getNext() != null) { 156 Sequence next = sequence.getNext(); 157 if (next.isAdjacentToFirst(value)) { 158 // Yep the sequence connected.. so join them. 159 sequence.last = next.last; 160 next.unlink(); 161 } 162 } 163 return true; 164 } 165 166 if (sequence.isAdjacentToFirst(value)) { 167 // grow the sequence... 168 sequence.first = value; 169 170 // it might connect us to the previous 171 if (sequence.getPrevious() != null) { 172 Sequence prev = sequence.getPrevious(); 173 if (prev.isAdjacentToLast(value)) { 174 // Yep the sequence connected.. so join them. 175 sequence.first = prev.first; 176 prev.unlink(); 177 } 178 } 179 return true; 180 } 181 182 // Did that value land before this sequence? 183 if (value < sequence.first) { 184 // Then insert a new entry before this sequence item. 185 sequence.linkBefore(new Sequence(value)); 186 return true; 187 } 188 189 // Did that value land within the sequence? The it's a duplicate. 190 if (sequence.contains(value)) { 191 return false; 192 } 193 194 sequence = sequence.getNext(); 195 } 196 197 // Then the value is getting appended to the tail of the sequence. 198 addLast(new Sequence(value)); 199 return true; 200 } 201 202 /** 203 * Removes the given value from the Sequence set, splitting a 204 * contained sequence if necessary. 205 * 206 * @param value 207 * The value that should be removed from the SequenceSet. 208 * 209 * @return true if the value was removed from the set, false if there 210 * was no sequence in the set that contained the given value. 211 */ 212 public boolean remove(long value) { 213 Sequence sequence = getHead(); 214 while (sequence != null ) { 215 if(sequence.contains(value)) { 216 if (sequence.range() == 1) { 217 sequence.unlink(); 218 return true; 219 } else if (sequence.getFirst() == value) { 220 sequence.setFirst(value+1); 221 return true; 222 } else if (sequence.getLast() == value) { 223 sequence.setLast(value-1); 224 return true; 225 } else { 226 sequence.linkBefore(new Sequence(sequence.first, value-1)); 227 sequence.linkAfter(new Sequence(value+1, sequence.last)); 228 sequence.unlink(); 229 return true; 230 } 231 } 232 233 sequence = sequence.getNext(); 234 } 235 236 return false; 237 } 238 239 /** 240 * Removes and returns the first element from this list. 241 * 242 * @return the first element from this list. 243 * @throws NoSuchElementException if this list is empty. 244 */ 245 public long removeFirst() { 246 if (isEmpty()) { 247 throw new NoSuchElementException(); 248 } 249 250 Sequence rc = removeFirstSequence(1); 251 return rc.first; 252 } 253 254 /** 255 * Removes and returns the last sequence from this list. 256 * 257 * @return the last sequence from this list or null if the list is empty. 258 */ 259 public Sequence removeLastSequence() { 260 if (isEmpty()) { 261 return null; 262 } 263 264 Sequence rc = getTail(); 265 rc.unlink(); 266 return rc; 267 } 268 269 /** 270 * Removes and returns the first sequence that is count range large. 271 * 272 * @return a sequence that is count range large, or null if no sequence is that large in the list. 273 */ 274 public Sequence removeFirstSequence(long count) { 275 if (isEmpty()) { 276 return null; 277 } 278 279 Sequence sequence = getHead(); 280 while (sequence != null ) { 281 if (sequence.range() == count ) { 282 sequence.unlink(); 283 return sequence; 284 } 285 if (sequence.range() > count ) { 286 Sequence rc = new Sequence(sequence.first, sequence.first+count-1); 287 sequence.first+=count; 288 return rc; 289 } 290 sequence = sequence.getNext(); 291 } 292 return null; 293 } 294 295 /** 296 * @return all the id Sequences that are missing from this set that are not 297 * in between the range provided. 298 */ 299 public List<Sequence> getMissing(long first, long last) { 300 ArrayList<Sequence> rc = new ArrayList<Sequence>(); 301 if (first > last) { 302 throw new IllegalArgumentException("First cannot be more than last"); 303 } 304 if (isEmpty()) { 305 // We are missing all the messages. 306 rc.add(new Sequence(first, last)); 307 return rc; 308 } 309 310 Sequence sequence = getHead(); 311 while (sequence != null && first <= last) { 312 if (sequence.contains(first)) { 313 first = sequence.last + 1; 314 } else { 315 if (first < sequence.first) { 316 if (last < sequence.first) { 317 rc.add(new Sequence(first, last)); 318 return rc; 319 } else { 320 rc.add(new Sequence(first, sequence.first - 1)); 321 first = sequence.last + 1; 322 } 323 } 324 } 325 sequence = sequence.getNext(); 326 } 327 328 if (first <= last) { 329 rc.add(new Sequence(first, last)); 330 } 331 return rc; 332 } 333 334 /** 335 * @return all the Sequence that are in this list 336 */ 337 public List<Sequence> getReceived() { 338 ArrayList<Sequence> rc = new ArrayList<Sequence>(size()); 339 Sequence sequence = getHead(); 340 while (sequence != null) { 341 rc.add(new Sequence(sequence.first, sequence.last)); 342 sequence = sequence.getNext(); 343 } 344 return rc; 345 } 346 347 /** 348 * Returns true if the value given is contained within one of the 349 * sequences held in this set. 350 * 351 * @param value 352 * The value to search for in the set. 353 * 354 * @return true if the value is contained in the set. 355 */ 356 public boolean contains(long value) { 357 if (isEmpty()) { 358 return false; 359 } 360 361 Sequence sequence = getHead(); 362 while (sequence != null) { 363 if (sequence.contains(value)) { 364 return true; 365 } 366 sequence = sequence.getNext(); 367 } 368 369 return false; 370 } 371 372 public boolean contains(int first, int last) { 373 if (isEmpty()) { 374 return false; 375 } 376 Sequence sequence = getHead(); 377 while (sequence != null) { 378 if (sequence.first <= first && first <= sequence.last ) { 379 return last <= sequence.last; 380 } 381 sequence = sequence.getNext(); 382 } 383 return false; 384 } 385 386 public Sequence get(int value) { 387 if (!isEmpty()) { 388 Sequence sequence = getHead(); 389 while (sequence != null) { 390 if (sequence.contains(value)) { 391 return sequence; 392 } 393 sequence = sequence.getNext(); 394 } 395 } 396 return null; 397 } 398 399 400 /** 401 * Computes the size of this Sequence by summing the values of all 402 * the contained sequences. 403 * 404 * @return the total number of values contained in this set if it 405 * were to be iterated over like an array. 406 */ 407 public long rangeSize() { 408 long result = 0; 409 Sequence sequence = getHead(); 410 while (sequence != null) { 411 result += sequence.range(); 412 sequence = sequence.getNext(); 413 } 414 return result; 415 } 416 417 public Iterator<Long> iterator() { 418 return new SequenceIterator(); 419 } 420 421 private class SequenceIterator implements Iterator<Long> { 422 423 private Sequence currentEntry; 424 private long lastReturned = -1; 425 426 public SequenceIterator() { 427 currentEntry = getHead(); 428 if (currentEntry != null) { 429 lastReturned = currentEntry.first - 1; 430 } 431 } 432 433 public boolean hasNext() { 434 return currentEntry != null; 435 } 436 437 public Long next() { 438 if (currentEntry == null) { 439 throw new NoSuchElementException(); 440 } 441 442 if(lastReturned < currentEntry.first) { 443 lastReturned = currentEntry.first; 444 if (currentEntry.range() == 1) { 445 currentEntry = currentEntry.getNext(); 446 } 447 } else { 448 lastReturned++; 449 if (lastReturned == currentEntry.last) { 450 currentEntry = currentEntry.getNext(); 451 } 452 } 453 454 return lastReturned; 455 } 456 457 public void remove() { 458 throw new UnsupportedOperationException(); 459 } 460 461 } 462 463}