001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.seda;
018
019import java.util.List;
020import java.util.concurrent.BlockingQueue;
021import java.util.concurrent.CountDownLatch;
022import java.util.concurrent.ExecutorService;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.apache.camel.AsyncCallback;
027import org.apache.camel.AsyncProcessor;
028import org.apache.camel.Consumer;
029import org.apache.camel.Endpoint;
030import org.apache.camel.Exchange;
031import org.apache.camel.Processor;
032import org.apache.camel.ShutdownRunningTask;
033import org.apache.camel.Suspendable;
034import org.apache.camel.processor.MulticastProcessor;
035import org.apache.camel.spi.ExceptionHandler;
036import org.apache.camel.spi.ShutdownAware;
037import org.apache.camel.spi.Synchronization;
038import org.apache.camel.support.LoggingExceptionHandler;
039import org.apache.camel.support.ServiceSupport;
040import org.apache.camel.util.AsyncProcessorConverterHelper;
041import org.apache.camel.util.ExchangeHelper;
042import org.apache.camel.util.ObjectHelper;
043import org.apache.camel.util.UnitOfWorkHelper;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047/**
048 * A Consumer for the SEDA component.
049 * <p/>
050 * In this implementation there is a little <i>slack period</i> when you suspend/stop the consumer, by which
051 * the consumer may pickup a newly arrived messages and process it. That period is up till 1 second.
052 *
053 * @version 
054 */
055public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware, Suspendable {
056    private static final Logger LOG = LoggerFactory.getLogger(SedaConsumer.class);
057
058    private final AtomicInteger taskCount = new AtomicInteger();
059    private volatile CountDownLatch latch;
060    private volatile boolean shutdownPending;
061    private volatile boolean forceShutdown;
062    private SedaEndpoint endpoint;
063    private AsyncProcessor processor;
064    private ExecutorService executor;
065    private ExceptionHandler exceptionHandler;
066    private final int pollTimeout;
067
068    public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
069        this.endpoint = endpoint;
070        this.processor = AsyncProcessorConverterHelper.convert(processor);
071        this.pollTimeout = endpoint.getPollTimeout();
072        this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
073    }
074
075    @Override
076    public String toString() {
077        return "SedaConsumer[" + endpoint + "]";
078    }
079
080    public Endpoint getEndpoint() {
081        return endpoint;
082    }
083
084    public ExceptionHandler getExceptionHandler() {
085        return exceptionHandler;
086    }
087
088    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
089        this.exceptionHandler = exceptionHandler;
090    }
091
092    public Processor getProcessor() {
093        return processor;
094    }
095
096    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
097        // deny stopping on shutdown as we want seda consumers to run in case some other queues
098        // depend on this consumer to run, so it can complete its exchanges
099        return true;
100    }
101
102    public int getPendingExchangesSize() {
103        // the route is shutting down, so either we should purge the queue,
104        // or return how many exchanges are still on the queue
105        if (endpoint.isPurgeWhenStopping()) {
106            endpoint.purgeQueue();
107        }
108        return endpoint.getQueue().size();
109    }
110
111    @Override
112    public void prepareShutdown(boolean suspendOnly, boolean forced) {
113        // if we are suspending then we want to keep the thread running but just not route the exchange
114        // this logic is only when we stop or shutdown the consumer
115        if (suspendOnly) {
116            LOG.debug("Skip preparing to shutdown as consumer is being suspended");
117            return;
118        }
119
120        // signal we want to shutdown
121        shutdownPending = true;
122        forceShutdown = forced;
123
124        if (latch != null) {
125            LOG.debug("Preparing to shutdown, waiting for {} consumer threads to complete.", latch.getCount());
126
127            // wait for all threads to end
128            try {
129                latch.await();
130            } catch (InterruptedException e) {
131                // ignore
132            }
133        }
134    }
135
136    @Override
137    public boolean isRunAllowed() {
138        // if we force shutdown then do not allow running anymore
139        if (forceShutdown) {
140            return false;
141        }
142
143        if (isSuspending() || isSuspended()) {
144            // allow to run even if we are suspended as we want to
145            // keep the thread task running
146            return true;
147        }
148        return super.isRunAllowed();
149    }
150
151    public void run() {
152        taskCount.incrementAndGet();
153        try {
154            doRun();
155        } finally {
156            taskCount.decrementAndGet();
157        }
158    }
159
160    protected void doRun() {
161        BlockingQueue<Exchange> queue = endpoint.getQueue();
162        // loop while we are allowed, or if we are stopping loop until the queue is empty
163        while (queue != null && (isRunAllowed())) {
164
165            // do not poll during CamelContext is starting, as we should only poll when CamelContext is fully started
166            if (getEndpoint().getCamelContext().getStatus().isStarting()) {
167                LOG.trace("CamelContext is starting so skip polling");
168                try {
169                    // sleep at most 1 sec
170                    Thread.sleep(Math.min(pollTimeout, 1000));
171                } catch (InterruptedException e) {
172                    LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped());
173                }
174                continue;
175            }
176
177            // do not poll if we are suspended or starting again after resuming
178            if (isSuspending() || isSuspended() || isStarting()) {
179                if (shutdownPending && queue.isEmpty()) {
180                    LOG.trace("Consumer is suspended and shutdown is pending, so this consumer thread is breaking out because the task queue is empty.");
181                    // we want to shutdown so break out if there queue is empty
182                    break;
183                } else {
184                    LOG.trace("Consumer is suspended so skip polling");
185                    try {
186                        // sleep at most 1 sec
187                        Thread.sleep(Math.min(pollTimeout, 1000));
188                    } catch (InterruptedException e) {
189                        LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped());
190                    }
191                    continue;
192                }
193            }
194
195            Exchange exchange = null;
196            try {
197                // use the end user configured poll timeout
198                exchange = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
199                if (LOG.isTraceEnabled()) {
200                    LOG.trace("Polled queue {} with timeout {} ms. -> {}", new Object[]{ObjectHelper.getIdentityHashCode(queue), pollTimeout, exchange});
201                }
202                if (exchange != null) {
203                    try {
204                        // send a new copied exchange with new camel context
205                        Exchange newExchange = prepareExchange(exchange);
206                        // process the exchange
207                        sendToConsumers(newExchange);
208                        // copy the message back
209                        if (newExchange.hasOut()) {
210                            exchange.setOut(newExchange.getOut().copy());
211                        } else {
212                            exchange.setIn(newExchange.getIn());
213                        }
214                        // log exception if an exception occurred and was not handled
215                        if (newExchange.getException() != null) {
216                            exchange.setException(newExchange.getException());
217                            getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
218                        }
219                    } catch (Exception e) {
220                        getExceptionHandler().handleException("Error processing exchange", exchange, e);
221                    }
222                } else if (shutdownPending && queue.isEmpty()) {
223                    LOG.trace("Shutdown is pending, so this consumer thread is breaking out because the task queue is empty.");
224                    // we want to shutdown so break out if there queue is empty
225                    break;
226                }
227            } catch (InterruptedException e) {
228                LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped());
229                continue;
230            } catch (Throwable e) {
231                if (exchange != null) {
232                    getExceptionHandler().handleException("Error processing exchange", exchange, e);
233                } else {
234                    getExceptionHandler().handleException(e);
235                }
236            }
237        }
238
239        latch.countDown();
240        LOG.debug("Ending this polling consumer thread, there are still {} consumer threads left.", latch.getCount());
241    }
242
243    /**
244     * Strategy to prepare exchange for being processed by this consumer
245     *
246     * @param exchange the exchange
247     * @return the exchange to process by this consumer.
248     */
249    protected Exchange prepareExchange(Exchange exchange) {
250        // send a new copied exchange with new camel context
251        Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext());
252        // set the from endpoint
253        newExchange.setFromEndpoint(endpoint);
254        return newExchange;
255    }
256
257    /**
258     * Send the given {@link Exchange} to the consumer(s).
259     * <p/>
260     * If multiple consumers then they will each receive a copy of the Exchange.
261     * A multicast processor will send the exchange in parallel to the multiple consumers.
262     * <p/>
263     * If there is only a single consumer then its dispatched directly to it using same thread.
264     * 
265     * @param exchange the exchange
266     * @throws Exception can be thrown if processing of the exchange failed
267     */
268    protected void sendToConsumers(final Exchange exchange) throws Exception {
269        // validate multiple consumers has been enabled
270        int size = endpoint.getConsumers().size();
271        if (size > 1 && !endpoint.isMultipleConsumersSupported()) {
272            throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + endpoint);
273        }
274
275        // if there are multiple consumers then multicast to them
276        if (endpoint.isMultipleConsumersSupported()) {
277
278            if (LOG.isTraceEnabled()) {
279                LOG.trace("Multicasting to {} consumers for Exchange: {}", size, exchange);
280            }
281
282            // handover completions, as we need to done this when the multicast is done
283            final List<Synchronization> completions = exchange.handoverCompletions();
284
285            // use a multicast processor to process it
286            MulticastProcessor mp = endpoint.getConsumerMulticastProcessor();
287            ObjectHelper.notNull(mp, "ConsumerMulticastProcessor", this);
288
289            // and use the asynchronous routing engine to support it
290            mp.process(exchange, new AsyncCallback() {
291                public void done(boolean doneSync) {
292                    // done the uow on the completions
293                    UnitOfWorkHelper.doneSynchronizations(exchange, completions, LOG);
294                }
295            });
296        } else {
297            // use the regular processor and use the asynchronous routing engine to support it
298            processor.process(exchange, new AsyncCallback() {
299                public void done(boolean doneSync) {
300                    // noop
301                }
302            });
303        }
304    }
305
306    protected void doStart() throws Exception {
307        latch = new CountDownLatch(endpoint.getConcurrentConsumers());
308        shutdownPending = false;
309        forceShutdown = false;
310
311        setupTasks();
312        endpoint.onStarted(this);
313    }
314
315    @Override
316    protected void doSuspend() throws Exception {
317        endpoint.onStopped(this);
318    }
319
320    @Override
321    protected void doResume() throws Exception {
322        endpoint.onStarted(this);
323    }
324
325    protected void doStop() throws Exception {
326        // ensure queue is purged if we stop the consumer
327        if (endpoint.isPurgeWhenStopping()) {
328            endpoint.purgeQueue();
329        }
330
331        endpoint.onStopped(this);
332        
333        shutdownExecutor();
334    }
335
336    @Override
337    protected void doShutdown() throws Exception {
338        shutdownExecutor();
339    }
340
341    private void shutdownExecutor() {
342        if (executor != null) {
343            endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
344            executor = null;
345        }
346    }
347
348    /**
349     * Setup the thread pool and ensures tasks gets executed (if needed)
350     */
351    private void setupTasks() {
352        int poolSize = endpoint.getConcurrentConsumers();
353
354        // create thread pool if needed
355        if (executor == null) {
356            executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize);
357        }
358
359        // submit needed number of tasks
360        int tasks = poolSize - taskCount.get();
361        LOG.debug("Creating {} consumer tasks with poll timeout {} ms.", tasks, pollTimeout);
362        for (int i = 0; i < tasks; i++) {
363            executor.execute(this);
364        }
365    }
366
367}