001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.io.Closeable;
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.Callable;
026import java.util.concurrent.CompletionService;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029import java.util.concurrent.CountDownLatch;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorCompletionService;
032import java.util.concurrent.ExecutorService;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.atomic.AtomicBoolean;
036import java.util.concurrent.atomic.AtomicInteger;
037
038import org.apache.camel.AsyncCallback;
039import org.apache.camel.AsyncProcessor;
040import org.apache.camel.CamelContext;
041import org.apache.camel.CamelExchangeException;
042import org.apache.camel.Endpoint;
043import org.apache.camel.ErrorHandlerFactory;
044import org.apache.camel.Exchange;
045import org.apache.camel.Navigate;
046import org.apache.camel.Processor;
047import org.apache.camel.Producer;
048import org.apache.camel.StreamCache;
049import org.apache.camel.Traceable;
050import org.apache.camel.processor.aggregate.AggregationStrategy;
051import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
052import org.apache.camel.processor.aggregate.DelegateAggregationStrategy;
053import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
054import org.apache.camel.spi.IdAware;
055import org.apache.camel.spi.RouteContext;
056import org.apache.camel.spi.TracedRouteNodes;
057import org.apache.camel.spi.UnitOfWork;
058import org.apache.camel.support.ServiceSupport;
059import org.apache.camel.util.AsyncProcessorConverterHelper;
060import org.apache.camel.util.AsyncProcessorHelper;
061import org.apache.camel.util.CastUtils;
062import org.apache.camel.util.EventHelper;
063import org.apache.camel.util.ExchangeHelper;
064import org.apache.camel.util.IOHelper;
065import org.apache.camel.util.KeyValueHolder;
066import org.apache.camel.util.ObjectHelper;
067import org.apache.camel.util.ServiceHelper;
068import org.apache.camel.util.StopWatch;
069import org.apache.camel.util.concurrent.AtomicException;
070import org.apache.camel.util.concurrent.AtomicExchange;
071import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075import static org.apache.camel.util.ObjectHelper.notNull;
076
077
078/**
079 * Implements the Multicast pattern to send a message exchange to a number of
080 * endpoints, each endpoint receiving a copy of the message exchange.
081 *
082 * @version
083 * @see Pipeline
084 */
085public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware {
086
087    private static final Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class);
088
089    /**
090     * Class that represent each step in the multicast route to do
091     */
092    static final class DefaultProcessorExchangePair implements ProcessorExchangePair {
093        private final int index;
094        private final Processor processor;
095        private final Processor prepared;
096        private final Exchange exchange;
097
098        private DefaultProcessorExchangePair(int index, Processor processor, Processor prepared, Exchange exchange) {
099            this.index = index;
100            this.processor = processor;
101            this.prepared = prepared;
102            this.exchange = exchange;
103        }
104
105        public int getIndex() {
106            return index;
107        }
108
109        public Exchange getExchange() {
110            return exchange;
111        }
112
113        public Producer getProducer() {
114            if (processor instanceof Producer) {
115                return (Producer) processor;
116            }
117            return null;
118        }
119
120        public Processor getProcessor() {
121            return prepared;
122        }
123
124        public void begin() {
125            // noop
126        }
127
128        public void done() {
129            // noop
130        }
131
132    }
133
134    /**
135     * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges
136     * <p/>
137     * See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods.
138     */
139    static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> {
140
141        PreparedErrorHandler(RouteContext key, Processor value) {
142            super(key, value);
143        }
144
145    }
146
147    protected final Processor onPrepare;
148    private final CamelContext camelContext;
149    private String id;
150    private Collection<Processor> processors;
151    private final AggregationStrategy aggregationStrategy;
152    private final boolean parallelProcessing;
153    private final boolean streaming;
154    private final boolean parallelAggregate;
155    private final boolean stopOnException;
156    private final ExecutorService executorService;
157    private final boolean shutdownExecutorService;
158    private ExecutorService aggregateExecutorService;
159    private final long timeout;
160    private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>();
161    private final boolean shareUnitOfWork;
162
163    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) {
164        this(camelContext, processors, null);
165    }
166
167    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
168        this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false, false);
169    }
170
171    @Deprecated
172    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
173                              boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
174                              boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
175        this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
176                streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false);
177    }
178
179    public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy,
180                              boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming,
181                              boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork,
182                              boolean parallelAggregate) {
183        notNull(camelContext, "camelContext");
184        this.camelContext = camelContext;
185        this.processors = processors;
186        this.aggregationStrategy = aggregationStrategy;
187        this.executorService = executorService;
188        this.shutdownExecutorService = shutdownExecutorService;
189        this.streaming = streaming;
190        this.stopOnException = stopOnException;
191        // must enable parallel if executor service is provided
192        this.parallelProcessing = parallelProcessing || executorService != null;
193        this.timeout = timeout;
194        this.onPrepare = onPrepare;
195        this.shareUnitOfWork = shareUnitOfWork;
196        this.parallelAggregate = parallelAggregate;
197    }
198
199    @Override
200    public String toString() {
201        return "Multicast[" + getProcessors() + "]";
202    }
203
204    public String getId() {
205        return id;
206    }
207
208    public void setId(String id) {
209        this.id = id;
210    }
211
212    public String getTraceLabel() {
213        return "multicast";
214    }
215
216    public CamelContext getCamelContext() {
217        return camelContext;
218    }
219
220    public void process(Exchange exchange) throws Exception {
221        AsyncProcessorHelper.process(this, exchange);
222    }
223
224    public boolean process(Exchange exchange, AsyncCallback callback) {
225        final AtomicExchange result = new AtomicExchange();
226        Iterable<ProcessorExchangePair> pairs = null;
227
228        try {
229            boolean sync = true;
230
231            pairs = createProcessorExchangePairs(exchange);
232
233            if (isParallelProcessing()) {
234                // ensure an executor is set when running in parallel
235                ObjectHelper.notNull(executorService, "executorService", this);
236                doProcessParallel(exchange, result, pairs, isStreaming(), callback);
237            } else {
238                sync = doProcessSequential(exchange, result, pairs, callback);
239            }
240
241            if (!sync) {
242                // the remainder of the multicast will be completed async
243                // so we break out now, then the callback will be invoked which then continue routing from where we left here
244                return false;
245            }
246        } catch (Throwable e) {
247            exchange.setException(e);
248            // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted
249            // and do the done work
250            doDone(exchange, null, pairs, callback, true, false);
251            return true;
252        }
253
254        // multicasting was processed successfully
255        // and do the done work
256        Exchange subExchange = result.get() != null ? result.get() : null;
257        doDone(exchange, subExchange, pairs, callback, true, true);
258        return true;
259    }
260
261    protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs,
262                                     final boolean streaming, final AsyncCallback callback) throws Exception {
263
264        ObjectHelper.notNull(executorService, "ExecutorService", this);
265        ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this);
266
267        final CompletionService<Exchange> completion;
268        if (streaming) {
269            // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence)
270            completion = new ExecutorCompletionService<Exchange>(executorService);
271        } else {
272            // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence)
273            completion = new SubmitOrderedCompletionService<Exchange>(executorService);
274        }
275
276        final AtomicInteger total = new AtomicInteger(0);
277        final Iterator<ProcessorExchangePair> it = pairs.iterator();
278
279        if (it.hasNext()) {
280            // when parallel then aggregate on the fly
281            final AtomicBoolean running = new AtomicBoolean(true);
282            final AtomicBoolean allTasksSubmitted = new AtomicBoolean();
283            final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1);
284            final AtomicException executionException = new AtomicException();
285
286            // issue task to execute in separate thread so it can aggregate on-the-fly
287            // while we submit new tasks, and those tasks complete concurrently
288            // this allows us to optimize work and reduce memory consumption
289            final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running,
290                    aggregationOnTheFlyDone, allTasksSubmitted, executionException);
291            final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean();
292
293            LOG.trace("Starting to submit parallel tasks");
294
295            while (it.hasNext()) {
296                final ProcessorExchangePair pair = it.next();
297                // in case the iterator returns null then continue to next
298                if (pair == null) {
299                    continue;
300                }
301
302                final Exchange subExchange = pair.getExchange();
303                updateNewExchange(subExchange, total.intValue(), pairs, it);
304
305                completion.submit(new Callable<Exchange>() {
306                    public Exchange call() throws Exception {
307                        // only start the aggregation task when the task is being executed to avoid staring
308                        // the aggregation task to early and pile up too many threads
309                        if (aggregationTaskSubmitted.compareAndSet(false, true)) {
310                            // but only submit the task once
311                            aggregateExecutorService.submit(aggregateOnTheFlyTask);
312                        }
313
314                        if (!running.get()) {
315                            // do not start processing the task if we are not running
316                            return subExchange;
317                        }
318
319                        try {
320                            doProcessParallel(pair);
321                        } catch (Throwable e) {
322                            subExchange.setException(e);
323                        }
324
325                        // Decide whether to continue with the multicast or not; similar logic to the Pipeline
326                        Integer number = getExchangeIndex(subExchange);
327                        boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG);
328                        if (stopOnException && !continueProcessing) {
329                            // signal to stop running
330                            running.set(false);
331                            // throw caused exception
332                            if (subExchange.getException() != null) {
333                                // wrap in exception to explain where it failed
334                                CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException());
335                                subExchange.setException(cause);
336                            }
337                        }
338
339                        LOG.trace("Parallel processing complete for exchange: {}", subExchange);
340                        return subExchange;
341                    }
342                });
343
344                total.incrementAndGet();
345            }
346
347            // signal all tasks has been submitted
348            LOG.trace("Signaling that all {} tasks has been submitted.", total.get());
349            allTasksSubmitted.set(true);
350
351            // its to hard to do parallel async routing so we let the caller thread be synchronously
352            // and have it pickup the replies and do the aggregation (eg we use a latch to wait)
353            // wait for aggregation to be done
354            LOG.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", total.get(), original.getExchangeId());
355            aggregationOnTheFlyDone.await();
356
357            // did we fail for whatever reason, if so throw that caused exception
358            if (executionException.get() != null) {
359                if (LOG.isDebugEnabled()) {
360                    LOG.debug("Parallel processing failed due {}", executionException.get().getMessage());
361                }
362                throw executionException.get();
363            }
364        }
365
366        // no everything is okay so we are done
367        LOG.debug("Done parallel processing {} exchanges", total);
368    }
369
370    /**
371     * Boss worker to control aggregate on-the-fly for completed tasks when using parallel processing.
372     * <p/>
373     * This ensures lower memory consumption as we do not need to keep all completed tasks in memory
374     * before we perform aggregation. Instead this separate thread will run and aggregate when new
375     * completed tasks is done.
376     * <p/>
377     * The logic is fairly complex as this implementation has to keep track how far it got, and also
378     * signal back to the <i>main</t> thread when its done, so the <i>main</t> thread can continue
379     * processing when the entire splitting is done.
380     */
381    private final class AggregateOnTheFlyTask implements Runnable {
382
383        private final AtomicExchange result;
384        private final Exchange original;
385        private final AtomicInteger total;
386        private final CompletionService<Exchange> completion;
387        private final AtomicBoolean running;
388        private final CountDownLatch aggregationOnTheFlyDone;
389        private final AtomicBoolean allTasksSubmitted;
390        private final AtomicException executionException;
391
392        private AggregateOnTheFlyTask(AtomicExchange result, Exchange original, AtomicInteger total,
393                                      CompletionService<Exchange> completion, AtomicBoolean running,
394                                      CountDownLatch aggregationOnTheFlyDone, AtomicBoolean allTasksSubmitted,
395                                      AtomicException executionException) {
396            this.result = result;
397            this.original = original;
398            this.total = total;
399            this.completion = completion;
400            this.running = running;
401            this.aggregationOnTheFlyDone = aggregationOnTheFlyDone;
402            this.allTasksSubmitted = allTasksSubmitted;
403            this.executionException = executionException;
404        }
405
406        public void run() {
407            LOG.trace("Aggregate on the fly task started for exchangeId: {}", original.getExchangeId());
408
409            try {
410                aggregateOnTheFly();
411            } catch (Throwable e) {
412                if (e instanceof Exception) {
413                    executionException.set((Exception) e);
414                } else {
415                    executionException.set(ObjectHelper.wrapRuntimeCamelException(e));
416                }
417            } finally {
418                // must signal we are done so the latch can open and let the other thread continue processing
419                LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId());
420                LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId());
421                aggregationOnTheFlyDone.countDown();
422            }
423        }
424
425        private void aggregateOnTheFly() throws InterruptedException, ExecutionException {
426            final AtomicBoolean timedOut = new AtomicBoolean();
427            boolean stoppedOnException = false;
428            final StopWatch watch = new StopWatch();
429            final AtomicInteger aggregated = new AtomicInteger();
430            boolean done = false;
431            // not a for loop as on the fly may still run
432            while (!done) {
433                // check if we have already aggregate everything
434                if (allTasksSubmitted.get() && aggregated.intValue() >= total.get()) {
435                    LOG.debug("Done aggregating {} exchanges on the fly.", aggregated);
436                    break;
437                }
438
439                Future<Exchange> future;
440                if (timedOut.get()) {
441                    // we are timed out but try to grab if some tasks has been completed
442                    // poll will return null if no tasks is present
443                    future = completion.poll();
444                    LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future);
445                } else if (timeout > 0) {
446                    long left = timeout - watch.taken();
447                    if (left < 0) {
448                        left = 0;
449                    }
450                    LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left);
451                    future = completion.poll(left, TimeUnit.MILLISECONDS);
452                } else {
453                    LOG.trace("Polling completion task #{}", aggregated);
454                    // we must not block so poll every second
455                    future = completion.poll(1, TimeUnit.SECONDS);
456                    if (future == null) {
457                        // and continue loop which will recheck if we are done
458                        continue;
459                    }
460                }
461
462                if (future == null) {
463                    ParallelAggregateTimeoutTask task = new ParallelAggregateTimeoutTask(original, result, completion, aggregated, total, timedOut);
464                    if (parallelAggregate) {
465                        aggregateExecutorService.submit(task);
466                    } else {
467                        // in non parallel mode then just run the task
468                        task.run();
469                    }
470                } else {
471                    // there is a result to aggregate
472                    Exchange subExchange = future.get();
473
474                    // Decide whether to continue with the multicast or not; similar logic to the Pipeline
475                    Integer number = getExchangeIndex(subExchange);
476                    boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG);
477                    if (stopOnException && !continueProcessing) {
478                        // we want to stop on exception and an exception or failure occurred
479                        // this is similar to what the pipeline does, so we should do the same to not surprise end users
480                        // so we should set the failed exchange as the result and break out
481                        result.set(subExchange);
482                        stoppedOnException = true;
483                        break;
484                    }
485
486                    // we got a result so aggregate it
487                    ParallelAggregateTask task = new ParallelAggregateTask(result, subExchange, aggregated);
488                    if (parallelAggregate) {
489                        aggregateExecutorService.submit(task);
490                    } else {
491                        // in non parallel mode then just run the task
492                        task.run();
493                    }
494                }
495            }
496
497            if (timedOut.get() || stoppedOnException) {
498                if (timedOut.get()) {
499                    LOG.debug("Cancelling tasks due timeout after {} millis.", timeout);
500                }
501                if (stoppedOnException) {
502                    LOG.debug("Cancelling tasks due stopOnException.");
503                }
504                // cancel tasks as we timed out (its safe to cancel done tasks)
505                running.set(false);
506            }
507        }
508    }
509
510    /**
511     * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing.
512     */
513    private final class ParallelAggregateTask implements Runnable {
514
515        private final AtomicExchange result;
516        private final Exchange subExchange;
517        private final AtomicInteger aggregated;
518
519        private ParallelAggregateTask(AtomicExchange result, Exchange subExchange, AtomicInteger aggregated) {
520            this.result = result;
521            this.subExchange = subExchange;
522            this.aggregated = aggregated;
523        }
524
525        @Override
526        public void run() {
527            try {
528                if (parallelAggregate) {
529                    doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
530                } else {
531                    doAggregate(getAggregationStrategy(subExchange), result, subExchange);
532                }
533            } catch (Throwable e) {
534                // wrap in exception to explain where it failed
535                subExchange.setException(new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e));
536            } finally {
537                aggregated.incrementAndGet();
538            }
539        }
540    }
541
542    /**
543     * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing.
544     */
545    private final class ParallelAggregateTimeoutTask implements Runnable {
546
547        private final Exchange original;
548        private final AtomicExchange result;
549        private final CompletionService<Exchange> completion;
550        private final AtomicInteger aggregated;
551        private final AtomicInteger total;
552        private final AtomicBoolean timedOut;
553
554        private ParallelAggregateTimeoutTask(Exchange original, AtomicExchange result, CompletionService<Exchange> completion,
555                                             AtomicInteger aggregated, AtomicInteger total, AtomicBoolean timedOut) {
556            this.original = original;
557            this.result = result;
558            this.completion = completion;
559            this.aggregated = aggregated;
560            this.total = total;
561            this.timedOut = timedOut;
562        }
563
564        @Override
565        public void run() {
566            AggregationStrategy strategy = getAggregationStrategy(null);
567            if (strategy instanceof DelegateAggregationStrategy) {
568                strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
569            }
570            if (strategy instanceof TimeoutAwareAggregationStrategy) {
571                // notify the strategy we timed out
572                Exchange oldExchange = result.get();
573                if (oldExchange == null) {
574                    // if they all timed out the result may not have been set yet, so use the original exchange
575                    oldExchange = original;
576                }
577                ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout);
578            } else {
579                // log a WARN we timed out since it will not be aggregated and the Exchange will be lost
580                LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated.intValue());
581            }
582            LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue());
583            timedOut.set(true);
584
585            // mark that index as timed out, which allows us to try to retrieve
586            // any already completed tasks in the next loop
587            if (completion instanceof SubmitOrderedCompletionService) {
588                ((SubmitOrderedCompletionService<?>) completion).timeoutTask();
589            }
590
591            // we timed out so increment the counter
592            aggregated.incrementAndGet();
593        }
594    }
595
596    protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception {
597        AtomicInteger total = new AtomicInteger();
598        Iterator<ProcessorExchangePair> it = pairs.iterator();
599
600        while (it.hasNext()) {
601            ProcessorExchangePair pair = it.next();
602            // in case the iterator returns null then continue to next
603            if (pair == null) {
604                continue;
605            }
606            Exchange subExchange = pair.getExchange();
607            updateNewExchange(subExchange, total.get(), pairs, it);
608
609            boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
610            if (!sync) {
611                if (LOG.isTraceEnabled()) {
612                    LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId());
613                }
614                // the remainder of the multicast will be completed async
615                // so we break out now, then the callback will be invoked which then continue routing from where we left here
616                return false;
617            }
618
619            if (LOG.isTraceEnabled()) {
620                LOG.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId());
621            }
622
623            // Decide whether to continue with the multicast or not; similar logic to the Pipeline
624            // remember to test for stop on exception and aggregate before copying back results
625            boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
626            if (stopOnException && !continueProcessing) {
627                if (subExchange.getException() != null) {
628                    // wrap in exception to explain where it failed
629                    CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException());
630                    subExchange.setException(cause);
631                }
632                // we want to stop on exception, and the exception was handled by the error handler
633                // this is similar to what the pipeline does, so we should do the same to not surprise end users
634                // so we should set the failed exchange as the result and be done
635                result.set(subExchange);
636                return true;
637            }
638
639            LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange);
640
641            if (parallelAggregate) {
642                doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
643            } else {
644                doAggregate(getAggregationStrategy(subExchange), result, subExchange);
645            }
646
647            total.incrementAndGet();
648        }
649
650        LOG.debug("Done sequential processing {} exchanges", total);
651
652        return true;
653    }
654
655    private boolean doProcessSequential(final Exchange original, final AtomicExchange result,
656                                        final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it,
657                                        final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) {
658        boolean sync = true;
659
660        final Exchange exchange = pair.getExchange();
661        Processor processor = pair.getProcessor();
662        final Producer producer = pair.getProducer();
663
664        TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
665
666        // compute time taken if sending to another endpoint
667        final StopWatch watch = producer != null ? new StopWatch() : null;
668
669        try {
670            // prepare tracing starting from a new block
671            if (traced != null) {
672                traced.pushBlock();
673            }
674
675            if (producer != null) {
676                EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
677            }
678            // let the prepared process it, remember to begin the exchange pair
679            AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
680            pair.begin();
681            sync = async.process(exchange, new AsyncCallback() {
682                public void done(boolean doneSync) {
683                    // we are done with the exchange pair
684                    pair.done();
685
686                    // okay we are done, so notify the exchange was sent
687                    if (producer != null) {
688                        long timeTaken = watch.stop();
689                        Endpoint endpoint = producer.getEndpoint();
690                        // emit event that the exchange was sent to the endpoint
691                        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
692                    }
693
694                    // we only have to handle async completion of the routing slip
695                    if (doneSync) {
696                        return;
697                    }
698
699                    // continue processing the multicast asynchronously
700                    Exchange subExchange = exchange;
701
702                    // Decide whether to continue with the multicast or not; similar logic to the Pipeline
703                    // remember to test for stop on exception and aggregate before copying back results
704                    boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
705                    if (stopOnException && !continueProcessing) {
706                        if (subExchange.getException() != null) {
707                            // wrap in exception to explain where it failed
708                            subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
709                        } else {
710                            // we want to stop on exception, and the exception was handled by the error handler
711                            // this is similar to what the pipeline does, so we should do the same to not surprise end users
712                            // so we should set the failed exchange as the result and be done
713                            result.set(subExchange);
714                        }
715                        // and do the done work
716                        doDone(original, subExchange, pairs, callback, false, true);
717                        return;
718                    }
719
720                    try {
721                        if (parallelAggregate) {
722                            doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
723                        } else {
724                            doAggregate(getAggregationStrategy(subExchange), result, subExchange);
725                        }
726                    } catch (Throwable e) {
727                        // wrap in exception to explain where it failed
728                        subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
729                        // and do the done work
730                        doDone(original, subExchange, pairs, callback, false, true);
731                        return;
732                    }
733
734                    total.incrementAndGet();
735
736                    // maybe there are more processors to multicast
737                    while (it.hasNext()) {
738
739                        // prepare and run the next
740                        ProcessorExchangePair pair = it.next();
741                        subExchange = pair.getExchange();
742                        updateNewExchange(subExchange, total.get(), pairs, it);
743                        boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total);
744
745                        if (!sync) {
746                            LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
747                            return;
748                        }
749
750                        // Decide whether to continue with the multicast or not; similar logic to the Pipeline
751                        // remember to test for stop on exception and aggregate before copying back results
752                        continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG);
753                        if (stopOnException && !continueProcessing) {
754                            if (subExchange.getException() != null) {
755                                // wrap in exception to explain where it failed
756                                subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException()));
757                            } else {
758                                // we want to stop on exception, and the exception was handled by the error handler
759                                // this is similar to what the pipeline does, so we should do the same to not surprise end users
760                                // so we should set the failed exchange as the result and be done
761                                result.set(subExchange);
762                            }
763                            // and do the done work
764                            doDone(original, subExchange, pairs, callback, false, true);
765                            return;
766                        }
767
768                        // must catch any exceptions from aggregation
769                        try {
770                            if (parallelAggregate) {
771                                doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
772                            } else {
773                                doAggregate(getAggregationStrategy(subExchange), result, subExchange);
774                            }
775                        } catch (Throwable e) {
776                            // wrap in exception to explain where it failed
777                            subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e));
778                            // and do the done work
779                            doDone(original, subExchange, pairs, callback, false, true);
780                            return;
781                        }
782
783                        total.incrementAndGet();
784                    }
785
786                    // do the done work
787                    subExchange = result.get() != null ? result.get() : null;
788                    doDone(original, subExchange, pairs, callback, false, true);
789                }
790            });
791        } finally {
792            // pop the block so by next round we have the same staring point and thus the tracing looks accurate
793            if (traced != null) {
794                traced.popBlock();
795            }
796        }
797
798        return sync;
799    }
800
801    private void doProcessParallel(final ProcessorExchangePair pair) throws Exception {
802        final Exchange exchange = pair.getExchange();
803        Processor processor = pair.getProcessor();
804        Producer producer = pair.getProducer();
805
806        TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null;
807
808        // compute time taken if sending to another endpoint
809        StopWatch watch = null;
810        if (producer != null) {
811            watch = new StopWatch();
812        }
813
814        try {
815            // prepare tracing starting from a new block
816            if (traced != null) {
817                traced.pushBlock();
818            }
819
820            if (producer != null) {
821                EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint());
822            }
823            // let the prepared process it, remember to begin the exchange pair
824            AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
825            pair.begin();
826            // we invoke it synchronously as parallel async routing is too hard
827            AsyncProcessorHelper.process(async, exchange);
828        } finally {
829            pair.done();
830            // pop the block so by next round we have the same staring point and thus the tracing looks accurate
831            if (traced != null) {
832                traced.popBlock();
833            }
834            if (producer != null) {
835                long timeTaken = watch.stop();
836                Endpoint endpoint = producer.getEndpoint();
837                // emit event that the exchange was sent to the endpoint
838                // this is okay to do here in the finally block, as the processing is not using the async routing engine
839                //( we invoke it synchronously as parallel async routing is too hard)
840                EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
841            }
842        }
843    }
844
845    /**
846     * Common work which must be done when we are done multicasting.
847     * <p/>
848     * This logic applies for both running synchronous and asynchronous as there are multiple exist points
849     * when using the asynchronous routing engine. And therefore we want the logic in one method instead
850     * of being scattered.
851     *
852     * @param original     the original exchange
853     * @param subExchange  the current sub exchange, can be <tt>null</tt> for the synchronous part
854     * @param pairs        the pairs with the exchanges to process
855     * @param callback     the callback
856     * @param doneSync     the <tt>doneSync</tt> parameter to call on callback
857     * @param forceExhaust whether or not error handling is exhausted
858     */
859    protected void doDone(Exchange original, Exchange subExchange, final Iterable<ProcessorExchangePair> pairs,
860                          AsyncCallback callback, boolean doneSync, boolean forceExhaust) {
861
862        // we are done so close the pairs iterator
863        if (pairs != null && pairs instanceof Closeable) {
864            IOHelper.close((Closeable) pairs, "pairs", LOG);
865        }
866
867        AggregationStrategy strategy = getAggregationStrategy(subExchange);
868        if (strategy instanceof DelegateAggregationStrategy) {
869            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
870        }
871        // invoke the on completion callback
872        if (strategy instanceof CompletionAwareAggregationStrategy) {
873            ((CompletionAwareAggregationStrategy) strategy).onCompletion(subExchange);
874        }
875
876        // cleanup any per exchange aggregation strategy
877        removeAggregationStrategyFromExchange(original);
878
879        // we need to know if there was an exception, and if the stopOnException option was enabled
880        // also we would need to know if any error handler has attempted redelivery and exhausted
881        boolean stoppedOnException = false;
882        boolean exception = false;
883        boolean exhaust = forceExhaust || subExchange != null && (subExchange.getException() != null || ExchangeHelper.isRedeliveryExhausted(subExchange));
884        if (original.getException() != null || subExchange != null && subExchange.getException() != null) {
885            // there was an exception and we stopped
886            stoppedOnException = isStopOnException();
887            exception = true;
888        }
889
890        // must copy results at this point
891        if (subExchange != null) {
892            if (stoppedOnException) {
893                // if we stopped due an exception then only propagate the exception
894                original.setException(subExchange.getException());
895            } else {
896                // copy the current result to original so it will contain this result of this eip
897                ExchangeHelper.copyResults(original, subExchange);
898            }
899        }
900
901        // .. and then if there was an exception we need to configure the redelivery exhaust
902        // for example the noErrorHandler will not cause redelivery exhaust so if this error
903        // handled has been in use, then the exhaust would be false (if not forced)
904        if (exception) {
905            // multicast uses error handling on its output processors and they have tried to redeliver
906            // so we shall signal back to the other error handlers that we are exhausted and they should not
907            // also try to redeliver as we will then do that twice
908            original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
909        }
910
911        callback.done(doneSync);
912    }
913
914    /**
915     * Aggregate the {@link Exchange} with the current result.
916     * This method is synchronized and is called directly when parallelAggregate is disabled (by default).
917     *
918     * @param strategy the aggregation strategy to use
919     * @param result   the current result
920     * @param exchange the exchange to be added to the result
921     * @see #doAggregateInternal(org.apache.camel.processor.aggregate.AggregationStrategy, org.apache.camel.util.concurrent.AtomicExchange, org.apache.camel.Exchange)
922     */
923    protected synchronized void doAggregate(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) {
924        doAggregateInternal(strategy, result, exchange);
925    }
926
927    /**
928     * Aggregate the {@link Exchange} with the current result.
929     * This method is unsynchronized and is called directly when parallelAggregate is enabled.
930     * In all other cases, this method is called from the doAggregate which is a synchronized method
931     *
932     * @param strategy the aggregation strategy to use
933     * @param result   the current result
934     * @param exchange the exchange to be added to the result
935     * @see #doAggregate(org.apache.camel.processor.aggregate.AggregationStrategy, org.apache.camel.util.concurrent.AtomicExchange, org.apache.camel.Exchange)
936     */
937    protected void doAggregateInternal(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) {
938        if (strategy != null) {
939            // prepare the exchanges for aggregation
940            Exchange oldExchange = result.get();
941            ExchangeHelper.prepareAggregation(oldExchange, exchange);
942            result.set(strategy.aggregate(oldExchange, exchange));
943        }
944    }
945
946    protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
947                                     Iterator<ProcessorExchangePair> it) {
948        exchange.setProperty(Exchange.MULTICAST_INDEX, index);
949        if (it.hasNext()) {
950            exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE);
951        } else {
952            exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE);
953        }
954    }
955
956    protected Integer getExchangeIndex(Exchange exchange) {
957        return exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class);
958    }
959
960    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
961        List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
962
963        StreamCache streamCache = null;
964        if (isParallelProcessing() && exchange.getIn().getBody() instanceof StreamCache) {
965            // in parallel processing case, the stream must be copied, therefore get the stream
966            streamCache = (StreamCache) exchange.getIn().getBody();
967        }
968
969        int index = 0;
970        for (Processor processor : processors) {
971            // copy exchange, and do not share the unit of work
972            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
973
974            if (streamCache != null) {
975                if (index > 0) {
976                    // copy it otherwise parallel processing is not possible,
977                    // because streams can only be read once
978                    StreamCache copiedStreamCache = streamCache.copy(copy);
979                    if (copiedStreamCache != null) {
980                        copy.getIn().setBody(copiedStreamCache);
981                    }
982                }
983            }
984
985            // If the multi-cast processor has an aggregation strategy
986            // then the StreamCache created by the child routes must not be 
987            // closed by the unit of work of the child route, but by the unit of 
988            // work of the parent route or grand parent route or grand grand parent route ...(in case of nesting).
989            // Set therefore the unit of work of the  parent route as stream cache unit of work, 
990            // if it is not already set.
991            if (copy.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
992                copy.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, exchange.getUnitOfWork());
993            }
994            // if we share unit of work, we need to prepare the child exchange
995            if (isShareUnitOfWork()) {
996                prepareSharedUnitOfWork(copy, exchange);
997            }
998
999            // and add the pair
1000            RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
1001            result.add(createProcessorExchangePair(index++, processor, copy, routeContext));
1002        }
1003
1004        if (exchange.getException() != null) {
1005            // force any exceptions occurred during creation of exchange paris to be thrown
1006            // before returning the answer;
1007            throw exchange.getException();
1008        }
1009
1010        return result;
1011    }
1012
1013    /**
1014     * Creates the {@link ProcessorExchangePair} which holds the processor and exchange to be send out.
1015     * <p/>
1016     * You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they
1017     * need to be specially prepared before use.
1018     *
1019     * @param index        the index
1020     * @param processor    the processor
1021     * @param exchange     the exchange
1022     * @param routeContext the route context
1023     * @return prepared for use
1024     */
1025    protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange,
1026                                                                RouteContext routeContext) {
1027        Processor prepared = processor;
1028
1029        // set property which endpoint we send to
1030        setToEndpoint(exchange, prepared);
1031
1032        // rework error handling to support fine grained error handling
1033        prepared = createErrorHandler(routeContext, exchange, prepared);
1034
1035        // invoke on prepare on the exchange if specified
1036        if (onPrepare != null) {
1037            try {
1038                onPrepare.process(exchange);
1039            } catch (Exception e) {
1040                exchange.setException(e);
1041            }
1042        }
1043        return new DefaultProcessorExchangePair(index, processor, prepared, exchange);
1044    }
1045
1046    protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) {
1047        Processor answer;
1048
1049        boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class);
1050
1051        // do not wrap in error handler if we are inside a try block
1052        if (!tryBlock && routeContext != null) {
1053            // wrap the producer in error handler so we have fine grained error handling on
1054            // the output side instead of the input side
1055            // this is needed to support redelivery on that output alone and not doing redelivery
1056            // for the entire multicast block again which will start from scratch again
1057
1058            // create key for cache
1059            final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor);
1060
1061            // lookup cached first to reuse and preserve memory
1062            answer = errorHandlers.get(key);
1063            if (answer != null) {
1064                LOG.trace("Using existing error handler for: {}", processor);
1065                return answer;
1066            }
1067
1068            LOG.trace("Creating error handler for: {}", processor);
1069            ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
1070            // create error handler (create error handler directly to keep it light weight,
1071            // instead of using ProcessorDefinition.wrapInErrorHandler)
1072            try {
1073                processor = builder.createErrorHandler(routeContext, processor);
1074
1075                // and wrap in unit of work processor so the copy exchange also can run under UoW
1076                answer = createUnitOfWorkProcessor(routeContext, processor, exchange);
1077
1078                boolean child = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null;
1079
1080                // must start the error handler
1081                ServiceHelper.startServices(answer);
1082
1083                // here we don't cache the child unit of work
1084                if (!child) {
1085                    // add to cache
1086                    errorHandlers.putIfAbsent(key, answer);
1087                }
1088
1089            } catch (Exception e) {
1090                throw ObjectHelper.wrapRuntimeCamelException(e);
1091            }
1092        } else {
1093            // and wrap in unit of work processor so the copy exchange also can run under UoW
1094            answer = createUnitOfWorkProcessor(routeContext, processor, exchange);
1095        }
1096
1097        return answer;
1098    }
1099
1100    /**
1101     * Strategy to create the unit of work to be used for the sub route
1102     *
1103     * @param routeContext the route context
1104     * @param processor    the processor
1105     * @param exchange     the exchange
1106     * @return the unit of work processor
1107     */
1108    protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) {
1109        CamelInternalProcessor internal = new CamelInternalProcessor(processor);
1110
1111        // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
1112        UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
1113        if (parent != null) {
1114            internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeContext, parent));
1115        } else {
1116            internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
1117        }
1118
1119        return internal;
1120    }
1121
1122    /**
1123     * Prepares the exchange for participating in a shared unit of work
1124     * <p/>
1125     * This ensures a child exchange can access its parent {@link UnitOfWork} when it participate
1126     * in a shared unit of work.
1127     *
1128     * @param childExchange  the child exchange
1129     * @param parentExchange the parent exchange
1130     */
1131    protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange parentExchange) {
1132        childExchange.setProperty(Exchange.PARENT_UNIT_OF_WORK, parentExchange.getUnitOfWork());
1133    }
1134
1135    protected void doStart() throws Exception {
1136        if (isParallelProcessing() && executorService == null) {
1137            throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set");
1138        }
1139        if (timeout > 0 && !isParallelProcessing()) {
1140            throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled");
1141        }
1142        if (isParallelProcessing() && aggregateExecutorService == null) {
1143            // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread
1144            // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run
1145            // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing
1146            String name = getClass().getSimpleName() + "-AggregateTask";
1147            aggregateExecutorService = createAggregateExecutorService(name);
1148        }
1149        ServiceHelper.startServices(aggregationStrategy, processors);
1150    }
1151
1152    /**
1153     * Strategy to create the thread pool for the aggregator background task which waits for and aggregates
1154     * completed tasks when running in parallel mode.
1155     *
1156     * @param name  the suggested name for the background thread
1157     * @return the thread pool
1158     */
1159    protected synchronized ExecutorService createAggregateExecutorService(String name) {
1160        // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in
1161        return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name);
1162    }
1163
1164    @Override
1165    protected void doStop() throws Exception {
1166        ServiceHelper.stopServices(processors, errorHandlers, aggregationStrategy);
1167    }
1168
1169    @Override
1170    protected void doShutdown() throws Exception {
1171        ServiceHelper.stopAndShutdownServices(processors, errorHandlers, aggregationStrategy);
1172        // only clear error handlers when shutting down
1173        errorHandlers.clear();
1174
1175        if (shutdownExecutorService && executorService != null) {
1176            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
1177        }
1178        if (aggregateExecutorService != null) {
1179            getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService);
1180        }
1181    }
1182
1183    protected static void setToEndpoint(Exchange exchange, Processor processor) {
1184        if (processor instanceof Producer) {
1185            Producer producer = (Producer) processor;
1186            exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
1187        }
1188    }
1189
1190    protected AggregationStrategy getAggregationStrategy(Exchange exchange) {
1191        AggregationStrategy answer = null;
1192
1193        // prefer to use per Exchange aggregation strategy over a global strategy
1194        if (exchange != null) {
1195            Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
1196            Map<Object, AggregationStrategy> map = CastUtils.cast(property);
1197            if (map != null) {
1198                answer = map.get(this);
1199            }
1200        }
1201        if (answer == null) {
1202            // fallback to global strategy
1203            answer = getAggregationStrategy();
1204        }
1205        return answer;
1206    }
1207
1208    /**
1209     * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}.
1210     *
1211     * @param exchange            the exchange
1212     * @param aggregationStrategy the strategy
1213     */
1214    protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) {
1215        Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
1216        Map<Object, AggregationStrategy> map = CastUtils.cast(property);
1217        if (map == null) {
1218            map = new ConcurrentHashMap<Object, AggregationStrategy>();
1219        } else {
1220            // it is not safe to use the map directly as the exchange doesn't have the deep copy of it's properties
1221            // we just create a new copy if we need to change the map
1222            map = new ConcurrentHashMap<Object, AggregationStrategy>(map);
1223        }
1224        // store the strategy using this processor as the key
1225        // (so we can store multiple strategies on the same exchange)
1226        map.put(this, aggregationStrategy);
1227        exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map);
1228    }
1229
1230    /**
1231     * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange}
1232     * which must be done after use.
1233     *
1234     * @param exchange the current exchange
1235     */
1236    protected void removeAggregationStrategyFromExchange(Exchange exchange) {
1237        Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
1238        Map<Object, AggregationStrategy> map = CastUtils.cast(property);
1239        if (map == null) {
1240            return;
1241        }
1242        // remove the strategy using this processor as the key
1243        map.remove(this);
1244    }
1245
1246    /**
1247     * Is the multicast processor working in streaming mode?
1248     * <p/>
1249     * In streaming mode:
1250     * <ul>
1251     * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li>
1252     * <li>for parallel processing, we start aggregating responses as they get send back to the processor;
1253     * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li>
1254     * </ul>
1255     */
1256    public boolean isStreaming() {
1257        return streaming;
1258    }
1259
1260    /**
1261     * Should the multicast processor stop processing further exchanges in case of an exception occurred?
1262     */
1263    public boolean isStopOnException() {
1264        return stopOnException;
1265    }
1266
1267    /**
1268     * Returns the producers to multicast to
1269     */
1270    public Collection<Processor> getProcessors() {
1271        return processors;
1272    }
1273
1274    /**
1275     * An optional timeout in millis when using parallel processing
1276     */
1277    public long getTimeout() {
1278        return timeout;
1279    }
1280
1281    /**
1282     * Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead.
1283     */
1284    public AggregationStrategy getAggregationStrategy() {
1285        return aggregationStrategy;
1286    }
1287
1288    public boolean isParallelProcessing() {
1289        return parallelProcessing;
1290    }
1291
1292    public boolean isParallelAggregate() {
1293        return parallelAggregate;
1294    }
1295
1296    public boolean isShareUnitOfWork() {
1297        return shareUnitOfWork;
1298    }
1299
1300    public List<Processor> next() {
1301        if (!hasNext()) {
1302            return null;
1303        }
1304        return new ArrayList<Processor>(processors);
1305    }
1306
1307    public boolean hasNext() {
1308        return processors != null && !processors.isEmpty();
1309    }
1310}