001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.resequencer;
018    
019    import java.util.Timer;
020    
021    import org.apache.camel.util.concurrent.ThreadHelper;
022    
023    /**
024     * Resequences elements based on a given {@link SequenceElementComparator}.
025     * This resequencer is designed for resequencing element streams. Stream-based
026     * resequencing has the advantage that the number of elements to be resequenced
027     * need not be known in advance. Resequenced elements are delivered via a
028     * {@link SequenceSender}.
029     * <p>
030     * The resequencer's behaviour for a given comparator is controlled by the
031     * <code>timeout</code> property. This is the timeout (in milliseconds) for a
032     * given element managed by this resequencer. An out-of-sequence element can
033     * only be marked as <i>ready-for-delivery</i> if it either times out or if it
034     * has an immediate predecessor (in that case it is in-sequence). If an
035     * immediate predecessor of a waiting element arrives the timeout task for the
036     * waiting element will be cancelled (which marks it as <i>ready-for-delivery</i>).
037     * <p>
038     * If the maximum out-of-sequence time difference between elements within a
039     * stream is known, the <code>timeout</code> value should be set to this
040     * value. In this case it is guaranteed that all elements of a stream will be
041     * delivered in sequence via the {@link SequenceSender}. The lower the
042     * <code>timeout</code> value is compared to the out-of-sequence time
043     * difference between elements within a stream the higher the probability is for
044     * out-of-sequence elements delivered by this resequencer. Delivery of elements
045     * must be explicitly triggered by applications using the {@link #deliver()} or
046     * {@link #deliverNext()} methods. Only elements that are <i>ready-for-delivery</i>
047     * are delivered by these methods. The longer an application waits to trigger a
048     * delivery the more elements may become <i>ready-for-delivery</i>.
049     * <p>
050     * The resequencer remembers the last-delivered element. If an element arrives
051     * which is the immediate successor of the last-delivered element it is
052     * <i>ready-for-delivery</i> immediately. After delivery the last-delivered
053     * element is adjusted accordingly. If the last-delivered element is
054     * <code>null</code> i.e. the resequencer was newly created the first arriving
055     * element needs <code>timeout</code> milliseconds in any case for becoming
056     * <i>ready-for-delivery</i>.
057     * <p>
058     *
059     * @version 
060     */
061    public class ResequencerEngine<E> {
062    
063        /**
064         * The element that most recently hash been delivered or <code>null</code>
065         * if no element has been delivered yet.
066         */
067        private Element<E> lastDelivered;
068    
069        /**
070         * Minimum amount of time to wait for out-of-sequence elements.
071         */
072        private long timeout;
073    
074        /**
075         * A sequence of elements for sorting purposes.
076         */
077        private Sequence<Element<E>> sequence;
078    
079        /**
080         * A timer for scheduling timeout notifications.
081         */
082        private Timer timer;
083    
084        /**
085         * A strategy for sending sequence elements.
086         */
087        private SequenceSender<E> sequenceSender;
088    
089        /**
090         * Creates a new resequencer instance with a default timeout of 2000
091         * milliseconds.
092         *
093         * @param comparator a sequence element comparator.
094         */
095        public ResequencerEngine(SequenceElementComparator<E> comparator) {
096            this.sequence = createSequence(comparator);
097            this.timeout = 2000L;
098            this.lastDelivered = null;
099        }
100    
101        public void start() {
102            timer = new Timer(ThreadHelper.resolveThreadName("Camel Thread ${counter} - ${name}", "Stream Resequencer Timer"), true);
103        }
104    
105        /**
106         * Stops this resequencer (i.e. this resequencer's {@link Timer} instance).
107         */
108        public void stop() {
109            timer.cancel();
110        }
111    
112        /**
113         * Returns the number of elements currently maintained by this resequencer.
114         *
115         * @return the number of elements currently maintained by this resequencer.
116         */
117        public synchronized int size() {
118            return sequence.size();
119        }
120    
121        /**
122         * Returns this resequencer's timeout value.
123         *
124         * @return the timeout in milliseconds.
125         */
126        public long getTimeout() {
127            return timeout;
128        }
129    
130        /**
131         * Sets this sequencer's timeout value.
132         *
133         * @param timeout the timeout in milliseconds.
134         */
135        public void setTimeout(long timeout) {
136            this.timeout = timeout;
137        }
138    
139        /**
140         * Returns the sequence sender.
141         *
142         * @return the sequence sender.
143         */
144        public SequenceSender<E> getSequenceSender() {
145            return sequenceSender;
146        }
147    
148        /**
149         * Sets the sequence sender.
150         *
151         * @param sequenceSender a sequence element sender.
152         */
153        public void setSequenceSender(SequenceSender<E> sequenceSender) {
154            this.sequenceSender = sequenceSender;
155        }
156    
157        /**
158         * Returns the last delivered element.
159         *
160         * @return the last delivered element or <code>null</code> if no delivery
161         *         has been made yet.
162         */
163        E getLastDelivered() {
164            if (lastDelivered == null) {
165                return null;
166            }
167            return lastDelivered.getObject();
168        }
169    
170        /**
171         * Sets the last delivered element. This is for testing purposes only.
172         *
173         * @param o an element.
174         */
175        void setLastDelivered(E o) {
176            lastDelivered = new Element<E>(o);
177        }
178    
179        /**
180         * Inserts the given element into this resequencer. If the element is not
181         * ready for immediate delivery and has no immediate presecessor then it is
182         * scheduled for timing out. After being timed out it is ready for delivery.
183         *
184         * @param o an element.
185         * @throws IllegalArgumentException if the element cannot be used with this resequencer engine
186         */
187        public synchronized void insert(E o) {
188            // wrap object into internal element
189            Element<E> element = new Element<E>(o);
190    
191            // validate the exchange has no problem
192            if (!sequence.comparator().isValid(element)) {
193                throw new IllegalArgumentException("Element cannot be used in comparator: " + sequence.comparator());
194            }
195    
196            // add element to sequence in proper order
197            sequence.add(element);
198    
199            Element<E> successor = sequence.successor(element);
200    
201            // check if there is an immediate successor and cancel
202            // timer task (no need to wait any more for timeout)
203            if (successor != null) {
204                successor.cancel();
205            }
206    
207            // start delivery if current element is successor of last delivered element
208            if (successorOfLastDelivered(element)) {
209                // nothing to schedule
210            } else if (sequence.predecessor(element) != null) {
211                // nothing to schedule
212            } else {
213                element.schedule(defineTimeout());
214            }
215        }
216    
217        /**
218         * Delivers all elements which are currently ready to deliver.
219         *
220         * @throws Exception thrown by {@link SequenceSender#sendElement(Object)}.
221         *
222         * @see ResequencerEngine#deliverNext() 
223         */
224        public synchronized void deliver() throws Exception {
225            while (deliverNext()) {
226                // do nothing here
227            }
228        }
229    
230        /**
231         * Attempts to deliver a single element from the head of the resequencer
232         * queue (sequence). Only elements which have not been scheduled for timing
233         * out or which already timed out can be delivered. Elements are delivered via
234         * {@link SequenceSender#sendElement(Object)}.
235         *
236         * @return <code>true</code> if the element has been delivered
237         *         <code>false</code> otherwise.
238         *
239         * @throws Exception thrown by {@link SequenceSender#sendElement(Object)}.
240         *
241         */
242        public boolean deliverNext() throws Exception {
243            if (sequence.size() == 0) {
244                return false;
245            }
246            // inspect element with lowest sequence value
247            Element<E> element = sequence.first();
248    
249            // if element is scheduled do not deliver and return
250            if (element.scheduled()) {
251                return false;
252            }
253    
254            // remove deliverable element from sequence
255            sequence.remove(element);
256    
257            // set the delivered element to last delivered element
258            lastDelivered = element;
259    
260            // deliver the sequence element
261            sequenceSender.sendElement(element.getObject());
262    
263            // element has been delivered
264            return true;
265        }
266    
267        /**
268         * Returns <code>true</code> if the given element is the immediate
269         * successor of the last delivered element.
270         *
271         * @param element an element.
272         * @return <code>true</code> if the given element is the immediate
273         *         successor of the last delivered element.
274         */
275        private boolean successorOfLastDelivered(Element<E> element) {
276            if (lastDelivered == null) {
277                return false;
278            }
279            if (sequence.comparator().successor(element, lastDelivered)) {
280                return true;
281            }
282            return false;
283        }
284    
285        /**
286         * Creates a timeout task based on the timeout setting of this resequencer.
287         *
288         * @return a new timeout task.
289         */
290        private Timeout defineTimeout() {
291            return new Timeout(timer, timeout);
292        }
293    
294        private static <E> Sequence<Element<E>> createSequence(SequenceElementComparator<E> comparator) {
295            return new Sequence<Element<E>>(new ElementComparator<E>(comparator));
296        }
297    
298    }