001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.io.Closeable;
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.Callable;
026import java.util.concurrent.CompletionService;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorCompletionService;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.concurrent.atomic.AtomicInteger;
037
038import org.apache.camel.AsyncCallback;
039import org.apache.camel.AsyncProcessor;
040import org.apache.camel.CamelContext;
041import org.apache.camel.CamelContextAware;
042import org.apache.camel.CamelExchangeException;
043import org.apache.camel.Endpoint;
044import org.apache.camel.ErrorHandlerFactory;
045import org.apache.camel.Exchange;
046import org.apache.camel.Navigate;
047import org.apache.camel.Processor;
048import org.apache.camel.Producer;
049import org.apache.camel.StreamCache;
050import org.apache.camel.Traceable;
051import org.apache.camel.processor.aggregate.AggregationStrategy;
052import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
053import org.apache.camel.processor.aggregate.DelegateAggregationStrategy;
054import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
055import org.apache.camel.spi.IdAware;
056import org.apache.camel.spi.RouteContext;
057import org.apache.camel.spi.TracedRouteNodes;
058import org.apache.camel.spi.UnitOfWork;
059import org.apache.camel.support.ServiceSupport;
060import org.apache.camel.util.AsyncProcessorConverterHelper;
061import org.apache.camel.util.AsyncProcessorHelper;
062import org.apache.camel.util.CastUtils;
063import org.apache.camel.util.EventHelper;
064import org.apache.camel.util.ExchangeHelper;
065import org.apache.camel.util.IOHelper;
066import org.apache.camel.util.KeyValueHolder;
067import org.apache.camel.util.ObjectHelper;
068import org.apache.camel.util.ServiceHelper;
069import org.apache.camel.util.StopWatch;
070import org.apache.camel.util.concurrent.AtomicException;
071import org.apache.camel.util.concurrent.AtomicExchange;
072import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
073import org.slf4j.Logger;
074import org.slf4j.LoggerFactory;
075
076import static org.apache.camel.util.ObjectHelper.notNull;
077
078
079/**
080 * Implements the Multicast pattern to send a message exchange to a number of
081 * endpoints, each endpoint receiving a copy of the message exchange.
082 *
083 * @version
084 * @see Pipeline
085 */
086public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware {
087
088    private static final Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class);
089
090    /**
091     * Class that represent each step in the multicast route to do
092     */
093    static final class DefaultProcessorExchangePair implements ProcessorExchangePair {
094        private final int index;
095        private final Processor processor;
096        private final Processor prepared;
097        private final Exchange exchange;
098
099        private DefaultProcessorExchangePair(int index, Processor processor, Processor prepared, Exchange exchange) {
100            this.index = index;
101            this.processor = processor;
102            this.prepared = prepared;
103            this.exchange = exchange;
104        }
105
106        public int getIndex() {
107            return index;
108        }
109
110        public Exchange getExchange() {
111            return exchange;
112        }
113
114        public Producer getProducer() {
115            if (processor instanceof Producer) {
116                return (Producer) processor;
117            }
118            return null;
119        }
120
121        public Processor getProcessor() {
122            return prepared;
123        }
124
125        public void begin() {
126            // noop
127        }
128
129        public void done() {
130            // noop
131        }
132
133    }
134
135    /**
136     * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges
137     * <p/>
138     * See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods.
139     */
140    static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> {
141
142        PreparedErrorHandler(RouteContext key, Processor value) {
143            super(key, value);
144        }
145
146    }
147
148    protected final Processor onPrepare;
149    private final CamelContext camelContext;
150    private String id;
151    private Collection<Processor> processors;
152    private final AggregationStrategy aggregationStrategy;
153    private final boolean parallelProcessing;
154    private final boolean streaming;
155    private final boolean parallelAggregate;
156    private final boolean stopOnAggregateException;
157    private final boolean stopOnException;
158    private final ExecutorService executorService;
159    private final boolean shutdownExecutorService;
160    private ExecutorService aggregateExecutorService;
161    private final long timeout;
162    private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>();
163    private final boolean shareUnitOfWork;
164
165    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) {
166        this(camelContext, processors, null);
167    }
168
169    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
170        this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false, false);
171    }
172
173    @Deprecated
174    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
175                              boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
176                              boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
177        this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
178                streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false);
179    }
180
181    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
182                              ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare,
183                              boolean shareUnitOfWork, boolean parallelAggregate) {
184        this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare,
185             shareUnitOfWork, false, false);
186    }
187    
188    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
189                              boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming,
190                              boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork,
191                              boolean parallelAggregate, boolean stopOnAggregateException) {
192        notNull(camelContext, "camelContext");
193        this.camelContext = camelContext;
194        this.processors = processors;
195        this.aggregationStrategy = aggregationStrategy;
196        this.executorService = executorService;
197        this.shutdownExecutorService = shutdownExecutorService;
198        this.streaming = streaming;
199        this.stopOnException = stopOnException;
200        // must enable parallel if executor service is provided
201        this.parallelProcessing = parallelProcessing || executorService != null;
202        this.timeout = timeout;
203        this.onPrepare = onPrepare;
204        this.shareUnitOfWork = shareUnitOfWork;
205        this.parallelAggregate = parallelAggregate;
206        this.stopOnAggregateException = stopOnAggregateException;
207    }
208
209    @Override
210    public String toString() {
211        return "Multicast[" + getProcessors() + "]";
212    }
213
214    public String getId() {
215        return id;
216    }
217
218    public void setId(String id) {
219        this.id = id;
220    }
221
222    public String getTraceLabel() {
223        return "multicast";
224    }
225
226    public CamelContext getCamelContext() {
227        return camelContext;
228    }
229
230    public void process(Exchange exchange) throws Exception {
231        AsyncProcessorHelper.process(this, exchange);
232    }
233
234    public boolean process(Exchange exchange, AsyncCallback callback) {
235        final AtomicExchange result = new AtomicExchange();
236        Iterable<ProcessorExchangePair> pairs = null;
237
238        try {
239            boolean sync = true;
240
241            pairs = createProcessorExchangePairs(exchange);
242
243            if (isParallelProcessing()) {
244                // ensure an executor is set when running in parallel
245                ObjectHelper.notNull(executorService, "executorService", this);
246                doProcessParallel(exchange, result, pairs, isStreaming(), callback);
247            } else {
248                sync = doProcessSequential(exchange, result, pairs, callback);
249            }
250
251            if (!sync) {
252                // the remainder of the multicast will be completed async
253                // so we break out now, then the callback will be invoked which then continue routing from where we left here
254                return false;
255            }
256        } catch (Throwable e) {
257            exchange.setException(e);
258            // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted
259            // and do the done work
260            doDone(exchange, null, pairs, callback, true, false);
261            return true;
262        }
263
264        // multicasting was processed successfully
265        // and do the done work
266        Exchange subExchange = result.get() != null ? result.get() : null;
267        doDone(exchange, subExchange, pairs, callback, true, true);
268        return true;
269    }
270
271    protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs,
272                                     final boolean streaming, final AsyncCallback callback) throws Exception {
273
274        ObjectHelper.notNull(executorService, "ExecutorService", this);
275        ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this);
276
277        final CompletionService<Exchange> completion;
278        if (streaming) {
279            // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence)
280            completion = new ExecutorCompletionService<Exchange>(executorService);
281        } else {
282            // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence)
283            completion = new SubmitOrderedCompletionService<Exchange>(executorService);
284        }
285
286        final AtomicInteger total = new AtomicInteger(0);
287        final Iterator<ProcessorExchangePair> it = pairs.iterator();
288
289        if (it.hasNext()) {
290            // when parallel then aggregate on the fly
291            final AtomicBoolean running = new AtomicBoolean(true);
292            final AtomicBoolean allTasksSubmitted = new AtomicBoolean();
293            final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1);
294            final AtomicException executionException = new AtomicException();
295
296            // issue task to execute in separate thread so it can aggregate on-the-fly
297            // while we submit new tasks, and those tasks complete concurrently
298            // this allows us to optimize work and reduce memory consumption
299            final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running,
300                    aggregationOnTheFlyDone, allTasksSubmitted, executionException);
301            final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean();
302
303            LOG.trace("Starting to submit parallel tasks");
304            
305            try {
306                while (it.hasNext()) {
307                    final ProcessorExchangePair pair = it.next();
308                    // in case the iterator returns null then continue to next
309                    if (pair == null) {
310                        continue;
311                    }
312    
313                    final Exchange subExchange = pair.getExchange();
314                    updateNewExchange(subExchange, total.intValue(), pairs, it);
315    
316                    completion.submit(new Callable<Exchange>() {
317                        public Exchange call() throws Exception {
318                            // start the aggregation task at this stage only in order not to pile up too many threads
319                            if (aggregationTaskSubmitted.compareAndSet(false, true)) {
320                                // but only submit the aggregation task once
321                                aggregateExecutorService.submit(aggregateOnTheFlyTask);
322                            }
323    
324                            if (!running.get()) {
325                                // do not start processing the task if we are not running
326                                return subExchange;
327                            }
328    
329                            try {
330                                doProcessParallel(pair);
331                            } catch (Throwable e) {
332                                subExchange.setException(e);
333                            }
334    
335                            // Decide whether to continue with the multicast or not; similar logic to the Pipeline
336                            Integer number = getExchangeIndex(subExchange);
337                            boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG);
338                            if (stopOnException && !continueProcessing) {
339                                // signal to stop running
340                                running.set(false);
341                                // throw caused exception
342                                if (subExchange.getException() != null) {
343                                    // wrap in exception to explain where it failed
344                                    CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException());
345                                    subExchange.setException(cause);
346                                }
347                            }
348    
349                            LOG.trace("Parallel processing complete for exchange: {}", subExchange);
350                            return subExchange;
351                        }
352                    });
353    
354                    total.incrementAndGet();
355                }
356            } catch (Throwable e) {
357                // The methods it.hasNext and it.next can throw RuntimeExceptions when custom iterators are implemented.
358                // We have to catch the exception here otherwise the aggregator threads would pile up.
359                if (e instanceof Exception) {
360                    executionException.set((Exception) e);
361                } else {
362                    executionException.set(ObjectHelper.wrapRuntimeCamelException(e));
363                }
364                // and because of the exception we must signal we are done so the latch can open and let the other thread continue processing
365                LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId());
366                LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId());
367                aggregationOnTheFlyDone.countDown();
368            }
369
370            // signal all tasks has been submitted
371            LOG.trace("Signaling that all {} tasks has been submitted.", total.get());
372            allTasksSubmitted.set(true);
373
374            // its to hard to do parallel async routing so we let the caller thread be synchronously
375            // and have it pickup the replies and do the aggregation (eg we use a latch to wait)
376            // wait for aggregation to be done
377            LOG.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", total.get(), original.getExchangeId());
378            aggregationOnTheFlyDone.await();
379
380            // did we fail for whatever reason, if so throw that caused exception
381            if (executionException.get() != null) {
382                if (LOG.isDebugEnabled()) {
383                    LOG.debug("Parallel processing failed due {}", executionException.get().getMessage());
384                }
385                throw executionException.get();
386            }
387        }
388
389        // no everything is okay so we are done
390        LOG.debug("Done parallel processing {} exchanges", total);
391    }
392
393    /**
394     * Boss worker to control aggregate on-the-fly for completed tasks when using parallel processing.
395     * <p/>
396     * This ensures lower memory consumption as we do not need to keep all completed tasks in memory
397     * before we perform aggregation. Instead this separate thread will run and aggregate when new
398     * completed tasks is done.
399     * <p/>
400     * The logic is fairly complex as this implementation has to keep track how far it got, and also
401     * signal back to the <i>main</t> thread when its done, so the <i>main</t> thread can continue
402     * processing when the entire splitting is done.
403     */
404    private final class AggregateOnTheFlyTask implements Runnable {
405
406        private final AtomicExchange result;
407        private final Exchange original;
408        private final AtomicInteger total;
409        private final CompletionService<Exchange> completion;
410        private final AtomicBoolean running;
411        private final CountDownLatch aggregationOnTheFlyDone;
412        private final AtomicBoolean allTasksSubmitted;
413        private final AtomicException executionException;
414
415        private AggregateOnTheFlyTask(AtomicExchange result, Exchange original, AtomicInteger total,
416                                      CompletionService<Exchange> completion, AtomicBoolean running,
417                                      CountDownLatch aggregationOnTheFlyDone, AtomicBoolean allTasksSubmitted,
418                                      AtomicException executionException) {
419            this.result = result;
420            this.original = original;
421            this.total = total;
422            this.completion = completion;
423            this.running = running;
424            this.aggregationOnTheFlyDone = aggregationOnTheFlyDone;
425            this.allTasksSubmitted = allTasksSubmitted;
426            this.executionException = executionException;
427        }
428
429        public void run() {
430            LOG.trace("Aggregate on the fly task started for exchangeId: {}", original.getExchangeId());
431
432            try {
433                aggregateOnTheFly();
434            } catch (Throwable e) {
435                if (e instanceof Exception) {
436                    executionException.set((Exception) e);
437                } else {
438                    executionException.set(ObjectHelper.wrapRuntimeCamelException(e));
439                }
440            } finally {
441                // must signal we are done so the latch can open and let the other thread continue processing
442                LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId());
443                LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId());
444                aggregationOnTheFlyDone.countDown();
445            }
446        }
447
448        private void aggregateOnTheFly() throws InterruptedException, ExecutionException {
449            final AtomicBoolean timedOut = new AtomicBoolean();
450            boolean stoppedOnException = false;
451            final StopWatch watch = new StopWatch();
452            final AtomicInteger aggregated = new AtomicInteger();
453            boolean done = false;
454            // not a for loop as on the fly may still run
455            while (!done) {
456                // check if we have already aggregate everything
457                if (allTasksSubmitted.get() && aggregated.intValue() >= total.get()) {
458                    LOG.debug("Done aggregating {} exchanges on the fly.", aggregated);
459                    break;
460                }
461
462                Future<Exchange> future;
463                if (timedOut.get()) {
464                    // we are timed out but try to grab if some tasks has been completed
465                    // poll will return null if no tasks is present
466                    future = completion.poll();
467                    LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future);
468                } else if (timeout > 0) {
469                    long left = timeout - watch.taken();
470                    if (left < 0) {
471                        left = 0;
472                    }
473                    LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left);
474                    future = completion.poll(left, TimeUnit.MILLISECONDS);
475                } else {
476                    LOG.trace("Polling completion task #{}", aggregated);
477                    // we must not block so poll every second
478                    future = completion.poll(1, TimeUnit.SECONDS);
479                    if (future == null) {
480                        // and continue loop which will recheck if we are done
481                        continue;
482                    }
483                }
484
485                if (future == null) {
486                    ParallelAggregateTimeoutTask task = new ParallelAggregateTimeoutTask(original, result, completion, aggregated, total, timedOut);
487                    if (parallelAggregate) {
488                        aggregateExecutorService.submit(task);
489                    } else {
490                        // in non parallel mode then just run the task
491                        task.run();
492                    }
493                } else {
494                    // there is a result to aggregate
495                    Exchange subExchange = future.get();
496
497                    // Decide whether to continue with the multicast or not; similar logic to the Pipeline
498                    Integer number = getExchangeIndex(subExchange);
499                    boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG);
500                    if (stopOnException && !continueProcessing) {
501                        // we want to stop on exception and an exception or failure occurred
502                        // this is similar to what the pipeline does, so we should do the same to not surprise end users
503                        // so we should set the failed exchange as the result and break out
504                        result.set(subExchange);
505                        stoppedOnException = true;
506                        break;
507                    }
508
509                    // we got a result so aggregate it
510                    ParallelAggregateTask task = new ParallelAggregateTask(result, subExchange, aggregated);
511                    if (parallelAggregate) {
512                        aggregateExecutorService.submit(task);
513                    } else {
514                        // in non parallel mode then just run the task
515                        task.run();
516                    }
517                }
518            }
519
520            if (timedOut.get() || stoppedOnException) {
521                if (timedOut.get()) {
522                    LOG.debug("Cancelling tasks due timeout after {} millis.", timeout);
523                }
524                if (stoppedOnException) {
525                    LOG.debug("Cancelling tasks due stopOnException.");
526                }
527                // cancel tasks as we timed out (its safe to cancel done tasks)
528                running.set(false);
529            }
530        }
531    }
532
533    /**
534     * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing.
535     */
536    private final class ParallelAggregateTask implements Runnable {
537
538        private final AtomicExchange result;
539        private final Exchange subExchange;
540        private final AtomicInteger aggregated;
541
542        private ParallelAggregateTask(AtomicExchange result, Exchange subExchange, AtomicInteger aggregated) {
543            this.result = result;
544            this.subExchange = subExchange;
545            this.aggregated = aggregated;
546        }
547
548        @Override
549        public void run() {
550            try {
551                if (parallelAggregate) {
552                    doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
553                } else {
554                    doAggregate(getAggregationStrategy(subExchange), result, subExchange);
555                }
556            } catch (Throwable e) {
557                if (isStopOnAggregateException()) {
558                    throw e;
559                } else {
560                    // wrap in exception to explain where it failed
561                    CamelExchangeException cex = new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e);
562                    subExchange.setException(cex);
563                    LOG.debug(cex.getMessage(), cex);
564                }
565            } finally {
566                aggregated.incrementAndGet();
567            }
568        }
569    }
570
571    /**
572     * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing.
573     */
574    private final class ParallelAggregateTimeoutTask implements Runnable {
575
576        private final Exchange original;
577        private final AtomicExchange result;
578        private final CompletionService<Exchange> completion;
579        private final AtomicInteger aggregated;
580        private final AtomicInteger total;
581        private final AtomicBoolean timedOut;
582
583        private ParallelAggregateTimeoutTask(Exchange original, AtomicExchange result, CompletionService<Exchange> completion,
584                                             AtomicInteger aggregated, AtomicInteger total, AtomicBoolean timedOut) {
585            this.original = original;
586            this.result = result;
587            this.completion = completion;
588            this.aggregated = aggregated;
589            this.total = total;
590            this.timedOut = timedOut;
591        }
592
593        @Override
594        public void run() {
595            AggregationStrategy strategy = getAggregationStrategy(null);
596            if (strategy instanceof DelegateAggregationStrategy) {
597                strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
598            }
599            if (strategy instanceof TimeoutAwareAggregationStrategy) {
600                // notify the strategy we timed out
601                Exchange oldExchange = result.get();
602                if (oldExchange == null) {
603                    // if they all timed out the result may not have been set yet, so use the original exchange
604                    oldExchange = original;
605                }
606                ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout);
607            } else {
608                // log a WARN we timed out since it will not be aggregated and the Exchange will be lost
609                LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated.intValue());
610            }
611            LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue());
612            timedOut.set(true);
613
614            // mark that index as timed out, which allows us to try to retrieve
615            // any already completed tasks in the next loop
616            if (completion instanceof SubmitOrderedCompletionService) {
617                ((SubmitOrderedCompletionService<?>) completion).timeoutTask();
618            }
619
620            // we timed out so increment the counter
621            aggregated.incrementAndGet();
622        }
623    }
624
625    protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception {
626        AtomicInteger total = new AtomicInteger();
627        Iterator<ProcessorExchangePair> it = pairs.iterator();
628
629        while (it.hasNext()) {
630            ProcessorExchangePair pair = it.next();
631            // in case the iterator returns null then continue to next
632            if (pair == null) {
633                continue;
634            }
635            Exchange subExchange = pair.getExchange();
636            updateNewExchange(subExchange, total.get(), pairs, it);
637
638            boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
639            if (!sync) {
640                if (LOG.isTraceEnabled()) {
641                    LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId());
642                }
643                // the remainder of the multicast will be completed async
644                // so we break out now, then the callback will be invoked which then continue routing from where we left here
645                return false;
646            }
647
648            if (LOG.isTraceEnabled()) {
649                LOG.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId());
650            }
651
652            // Decide whether to continue with the multicast or not; similar logic to the Pipeline
653            // remember to test for stop on exception and aggregate before copying back results
654            boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
655            if (stopOnException && !continueProcessing) {
656                if (subExchange.getException() != null) {
657                    // wrap in exception to explain where it failed
658                    CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException());
659                    subExchange.setException(cause);
660                }
661                // we want to stop on exception, and the exception was handled by the error handler
662                // this is similar to what the pipeline does, so we should do the same to not surprise end users
663                // so we should set the failed exchange as the result and be done
664                result.set(subExchange);
665                return true;
666            }
667
668            LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange);
669
670            if (parallelAggregate) {
671                doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
672            } else {
673                doAggregate(getAggregationStrategy(subExchange), result, subExchange);
674            }
675
676            total.incrementAndGet();
677        }
678
679        LOG.debug("Done sequential processing {} exchanges", total);
680
681        return true;
682    }
683
684    private boolean doProcessSequential(final Exchange original, final AtomicExchange result,
685                                        final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it,
686                                        final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) {
687        boolean sync = true;
688
689        final Exchange exchange = pair.getExchange();
690        Processor processor = pair.getProcessor();
691        final Producer producer = pair.getProducer();
692
693        TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
694
695        try {
696            // prepare tracing starting from a new block
697            if (traced != null) {
698                traced.pushBlock();
699            }
700
701            StopWatch sw = null;
702            if (producer != null) {
703                boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
704                if (sending) {
705                    sw = new StopWatch();
706                }
707            }
708
709            // compute time taken if sending to another endpoint
710            final StopWatch watch = sw;
711
712            // let the prepared process it, remember to begin the exchange pair
713            AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
714            pair.begin();
715            sync = async.process(exchange, new AsyncCallback() {
716                public void done(boolean doneSync) {
717                    // we are done with the exchange pair
718                    pair.done();
719
720                    // okay we are done, so notify the exchange was sent
721                    if (producer != null && watch != null) {
722                        long timeTaken = watch.taken();
723                        Endpoint endpoint = producer.getEndpoint();
724                        // emit event that the exchange was sent to the endpoint
725                        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
726                    }
727
728                    // we only have to handle async completion of the routing slip
729                    if (doneSync) {
730                        return;
731                    }
732
733                    // continue processing the multicast asynchronously
734                    Exchange subExchange = exchange;
735
736                    // Decide whether to continue with the multicast or not; similar logic to the Pipeline
737                    // remember to test for stop on exception and aggregate before copying back results
738                    boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
739                    if (stopOnException && !continueProcessing) {
740                        if (subExchange.getException() != null) {
741                            // wrap in exception to explain where it failed
742                            subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
743                        } else {
744                            // we want to stop on exception, and the exception was handled by the error handler
745                            // this is similar to what the pipeline does, so we should do the same to not surprise end users
746                            // so we should set the failed exchange as the result and be done
747                            result.set(subExchange);
748                        }
749                        // and do the done work
750                        doDone(original, subExchange, pairs, callback, false, true);
751                        return;
752                    }
753
754                    try {
755                        if (parallelAggregate) {
756                            doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
757                        } else {
758                            doAggregate(getAggregationStrategy(subExchange), result, subExchange);
759                        }
760                    } catch (Throwable e) {
761                        // wrap in exception to explain where it failed
762                        subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
763                        // and do the done work
764                        doDone(original, subExchange, pairs, callback, false, true);
765                        return;
766                    }
767
768                    total.incrementAndGet();
769
770                    // maybe there are more processors to multicast
771                    while (it.hasNext()) {
772
773                        // prepare and run the next
774                        ProcessorExchangePair pair = it.next();
775                        subExchange = pair.getExchange();
776                        updateNewExchange(subExchange, total.get(), pairs, it);
777                        boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
778
779                        if (!sync) {
780                            LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
781                            return;
782                        }
783
784                        // Decide whether to continue with the multicast or not; similar logic to the Pipeline
785                        // remember to test for stop on exception and aggregate before copying back results
786                        continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
787                        if (stopOnException && !continueProcessing) {
788                            if (subExchange.getException() != null) {
789                                // wrap in exception to explain where it failed
790                                subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
791                            } else {
792                                // we want to stop on exception, and the exception was handled by the error handler
793                                // this is similar to what the pipeline does, so we should do the same to not surprise end users
794                                // so we should set the failed exchange as the result and be done
795                                result.set(subExchange);
796                            }
797                            // and do the done work
798                            doDone(original, subExchange, pairs, callback, false, true);
799                            return;
800                        }
801
802                        // must catch any exceptions from aggregation
803                        try {
804                            if (parallelAggregate) {
805                                doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
806                            } else {
807                                doAggregate(getAggregationStrategy(subExchange), result, subExchange);
808                            }
809                        } catch (Throwable e) {
810                            // wrap in exception to explain where it failed
811                            subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
812                            // and do the done work
813                            doDone(original, subExchange, pairs, callback, false, true);
814                            return;
815                        }
816
817                        total.incrementAndGet();
818                    }
819
820                    // do the done work
821                    subExchange = result.get() != null ? result.get() : null;
822                    doDone(original, subExchange, pairs, callback, false, true);
823                }
824            });
825        } finally {
826            // pop the block so by next round we have the same staring point and thus the tracing looks accurate
827            if (traced != null) {
828                traced.popBlock();
829            }
830        }
831
832        return sync;
833    }
834
835    private void doProcessParallel(final ProcessorExchangePair pair) throws Exception {
836        final Exchange exchange = pair.getExchange();
837        Processor processor = pair.getProcessor();
838        Producer producer = pair.getProducer();
839
840        TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
841
842        // compute time taken if sending to another endpoint
843        StopWatch watch = null;
844        try {
845            // prepare tracing starting from a new block
846            if (traced != null) {
847                traced.pushBlock();
848            }
849
850            if (producer != null) {
851                boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
852                if (sending) {
853                    watch = new StopWatch();
854                }
855            }
856            // let the prepared process it, remember to begin the exchange pair
857            AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
858            pair.begin();
859            // we invoke it synchronously as parallel async routing is too hard
860            AsyncProcessorHelper.process(async, exchange);
861        } finally {
862            pair.done();
863            // pop the block so by next round we have the same staring point and thus the tracing looks accurate
864            if (traced != null) {
865                traced.popBlock();
866            }
867            if (producer != null && watch != null) {
868                Endpoint endpoint = producer.getEndpoint();
869                long timeTaken = watch.taken();
870                // emit event that the exchange was sent to the endpoint
871                // this is okay to do here in the finally block, as the processing is not using the async routing engine
872                //( we invoke it synchronously as parallel async routing is too hard)
873                EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
874            }
875        }
876    }
877
878    /**
879     * Common work which must be done when we are done multicasting.
880     * <p/>
881     * This logic applies for both running synchronous and asynchronous as there are multiple exist points
882     * when using the asynchronous routing engine. And therefore we want the logic in one method instead
883     * of being scattered.
884     *
885     * @param original     the original exchange
886     * @param subExchange  the current sub exchange, can be <tt>null</tt> for the synchronous part
887     * @param pairs        the pairs with the exchanges to process
888     * @param callback     the callback
889     * @param doneSync     the <tt>doneSync</tt> parameter to call on callback
890     * @param forceExhaust whether or not error handling is exhausted
891     */
892    protected void doDone(Exchange original, Exchange subExchange, final Iterable<ProcessorExchangePair> pairs,
893                          AsyncCallback callback, boolean doneSync, boolean forceExhaust) {
894
895        // we are done so close the pairs iterator
896        if (pairs instanceof Closeable) {
897            IOHelper.close((Closeable) pairs, "pairs", LOG);
898        }
899
900        AggregationStrategy strategy = getAggregationStrategy(subExchange);
901        if (strategy instanceof DelegateAggregationStrategy) {
902            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
903        }
904        // invoke the on completion callback
905        if (strategy instanceof CompletionAwareAggregationStrategy) {
906            ((CompletionAwareAggregationStrategy) strategy).onCompletion(subExchange);
907        }
908
909        // cleanup any per exchange aggregation strategy
910        removeAggregationStrategyFromExchange(original);
911
912        // we need to know if there was an exception, and if the stopOnException option was enabled
913        // also we would need to know if any error handler has attempted redelivery and exhausted
914        boolean stoppedOnException = false;
915        boolean exception = false;
916        boolean exhaust = forceExhaust || subExchange != null && (subExchange.getException() != null || ExchangeHelper.isRedeliveryExhausted(subExchange));
917        if (original.getException() != null || subExchange != null && subExchange.getException() != null) {
918            // there was an exception and we stopped
919            stoppedOnException = isStopOnException();
920            exception = true;
921        }
922
923        // must copy results at this point
924        if (subExchange != null) {
925            if (stoppedOnException) {
926                // if we stopped due an exception then only propagate the exception
927                original.setException(subExchange.getException());
928            } else {
929                // copy the current result to original so it will contain this result of this eip
930                ExchangeHelper.copyResults(original, subExchange);
931            }
932        }
933
934        // .. and then if there was an exception we need to configure the redelivery exhaust
935        // for example the noErrorHandler will not cause redelivery exhaust so if this error
936        // handled has been in use, then the exhaust would be false (if not forced)
937        if (exception) {
938            // multicast uses error handling on its output processors and they have tried to redeliver
939            // so we shall signal back to the other error handlers that we are exhausted and they should not
940            // also try to redeliver as we will then do that twice
941            original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
942        }
943
944        callback.done(doneSync);
945    }
946
947    /**
948     * Aggregate the {@link Exchange} with the current result.
949     * This method is synchronized and is called directly when parallelAggregate is disabled (by default).
950     *
951     * @param strategy the aggregation strategy to use
952     * @param result   the current result
953     * @param exchange the exchange to be added to the result
954     * @see #doAggregateInternal(org.apache.camel.processor.aggregate.AggregationStrategy, org.apache.camel.util.concurrent.AtomicExchange, org.apache.camel.Exchange)
955     */
956    protected synchronized void doAggregate(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) {
957        doAggregateInternal(strategy, result, exchange);
958    }
959
960    /**
961     * Aggregate the {@link Exchange} with the current result.
962     * This method is unsynchronized and is called directly when parallelAggregate is enabled.
963     * In all other cases, this method is called from the doAggregate which is a synchronized method
964     *
965     * @param strategy the aggregation strategy to use
966     * @param result   the current result
967     * @param exchange the exchange to be added to the result
968     * @see #doAggregate(org.apache.camel.processor.aggregate.AggregationStrategy, org.apache.camel.util.concurrent.AtomicExchange, org.apache.camel.Exchange)
969     */
970    protected void doAggregateInternal(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) {
971        if (strategy != null) {
972            // prepare the exchanges for aggregation
973            Exchange oldExchange = result.get();
974            ExchangeHelper.prepareAggregation(oldExchange, exchange);
975            result.set(strategy.aggregate(oldExchange, exchange));
976        }
977    }
978
979    protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
980                                     Iterator<ProcessorExchangePair> it) {
981        exchange.setProperty(Exchange.MULTICAST_INDEX, index);
982        if (it.hasNext()) {
983            exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE);
984        } else {
985            exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE);
986        }
987    }
988
989    protected Integer getExchangeIndex(Exchange exchange) {
990        return exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class);
991    }
992
993    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
994        List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
995
996        StreamCache streamCache = null;
997        if (isParallelProcessing() && exchange.getIn().getBody() instanceof StreamCache) {
998            // in parallel processing case, the stream must be copied, therefore get the stream
999            streamCache = (StreamCache) exchange.getIn().getBody();
1000        }
1001
1002        int index = 0;
1003        for (Processor processor : processors) {
1004            // copy exchange, and do not share the unit of work
1005            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
1006
1007            if (streamCache != null) {
1008                if (index > 0) {
1009                    // copy it otherwise parallel processing is not possible,
1010                    // because streams can only be read once
1011                    StreamCache copiedStreamCache = streamCache.copy(copy);
1012                    if (copiedStreamCache != null) {
1013                        copy.getIn().setBody(copiedStreamCache);
1014                    }
1015                }
1016            }
1017
1018            // If the multi-cast processor has an aggregation strategy
1019            // then the StreamCache created by the child routes must not be 
1020            // closed by the unit of work of the child route, but by the unit of 
1021            // work of the parent route or grand parent route or grand grand parent route ...(in case of nesting).
1022            // Set therefore the unit of work of the  parent route as stream cache unit of work, 
1023            // if it is not already set.
1024            if (copy.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
1025                copy.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, exchange.getUnitOfWork());
1026            }
1027            // if we share unit of work, we need to prepare the child exchange
1028            if (isShareUnitOfWork()) {
1029                prepareSharedUnitOfWork(copy, exchange);
1030            }
1031
1032            // and add the pair
1033            RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
1034            result.add(createProcessorExchangePair(index++, processor, copy, routeContext));
1035        }
1036
1037        if (exchange.getException() != null) {
1038            // force any exceptions occurred during creation of exchange paris to be thrown
1039            // before returning the answer;
1040            throw exchange.getException();
1041        }
1042
1043        return result;
1044    }
1045
1046    /**
1047     * Creates the {@link ProcessorExchangePair} which holds the processor and exchange to be send out.
1048     * <p/>
1049     * You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they
1050     * need to be specially prepared before use.
1051     *
1052     * @param index        the index
1053     * @param processor    the processor
1054     * @param exchange     the exchange
1055     * @param routeContext the route context
1056     * @return prepared for use
1057     */
1058    protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange,
1059                                                                RouteContext routeContext) {
1060        Processor prepared = processor;
1061
1062        // set property which endpoint we send to
1063        setToEndpoint(exchange, prepared);
1064
1065        // rework error handling to support fine grained error handling
1066        prepared = createErrorHandler(routeContext, exchange, prepared);
1067
1068        // invoke on prepare on the exchange if specified
1069        if (onPrepare != null) {
1070            try {
1071                onPrepare.process(exchange);
1072            } catch (Exception e) {
1073                exchange.setException(e);
1074            }
1075        }
1076        return new DefaultProcessorExchangePair(index, processor, prepared, exchange);
1077    }
1078
1079    protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) {
1080        Processor answer;
1081
1082        boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class);
1083
1084        // do not wrap in error handler if we are inside a try block
1085        if (!tryBlock && routeContext != null) {
1086            // wrap the producer in error handler so we have fine grained error handling on
1087            // the output side instead of the input side
1088            // this is needed to support redelivery on that output alone and not doing redelivery
1089            // for the entire multicast block again which will start from scratch again
1090
1091            // create key for cache
1092            final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor);
1093
1094            // lookup cached first to reuse and preserve memory
1095            answer = errorHandlers.get(key);
1096            if (answer != null) {
1097                LOG.trace("Using existing error handler for: {}", processor);
1098                return answer;
1099            }
1100
1101            LOG.trace("Creating error handler for: {}", processor);
1102            ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
1103            // create error handler (create error handler directly to keep it light weight,
1104            // instead of using ProcessorDefinition.wrapInErrorHandler)
1105            try {
1106                processor = builder.createErrorHandler(routeContext, processor);
1107
1108                // and wrap in unit of work processor so the copy exchange also can run under UoW
1109                answer = createUnitOfWorkProcessor(routeContext, processor, exchange);
1110
1111                boolean child = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null;
1112
1113                // must start the error handler
1114                ServiceHelper.startServices(answer);
1115
1116                // here we don't cache the child unit of work
1117                if (!child) {
1118                    // add to cache
1119                    errorHandlers.putIfAbsent(key, answer);
1120                }
1121
1122            } catch (Exception e) {
1123                throw ObjectHelper.wrapRuntimeCamelException(e);
1124            }
1125        } else {
1126            // and wrap in unit of work processor so the copy exchange also can run under UoW
1127            answer = createUnitOfWorkProcessor(routeContext, processor, exchange);
1128        }
1129
1130        return answer;
1131    }
1132
1133    /**
1134     * Strategy to create the unit of work to be used for the sub route
1135     *
1136     * @param routeContext the route context
1137     * @param processor    the processor
1138     * @param exchange     the exchange
1139     * @return the unit of work processor
1140     */
1141    protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) {
1142        CamelInternalProcessor internal = new CamelInternalProcessor(processor);
1143
1144        // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
1145        UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
1146        if (parent != null) {
1147            internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeContext, parent));
1148        } else {
1149            internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
1150        }
1151
1152        return internal;
1153    }
1154
1155    /**
1156     * Prepares the exchange for participating in a shared unit of work
1157     * <p/>
1158     * This ensures a child exchange can access its parent {@link UnitOfWork} when it participate
1159     * in a shared unit of work.
1160     *
1161     * @param childExchange  the child exchange
1162     * @param parentExchange the parent exchange
1163     */
1164    protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange parentExchange) {
1165        childExchange.setProperty(Exchange.PARENT_UNIT_OF_WORK, parentExchange.getUnitOfWork());
1166    }
1167
1168    protected void doStart() throws Exception {
1169        if (isParallelProcessing() && executorService == null) {
1170            throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set");
1171        }
1172        if (timeout > 0 && !isParallelProcessing()) {
1173            throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled");
1174        }
1175        if (isParallelProcessing() && aggregateExecutorService == null) {
1176            // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread
1177            // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run
1178            // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing
1179            String name = getClass().getSimpleName() + "-AggregateTask";
1180            aggregateExecutorService = createAggregateExecutorService(name);
1181        }
1182        if (aggregationStrategy instanceof CamelContextAware) {
1183            ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
1184        }
1185
1186        ServiceHelper.startServices(aggregationStrategy, processors);
1187    }
1188
1189    /**
1190     * Strategy to create the thread pool for the aggregator background task which waits for and aggregates
1191     * completed tasks when running in parallel mode.
1192     *
1193     * @param name  the suggested name for the background thread
1194     * @return the thread pool
1195     */
1196    protected synchronized ExecutorService createAggregateExecutorService(String name) {
1197        // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in
1198        return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
1199    }
1200
1201    @Override
1202    protected void doStop() throws Exception {
1203        ServiceHelper.stopServices(processors, errorHandlers, aggregationStrategy);
1204    }
1205
1206    @Override
1207    protected void doShutdown() throws Exception {
1208        ServiceHelper.stopAndShutdownServices(processors, errorHandlers, aggregationStrategy);
1209        // only clear error handlers when shutting down
1210        errorHandlers.clear();
1211
1212        if (shutdownExecutorService && executorService != null) {
1213            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
1214        }
1215        if (aggregateExecutorService != null) {
1216            getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService);
1217        }
1218    }
1219
1220    protected static void setToEndpoint(Exchange exchange, Processor processor) {
1221        if (processor instanceof Producer) {
1222            Producer producer = (Producer) processor;
1223            exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
1224        }
1225    }
1226
1227    protected AggregationStrategy getAggregationStrategy(Exchange exchange) {
1228        AggregationStrategy answer = null;
1229
1230        // prefer to use per Exchange aggregation strategy over a global strategy
1231        if (exchange != null) {
1232            Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
1233            Map<Object, AggregationStrategy> map = CastUtils.cast(property);
1234            if (map != null) {
1235                answer = map.get(this);
1236            }
1237        }
1238        if (answer == null) {
1239            // fallback to global strategy
1240            answer = getAggregationStrategy();
1241        }
1242        return answer;
1243    }
1244
1245    /**
1246     * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}.
1247     *
1248     * @param exchange            the exchange
1249     * @param aggregationStrategy the strategy
1250     */
1251    protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) {
1252        Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
1253        Map<Object, AggregationStrategy> map = CastUtils.cast(property);
1254        if (map == null) {
1255            map = new ConcurrentHashMap<Object, AggregationStrategy>();
1256        } else {
1257            // it is not safe to use the map directly as the exchange doesn't have the deep copy of it's properties
1258            // we just create a new copy if we need to change the map
1259            map = new ConcurrentHashMap<Object, AggregationStrategy>(map);
1260        }
1261        // store the strategy using this processor as the key
1262        // (so we can store multiple strategies on the same exchange)
1263        map.put(this, aggregationStrategy);
1264        exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map);
1265    }
1266
1267    /**
1268     * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange}
1269     * which must be done after use.
1270     *
1271     * @param exchange the current exchange
1272     */
1273    protected void removeAggregationStrategyFromExchange(Exchange exchange) {
1274        Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
1275        Map<Object, AggregationStrategy> map = CastUtils.cast(property);
1276        if (map == null) {
1277            return;
1278        }
1279        // remove the strategy using this processor as the key
1280        map.remove(this);
1281    }
1282
1283    /**
1284     * Is the multicast processor working in streaming mode?
1285     * <p/>
1286     * In streaming mode:
1287     * <ul>
1288     * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li>
1289     * <li>for parallel processing, we start aggregating responses as they get send back to the processor;
1290     * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li>
1291     * </ul>
1292     */
1293    public boolean isStreaming() {
1294        return streaming;
1295    }
1296
1297    /**
1298     * Should the multicast processor stop processing further exchanges in case of an exception occurred?
1299     */
1300    public boolean isStopOnException() {
1301        return stopOnException;
1302    }
1303
1304    /**
1305     * Returns the producers to multicast to
1306     */
1307    public Collection<Processor> getProcessors() {
1308        return processors;
1309    }
1310
1311    /**
1312     * An optional timeout in millis when using parallel processing
1313     */
1314    public long getTimeout() {
1315        return timeout;
1316    }
1317
1318    /**
1319     * Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead.
1320     */
1321    public AggregationStrategy getAggregationStrategy() {
1322        return aggregationStrategy;
1323    }
1324
1325    public boolean isParallelProcessing() {
1326        return parallelProcessing;
1327    }
1328
1329    public boolean isParallelAggregate() {
1330        return parallelAggregate;
1331    }
1332
1333    public boolean isStopOnAggregateException() {
1334        return stopOnAggregateException;
1335    }
1336
1337    public boolean isShareUnitOfWork() {
1338        return shareUnitOfWork;
1339    }
1340
1341    public List<Processor> next() {
1342        if (!hasNext()) {
1343            return null;
1344        }
1345        return new ArrayList<Processor>(processors);
1346    }
1347
1348    public boolean hasNext() {
1349        return processors != null && !processors.isEmpty();
1350    }
1351}