001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.resequencer; 018 019 import java.util.Timer; 020 021 import org.apache.camel.util.concurrent.ThreadHelper; 022 023 /** 024 * Resequences elements based on a given {@link SequenceElementComparator}. 025 * This resequencer is designed for resequencing element streams. Stream-based 026 * resequencing has the advantage that the number of elements to be resequenced 027 * need not be known in advance. Resequenced elements are delivered via a 028 * {@link SequenceSender}. 029 * <p> 030 * The resequencer's behaviour for a given comparator is controlled by the 031 * <code>timeout</code> property. This is the timeout (in milliseconds) for a 032 * given element managed by this resequencer. An out-of-sequence element can 033 * only be marked as <i>ready-for-delivery</i> if it either times out or if it 034 * has an immediate predecessor (in that case it is in-sequence). If an 035 * immediate predecessor of a waiting element arrives the timeout task for the 036 * waiting element will be cancelled (which marks it as <i>ready-for-delivery</i>). 037 * <p> 038 * If the maximum out-of-sequence time difference between elements within a 039 * stream is known, the <code>timeout</code> value should be set to this 040 * value. In this case it is guaranteed that all elements of a stream will be 041 * delivered in sequence via the {@link SequenceSender}. The lower the 042 * <code>timeout</code> value is compared to the out-of-sequence time 043 * difference between elements within a stream the higher the probability is for 044 * out-of-sequence elements delivered by this resequencer. Delivery of elements 045 * must be explicitly triggered by applications using the {@link #deliver()} or 046 * {@link #deliverNext()} methods. Only elements that are <i>ready-for-delivery</i> 047 * are delivered by these methods. The longer an application waits to trigger a 048 * delivery the more elements may become <i>ready-for-delivery</i>. 049 * <p> 050 * The resequencer remembers the last-delivered element. If an element arrives 051 * which is the immediate successor of the last-delivered element it is 052 * <i>ready-for-delivery</i> immediately. After delivery the last-delivered 053 * element is adjusted accordingly. If the last-delivered element is 054 * <code>null</code> i.e. the resequencer was newly created the first arriving 055 * element needs <code>timeout</code> milliseconds in any case for becoming 056 * <i>ready-for-delivery</i>. 057 * <p> 058 * 059 * @version 060 */ 061 public class ResequencerEngine<E> { 062 063 /** 064 * The element that most recently hash been delivered or <code>null</code> 065 * if no element has been delivered yet. 066 */ 067 private Element<E> lastDelivered; 068 069 /** 070 * Minimum amount of time to wait for out-of-sequence elements. 071 */ 072 private long timeout; 073 074 /** 075 * A sequence of elements for sorting purposes. 076 */ 077 private Sequence<Element<E>> sequence; 078 079 /** 080 * A timer for scheduling timeout notifications. 081 */ 082 private Timer timer; 083 084 /** 085 * A strategy for sending sequence elements. 086 */ 087 private SequenceSender<E> sequenceSender; 088 089 /** 090 * Creates a new resequencer instance with a default timeout of 2000 091 * milliseconds. 092 * 093 * @param comparator a sequence element comparator. 094 */ 095 public ResequencerEngine(SequenceElementComparator<E> comparator) { 096 this.sequence = createSequence(comparator); 097 this.timeout = 2000L; 098 this.lastDelivered = null; 099 } 100 101 public void start() { 102 timer = new Timer(ThreadHelper.resolveThreadName("Camel Thread ${counter} - ${name}", "Stream Resequencer Timer"), true); 103 } 104 105 /** 106 * Stops this resequencer (i.e. this resequencer's {@link Timer} instance). 107 */ 108 public void stop() { 109 timer.cancel(); 110 } 111 112 /** 113 * Returns the number of elements currently maintained by this resequencer. 114 * 115 * @return the number of elements currently maintained by this resequencer. 116 */ 117 public synchronized int size() { 118 return sequence.size(); 119 } 120 121 /** 122 * Returns this resequencer's timeout value. 123 * 124 * @return the timeout in milliseconds. 125 */ 126 public long getTimeout() { 127 return timeout; 128 } 129 130 /** 131 * Sets this sequencer's timeout value. 132 * 133 * @param timeout the timeout in milliseconds. 134 */ 135 public void setTimeout(long timeout) { 136 this.timeout = timeout; 137 } 138 139 /** 140 * Returns the sequence sender. 141 * 142 * @return the sequence sender. 143 */ 144 public SequenceSender<E> getSequenceSender() { 145 return sequenceSender; 146 } 147 148 /** 149 * Sets the sequence sender. 150 * 151 * @param sequenceSender a sequence element sender. 152 */ 153 public void setSequenceSender(SequenceSender<E> sequenceSender) { 154 this.sequenceSender = sequenceSender; 155 } 156 157 /** 158 * Returns the last delivered element. 159 * 160 * @return the last delivered element or <code>null</code> if no delivery 161 * has been made yet. 162 */ 163 E getLastDelivered() { 164 if (lastDelivered == null) { 165 return null; 166 } 167 return lastDelivered.getObject(); 168 } 169 170 /** 171 * Sets the last delivered element. This is for testing purposes only. 172 * 173 * @param o an element. 174 */ 175 void setLastDelivered(E o) { 176 lastDelivered = new Element<E>(o); 177 } 178 179 /** 180 * Inserts the given element into this resequencer. If the element is not 181 * ready for immediate delivery and has no immediate presecessor then it is 182 * scheduled for timing out. After being timed out it is ready for delivery. 183 * 184 * @param o an element. 185 * @throws IllegalArgumentException if the element cannot be used with this resequencer engine 186 */ 187 public synchronized void insert(E o) { 188 // wrap object into internal element 189 Element<E> element = new Element<E>(o); 190 191 // validate the exchange has no problem 192 if (!sequence.comparator().isValid(element)) { 193 throw new IllegalArgumentException("Element cannot be used in comparator: " + sequence.comparator()); 194 } 195 196 // add element to sequence in proper order 197 sequence.add(element); 198 199 Element<E> successor = sequence.successor(element); 200 201 // check if there is an immediate successor and cancel 202 // timer task (no need to wait any more for timeout) 203 if (successor != null) { 204 successor.cancel(); 205 } 206 207 // start delivery if current element is successor of last delivered element 208 if (successorOfLastDelivered(element)) { 209 // nothing to schedule 210 } else if (sequence.predecessor(element) != null) { 211 // nothing to schedule 212 } else { 213 element.schedule(defineTimeout()); 214 } 215 } 216 217 /** 218 * Delivers all elements which are currently ready to deliver. 219 * 220 * @throws Exception thrown by {@link SequenceSender#sendElement(Object)}. 221 * 222 * @see ResequencerEngine#deliverNext() 223 */ 224 public synchronized void deliver() throws Exception { 225 while (deliverNext()) { 226 // do nothing here 227 } 228 } 229 230 /** 231 * Attempts to deliver a single element from the head of the resequencer 232 * queue (sequence). Only elements which have not been scheduled for timing 233 * out or which already timed out can be delivered. Elements are delivered via 234 * {@link SequenceSender#sendElement(Object)}. 235 * 236 * @return <code>true</code> if the element has been delivered 237 * <code>false</code> otherwise. 238 * 239 * @throws Exception thrown by {@link SequenceSender#sendElement(Object)}. 240 * 241 */ 242 public boolean deliverNext() throws Exception { 243 if (sequence.size() == 0) { 244 return false; 245 } 246 // inspect element with lowest sequence value 247 Element<E> element = sequence.first(); 248 249 // if element is scheduled do not deliver and return 250 if (element.scheduled()) { 251 return false; 252 } 253 254 // remove deliverable element from sequence 255 sequence.remove(element); 256 257 // set the delivered element to last delivered element 258 lastDelivered = element; 259 260 // deliver the sequence element 261 sequenceSender.sendElement(element.getObject()); 262 263 // element has been delivered 264 return true; 265 } 266 267 /** 268 * Returns <code>true</code> if the given element is the immediate 269 * successor of the last delivered element. 270 * 271 * @param element an element. 272 * @return <code>true</code> if the given element is the immediate 273 * successor of the last delivered element. 274 */ 275 private boolean successorOfLastDelivered(Element<E> element) { 276 if (lastDelivered == null) { 277 return false; 278 } 279 if (sequence.comparator().successor(element, lastDelivered)) { 280 return true; 281 } 282 return false; 283 } 284 285 /** 286 * Creates a timeout task based on the timeout setting of this resequencer. 287 * 288 * @return a new timeout task. 289 */ 290 private Timeout defineTimeout() { 291 return new Timeout(timer, timeout); 292 } 293 294 private static <E> Sequence<Element<E>> createSequence(SequenceElementComparator<E> comparator) { 295 return new Sequence<Element<E>>(new ElementComparator<E>(comparator)); 296 } 297 298 }