001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.util;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Iterator;
024import java.util.List;
025import java.util.NoSuchElementException;
026
027/**
028 * Keeps track of a added long values. Collapses ranges of numbers using a
029 * Sequence representation. Use to keep track of received message ids to find
030 * out if a message is duplicate or if there are any missing messages.
031 *
032 * @author chirino
033 */
034public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Long> {
035
036    public static class Marshaller implements org.apache.activemq.store.kahadb.disk.util.Marshaller<SequenceSet> {
037
038        public static final Marshaller INSTANCE = new Marshaller();
039
040        public SequenceSet readPayload(DataInput in) throws IOException {
041            SequenceSet value = new SequenceSet();
042            int count = in.readInt();
043            for (int i = 0; i < count; i++) {
044                if( in.readBoolean() ) {
045                    Sequence sequence = new Sequence(in.readLong(), in.readLong());
046                    value.addLast(sequence);
047                } else {
048                    Sequence sequence = new Sequence(in.readLong());
049                    value.addLast(sequence);
050                }
051            }
052            return value;
053        }
054
055        public void writePayload(SequenceSet value, DataOutput out) throws IOException {
056            out.writeInt(value.size());
057            Sequence sequence = value.getHead();
058            while (sequence != null ) {
059                if( sequence.range() > 1 ) {
060                    out.writeBoolean(true);
061                    out.writeLong(sequence.first);
062                    out.writeLong(sequence.last);
063                } else {
064                    out.writeBoolean(false);
065                    out.writeLong(sequence.first);
066                }
067                sequence = sequence.getNext();
068            }
069        }
070
071        public int getFixedSize() {
072            return -1;
073        }
074
075        public SequenceSet deepCopy(SequenceSet value) {
076            SequenceSet rc = new SequenceSet();
077            Sequence sequence = value.getHead();
078            while (sequence != null ) {
079                rc.add(new Sequence(sequence.first, sequence.last));
080                sequence = sequence.getNext();
081            }
082            return rc;
083        }
084
085        public boolean isDeepCopySupported() {
086            return true;
087        }
088    }
089
090    public void add(Sequence value) {
091        // TODO we can probably optimize this a bit
092        for(long i=value.first; i<value.last+1; i++) {
093            add(i);
094        }
095    }
096
097    public void merge(SequenceSet sequenceSet) {
098        Sequence node = sequenceSet.getHead();
099
100        while (node != null) {
101            add(node);
102            node = node.getNext();
103        }
104    }
105
106    public void remove(SequenceSet sequenceSet) {
107        Sequence node = sequenceSet.getHead();
108
109        while (node != null) {
110            remove(node);
111            node = node.getNext();
112        }
113    }
114
115    public void remove(Sequence value) {
116        for(long i=value.first; i<value.last+1; i++) {
117            remove(i);
118        }
119    }
120
121    /**
122     *
123     * @param value
124     *            the value to add to the list
125     * @return false if the value was a duplicate.
126     */
127    public boolean add(long value) {
128
129        if (isEmpty()) {
130            addFirst(new Sequence(value));
131            return true;
132        }
133
134        // check for append
135        Sequence sequence = getTail();
136        if (sequence.isAdjacentToLast(value)) {
137            sequence.last = value;
138            return true;
139        }
140
141        // check if the value is greater than the bigger sequence value and if it's not adjacent to it
142        // in this case, we are sure that the value should be add to the tail of the sequence.
143        if (sequence.isBiggerButNotAdjacentToLast(value)) {
144            addLast(new Sequence(value));
145            return true;
146        }
147
148        sequence = getHead();
149        while (sequence != null) {
150
151            if (sequence.isAdjacentToLast(value)) {
152                // grow the sequence...
153                sequence.last = value;
154                // it might connect us to the next sequence..
155                if (sequence.getNext() != null) {
156                    Sequence next = sequence.getNext();
157                    if (next.isAdjacentToFirst(value)) {
158                        // Yep the sequence connected.. so join them.
159                        sequence.last = next.last;
160                        next.unlink();
161                    }
162                }
163                return true;
164            }
165
166            if (sequence.isAdjacentToFirst(value)) {
167                // grow the sequence...
168                sequence.first = value;
169
170                // it might connect us to the previous
171                if (sequence.getPrevious() != null) {
172                    Sequence prev = sequence.getPrevious();
173                    if (prev.isAdjacentToLast(value)) {
174                        // Yep the sequence connected.. so join them.
175                        sequence.first = prev.first;
176                        prev.unlink();
177                    }
178                }
179                return true;
180            }
181
182            // Did that value land before this sequence?
183            if (value < sequence.first) {
184                // Then insert a new entry before this sequence item.
185                sequence.linkBefore(new Sequence(value));
186                return true;
187            }
188
189            // Did that value land within the sequence? The it's a duplicate.
190            if (sequence.contains(value)) {
191                return false;
192            }
193
194            sequence = sequence.getNext();
195        }
196
197        // Then the value is getting appended to the tail of the sequence.
198        addLast(new Sequence(value));
199        return true;
200    }
201
202    /**
203     * Removes the given value from the Sequence set, splitting a
204     * contained sequence if necessary.
205     *
206     * @param value
207     *          The value that should be removed from the SequenceSet.
208     *
209     * @return true if the value was removed from the set, false if there
210     *         was no sequence in the set that contained the given value.
211     */
212    public boolean remove(long value) {
213        Sequence sequence = getHead();
214        while (sequence != null ) {
215            if(sequence.contains(value)) {
216                if (sequence.range() == 1) {
217                    sequence.unlink();
218                    return true;
219                } else if (sequence.getFirst() == value) {
220                    sequence.setFirst(value+1);
221                    return true;
222                } else if (sequence.getLast() == value) {
223                    sequence.setLast(value-1);
224                    return true;
225                } else {
226                    sequence.linkBefore(new Sequence(sequence.first, value-1));
227                    sequence.linkAfter(new Sequence(value+1, sequence.last));
228                    sequence.unlink();
229                    return true;
230                }
231            }
232
233            sequence = sequence.getNext();
234        }
235
236        return false;
237    }
238
239    /**
240     * Removes and returns the first element from this list.
241     *
242     * @return the first element from this list.
243     * @throws NoSuchElementException if this list is empty.
244     */
245    public long removeFirst() {
246        if (isEmpty()) {
247            throw new NoSuchElementException();
248        }
249
250        Sequence rc = removeFirstSequence(1);
251        return rc.first;
252    }
253
254    /**
255     * Removes and returns the last sequence from this list.
256     *
257     * @return the last sequence from this list or null if the list is empty.
258     */
259    public Sequence removeLastSequence() {
260        if (isEmpty()) {
261            return null;
262        }
263
264        Sequence rc = getTail();
265        rc.unlink();
266        return rc;
267    }
268
269    /**
270     * Removes and returns the first sequence that is count range large.
271     *
272     * @return a sequence that is count range large, or null if no sequence is that large in the list.
273     */
274    public Sequence removeFirstSequence(long count) {
275        if (isEmpty()) {
276            return null;
277        }
278
279        Sequence sequence = getHead();
280        while (sequence != null ) {
281            if (sequence.range() == count ) {
282                sequence.unlink();
283                return sequence;
284            }
285            if (sequence.range() > count ) {
286                Sequence rc = new Sequence(sequence.first, sequence.first+count-1);
287                sequence.first+=count;
288                return rc;
289            }
290            sequence = sequence.getNext();
291        }
292        return null;
293    }
294
295    /**
296     * @return all the id Sequences that are missing from this set that are not
297     *         in between the range provided.
298     */
299    public List<Sequence> getMissing(long first, long last) {
300        ArrayList<Sequence> rc = new ArrayList<Sequence>();
301        if (first > last) {
302            throw new IllegalArgumentException("First cannot be more than last");
303        }
304        if (isEmpty()) {
305            // We are missing all the messages.
306            rc.add(new Sequence(first, last));
307            return rc;
308        }
309
310        Sequence sequence = getHead();
311        while (sequence != null && first <= last) {
312            if (sequence.contains(first)) {
313                first = sequence.last + 1;
314            } else {
315                if (first < sequence.first) {
316                    if (last < sequence.first) {
317                        rc.add(new Sequence(first, last));
318                        return rc;
319                    } else {
320                        rc.add(new Sequence(first, sequence.first - 1));
321                        first = sequence.last + 1;
322                    }
323                }
324            }
325            sequence = sequence.getNext();
326        }
327
328        if (first <= last) {
329            rc.add(new Sequence(first, last));
330        }
331        return rc;
332    }
333
334    /**
335     * @return all the Sequence that are in this list
336     */
337    public List<Sequence> getReceived() {
338        ArrayList<Sequence> rc = new ArrayList<Sequence>(size());
339        Sequence sequence = getHead();
340        while (sequence != null) {
341            rc.add(new Sequence(sequence.first, sequence.last));
342            sequence = sequence.getNext();
343        }
344        return rc;
345    }
346
347    /**
348     * Returns true if the value given is contained within one of the
349     * sequences held in this set.
350     *
351     * @param value
352     *      The value to search for in the set.
353     *
354     * @return true if the value is contained in the set.
355     */
356    public boolean contains(long value) {
357        if (isEmpty()) {
358            return false;
359        }
360
361        Sequence sequence = getHead();
362        while (sequence != null) {
363            if (sequence.contains(value)) {
364                return true;
365            }
366            sequence = sequence.getNext();
367        }
368
369        return false;
370    }
371
372    public boolean contains(int first, int last) {
373        if (isEmpty()) {
374            return false;
375        }
376        Sequence sequence = getHead();
377        while (sequence != null) {
378            if (sequence.first <= first && first <= sequence.last ) {
379                return last <= sequence.last;
380            }
381            sequence = sequence.getNext();
382        }
383        return false;
384    }
385
386    public Sequence get(int value) {
387        if (!isEmpty()) {
388            Sequence sequence = getHead();
389            while (sequence != null) {
390                if (sequence.contains(value)) {
391                    return sequence;
392                }
393                sequence = sequence.getNext();
394            }
395        }
396        return null;
397    }
398
399
400    /**
401     * Computes the size of this Sequence by summing the values of all
402     * the contained sequences.
403     *
404     * @return the total number of values contained in this set if it
405     *         were to be iterated over like an array.
406     */
407    public long rangeSize() {
408        long result = 0;
409        Sequence sequence = getHead();
410        while (sequence != null) {
411            result += sequence.range();
412            sequence = sequence.getNext();
413        }
414        return result;
415    }
416
417    public Iterator<Long> iterator() {
418        return new SequenceIterator();
419    }
420
421    private class SequenceIterator implements Iterator<Long> {
422
423        private Sequence currentEntry;
424        private long lastReturned = -1;
425
426        public SequenceIterator() {
427            currentEntry = getHead();
428            if (currentEntry != null) {
429                lastReturned = currentEntry.first - 1;
430            }
431        }
432
433        public boolean hasNext() {
434            return currentEntry != null;
435        }
436
437        public Long next() {
438            if (currentEntry == null) {
439                throw new NoSuchElementException();
440            }
441
442            if(lastReturned < currentEntry.first) {
443                lastReturned = currentEntry.first;
444                if (currentEntry.range() == 1) {
445                    currentEntry = currentEntry.getNext();
446                }
447            } else {
448                lastReturned++;
449                if (lastReturned == currentEntry.last) {
450                    currentEntry = currentEntry.getNext();
451                }
452            }
453
454            return lastReturned;
455        }
456
457        public void remove() {
458            throw new UnsupportedOperationException();
459        }
460
461    }
462
463}