001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.seda; 018 019import java.util.List; 020import java.util.concurrent.BlockingQueue; 021import java.util.concurrent.CountDownLatch; 022import java.util.concurrent.ExecutorService; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.camel.AsyncCallback; 027import org.apache.camel.AsyncProcessor; 028import org.apache.camel.Consumer; 029import org.apache.camel.Endpoint; 030import org.apache.camel.Exchange; 031import org.apache.camel.Processor; 032import org.apache.camel.ShutdownRunningTask; 033import org.apache.camel.Suspendable; 034import org.apache.camel.processor.MulticastProcessor; 035import org.apache.camel.spi.ExceptionHandler; 036import org.apache.camel.spi.ShutdownAware; 037import org.apache.camel.spi.Synchronization; 038import org.apache.camel.support.LoggingExceptionHandler; 039import org.apache.camel.support.ServiceSupport; 040import org.apache.camel.util.AsyncProcessorConverterHelper; 041import org.apache.camel.util.ExchangeHelper; 042import org.apache.camel.util.ObjectHelper; 043import org.apache.camel.util.UnitOfWorkHelper; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * A Consumer for the SEDA component. 049 * <p/> 050 * In this implementation there is a little <i>slack period</i> when you suspend/stop the consumer, by which 051 * the consumer may pickup a newly arrived messages and process it. That period is up till 1 second. 052 * 053 * @version 054 */ 055public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware, Suspendable { 056 private static final Logger LOG = LoggerFactory.getLogger(SedaConsumer.class); 057 058 private final AtomicInteger taskCount = new AtomicInteger(); 059 private volatile CountDownLatch latch; 060 private volatile boolean shutdownPending; 061 private volatile boolean forceShutdown; 062 private SedaEndpoint endpoint; 063 private AsyncProcessor processor; 064 private ExecutorService executor; 065 private ExceptionHandler exceptionHandler; 066 private final int pollTimeout; 067 068 public SedaConsumer(SedaEndpoint endpoint, Processor processor) { 069 this.endpoint = endpoint; 070 this.processor = AsyncProcessorConverterHelper.convert(processor); 071 this.pollTimeout = endpoint.getPollTimeout(); 072 this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass()); 073 } 074 075 @Override 076 public String toString() { 077 return "SedaConsumer[" + endpoint + "]"; 078 } 079 080 public Endpoint getEndpoint() { 081 return endpoint; 082 } 083 084 public ExceptionHandler getExceptionHandler() { 085 return exceptionHandler; 086 } 087 088 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 089 this.exceptionHandler = exceptionHandler; 090 } 091 092 public Processor getProcessor() { 093 return processor; 094 } 095 096 public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { 097 // deny stopping on shutdown as we want seda consumers to run in case some other queues 098 // depend on this consumer to run, so it can complete its exchanges 099 return true; 100 } 101 102 public int getPendingExchangesSize() { 103 // the route is shutting down, so either we should purge the queue, 104 // or return how many exchanges are still on the queue 105 if (endpoint.isPurgeWhenStopping()) { 106 endpoint.purgeQueue(); 107 } 108 return endpoint.getQueue().size(); 109 } 110 111 @Override 112 public void prepareShutdown(boolean suspendOnly, boolean forced) { 113 // if we are suspending then we want to keep the thread running but just not route the exchange 114 // this logic is only when we stop or shutdown the consumer 115 if (suspendOnly) { 116 LOG.debug("Skip preparing to shutdown as consumer is being suspended"); 117 return; 118 } 119 120 // signal we want to shutdown 121 shutdownPending = true; 122 forceShutdown = forced; 123 124 if (latch != null) { 125 LOG.debug("Preparing to shutdown, waiting for {} consumer threads to complete.", latch.getCount()); 126 127 // wait for all threads to end 128 try { 129 latch.await(); 130 } catch (InterruptedException e) { 131 // ignore 132 } 133 } 134 } 135 136 @Override 137 public boolean isRunAllowed() { 138 // if we force shutdown then do not allow running anymore 139 if (forceShutdown) { 140 return false; 141 } 142 143 if (isSuspending() || isSuspended()) { 144 // allow to run even if we are suspended as we want to 145 // keep the thread task running 146 return true; 147 } 148 return super.isRunAllowed(); 149 } 150 151 public void run() { 152 taskCount.incrementAndGet(); 153 try { 154 doRun(); 155 } finally { 156 taskCount.decrementAndGet(); 157 } 158 } 159 160 protected void doRun() { 161 BlockingQueue<Exchange> queue = endpoint.getQueue(); 162 // loop while we are allowed, or if we are stopping loop until the queue is empty 163 while (queue != null && (isRunAllowed())) { 164 165 // do not poll during CamelContext is starting, as we should only poll when CamelContext is fully started 166 if (getEndpoint().getCamelContext().getStatus().isStarting()) { 167 LOG.trace("CamelContext is starting so skip polling"); 168 try { 169 // sleep at most 1 sec 170 Thread.sleep(Math.min(pollTimeout, 1000)); 171 } catch (InterruptedException e) { 172 LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); 173 } 174 continue; 175 } 176 177 // do not poll if we are suspended or starting again after resuming 178 if (isSuspending() || isSuspended() || isStarting()) { 179 if (shutdownPending && queue.isEmpty()) { 180 LOG.trace("Consumer is suspended and shutdown is pending, so this consumer thread is breaking out because the task queue is empty."); 181 // we want to shutdown so break out if there queue is empty 182 break; 183 } else { 184 LOG.trace("Consumer is suspended so skip polling"); 185 try { 186 // sleep at most 1 sec 187 Thread.sleep(Math.min(pollTimeout, 1000)); 188 } catch (InterruptedException e) { 189 LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); 190 } 191 continue; 192 } 193 } 194 195 Exchange exchange = null; 196 try { 197 // use the end user configured poll timeout 198 exchange = queue.poll(pollTimeout, TimeUnit.MILLISECONDS); 199 if (LOG.isTraceEnabled()) { 200 LOG.trace("Polled queue {} with timeout {} ms. -> {}", new Object[]{ObjectHelper.getIdentityHashCode(queue), pollTimeout, exchange}); 201 } 202 if (exchange != null) { 203 try { 204 // send a new copied exchange with new camel context 205 Exchange newExchange = prepareExchange(exchange); 206 // process the exchange 207 sendToConsumers(newExchange); 208 // copy the message back 209 if (newExchange.hasOut()) { 210 exchange.setOut(newExchange.getOut().copy()); 211 } else { 212 exchange.setIn(newExchange.getIn()); 213 } 214 // log exception if an exception occurred and was not handled 215 if (newExchange.getException() != null) { 216 exchange.setException(newExchange.getException()); 217 getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); 218 } 219 } catch (Exception e) { 220 getExceptionHandler().handleException("Error processing exchange", exchange, e); 221 } 222 } else if (shutdownPending && queue.isEmpty()) { 223 LOG.trace("Shutdown is pending, so this consumer thread is breaking out because the task queue is empty."); 224 // we want to shutdown so break out if there queue is empty 225 break; 226 } 227 } catch (InterruptedException e) { 228 LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); 229 continue; 230 } catch (Throwable e) { 231 if (exchange != null) { 232 getExceptionHandler().handleException("Error processing exchange", exchange, e); 233 } else { 234 getExceptionHandler().handleException(e); 235 } 236 } 237 } 238 239 latch.countDown(); 240 LOG.debug("Ending this polling consumer thread, there are still {} consumer threads left.", latch.getCount()); 241 } 242 243 /** 244 * Strategy to prepare exchange for being processed by this consumer 245 * 246 * @param exchange the exchange 247 * @return the exchange to process by this consumer. 248 */ 249 protected Exchange prepareExchange(Exchange exchange) { 250 // send a new copied exchange with new camel context 251 Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext()); 252 // set the from endpoint 253 newExchange.setFromEndpoint(endpoint); 254 return newExchange; 255 } 256 257 /** 258 * Send the given {@link Exchange} to the consumer(s). 259 * <p/> 260 * If multiple consumers then they will each receive a copy of the Exchange. 261 * A multicast processor will send the exchange in parallel to the multiple consumers. 262 * <p/> 263 * If there is only a single consumer then its dispatched directly to it using same thread. 264 * 265 * @param exchange the exchange 266 * @throws Exception can be thrown if processing of the exchange failed 267 */ 268 protected void sendToConsumers(final Exchange exchange) throws Exception { 269 // validate multiple consumers has been enabled 270 int size = endpoint.getConsumers().size(); 271 if (size > 1 && !endpoint.isMultipleConsumersSupported()) { 272 throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + endpoint); 273 } 274 275 // if there are multiple consumers then multicast to them 276 if (endpoint.isMultipleConsumersSupported()) { 277 278 if (LOG.isTraceEnabled()) { 279 LOG.trace("Multicasting to {} consumers for Exchange: {}", size, exchange); 280 } 281 282 // handover completions, as we need to done this when the multicast is done 283 final List<Synchronization> completions = exchange.handoverCompletions(); 284 285 // use a multicast processor to process it 286 MulticastProcessor mp = endpoint.getConsumerMulticastProcessor(); 287 ObjectHelper.notNull(mp, "ConsumerMulticastProcessor", this); 288 289 // and use the asynchronous routing engine to support it 290 mp.process(exchange, new AsyncCallback() { 291 public void done(boolean doneSync) { 292 // done the uow on the completions 293 UnitOfWorkHelper.doneSynchronizations(exchange, completions, LOG); 294 } 295 }); 296 } else { 297 // use the regular processor and use the asynchronous routing engine to support it 298 processor.process(exchange, new AsyncCallback() { 299 public void done(boolean doneSync) { 300 // noop 301 } 302 }); 303 } 304 } 305 306 protected void doStart() throws Exception { 307 latch = new CountDownLatch(endpoint.getConcurrentConsumers()); 308 shutdownPending = false; 309 forceShutdown = false; 310 311 setupTasks(); 312 endpoint.onStarted(this); 313 } 314 315 @Override 316 protected void doSuspend() throws Exception { 317 endpoint.onStopped(this); 318 } 319 320 @Override 321 protected void doResume() throws Exception { 322 endpoint.onStarted(this); 323 } 324 325 protected void doStop() throws Exception { 326 // ensure queue is purged if we stop the consumer 327 if (endpoint.isPurgeWhenStopping()) { 328 endpoint.purgeQueue(); 329 } 330 331 endpoint.onStopped(this); 332 333 shutdownExecutor(); 334 } 335 336 @Override 337 protected void doShutdown() throws Exception { 338 shutdownExecutor(); 339 } 340 341 private void shutdownExecutor() { 342 if (executor != null) { 343 endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor); 344 executor = null; 345 } 346 } 347 348 /** 349 * Setup the thread pool and ensures tasks gets executed (if needed) 350 */ 351 private void setupTasks() { 352 int poolSize = endpoint.getConcurrentConsumers(); 353 354 // create thread pool if needed 355 if (executor == null) { 356 executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); 357 } 358 359 // submit needed number of tasks 360 int tasks = poolSize - taskCount.get(); 361 LOG.debug("Creating {} consumer tasks with poll timeout {} ms.", tasks, pollTimeout); 362 for (int i = 0; i < tasks; i++) { 363 executor.execute(this); 364 } 365 } 366 367}