001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.seda; 018 019 import java.util.List; 020 import java.util.concurrent.BlockingQueue; 021 import java.util.concurrent.CountDownLatch; 022 import java.util.concurrent.ExecutorService; 023 import java.util.concurrent.TimeUnit; 024 import java.util.concurrent.atomic.AtomicInteger; 025 026 import org.apache.camel.AsyncCallback; 027 import org.apache.camel.AsyncProcessor; 028 import org.apache.camel.Consumer; 029 import org.apache.camel.Endpoint; 030 import org.apache.camel.Exchange; 031 import org.apache.camel.Processor; 032 import org.apache.camel.ShutdownRunningTask; 033 import org.apache.camel.SuspendableService; 034 import org.apache.camel.impl.LoggingExceptionHandler; 035 import org.apache.camel.processor.MulticastProcessor; 036 import org.apache.camel.spi.ExceptionHandler; 037 import org.apache.camel.spi.ShutdownAware; 038 import org.apache.camel.spi.Synchronization; 039 import org.apache.camel.support.ServiceSupport; 040 import org.apache.camel.util.AsyncProcessorConverterHelper; 041 import org.apache.camel.util.AsyncProcessorHelper; 042 import org.apache.camel.util.ExchangeHelper; 043 import org.apache.camel.util.ObjectHelper; 044 import org.apache.camel.util.UnitOfWorkHelper; 045 import org.slf4j.Logger; 046 import org.slf4j.LoggerFactory; 047 048 /** 049 * A Consumer for the SEDA component. 050 * <p/> 051 * In this implementation there is a little <i>slack period</i> when you suspend/stop the consumer, by which 052 * the consumer may pickup a newly arrived messages and process it. That period is up till 1 second. 053 * 054 * @version 055 */ 056 public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware, SuspendableService { 057 private static final transient Logger LOG = LoggerFactory.getLogger(SedaConsumer.class); 058 059 private final AtomicInteger taskCount = new AtomicInteger(); 060 private volatile CountDownLatch latch; 061 private volatile boolean shutdownPending; 062 private SedaEndpoint endpoint; 063 private AsyncProcessor processor; 064 private ExecutorService executor; 065 private ExceptionHandler exceptionHandler; 066 private final int pollTimeout; 067 068 public SedaConsumer(SedaEndpoint endpoint, Processor processor) { 069 this.endpoint = endpoint; 070 this.processor = AsyncProcessorConverterHelper.convert(processor); 071 this.pollTimeout = endpoint.getPollTimeout(); 072 } 073 074 @Override 075 public String toString() { 076 return "SedaConsumer[" + endpoint + "]"; 077 } 078 079 public Endpoint getEndpoint() { 080 return endpoint; 081 } 082 083 public ExceptionHandler getExceptionHandler() { 084 if (exceptionHandler == null) { 085 exceptionHandler = new LoggingExceptionHandler(getClass()); 086 } 087 return exceptionHandler; 088 } 089 090 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 091 this.exceptionHandler = exceptionHandler; 092 } 093 094 public Processor getProcessor() { 095 return processor; 096 } 097 098 public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { 099 // deny stopping on shutdown as we want seda consumers to run in case some other queues 100 // depend on this consumer to run, so it can complete its exchanges 101 return true; 102 } 103 104 public int getPendingExchangesSize() { 105 // number of pending messages on the queue 106 return endpoint.getQueue().size(); 107 } 108 109 @Override 110 public void prepareShutdown(boolean forced) { 111 // signal we want to shutdown 112 shutdownPending = true; 113 114 if (latch != null) { 115 LOG.debug("Preparing to shutdown, waiting for {} consumer threads to complete.", latch.getCount()); 116 117 // wait for all threads to end 118 try { 119 latch.await(); 120 } catch (InterruptedException e) { 121 // ignore 122 } 123 } 124 } 125 126 @Override 127 public boolean isRunAllowed() { 128 if (isSuspending() || isSuspended()) { 129 // allow to run even if we are suspended as we want to 130 // keep the thread task running 131 return true; 132 } 133 return super.isRunAllowed(); 134 } 135 136 public void run() { 137 taskCount.incrementAndGet(); 138 try { 139 doRun(); 140 } finally { 141 taskCount.decrementAndGet(); 142 } 143 } 144 145 protected void doRun() { 146 BlockingQueue<Exchange> queue = endpoint.getQueue(); 147 // loop while we are allowed, or if we are stopping loop until the queue is empty 148 while (queue != null && (isRunAllowed())) { 149 150 // do not poll during CamelContext is starting, as we should only poll when CamelContext is fully started 151 if (getEndpoint().getCamelContext().getStatus().isStarting()) { 152 LOG.trace("CamelContext is starting so skip polling"); 153 try { 154 // sleep at most 1 sec 155 Thread.sleep(Math.min(pollTimeout, 1000)); 156 } catch (InterruptedException e) { 157 LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); 158 } 159 continue; 160 } 161 162 // do not poll if we are suspended 163 if (isSuspending() || isSuspended()) { 164 LOG.trace("Consumer is suspended so skip polling"); 165 try { 166 // sleep at most 1 sec 167 Thread.sleep(Math.min(pollTimeout, 1000)); 168 } catch (InterruptedException e) { 169 LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); 170 } 171 continue; 172 } 173 174 Exchange exchange = null; 175 try { 176 // use the end user configured poll timeout 177 exchange = queue.poll(pollTimeout, TimeUnit.MILLISECONDS); 178 if (exchange != null) { 179 try { 180 // send a new copied exchange with new camel context 181 Exchange newExchange = prepareExchange(exchange); 182 // process the exchange 183 sendToConsumers(newExchange); 184 // copy the message back 185 if (newExchange.hasOut()) { 186 exchange.setOut(newExchange.getOut().copy()); 187 } else { 188 exchange.setIn(newExchange.getIn()); 189 } 190 // log exception if an exception occurred and was not handled 191 if (newExchange.getException() != null) { 192 exchange.setException(newExchange.getException()); 193 getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); 194 } 195 } catch (Exception e) { 196 getExceptionHandler().handleException("Error processing exchange", exchange, e); 197 } 198 } else if (shutdownPending && queue.isEmpty()) { 199 LOG.trace("Shutdown is pending, so this consumer thread is breaking out because the task queue is empty."); 200 // we want to shutdown so break out if there queue is empty 201 break; 202 } 203 } catch (InterruptedException e) { 204 LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped()); 205 continue; 206 } catch (Throwable e) { 207 if (exchange != null) { 208 getExceptionHandler().handleException("Error processing exchange", exchange, e); 209 } else { 210 getExceptionHandler().handleException(e); 211 } 212 } 213 } 214 215 latch.countDown(); 216 LOG.debug("Ending this polling consumer thread, there are still {} consumer threads left.", latch.getCount()); 217 } 218 219 /** 220 * Strategy to prepare exchange for being processed by this consumer 221 * 222 * @param exchange the exchange 223 * @return the exchange to process by this consumer. 224 */ 225 protected Exchange prepareExchange(Exchange exchange) { 226 // send a new copied exchange with new camel context 227 Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext()); 228 // set the from endpoint 229 newExchange.setFromEndpoint(endpoint); 230 return newExchange; 231 } 232 233 /** 234 * Send the given {@link Exchange} to the consumer(s). 235 * <p/> 236 * If multiple consumers then they will each receive a copy of the Exchange. 237 * A multicast processor will send the exchange in parallel to the multiple consumers. 238 * <p/> 239 * If there is only a single consumer then its dispatched directly to it using same thread. 240 * 241 * @param exchange the exchange 242 * @throws Exception can be thrown if processing of the exchange failed 243 */ 244 protected void sendToConsumers(final Exchange exchange) throws Exception { 245 int size = endpoint.getConsumers().size(); 246 247 // if there are multiple consumers then multicast to them 248 if (size > 1) { 249 250 // validate multiple consumers has been enabled 251 if (!endpoint.isMultipleConsumersSupported()) { 252 throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + endpoint); 253 } 254 255 if (LOG.isDebugEnabled()) { 256 LOG.debug("Multicasting to {} consumers for Exchange: {}", endpoint.getConsumers().size(), exchange); 257 } 258 259 // handover completions, as we need to done this when the multicast is done 260 final List<Synchronization> completions = exchange.handoverCompletions(); 261 262 // use a multicast processor to process it 263 MulticastProcessor mp = endpoint.getConsumerMulticastProcessor(); 264 ObjectHelper.notNull(mp, "ConsumerMulticastProcessor", this); 265 266 // and use the asynchronous routing engine to support it 267 AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() { 268 public void done(boolean doneSync) { 269 // done the uow on the completions 270 UnitOfWorkHelper.doneSynchronizations(exchange, completions, LOG); 271 } 272 }); 273 } else { 274 // use the regular processor and use the asynchronous routing engine to support it 275 AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() { 276 public void done(boolean doneSync) { 277 // noop 278 } 279 }); 280 } 281 } 282 283 protected void doStart() throws Exception { 284 latch = new CountDownLatch(endpoint.getConcurrentConsumers()); 285 shutdownPending = false; 286 287 setupTasks(); 288 endpoint.onStarted(this); 289 } 290 291 @Override 292 protected void doSuspend() throws Exception { 293 endpoint.onStopped(this); 294 } 295 296 @Override 297 protected void doResume() throws Exception { 298 doStart(); 299 } 300 301 protected void doStop() throws Exception { 302 endpoint.onStopped(this); 303 304 shutdownExecutor(); 305 } 306 307 @Override 308 protected void doShutdown() throws Exception { 309 shutdownExecutor(); 310 } 311 312 private void shutdownExecutor() { 313 if (executor != null) { 314 endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor); 315 executor = null; 316 } 317 } 318 319 /** 320 * Setup the thread pool and ensures tasks gets executed (if needed) 321 */ 322 private void setupTasks() { 323 int poolSize = endpoint.getConcurrentConsumers(); 324 325 // create thread pool if needed 326 if (executor == null) { 327 executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize); 328 } 329 330 // submit needed number of tasks 331 int tasks = poolSize - taskCount.get(); 332 LOG.debug("Creating {} consumer tasks with poll timeout {} ms.", tasks, pollTimeout); 333 for (int i = 0; i < tasks; i++) { 334 executor.execute(this); 335 } 336 } 337 338 }