001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.CompletionService;
027    import java.util.concurrent.ConcurrentHashMap;
028    import java.util.concurrent.ConcurrentMap;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.ExecutionException;
031    import java.util.concurrent.ExecutorCompletionService;
032    import java.util.concurrent.ExecutorService;
033    import java.util.concurrent.Future;
034    import java.util.concurrent.TimeUnit;
035    import java.util.concurrent.atomic.AtomicBoolean;
036    import java.util.concurrent.atomic.AtomicInteger;
037    
038    import org.apache.camel.AsyncCallback;
039    import org.apache.camel.AsyncProcessor;
040    import org.apache.camel.CamelContext;
041    import org.apache.camel.CamelExchangeException;
042    import org.apache.camel.Endpoint;
043    import org.apache.camel.ErrorHandlerFactory;
044    import org.apache.camel.Exchange;
045    import org.apache.camel.Navigate;
046    import org.apache.camel.Processor;
047    import org.apache.camel.Producer;
048    import org.apache.camel.Traceable;
049    import org.apache.camel.processor.aggregate.AggregationStrategy;
050    import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
051    import org.apache.camel.spi.RouteContext;
052    import org.apache.camel.spi.TracedRouteNodes;
053    import org.apache.camel.spi.UnitOfWork;
054    import org.apache.camel.support.ServiceSupport;
055    import org.apache.camel.util.AsyncProcessorConverterHelper;
056    import org.apache.camel.util.AsyncProcessorHelper;
057    import org.apache.camel.util.CastUtils;
058    import org.apache.camel.util.EventHelper;
059    import org.apache.camel.util.ExchangeHelper;
060    import org.apache.camel.util.KeyValueHolder;
061    import org.apache.camel.util.ObjectHelper;
062    import org.apache.camel.util.ServiceHelper;
063    import org.apache.camel.util.StopWatch;
064    import org.apache.camel.util.concurrent.AtomicException;
065    import org.apache.camel.util.concurrent.AtomicExchange;
066    import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
067    import org.slf4j.Logger;
068    import org.slf4j.LoggerFactory;
069    
070    import static org.apache.camel.util.ObjectHelper.notNull;
071    
072    
073    /**
074     * Implements the Multicast pattern to send a message exchange to a number of
075     * endpoints, each endpoint receiving a copy of the message exchange.
076     *
077     * @version 
078     * @see Pipeline
079     */
080    public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable {
081    
082        private static final transient Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class);
083    
084        /**
085         * Class that represent each step in the multicast route to do
086         */
087        static final class DefaultProcessorExchangePair implements ProcessorExchangePair {
088            private final int index;
089            private final Processor processor;
090            private final Processor prepared;
091            private final Exchange exchange;
092    
093            private DefaultProcessorExchangePair(int index, Processor processor, Processor prepared, Exchange exchange) {
094                this.index = index;
095                this.processor = processor;
096                this.prepared = prepared;
097                this.exchange = exchange;
098            }
099    
100            public int getIndex() {
101                return index;
102            }
103    
104            public Exchange getExchange() {
105                return exchange;
106            }
107    
108            public Producer getProducer() {
109                if (processor instanceof Producer) {
110                    return (Producer) processor;
111                }
112                return null;
113            }
114    
115            public Processor getProcessor() {
116                return prepared;
117            }
118    
119            public void begin() {
120                // noop
121            }
122    
123            public void done() {
124                // noop
125            }
126    
127        }
128    
129        /**
130         * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges
131         * <p/>
132         * See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods.
133         */
134        static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> {
135    
136            public PreparedErrorHandler(RouteContext key, Processor value) {
137                super(key, value);
138            }
139    
140        }
141    
142        protected final Processor onPrepare;
143        private final CamelContext camelContext;
144        private Collection<Processor> processors;
145        private final AggregationStrategy aggregationStrategy;
146        private final boolean parallelProcessing;
147        private final boolean streaming;
148        private final boolean stopOnException;
149        private final ExecutorService executorService;
150        private final boolean shutdownExecutorService;
151        private ExecutorService aggregateExecutorService;
152        private final long timeout;
153        private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>();
154        private final boolean shareUnitOfWork;
155    
156        public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) {
157            this(camelContext, processors, null);
158        }
159    
160        public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
161            this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false);
162        }
163    
164        public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
165                                  boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
166                                  boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
167            notNull(camelContext, "camelContext");
168            this.camelContext = camelContext;
169            this.processors = processors;
170            this.aggregationStrategy = aggregationStrategy;
171            this.executorService = executorService;
172            this.shutdownExecutorService = shutdownExecutorService;
173            this.streaming = streaming;
174            this.stopOnException = stopOnException;
175            // must enable parallel if executor service is provided
176            this.parallelProcessing = parallelProcessing || executorService != null;
177            this.timeout = timeout;
178            this.onPrepare = onPrepare;
179            this.shareUnitOfWork = shareUnitOfWork;
180        }
181    
182        @Override
183        public String toString() {
184            return "Multicast[" + getProcessors() + "]";
185        }
186    
187        public String getTraceLabel() {
188            return "multicast";
189        }
190    
191        public CamelContext getCamelContext() {
192            return camelContext;
193        }
194    
195        public void process(Exchange exchange) throws Exception {
196            AsyncProcessorHelper.process(this, exchange);
197        }
198    
199        public boolean process(Exchange exchange, AsyncCallback callback) {
200            final AtomicExchange result = new AtomicExchange();
201            final Iterable<ProcessorExchangePair> pairs;
202    
203            try {
204                boolean sync = true;
205    
206                pairs = createProcessorExchangePairs(exchange);
207    
208                if (isParallelProcessing()) {
209                    // ensure an executor is set when running in parallel
210                    ObjectHelper.notNull(executorService, "executorService", this);
211                    doProcessParallel(exchange, result, pairs, isStreaming(), callback);
212                } else {
213                    sync = doProcessSequential(exchange, result, pairs, callback);
214                }
215    
216                if (!sync) {
217                    // the remainder of the multicast will be completed async
218                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
219                    return false;
220                }
221            } catch (Throwable e) {
222                exchange.setException(e);
223                // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted
224                // and do the done work
225                doDone(exchange, null, callback, true, false);
226                return true;
227            }
228    
229            // multicasting was processed successfully
230            // and do the done work
231            Exchange subExchange = result.get() != null ? result.get() : null;
232            doDone(exchange, subExchange, callback, true, true);
233            return true;
234        }
235    
236        protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs,
237                                         final boolean streaming, final AsyncCallback callback) throws Exception {
238    
239            ObjectHelper.notNull(executorService, "ExecutorService", this);
240            ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this);
241    
242            final CompletionService<Exchange> completion;
243            if (streaming) {
244                // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence)
245                completion = new ExecutorCompletionService<Exchange>(executorService);
246            } else {
247                // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence)
248                completion = new SubmitOrderedCompletionService<Exchange>(executorService);
249            }
250    
251            final AtomicInteger total = new AtomicInteger(0);
252            final Iterator<ProcessorExchangePair> it = pairs.iterator();
253    
254            if (it.hasNext()) {
255                // when parallel then aggregate on the fly
256                final AtomicBoolean running = new AtomicBoolean(true);
257                final AtomicBoolean allTasksSubmitted = new AtomicBoolean();
258                final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1);
259                final AtomicException executionException = new AtomicException();
260    
261                // issue task to execute in separate thread so it can aggregate on-the-fly
262                // while we submit new tasks, and those tasks complete concurrently
263                // this allows us to optimize work and reduce memory consumption
264                final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running,
265                        aggregationOnTheFlyDone, allTasksSubmitted, executionException);
266                final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean();
267    
268                LOG.trace("Starting to submit parallel tasks");
269    
270                while (it.hasNext()) {
271                    final ProcessorExchangePair pair = it.next();
272                    final Exchange subExchange = pair.getExchange();
273                    updateNewExchange(subExchange, total.intValue(), pairs, it);
274    
275                    completion.submit(new Callable<Exchange>() {
276                        public Exchange call() throws Exception {
277                            // only start the aggregation task when the task is being executed to avoid staring
278                            // the aggregation task to early and pile up too many threads
279                            if (aggregationTaskSubmitted.compareAndSet(false, true)) {
280                                // but only submit the task once
281                                aggregateExecutorService.submit(aggregateOnTheFlyTask);
282                            }
283    
284                            if (!running.get()) {
285                                // do not start processing the task if we are not running
286                                return subExchange;
287                            }
288    
289                            try {
290                                doProcessParallel(pair);
291                            } catch (Throwable e) {
292                                subExchange.setException(e);
293                            }
294    
295                            // Decide whether to continue with the multicast or not; similar logic to the Pipeline
296                            Integer number = getExchangeIndex(subExchange);
297                            boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG);
298                            if (stopOnException && !continueProcessing) {
299                                // signal to stop running
300                                running.set(false);
301                                // throw caused exception
302                                if (subExchange.getException() != null) {
303                                    // wrap in exception to explain where it failed
304                                    CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException());
305                                    subExchange.setException(cause);
306                                }
307                            }
308    
309                            LOG.trace("Parallel processing complete for exchange: {}", subExchange);
310                            return subExchange;
311                        }
312                    });
313    
314                    total.incrementAndGet();
315                }
316    
317                // signal all tasks has been submitted
318                LOG.trace("Signaling that all {} tasks has been submitted.", total.get());
319                allTasksSubmitted.set(true);
320    
321                // its to hard to do parallel async routing so we let the caller thread be synchronously
322                // and have it pickup the replies and do the aggregation (eg we use a latch to wait)
323                // wait for aggregation to be done
324                LOG.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", total.get(), original.getExchangeId());
325                aggregationOnTheFlyDone.await();
326    
327                // did we fail for whatever reason, if so throw that caused exception
328                if (executionException.get() != null) {
329                    if (LOG.isDebugEnabled()) {
330                        LOG.debug("Parallel processing failed due {}", executionException.get().getMessage());
331                    }
332                    throw executionException.get();
333                }
334            }
335    
336            // no everything is okay so we are done
337            LOG.debug("Done parallel processing {} exchanges", total);
338        }
339    
340        /**
341         * Task to aggregate on-the-fly for completed tasks when using parallel processing.
342         * <p/>
343         * This ensures lower memory consumption as we do not need to keep all completed tasks in memory
344         * before we perform aggregation. Instead this separate thread will run and aggregate when new
345         * completed tasks is done.
346         * <p/>
347         * The logic is fairly complex as this implementation has to keep track how far it got, and also
348         * signal back to the <i>main</t> thread when its done, so the <i>main</t> thread can continue
349         * processing when the entire splitting is done.
350         */
351        private final class AggregateOnTheFlyTask implements Runnable {
352    
353            private final AtomicExchange result;
354            private final Exchange original;
355            private final AtomicInteger total;
356            private final CompletionService<Exchange> completion;
357            private final AtomicBoolean running;
358            private final CountDownLatch aggregationOnTheFlyDone;
359            private final AtomicBoolean allTasksSubmitted;
360            private final AtomicException executionException;
361    
362            private AggregateOnTheFlyTask(AtomicExchange result, Exchange original, AtomicInteger total,
363                                          CompletionService<Exchange> completion, AtomicBoolean running,
364                                          CountDownLatch aggregationOnTheFlyDone, AtomicBoolean allTasksSubmitted,
365                                          AtomicException executionException) {
366                this.result = result;
367                this.original = original;
368                this.total = total;
369                this.completion = completion;
370                this.running = running;
371                this.aggregationOnTheFlyDone = aggregationOnTheFlyDone;
372                this.allTasksSubmitted = allTasksSubmitted;
373                this.executionException = executionException;
374            }
375    
376            public void run() {
377                LOG.trace("Aggregate on the fly task started for exchangeId: {}", original.getExchangeId());
378    
379                try {
380                    aggregateOnTheFly();
381                } catch (Throwable e) {
382                    if (e instanceof Exception) {
383                        executionException.set((Exception) e);
384                    } else {
385                        executionException.set(ObjectHelper.wrapRuntimeCamelException(e));
386                    }
387                } finally {
388                    // must signal we are done so the latch can open and let the other thread continue processing
389                    LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId());
390                    LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId());
391                    aggregationOnTheFlyDone.countDown();
392                }
393            }
394    
395            private void aggregateOnTheFly() throws InterruptedException, ExecutionException {
396                boolean timedOut = false;
397                boolean stoppedOnException = false;
398                final StopWatch watch = new StopWatch();
399                int aggregated = 0;
400                boolean done = false;
401                // not a for loop as on the fly may still run
402                while (!done) {
403                    // check if we have already aggregate everything
404                    if (allTasksSubmitted.get() && aggregated >= total.get()) {
405                        LOG.debug("Done aggregating {} exchanges on the fly.", aggregated);
406                        break;
407                    }
408    
409                    Future<Exchange> future;
410                    if (timedOut) {
411                        // we are timed out but try to grab if some tasks has been completed
412                        // poll will return null if no tasks is present
413                        future = completion.poll();
414                        LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future);
415                    } else if (timeout > 0) {
416                        long left = timeout - watch.taken();
417                        if (left < 0) {
418                            left = 0;
419                        }
420                        LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left);
421                        future = completion.poll(left, TimeUnit.MILLISECONDS);
422                    } else {
423                        LOG.trace("Polling completion task #{}", aggregated);
424                        // we must not block so poll every second
425                        future = completion.poll(1, TimeUnit.SECONDS);
426                        if (future == null) {
427                            // and continue loop which will recheck if we are done
428                            continue;
429                        }
430                    }
431    
432                    if (future == null && timedOut) {
433                        // we are timed out and no more tasks complete so break out
434                        break;
435                    } else if (future == null) {
436                        // timeout occurred
437                        AggregationStrategy strategy = getAggregationStrategy(null);
438                        if (strategy instanceof TimeoutAwareAggregationStrategy) {
439                            // notify the strategy we timed out
440                            Exchange oldExchange = result.get();
441                            if (oldExchange == null) {
442                                // if they all timed out the result may not have been set yet, so use the original exchange
443                                oldExchange = original;
444                            }
445                            ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated, total.intValue(), timeout);
446                        } else {
447                            // log a WARN we timed out since it will not be aggregated and the Exchange will be lost
448                            LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated);
449                        }
450                        LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated);
451                        timedOut = true;
452    
453                        // mark that index as timed out, which allows us to try to retrieve
454                        // any already completed tasks in the next loop
455                        if (completion instanceof SubmitOrderedCompletionService) {
456                            ((SubmitOrderedCompletionService<?>) completion).timeoutTask();
457                        }
458                    } else {
459                        // there is a result to aggregate
460                        Exchange subExchange = future.get();
461    
462                        // Decide whether to continue with the multicast or not; similar logic to the Pipeline
463                        Integer number = getExchangeIndex(subExchange);
464                        boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG);
465                        if (stopOnException && !continueProcessing) {
466                            // we want to stop on exception and an exception or failure occurred
467                            // this is similar to what the pipeline does, so we should do the same to not surprise end users
468                            // so we should set the failed exchange as the result and break out
469                            result.set(subExchange);
470                            stoppedOnException = true;
471                            break;
472                        }
473    
474                        // we got a result so aggregate it
475                        AggregationStrategy strategy = getAggregationStrategy(subExchange);
476                        doAggregate(strategy, result, subExchange);
477                    }
478    
479                    aggregated++;
480                }
481    
482                if (timedOut || stoppedOnException) {
483                    if (timedOut) {
484                        LOG.debug("Cancelling tasks due timeout after {} millis.", timeout);
485                    }
486                    if (stoppedOnException) {
487                        LOG.debug("Cancelling tasks due stopOnException.");
488                    }
489                    // cancel tasks as we timed out (its safe to cancel done tasks)
490                    running.set(false);
491                }
492            }
493        }
494    
495        protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception {
496            AtomicInteger total = new AtomicInteger();
497            Iterator<ProcessorExchangePair> it = pairs.iterator();
498    
499            while (it.hasNext()) {
500                ProcessorExchangePair pair = it.next();
501                Exchange subExchange = pair.getExchange();
502                updateNewExchange(subExchange, total.get(), pairs, it);
503    
504                boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
505                if (!sync) {
506                    if (LOG.isTraceEnabled()) {
507                        LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId());
508                    }
509                    // the remainder of the multicast will be completed async
510                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
511                    return false;
512                }
513    
514                if (LOG.isTraceEnabled()) {
515                    LOG.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId());
516                }
517    
518                // Decide whether to continue with the multicast or not; similar logic to the Pipeline
519                // remember to test for stop on exception and aggregate before copying back results
520                boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
521                if (stopOnException && !continueProcessing) {
522                    if (subExchange.getException() != null) {
523                        // wrap in exception to explain where it failed
524                        CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException());
525                        subExchange.setException(cause);
526                    }
527                    // we want to stop on exception, and the exception was handled by the error handler
528                    // this is similar to what the pipeline does, so we should do the same to not surprise end users
529                    // so we should set the failed exchange as the result and be done
530                    result.set(subExchange);
531                    return true;
532                }
533    
534                LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange);
535    
536                doAggregate(getAggregationStrategy(subExchange), result, subExchange);
537                total.incrementAndGet();
538            }
539    
540            LOG.debug("Done sequential processing {} exchanges", total);
541    
542            return true;
543        }
544    
545        private boolean doProcessSequential(final Exchange original, final AtomicExchange result,
546                                            final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it,
547                                            final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) {
548            boolean sync = true;
549    
550            final Exchange exchange = pair.getExchange();
551            Processor processor = pair.getProcessor();
552            final Producer producer = pair.getProducer();
553    
554            TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
555    
556            // compute time taken if sending to another endpoint
557            final StopWatch watch = producer != null ? new StopWatch() : null;
558    
559            try {
560                // prepare tracing starting from a new block
561                if (traced != null) {
562                    traced.pushBlock();
563                }
564    
565                if (producer != null) {
566                    EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
567                }
568                // let the prepared process it, remember to begin the exchange pair
569                AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
570                pair.begin();
571                sync = AsyncProcessorHelper.process(async, exchange, new AsyncCallback() {
572                    public void done(boolean doneSync) {
573                        // we are done with the exchange pair
574                        pair.done();
575    
576                        // okay we are done, so notify the exchange was sent
577                        if (producer != null) {
578                            long timeTaken = watch.stop();
579                            Endpoint endpoint = producer.getEndpoint();
580                            // emit event that the exchange was sent to the endpoint
581                            EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
582                        }
583    
584                        // we only have to handle async completion of the routing slip
585                        if (doneSync) {
586                            return;
587                        }
588    
589                        // continue processing the multicast asynchronously
590                        Exchange subExchange = exchange;
591    
592                        // Decide whether to continue with the multicast or not; similar logic to the Pipeline
593                        // remember to test for stop on exception and aggregate before copying back results
594                        boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
595                        if (stopOnException && !continueProcessing) {
596                            if (subExchange.getException() != null) {
597                                // wrap in exception to explain where it failed
598                                subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
599                            } else {
600                                // we want to stop on exception, and the exception was handled by the error handler
601                                // this is similar to what the pipeline does, so we should do the same to not surprise end users
602                                // so we should set the failed exchange as the result and be done
603                                result.set(subExchange);
604                            }
605                            // and do the done work
606                            doDone(original, subExchange, callback, false, true);
607                            return;
608                        }
609    
610                        try {
611                            doAggregate(getAggregationStrategy(subExchange), result, subExchange);
612                        } catch (Throwable e) {
613                            // wrap in exception to explain where it failed
614                            subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
615                            // and do the done work
616                            doDone(original, subExchange, callback, false, true);
617                            return;
618                        }
619    
620                        total.incrementAndGet();
621    
622                        // maybe there are more processors to multicast
623                        while (it.hasNext()) {
624    
625                            // prepare and run the next
626                            ProcessorExchangePair pair = it.next();
627                            subExchange = pair.getExchange();
628                            updateNewExchange(subExchange, total.get(), pairs, it);
629                            boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
630    
631                            if (!sync) {
632                                LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
633                                return;
634                            }
635    
636                            // Decide whether to continue with the multicast or not; similar logic to the Pipeline
637                            // remember to test for stop on exception and aggregate before copying back results
638                            continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
639                            if (stopOnException && !continueProcessing) {
640                                if (subExchange.getException() != null) {
641                                    // wrap in exception to explain where it failed
642                                    subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
643                                } else {
644                                    // we want to stop on exception, and the exception was handled by the error handler
645                                    // this is similar to what the pipeline does, so we should do the same to not surprise end users
646                                    // so we should set the failed exchange as the result and be done
647                                    result.set(subExchange);
648                                }
649                                // and do the done work
650                                doDone(original, subExchange, callback, false, true);
651                                return;
652                            }
653    
654                            // must catch any exceptions from aggregation
655                            try {
656                                doAggregate(getAggregationStrategy(subExchange), result, subExchange);
657                            } catch (Throwable e) {
658                                // wrap in exception to explain where it failed
659                                subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
660                                // and do the done work
661                                doDone(original, subExchange, callback, false, true);
662                                return;
663                            }
664    
665                            total.incrementAndGet();
666                        }
667    
668                        // do the done work
669                        subExchange = result.get() != null ? result.get() : null;
670                        doDone(original, subExchange, callback, false, true);
671                    }
672                });
673            } finally {
674                // pop the block so by next round we have the same staring point and thus the tracing looks accurate
675                if (traced != null) {
676                    traced.popBlock();
677                }
678            }
679    
680            return sync;
681        }
682    
683        private void doProcessParallel(final ProcessorExchangePair pair) throws Exception {
684            final Exchange exchange = pair.getExchange();
685            Processor processor = pair.getProcessor();
686            Producer producer = pair.getProducer();
687    
688            TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
689    
690            // compute time taken if sending to another endpoint
691            StopWatch watch = null;
692            if (producer != null) {
693                watch = new StopWatch();
694            }
695    
696            try {
697                // prepare tracing starting from a new block
698                if (traced != null) {
699                    traced.pushBlock();
700                }
701    
702                if (producer != null) {
703                    EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
704                }
705                // let the prepared process it, remember to begin the exchange pair
706                AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
707                pair.begin();
708                // we invoke it synchronously as parallel async routing is too hard
709                AsyncProcessorHelper.process(async, exchange);
710            } finally {
711                pair.done();
712                // pop the block so by next round we have the same staring point and thus the tracing looks accurate
713                if (traced != null) {
714                    traced.popBlock();
715                }
716                if (producer != null) {
717                    long timeTaken = watch.stop();
718                    Endpoint endpoint = producer.getEndpoint();
719                    // emit event that the exchange was sent to the endpoint
720                    // this is okay to do here in the finally block, as the processing is not using the async routing engine
721                    //( we invoke it synchronously as parallel async routing is too hard)
722                    EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
723                }
724            }
725        }
726    
727        /**
728         * Common work which must be done when we are done multicasting.
729         * <p/>
730         * This logic applies for both running synchronous and asynchronous as there are multiple exist points
731         * when using the asynchronous routing engine. And therefore we want the logic in one method instead
732         * of being scattered.
733         *
734         * @param original    the original exchange
735         * @param subExchange the current sub exchange, can be <tt>null</tt> for the synchronous part
736         * @param callback    the callback
737         * @param doneSync    the <tt>doneSync</tt> parameter to call on callback
738         * @param exhaust     whether or not error handling is exhausted
739         */
740        protected void doDone(Exchange original, Exchange subExchange, AsyncCallback callback, boolean doneSync, boolean exhaust) {
741            // cleanup any per exchange aggregation strategy
742            removeAggregationStrategyFromExchange(original);
743            if (original.getException() != null || subExchange != null && subExchange.getException() != null) {
744                // multicast uses error handling on its output processors and they have tried to redeliver
745                // so we shall signal back to the other error handlers that we are exhausted and they should not
746                // also try to redeliver as we will then do that twice
747                original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
748            }
749            if (subExchange != null) {
750                // and copy the current result to original so it will contain this result of this eip
751                ExchangeHelper.copyResults(original, subExchange);
752            }
753            callback.done(doneSync);
754        }
755    
756        /**
757         * Aggregate the {@link Exchange} with the current result
758         *
759         * @param strategy the aggregation strategy to use
760         * @param result   the current result
761         * @param exchange the exchange to be added to the result
762         */
763        protected synchronized void doAggregate(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) {
764            if (strategy != null) {
765                // prepare the exchanges for aggregation
766                Exchange oldExchange = result.get();
767                ExchangeHelper.prepareAggregation(oldExchange, exchange);
768                result.set(strategy.aggregate(oldExchange, exchange));
769            }
770        }
771    
772        protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
773                                         Iterator<ProcessorExchangePair> it) {
774            exchange.setProperty(Exchange.MULTICAST_INDEX, index);
775            if (it.hasNext()) {
776                exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE);
777            } else {
778                exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE);
779            }
780        }
781    
782        protected Integer getExchangeIndex(Exchange exchange) {
783            return exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class);
784        }
785    
786        protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
787            List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
788    
789            int index = 0;
790            for (Processor processor : processors) {
791                // copy exchange, and do not share the unit of work
792                Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
793    
794                // if we share unit of work, we need to prepare the child exchange
795                if (isShareUnitOfWork()) {
796                    prepareSharedUnitOfWork(copy, exchange);
797                }
798    
799                // and add the pair
800                RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
801                result.add(createProcessorExchangePair(index++, processor, copy, routeContext));
802            }
803    
804            if (exchange.getException() != null) {
805                // force any exceptions occurred during creation of exchange paris to be thrown
806                // before returning the answer;
807                throw exchange.getException();
808            }
809    
810            return result;
811        }
812    
813        /**
814         * Creates the {@link ProcessorExchangePair} which holds the processor and exchange to be send out.
815         * <p/>
816         * You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they
817         * need to be specially prepared before use.
818         *
819         * @param index        the index
820         * @param processor    the processor
821         * @param exchange     the exchange
822         * @param routeContext the route context
823         * @return prepared for use
824         */
825        protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange,
826                                                                    RouteContext routeContext) {
827            Processor prepared = processor;
828    
829            // set property which endpoint we send to
830            setToEndpoint(exchange, prepared);
831    
832            // rework error handling to support fine grained error handling
833            prepared = createErrorHandler(routeContext, exchange, prepared);
834    
835            // invoke on prepare on the exchange if specified
836            if (onPrepare != null) {
837                try {
838                    onPrepare.process(exchange);
839                } catch (Exception e) {
840                    exchange.setException(e);
841                }
842            }
843            return new DefaultProcessorExchangePair(index, processor, prepared, exchange);
844        }
845    
846        protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) {
847            Processor answer;
848    
849            if (routeContext != null) {
850                // wrap the producer in error handler so we have fine grained error handling on
851                // the output side instead of the input side
852                // this is needed to support redelivery on that output alone and not doing redelivery
853                // for the entire multicast block again which will start from scratch again
854    
855                // create key for cache
856                final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor);
857    
858                // lookup cached first to reuse and preserve memory
859                answer = errorHandlers.get(key);
860                if (answer != null) {
861                    LOG.trace("Using existing error handler for: {}", processor);
862                    return answer;
863                }
864    
865                LOG.trace("Creating error handler for: {}", processor);
866                ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
867                // create error handler (create error handler directly to keep it light weight,
868                // instead of using ProcessorDefinition.wrapInErrorHandler)
869                try {
870                    processor = builder.createErrorHandler(routeContext, processor);
871    
872                    // and wrap in unit of work processor so the copy exchange also can run under UoW
873                    answer = createUnitOfWorkProcessor(routeContext, processor, exchange);
874    
875                    // must start the error handler
876                    ServiceHelper.startServices(answer);
877                } catch (Exception e) {
878                    throw ObjectHelper.wrapRuntimeCamelException(e);
879                }
880                // here we don't cache the ChildUnitOfWorkProcessor
881                // As the UnitOfWorkProcess will be delegate to the Parent
882                if (!(answer instanceof ChildUnitOfWorkProcessor)) {
883                    // add to cache
884                    errorHandlers.putIfAbsent(key, answer);
885                }
886            } else {
887                // and wrap in unit of work processor so the copy exchange also can run under UoW
888                answer = createUnitOfWorkProcessor(routeContext, processor, exchange);
889            }
890    
891            return answer;
892        }
893    
894        /**
895         * Strategy to create the {@link UnitOfWorkProcessor} to be used for the sub route
896         *
897         * @param routeContext the route context
898         * @param processor    the processor wrapped in this unit of work processor
899         * @param exchange     the exchange
900         * @return the unit of work processor
901         */
902        protected UnitOfWorkProcessor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) {
903            UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
904            if (parent != null) {
905                return new ChildUnitOfWorkProcessor(parent, routeContext, processor);
906            } else {
907                return new UnitOfWorkProcessor(routeContext, processor);
908            }
909        }
910    
911        /**
912         * Prepares the exchange for participating in a shared unit of work
913         * <p/>
914         * This ensures a child exchange can access its parent {@link UnitOfWork} when it participate
915         * in a shared unit of work.
916         *
917         * @param childExchange  the child exchange
918         * @param parentExchange the parent exchange
919         */
920        protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange parentExchange) {
921            childExchange.setProperty(Exchange.PARENT_UNIT_OF_WORK, parentExchange.getUnitOfWork());
922        }
923    
924        protected void doStart() throws Exception {
925            if (isParallelProcessing() && executorService == null) {
926                throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set");
927            }
928            if (timeout > 0 && !isParallelProcessing()) {
929                throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled");
930            }
931            if (isParallelProcessing() && aggregateExecutorService == null) {
932                // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread
933                // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run
934                // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing
935                String name = getClass().getSimpleName() + "-AggregateTask";
936                aggregateExecutorService = createAggregateExecutorService(name);
937            }
938            ServiceHelper.startServices(processors);
939        }
940    
941        /**
942         * Strategy to create the thread pool for the aggregator background task which waits for and aggregates
943         * completed tasks when running in parallel mode.
944         *
945         * @param name  the suggested name for the background thread
946         * @return the thread pool
947         */
948        protected synchronized ExecutorService createAggregateExecutorService(String name) {
949            // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in
950            return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
951        }
952    
953        @Override
954        protected void doStop() throws Exception {
955            ServiceHelper.stopServices(processors, errorHandlers);
956        }
957    
958        @Override
959        protected void doShutdown() throws Exception {
960            ServiceHelper.stopAndShutdownServices(processors, errorHandlers);
961            // only clear error handlers when shutting down
962            errorHandlers.clear();
963    
964            if (shutdownExecutorService && executorService != null) {
965                getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
966            }
967            if (aggregateExecutorService != null) {
968                getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService);
969            }
970        }
971    
972        protected static void setToEndpoint(Exchange exchange, Processor processor) {
973            if (processor instanceof Producer) {
974                Producer producer = (Producer) processor;
975                exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
976            }
977        }
978    
979        protected AggregationStrategy getAggregationStrategy(Exchange exchange) {
980            AggregationStrategy answer = null;
981    
982            // prefer to use per Exchange aggregation strategy over a global strategy
983            if (exchange != null) {
984                Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
985                Map<Object, AggregationStrategy> map = CastUtils.cast(property);
986                if (map != null) {
987                    answer = map.get(this);
988                }
989            }
990            if (answer == null) {
991                // fallback to global strategy
992                answer = getAggregationStrategy();
993            }
994            return answer;
995        }
996    
997        /**
998         * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}.
999         *
1000         * @param exchange            the exchange
1001         * @param aggregationStrategy the strategy
1002         */
1003        protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) {
1004            Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
1005            Map<Object, AggregationStrategy> map = CastUtils.cast(property);
1006            if (map == null) {
1007                map = new HashMap<Object, AggregationStrategy>();
1008            } else {
1009                // it is not safe to use the map directly as the exchange doesn't have the deep copy of it's properties
1010                // we just create a new copy if we need to change the map
1011                map = new HashMap<Object, AggregationStrategy>(map);
1012            }
1013            // store the strategy using this processor as the key
1014            // (so we can store multiple strategies on the same exchange)
1015            map.put(this, aggregationStrategy);
1016            exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map);
1017        }
1018    
1019        /**
1020         * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange}
1021         * which must be done after use.
1022         *
1023         * @param exchange the current exchange
1024         */
1025        protected void removeAggregationStrategyFromExchange(Exchange exchange) {
1026            Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
1027            Map<Object, AggregationStrategy> map = CastUtils.cast(property);
1028            if (map == null) {
1029                return;
1030            }
1031            // remove the strategy using this processor as the key
1032            map.remove(this);
1033        }
1034    
1035        /**
1036         * Is the multicast processor working in streaming mode?
1037         * <p/>
1038         * In streaming mode:
1039         * <ul>
1040         * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li>
1041         * <li>for parallel processing, we start aggregating responses as they get send back to the processor;
1042         * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li>
1043         * </ul>
1044         */
1045        public boolean isStreaming() {
1046            return streaming;
1047        }
1048    
1049        /**
1050         * Should the multicast processor stop processing further exchanges in case of an exception occurred?
1051         */
1052        public boolean isStopOnException() {
1053            return stopOnException;
1054        }
1055    
1056        /**
1057         * Returns the producers to multicast to
1058         */
1059        public Collection<Processor> getProcessors() {
1060            return processors;
1061        }
1062    
1063        /**
1064         * An optional timeout in millis when using parallel processing
1065         */
1066        public long getTimeout() {
1067            return timeout;
1068        }
1069    
1070        /**
1071         * Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead.
1072         */
1073        public AggregationStrategy getAggregationStrategy() {
1074            return aggregationStrategy;
1075        }
1076    
1077        public boolean isParallelProcessing() {
1078            return parallelProcessing;
1079        }
1080    
1081        public boolean isShareUnitOfWork() {
1082            return shareUnitOfWork;
1083        }
1084    
1085        public List<Processor> next() {
1086            if (!hasNext()) {
1087                return null;
1088            }
1089            return new ArrayList<Processor>(processors);
1090        }
1091    
1092        public boolean hasNext() {
1093            return processors != null && !processors.isEmpty();
1094        }
1095    }