001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.seda;
018    
019    import java.util.List;
020    import java.util.concurrent.BlockingQueue;
021    import java.util.concurrent.CountDownLatch;
022    import java.util.concurrent.ExecutorService;
023    import java.util.concurrent.TimeUnit;
024    import java.util.concurrent.atomic.AtomicInteger;
025    
026    import org.apache.camel.AsyncCallback;
027    import org.apache.camel.AsyncProcessor;
028    import org.apache.camel.Consumer;
029    import org.apache.camel.Endpoint;
030    import org.apache.camel.Exchange;
031    import org.apache.camel.Processor;
032    import org.apache.camel.ShutdownRunningTask;
033    import org.apache.camel.SuspendableService;
034    import org.apache.camel.impl.LoggingExceptionHandler;
035    import org.apache.camel.processor.MulticastProcessor;
036    import org.apache.camel.spi.ExceptionHandler;
037    import org.apache.camel.spi.ShutdownAware;
038    import org.apache.camel.spi.Synchronization;
039    import org.apache.camel.support.ServiceSupport;
040    import org.apache.camel.util.AsyncProcessorConverterHelper;
041    import org.apache.camel.util.AsyncProcessorHelper;
042    import org.apache.camel.util.ExchangeHelper;
043    import org.apache.camel.util.ObjectHelper;
044    import org.apache.camel.util.UnitOfWorkHelper;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * A Consumer for the SEDA component.
050     * <p/>
051     * In this implementation there is a little <i>slack period</i> when you suspend/stop the consumer, by which
052     * the consumer may pickup a newly arrived messages and process it. That period is up till 1 second.
053     *
054     * @version 
055     */
056    public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware, SuspendableService {
057        private static final transient Logger LOG = LoggerFactory.getLogger(SedaConsumer.class);
058    
059        private final AtomicInteger taskCount = new AtomicInteger();
060        private volatile CountDownLatch latch;
061        private volatile boolean shutdownPending;
062        private SedaEndpoint endpoint;
063        private AsyncProcessor processor;
064        private ExecutorService executor;
065        private ExceptionHandler exceptionHandler;
066        private final int pollTimeout;
067    
068        public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
069            this.endpoint = endpoint;
070            this.processor = AsyncProcessorConverterHelper.convert(processor);
071            this.pollTimeout = endpoint.getPollTimeout();
072        }
073    
074        @Override
075        public String toString() {
076            return "SedaConsumer[" + endpoint + "]";
077        }
078    
079        public Endpoint getEndpoint() {
080            return endpoint;
081        }
082    
083        public ExceptionHandler getExceptionHandler() {
084            if (exceptionHandler == null) {
085                exceptionHandler = new LoggingExceptionHandler(getClass());
086            }
087            return exceptionHandler;
088        }
089    
090        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
091            this.exceptionHandler = exceptionHandler;
092        }
093    
094        public Processor getProcessor() {
095            return processor;
096        }
097    
098        public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
099            // deny stopping on shutdown as we want seda consumers to run in case some other queues
100            // depend on this consumer to run, so it can complete its exchanges
101            return true;
102        }
103    
104        public int getPendingExchangesSize() {
105            // number of pending messages on the queue
106            return endpoint.getQueue().size();
107        }
108    
109        @Override
110        public void prepareShutdown(boolean forced) {
111            // signal we want to shutdown
112            shutdownPending = true;
113    
114            if (latch != null) {
115                LOG.debug("Preparing to shutdown, waiting for {} consumer threads to complete.", latch.getCount());
116    
117                // wait for all threads to end
118                try {
119                    latch.await();
120                } catch (InterruptedException e) {
121                    // ignore
122                }
123            }
124        }
125    
126        @Override
127        public boolean isRunAllowed() {
128            if (isSuspending() || isSuspended()) {
129                // allow to run even if we are suspended as we want to
130                // keep the thread task running
131                return true;
132            }
133            return super.isRunAllowed();
134        }
135    
136        public void run() {
137            taskCount.incrementAndGet();
138            try {
139                doRun();
140            } finally {
141                taskCount.decrementAndGet();
142            }
143        }
144    
145        protected void doRun() {
146            BlockingQueue<Exchange> queue = endpoint.getQueue();
147            // loop while we are allowed, or if we are stopping loop until the queue is empty
148            while (queue != null && (isRunAllowed())) {
149    
150                // do not poll during CamelContext is starting, as we should only poll when CamelContext is fully started
151                if (getEndpoint().getCamelContext().getStatus().isStarting()) {
152                    LOG.trace("CamelContext is starting so skip polling");
153                    try {
154                        // sleep at most 1 sec
155                        Thread.sleep(Math.min(pollTimeout, 1000));
156                    } catch (InterruptedException e) {
157                        LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped());
158                    }
159                    continue;
160                }
161    
162                // do not poll if we are suspended
163                if (isSuspending() || isSuspended()) {
164                    LOG.trace("Consumer is suspended so skip polling");
165                    try {
166                        // sleep at most 1 sec
167                        Thread.sleep(Math.min(pollTimeout, 1000));
168                    } catch (InterruptedException e) {
169                        LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped());
170                    }
171                    continue;
172                }
173    
174                Exchange exchange = null;
175                try {
176                    // use the end user configured poll timeout
177                    exchange = queue.poll(pollTimeout, TimeUnit.MILLISECONDS);
178                    if (exchange != null) {
179                        try {
180                            // send a new copied exchange with new camel context
181                            Exchange newExchange = prepareExchange(exchange);
182                            // process the exchange
183                            sendToConsumers(newExchange);
184                            // copy the message back
185                            if (newExchange.hasOut()) {
186                                exchange.setOut(newExchange.getOut().copy());
187                            } else {
188                                exchange.setIn(newExchange.getIn());
189                            }
190                            // log exception if an exception occurred and was not handled
191                            if (newExchange.getException() != null) {
192                                exchange.setException(newExchange.getException());
193                                getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException());
194                            }
195                        } catch (Exception e) {
196                            getExceptionHandler().handleException("Error processing exchange", exchange, e);
197                        }
198                    } else if (shutdownPending && queue.isEmpty()) {
199                        LOG.trace("Shutdown is pending, so this consumer thread is breaking out because the task queue is empty.");
200                        // we want to shutdown so break out if there queue is empty
201                        break;
202                    }
203                } catch (InterruptedException e) {
204                    LOG.debug("Sleep interrupted, are we stopping? {}", isStopping() || isStopped());
205                    continue;
206                } catch (Throwable e) {
207                    if (exchange != null) {
208                        getExceptionHandler().handleException("Error processing exchange", exchange, e);
209                    } else {
210                        getExceptionHandler().handleException(e);
211                    }
212                }
213            }
214    
215            latch.countDown();
216            LOG.debug("Ending this polling consumer thread, there are still {} consumer threads left.", latch.getCount());
217        }
218    
219        /**
220         * Strategy to prepare exchange for being processed by this consumer
221         *
222         * @param exchange the exchange
223         * @return the exchange to process by this consumer.
224         */
225        protected Exchange prepareExchange(Exchange exchange) {
226            // send a new copied exchange with new camel context
227            Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext());
228            // set the from endpoint
229            newExchange.setFromEndpoint(endpoint);
230            return newExchange;
231        }
232    
233        /**
234         * Send the given {@link Exchange} to the consumer(s).
235         * <p/>
236         * If multiple consumers then they will each receive a copy of the Exchange.
237         * A multicast processor will send the exchange in parallel to the multiple consumers.
238         * <p/>
239         * If there is only a single consumer then its dispatched directly to it using same thread.
240         * 
241         * @param exchange the exchange
242         * @throws Exception can be thrown if processing of the exchange failed
243         */
244        protected void sendToConsumers(final Exchange exchange) throws Exception {
245            int size = endpoint.getConsumers().size();
246    
247            // if there are multiple consumers then multicast to them
248            if (size > 1) {
249    
250                // validate multiple consumers has been enabled
251                if (!endpoint.isMultipleConsumersSupported()) {
252                    throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + endpoint);
253                }
254    
255                if (LOG.isDebugEnabled()) {
256                    LOG.debug("Multicasting to {} consumers for Exchange: {}", endpoint.getConsumers().size(), exchange);
257                }
258    
259                // handover completions, as we need to done this when the multicast is done
260                final List<Synchronization> completions = exchange.handoverCompletions();
261    
262                // use a multicast processor to process it
263                MulticastProcessor mp = endpoint.getConsumerMulticastProcessor();
264                ObjectHelper.notNull(mp, "ConsumerMulticastProcessor", this);
265    
266                // and use the asynchronous routing engine to support it
267                AsyncProcessorHelper.process(mp, exchange, new AsyncCallback() {
268                    public void done(boolean doneSync) {
269                        // done the uow on the completions
270                        UnitOfWorkHelper.doneSynchronizations(exchange, completions, LOG);
271                    }
272                });
273            } else {
274                // use the regular processor and use the asynchronous routing engine to support it
275                AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() {
276                    public void done(boolean doneSync) {
277                        // noop
278                    }
279                });
280            }
281        }
282    
283        protected void doStart() throws Exception {
284            latch = new CountDownLatch(endpoint.getConcurrentConsumers());
285            shutdownPending = false;
286    
287            setupTasks();
288            endpoint.onStarted(this);
289        }
290    
291        @Override
292        protected void doSuspend() throws Exception {
293            endpoint.onStopped(this);
294        }
295    
296        @Override
297        protected void doResume() throws Exception {
298            doStart();
299        }
300    
301        protected void doStop() throws Exception {
302            endpoint.onStopped(this);
303            
304            shutdownExecutor();
305        }
306    
307        @Override
308        protected void doShutdown() throws Exception {
309            shutdownExecutor();
310        }
311    
312        private void shutdownExecutor() {
313            if (executor != null) {
314                endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
315                executor = null;
316            }
317        }
318    
319        /**
320         * Setup the thread pool and ensures tasks gets executed (if needed)
321         */
322        private void setupTasks() {
323            int poolSize = endpoint.getConcurrentConsumers();
324    
325            // create thread pool if needed
326            if (executor == null) {
327                executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize);
328            }
329    
330            // submit needed number of tasks
331            int tasks = poolSize - taskCount.get();
332            LOG.debug("Creating {} consumer tasks with poll timeout {} ms.", tasks, pollTimeout);
333            for (int i = 0; i < tasks; i++) {
334                executor.execute(this);
335            }
336        }
337    
338    }