001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import java.util.concurrent.TimeUnit; 022 import java.util.concurrent.locks.Condition; 023 import java.util.concurrent.locks.Lock; 024 import java.util.concurrent.locks.ReentrantLock; 025 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.CamelExchangeException; 028 import org.apache.camel.Exchange; 029 import org.apache.camel.Navigate; 030 import org.apache.camel.Processor; 031 import org.apache.camel.Traceable; 032 import org.apache.camel.impl.LoggingExceptionHandler; 033 import org.apache.camel.processor.resequencer.ResequencerEngine; 034 import org.apache.camel.processor.resequencer.SequenceElementComparator; 035 import org.apache.camel.processor.resequencer.SequenceSender; 036 import org.apache.camel.spi.ExceptionHandler; 037 import org.apache.camel.support.ServiceSupport; 038 import org.apache.camel.util.ObjectHelper; 039 import org.apache.camel.util.ServiceHelper; 040 import org.slf4j.Logger; 041 import org.slf4j.LoggerFactory; 042 043 /** 044 * A resequencer that re-orders a (continuous) stream of {@link Exchange}s. The 045 * algorithm implemented by {@link ResequencerEngine} is based on the detection 046 * of gaps in a message stream rather than on a fixed batch size. Gap detection 047 * in combination with timeouts removes the constraint of having to know the 048 * number of messages of a sequence (i.e. the batch size) in advance. 049 * <p> 050 * Messages must contain a unique sequence number for which a predecessor and a 051 * successor is known. For example a message with the sequence number 3 has a 052 * predecessor message with the sequence number 2 and a successor message with 053 * the sequence number 4. The message sequence 2,3,5 has a gap because the 054 * successor of 3 is missing. The resequencer therefore has to retain message 5 055 * until message 4 arrives (or a timeout occurs). 056 * <p> 057 * Instances of this class poll for {@link Exchange}s from a given 058 * <code>endpoint</code>. Resequencing work and the delivery of messages to 059 * the next <code>processor</code> is done within the single polling thread. 060 * 061 * @version 062 * 063 * @see ResequencerEngine 064 */ 065 public class StreamResequencer extends ServiceSupport implements SequenceSender<Exchange>, Processor, Navigate<Processor>, Traceable { 066 067 private static final long DELIVERY_ATTEMPT_INTERVAL = 1000L; 068 private static final Logger LOG = LoggerFactory.getLogger(StreamResequencer.class); 069 070 private final CamelContext camelContext; 071 private final ExceptionHandler exceptionHandler; 072 private final ResequencerEngine<Exchange> engine; 073 private final Processor processor; 074 private Delivery delivery; 075 private int capacity; 076 private boolean ignoreInvalidExchanges; 077 078 /** 079 * Creates a new {@link StreamResequencer} instance. 080 * 081 * @param processor next processor that processes re-ordered exchanges. 082 * @param comparator a sequence element comparator for exchanges. 083 */ 084 public StreamResequencer(CamelContext camelContext, Processor processor, SequenceElementComparator<Exchange> comparator) { 085 ObjectHelper.notNull(camelContext, "CamelContext"); 086 this.camelContext = camelContext; 087 this.exceptionHandler = new LoggingExceptionHandler(getClass()); 088 this.engine = new ResequencerEngine<Exchange>(comparator); 089 this.engine.setSequenceSender(this); 090 this.processor = processor; 091 } 092 093 /** 094 * Returns this resequencer's exception handler. 095 */ 096 public ExceptionHandler getExceptionHandler() { 097 return exceptionHandler; 098 } 099 100 /** 101 * Returns the next processor. 102 */ 103 public Processor getProcessor() { 104 return processor; 105 } 106 107 /** 108 * Returns this resequencer's capacity. The capacity is the maximum number 109 * of exchanges that can be managed by this resequencer at a given point in 110 * time. If the capacity if reached, polling from the endpoint will be 111 * skipped for <code>timeout</code> milliseconds giving exchanges the 112 * possibility to time out and to be delivered after the waiting period. 113 * 114 * @return this resequencer's capacity. 115 */ 116 public int getCapacity() { 117 return capacity; 118 } 119 120 /** 121 * Returns this resequencer's timeout. This sets the resequencer engine's 122 * timeout via {@link ResequencerEngine#setTimeout(long)}. This value is 123 * also used to define the polling timeout from the endpoint. 124 * 125 * @return this resequencer's timeout. (Processor) 126 * @see ResequencerEngine#setTimeout(long) 127 */ 128 public long getTimeout() { 129 return engine.getTimeout(); 130 } 131 132 public void setCapacity(int capacity) { 133 this.capacity = capacity; 134 } 135 136 public void setTimeout(long timeout) { 137 engine.setTimeout(timeout); 138 } 139 140 public boolean isIgnoreInvalidExchanges() { 141 return ignoreInvalidExchanges; 142 } 143 144 /** 145 * Sets whether to ignore invalid exchanges which cannot be used by this stream resequencer. 146 * <p/> 147 * Default is <tt>false</tt>, by which an {@link CamelExchangeException} is thrown if the {@link Exchange} 148 * is invalid. 149 */ 150 public void setIgnoreInvalidExchanges(boolean ignoreInvalidExchanges) { 151 this.ignoreInvalidExchanges = ignoreInvalidExchanges; 152 } 153 154 @Override 155 public String toString() { 156 return "StreamResequencer[to: " + processor + "]"; 157 } 158 159 public String getTraceLabel() { 160 return "streamResequence"; 161 } 162 163 @Override 164 protected void doStart() throws Exception { 165 ServiceHelper.startServices(processor); 166 delivery = new Delivery(); 167 engine.start(); 168 delivery.start(); 169 } 170 171 @Override 172 protected void doStop() throws Exception { 173 // let's stop everything in the reverse order 174 // no need to stop the worker thread -- it will stop automatically when this service is stopped 175 engine.stop(); 176 ServiceHelper.stopServices(processor); 177 } 178 179 /** 180 * Sends the <code>exchange</code> to the next <code>processor</code>. 181 * 182 * @param exchange exchange to send. 183 */ 184 public void sendElement(Exchange exchange) throws Exception { 185 processor.process(exchange); 186 } 187 188 public void process(Exchange exchange) throws Exception { 189 while (engine.size() >= capacity) { 190 Thread.sleep(getTimeout()); 191 } 192 193 try { 194 engine.insert(exchange); 195 delivery.request(); 196 } catch (IllegalArgumentException e) { 197 if (isIgnoreInvalidExchanges()) { 198 LOG.debug("Invalid Exchange. This Exchange will be ignored: {}", exchange); 199 return; 200 } else { 201 throw new CamelExchangeException("Error processing Exchange in StreamResequencer", exchange, e); 202 } 203 } 204 } 205 206 public boolean hasNext() { 207 return processor != null; 208 } 209 210 public List<Processor> next() { 211 if (!hasNext()) { 212 return null; 213 } 214 List<Processor> answer = new ArrayList<Processor>(1); 215 answer.add(processor); 216 return answer; 217 } 218 219 class Delivery extends Thread { 220 221 private Lock deliveryRequestLock = new ReentrantLock(); 222 private Condition deliveryRequestCondition = deliveryRequestLock.newCondition(); 223 224 public Delivery() { 225 super(camelContext.getExecutorServiceManager().resolveThreadName("Resequencer Delivery")); 226 } 227 228 @Override 229 public void run() { 230 while (isRunAllowed()) { 231 try { 232 deliveryRequestLock.lock(); 233 try { 234 deliveryRequestCondition.await(DELIVERY_ATTEMPT_INTERVAL, TimeUnit.MILLISECONDS); 235 } finally { 236 deliveryRequestLock.unlock(); 237 } 238 } catch (InterruptedException e) { 239 break; 240 } 241 try { 242 engine.deliver(); 243 } catch (Throwable t) { 244 // a fail safe to handle all exceptions being thrown 245 getExceptionHandler().handleException(t); 246 } 247 } 248 } 249 250 public void cancel() { 251 interrupt(); 252 } 253 254 public void request() { 255 deliveryRequestLock.lock(); 256 try { 257 deliveryRequestCondition.signal(); 258 } finally { 259 deliveryRequestLock.unlock(); 260 } 261 } 262 263 } 264 265 }