001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.locks.Condition; 023import java.util.concurrent.locks.Lock; 024import java.util.concurrent.locks.ReentrantLock; 025 026import org.apache.camel.AsyncCallback; 027import org.apache.camel.AsyncProcessor; 028import org.apache.camel.CamelContext; 029import org.apache.camel.CamelExchangeException; 030import org.apache.camel.Exchange; 031import org.apache.camel.Expression; 032import org.apache.camel.Navigate; 033import org.apache.camel.Processor; 034import org.apache.camel.Traceable; 035import org.apache.camel.processor.resequencer.ResequencerEngine; 036import org.apache.camel.processor.resequencer.SequenceElementComparator; 037import org.apache.camel.processor.resequencer.SequenceSender; 038import org.apache.camel.spi.ExceptionHandler; 039import org.apache.camel.spi.IdAware; 040import org.apache.camel.support.LoggingExceptionHandler; 041import org.apache.camel.support.ServiceSupport; 042import org.apache.camel.util.AsyncProcessorHelper; 043import org.apache.camel.util.ObjectHelper; 044import org.apache.camel.util.ServiceHelper; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * A resequencer that re-orders a (continuous) stream of {@link Exchange}s. The 050 * algorithm implemented by {@link ResequencerEngine} is based on the detection 051 * of gaps in a message stream rather than on a fixed batch size. Gap detection 052 * in combination with timeouts removes the constraint of having to know the 053 * number of messages of a sequence (i.e. the batch size) in advance. 054 * <p> 055 * Messages must contain a unique sequence number for which a predecessor and a 056 * successor is known. For example a message with the sequence number 3 has a 057 * predecessor message with the sequence number 2 and a successor message with 058 * the sequence number 4. The message sequence 2,3,5 has a gap because the 059 * successor of 3 is missing. The resequencer therefore has to retain message 5 060 * until message 4 arrives (or a timeout occurs). 061 * <p> 062 * Instances of this class poll for {@link Exchange}s from a given 063 * <code>endpoint</code>. Resequencing work and the delivery of messages to 064 * the next <code>processor</code> is done within the single polling thread. 065 * 066 * @version 067 * 068 * @see ResequencerEngine 069 */ 070public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>, AsyncProcessor, Navigate<Processor>, Traceable, IdAware { 071 072 private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L; 073 private static final Logger LOG = LoggerFactory.getLogger(StreamResequencer.class); 074 075 private String id; 076 private final CamelContext camelContext; 077 private final ExceptionHandler exceptionHandler; 078 private final ResequencerEngine<Exchange> engine; 079 private final Processor processor; 080 private final Expression expression; 081 private Delivery delivery; 082 private int capacity; 083 private boolean ignoreInvalidExchanges; 084 085 /** 086 * Creates a new {@link StreamResequencer} instance. 087 * 088 * @param processor next processor that processes re-ordered exchanges. 089 * @param comparator a sequence element comparator for exchanges. 090 */ 091 public StreamResequencer(CamelContext camelContext, Processor processor, SequenceElementComparator<Exchange> comparator, Expression expression) { 092 ObjectHelper.notNull(camelContext, "CamelContext"); 093 this.camelContext = camelContext; 094 this.engine = new ResequencerEngine<Exchange>(comparator); 095 this.engine.setSequenceSender(this); 096 this.processor = processor; 097 this.expression = expression; 098 this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass()); 099 } 100 101 public Expression getExpression() { 102 return expression; 103 } 104 105 /** 106 * Returns this resequencer's exception handler. 107 */ 108 public ExceptionHandler getExceptionHandler() { 109 return exceptionHandler; 110 } 111 112 /** 113 * Returns the next processor. 114 */ 115 public Processor getProcessor() { 116 return processor; 117 } 118 119 /** 120 * Returns this resequencer's capacity. The capacity is the maximum number 121 * of exchanges that can be managed by this resequencer at a given point in 122 * time. If the capacity if reached, polling from the endpoint will be 123 * skipped for <code>timeout</code> milliseconds giving exchanges the 124 * possibility to time out and to be delivered after the waiting period. 125 * 126 * @return this resequencer's capacity. 127 */ 128 public int getCapacity() { 129 return capacity; 130 } 131 132 /** 133 * Returns this resequencer's timeout. This sets the resequencer engine's 134 * timeout via {@link ResequencerEngine#setTimeout(long)}. This value is 135 * also used to define the polling timeout from the endpoint. 136 * 137 * @return this resequencer's timeout. (Processor) 138 * @see ResequencerEngine#setTimeout(long) 139 */ 140 public long getTimeout() { 141 return engine.getTimeout(); 142 } 143 144 public void setCapacity(int capacity) { 145 this.capacity = capacity; 146 } 147 148 public void setTimeout(long timeout) { 149 engine.setTimeout(timeout); 150 } 151 152 public boolean isIgnoreInvalidExchanges() { 153 return ignoreInvalidExchanges; 154 } 155 156 public void setRejectOld(Boolean rejectOld) { 157 engine.setRejectOld(rejectOld); 158 } 159 160 public boolean isRejectOld() { 161 return engine.getRejectOld() != null && engine.getRejectOld(); 162 } 163 164 /** 165 * Sets whether to ignore invalid exchanges which cannot be used by this stream resequencer. 166 * <p/> 167 * Default is <tt>false</tt>, by which an {@link CamelExchangeException} is thrown if the {@link Exchange} 168 * is invalid. 169 */ 170 public void setIgnoreInvalidExchanges(boolean ignoreInvalidExchanges) { 171 this.ignoreInvalidExchanges = ignoreInvalidExchanges; 172 } 173 174 @Override 175 public String toString() { 176 return "StreamResequencer[to: " + processor + "]"; 177 } 178 179 public String getTraceLabel() { 180 return "streamResequence"; 181 } 182 183 public String getId() { 184 return id; 185 } 186 187 public void setId(String id) { 188 this.id = id; 189 } 190 191 @Override 192 protected void doStart() throws Exception { 193 ServiceHelper.startServices(processor); 194 delivery = new Delivery(); 195 engine.start(); 196 delivery.start(); 197 } 198 199 @Override 200 protected void doStop() throws Exception { 201 // let's stop everything in the reverse order 202 // no need to stop the worker thread -- it will stop automatically when this service is stopped 203 engine.stop(); 204 ServiceHelper.stopServices(processor); 205 } 206 207 /** 208 * Sends the <code>exchange</code> to the next <code>processor</code>. 209 * 210 * @param exchange exchange to send. 211 */ 212 public void sendElement(Exchange exchange) throws Exception { 213 processor.process(exchange); 214 } 215 216 public void process(Exchange exchange) throws Exception { 217 AsyncProcessorHelper.process(this, exchange); 218 } 219 220 public boolean process(Exchange exchange, AsyncCallback callback) { 221 while (engine.size() >= capacity) { 222 try { 223 Thread.sleep(getTimeout()); 224 } catch (InterruptedException e) { 225 // we was interrupted so break out 226 exchange.setException(e); 227 callback.done(true); 228 return true; 229 } 230 } 231 232 try { 233 engine.insert(exchange); 234 delivery.request(); 235 } catch (Exception e) { 236 if (isIgnoreInvalidExchanges()) { 237 LOG.debug("Invalid Exchange. This Exchange will be ignored: {}", exchange); 238 } else { 239 exchange.setException(new CamelExchangeException("Error processing Exchange in StreamResequencer", exchange, e)); 240 } 241 } 242 243 callback.done(true); 244 return true; 245 } 246 247 public boolean hasNext() { 248 return processor != null; 249 } 250 251 public List<Processor> next() { 252 if (!hasNext()) { 253 return null; 254 } 255 List<Processor> answer = new ArrayList<Processor>(1); 256 answer.add(processor); 257 return answer; 258 } 259 260 class Delivery extends Thread { 261 262 private Lock deliveryRequestLock = new ReentrantLock(); 263 private Condition deliveryRequestCondition = deliveryRequestLock.newCondition(); 264 265 Delivery() { 266 super(camelContext.getExecutorServiceManager().resolveThreadName("Resequencer Delivery")); 267 } 268 269 @Override 270 public void run() { 271 while (isRunAllowed()) { 272 try { 273 deliveryRequestLock.lock(); 274 try { 275 deliveryRequestCondition.await(DELIVERY_ATTEMPT_INTERVAL, TimeUnit.MILLISECONDS); 276 } finally { 277 deliveryRequestLock.unlock(); 278 } 279 } catch (InterruptedException e) { 280 break; 281 } 282 try { 283 engine.deliver(); 284 } catch (Throwable t) { 285 // a fail safe to handle all exceptions being thrown 286 getExceptionHandler().handleException(t); 287 } 288 } 289 } 290 291 public void cancel() { 292 interrupt(); 293 } 294 295 public void request() { 296 deliveryRequestLock.lock(); 297 try { 298 deliveryRequestCondition.signal(); 299 } finally { 300 deliveryRequestLock.unlock(); 301 } 302 } 303 304 } 305 306}