001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.io.Closeable; 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.Callable; 026import java.util.concurrent.CompletionService; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.ExecutorCompletionService; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Future; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicBoolean; 036import java.util.concurrent.atomic.AtomicInteger; 037 038import org.apache.camel.AsyncCallback; 039import org.apache.camel.AsyncProcessor; 040import org.apache.camel.CamelContext; 041import org.apache.camel.CamelExchangeException; 042import org.apache.camel.Endpoint; 043import org.apache.camel.ErrorHandlerFactory; 044import org.apache.camel.Exchange; 045import org.apache.camel.Navigate; 046import org.apache.camel.Processor; 047import org.apache.camel.Producer; 048import org.apache.camel.StreamCache; 049import org.apache.camel.Traceable; 050import org.apache.camel.processor.aggregate.AggregationStrategy; 051import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy; 052import org.apache.camel.processor.aggregate.DelegateAggregationStrategy; 053import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy; 054import org.apache.camel.spi.IdAware; 055import org.apache.camel.spi.RouteContext; 056import org.apache.camel.spi.TracedRouteNodes; 057import org.apache.camel.spi.UnitOfWork; 058import org.apache.camel.support.ServiceSupport; 059import org.apache.camel.util.AsyncProcessorConverterHelper; 060import org.apache.camel.util.AsyncProcessorHelper; 061import org.apache.camel.util.CastUtils; 062import org.apache.camel.util.EventHelper; 063import org.apache.camel.util.ExchangeHelper; 064import org.apache.camel.util.IOHelper; 065import org.apache.camel.util.KeyValueHolder; 066import org.apache.camel.util.ObjectHelper; 067import org.apache.camel.util.ServiceHelper; 068import org.apache.camel.util.StopWatch; 069import org.apache.camel.util.concurrent.AtomicException; 070import org.apache.camel.util.concurrent.AtomicExchange; 071import org.apache.camel.util.concurrent.SubmitOrderedCompletionService; 072import org.slf4j.Logger; 073import org.slf4j.LoggerFactory; 074 075import static org.apache.camel.util.ObjectHelper.notNull; 076 077 078/** 079 * Implements the Multicast pattern to send a message exchange to a number of 080 * endpoints, each endpoint receiving a copy of the message exchange. 081 * 082 * @version 083 * @see Pipeline 084 */ 085public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware { 086 087 private static final Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class); 088 089 /** 090 * Class that represent each step in the multicast route to do 091 */ 092 static final class DefaultProcessorExchangePair implements ProcessorExchangePair { 093 private final int index; 094 private final Processor processor; 095 private final Processor prepared; 096 private final Exchange exchange; 097 098 private DefaultProcessorExchangePair(int index, Processor processor, Processor prepared, Exchange exchange) { 099 this.index = index; 100 this.processor = processor; 101 this.prepared = prepared; 102 this.exchange = exchange; 103 } 104 105 public int getIndex() { 106 return index; 107 } 108 109 public Exchange getExchange() { 110 return exchange; 111 } 112 113 public Producer getProducer() { 114 if (processor instanceof Producer) { 115 return (Producer) processor; 116 } 117 return null; 118 } 119 120 public Processor getProcessor() { 121 return prepared; 122 } 123 124 public void begin() { 125 // noop 126 } 127 128 public void done() { 129 // noop 130 } 131 132 } 133 134 /** 135 * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges 136 * <p/> 137 * See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods. 138 */ 139 static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> { 140 141 PreparedErrorHandler(RouteContext key, Processor value) { 142 super(key, value); 143 } 144 145 } 146 147 protected final Processor onPrepare; 148 private final CamelContext camelContext; 149 private String id; 150 private Collection<Processor> processors; 151 private final AggregationStrategy aggregationStrategy; 152 private final boolean parallelProcessing; 153 private final boolean streaming; 154 private final boolean parallelAggregate; 155 private final boolean stopOnException; 156 private final ExecutorService executorService; 157 private final boolean shutdownExecutorService; 158 private ExecutorService aggregateExecutorService; 159 private final long timeout; 160 private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>(); 161 private final boolean shareUnitOfWork; 162 163 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) { 164 this(camelContext, processors, null); 165 } 166 167 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) { 168 this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false, false); 169 } 170 171 @Deprecated 172 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, 173 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, 174 boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { 175 this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, 176 streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false); 177 } 178 179 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, 180 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, 181 boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, 182 boolean parallelAggregate) { 183 notNull(camelContext, "camelContext"); 184 this.camelContext = camelContext; 185 this.processors = processors; 186 this.aggregationStrategy = aggregationStrategy; 187 this.executorService = executorService; 188 this.shutdownExecutorService = shutdownExecutorService; 189 this.streaming = streaming; 190 this.stopOnException = stopOnException; 191 // must enable parallel if executor service is provided 192 this.parallelProcessing = parallelProcessing || executorService != null; 193 this.timeout = timeout; 194 this.onPrepare = onPrepare; 195 this.shareUnitOfWork = shareUnitOfWork; 196 this.parallelAggregate = parallelAggregate; 197 } 198 199 @Override 200 public String toString() { 201 return "Multicast[" + getProcessors() + "]"; 202 } 203 204 public String getId() { 205 return id; 206 } 207 208 public void setId(String id) { 209 this.id = id; 210 } 211 212 public String getTraceLabel() { 213 return "multicast"; 214 } 215 216 public CamelContext getCamelContext() { 217 return camelContext; 218 } 219 220 public void process(Exchange exchange) throws Exception { 221 AsyncProcessorHelper.process(this, exchange); 222 } 223 224 public boolean process(Exchange exchange, AsyncCallback callback) { 225 final AtomicExchange result = new AtomicExchange(); 226 Iterable<ProcessorExchangePair> pairs = null; 227 228 try { 229 boolean sync = true; 230 231 pairs = createProcessorExchangePairs(exchange); 232 233 if (isParallelProcessing()) { 234 // ensure an executor is set when running in parallel 235 ObjectHelper.notNull(executorService, "executorService", this); 236 doProcessParallel(exchange, result, pairs, isStreaming(), callback); 237 } else { 238 sync = doProcessSequential(exchange, result, pairs, callback); 239 } 240 241 if (!sync) { 242 // the remainder of the multicast will be completed async 243 // so we break out now, then the callback will be invoked which then continue routing from where we left here 244 return false; 245 } 246 } catch (Throwable e) { 247 exchange.setException(e); 248 // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted 249 // and do the done work 250 doDone(exchange, null, pairs, callback, true, false); 251 return true; 252 } 253 254 // multicasting was processed successfully 255 // and do the done work 256 Exchange subExchange = result.get() != null ? result.get() : null; 257 doDone(exchange, subExchange, pairs, callback, true, true); 258 return true; 259 } 260 261 protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs, 262 final boolean streaming, final AsyncCallback callback) throws Exception { 263 264 ObjectHelper.notNull(executorService, "ExecutorService", this); 265 ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this); 266 267 final CompletionService<Exchange> completion; 268 if (streaming) { 269 // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence) 270 completion = new ExecutorCompletionService<Exchange>(executorService); 271 } else { 272 // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence) 273 completion = new SubmitOrderedCompletionService<Exchange>(executorService); 274 } 275 276 final AtomicInteger total = new AtomicInteger(0); 277 final Iterator<ProcessorExchangePair> it = pairs.iterator(); 278 279 if (it.hasNext()) { 280 // when parallel then aggregate on the fly 281 final AtomicBoolean running = new AtomicBoolean(true); 282 final AtomicBoolean allTasksSubmitted = new AtomicBoolean(); 283 final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1); 284 final AtomicException executionException = new AtomicException(); 285 286 // issue task to execute in separate thread so it can aggregate on-the-fly 287 // while we submit new tasks, and those tasks complete concurrently 288 // this allows us to optimize work and reduce memory consumption 289 final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running, 290 aggregationOnTheFlyDone, allTasksSubmitted, executionException); 291 final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean(); 292 293 LOG.trace("Starting to submit parallel tasks"); 294 295 while (it.hasNext()) { 296 final ProcessorExchangePair pair = it.next(); 297 // in case the iterator returns null then continue to next 298 if (pair == null) { 299 continue; 300 } 301 302 final Exchange subExchange = pair.getExchange(); 303 updateNewExchange(subExchange, total.intValue(), pairs, it); 304 305 completion.submit(new Callable<Exchange>() { 306 public Exchange call() throws Exception { 307 // only start the aggregation task when the task is being executed to avoid staring 308 // the aggregation task to early and pile up too many threads 309 if (aggregationTaskSubmitted.compareAndSet(false, true)) { 310 // but only submit the task once 311 aggregateExecutorService.submit(aggregateOnTheFlyTask); 312 } 313 314 if (!running.get()) { 315 // do not start processing the task if we are not running 316 return subExchange; 317 } 318 319 try { 320 doProcessParallel(pair); 321 } catch (Throwable e) { 322 subExchange.setException(e); 323 } 324 325 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 326 Integer number = getExchangeIndex(subExchange); 327 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); 328 if (stopOnException && !continueProcessing) { 329 // signal to stop running 330 running.set(false); 331 // throw caused exception 332 if (subExchange.getException() != null) { 333 // wrap in exception to explain where it failed 334 CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException()); 335 subExchange.setException(cause); 336 } 337 } 338 339 LOG.trace("Parallel processing complete for exchange: {}", subExchange); 340 return subExchange; 341 } 342 }); 343 344 total.incrementAndGet(); 345 } 346 347 // signal all tasks has been submitted 348 LOG.trace("Signaling that all {} tasks has been submitted.", total.get()); 349 allTasksSubmitted.set(true); 350 351 // its to hard to do parallel async routing so we let the caller thread be synchronously 352 // and have it pickup the replies and do the aggregation (eg we use a latch to wait) 353 // wait for aggregation to be done 354 LOG.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", total.get(), original.getExchangeId()); 355 aggregationOnTheFlyDone.await(); 356 357 // did we fail for whatever reason, if so throw that caused exception 358 if (executionException.get() != null) { 359 if (LOG.isDebugEnabled()) { 360 LOG.debug("Parallel processing failed due {}", executionException.get().getMessage()); 361 } 362 throw executionException.get(); 363 } 364 } 365 366 // no everything is okay so we are done 367 LOG.debug("Done parallel processing {} exchanges", total); 368 } 369 370 /** 371 * Boss worker to control aggregate on-the-fly for completed tasks when using parallel processing. 372 * <p/> 373 * This ensures lower memory consumption as we do not need to keep all completed tasks in memory 374 * before we perform aggregation. Instead this separate thread will run and aggregate when new 375 * completed tasks is done. 376 * <p/> 377 * The logic is fairly complex as this implementation has to keep track how far it got, and also 378 * signal back to the <i>main</t> thread when its done, so the <i>main</t> thread can continue 379 * processing when the entire splitting is done. 380 */ 381 private final class AggregateOnTheFlyTask implements Runnable { 382 383 private final AtomicExchange result; 384 private final Exchange original; 385 private final AtomicInteger total; 386 private final CompletionService<Exchange> completion; 387 private final AtomicBoolean running; 388 private final CountDownLatch aggregationOnTheFlyDone; 389 private final AtomicBoolean allTasksSubmitted; 390 private final AtomicException executionException; 391 392 private AggregateOnTheFlyTask(AtomicExchange result, Exchange original, AtomicInteger total, 393 CompletionService<Exchange> completion, AtomicBoolean running, 394 CountDownLatch aggregationOnTheFlyDone, AtomicBoolean allTasksSubmitted, 395 AtomicException executionException) { 396 this.result = result; 397 this.original = original; 398 this.total = total; 399 this.completion = completion; 400 this.running = running; 401 this.aggregationOnTheFlyDone = aggregationOnTheFlyDone; 402 this.allTasksSubmitted = allTasksSubmitted; 403 this.executionException = executionException; 404 } 405 406 public void run() { 407 LOG.trace("Aggregate on the fly task started for exchangeId: {}", original.getExchangeId()); 408 409 try { 410 aggregateOnTheFly(); 411 } catch (Throwable e) { 412 if (e instanceof Exception) { 413 executionException.set((Exception) e); 414 } else { 415 executionException.set(ObjectHelper.wrapRuntimeCamelException(e)); 416 } 417 } finally { 418 // must signal we are done so the latch can open and let the other thread continue processing 419 LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId()); 420 LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId()); 421 aggregationOnTheFlyDone.countDown(); 422 } 423 } 424 425 private void aggregateOnTheFly() throws InterruptedException, ExecutionException { 426 final AtomicBoolean timedOut = new AtomicBoolean(); 427 boolean stoppedOnException = false; 428 final StopWatch watch = new StopWatch(); 429 final AtomicInteger aggregated = new AtomicInteger(); 430 boolean done = false; 431 // not a for loop as on the fly may still run 432 while (!done) { 433 // check if we have already aggregate everything 434 if (allTasksSubmitted.get() && aggregated.intValue() >= total.get()) { 435 LOG.debug("Done aggregating {} exchanges on the fly.", aggregated); 436 break; 437 } 438 439 Future<Exchange> future; 440 if (timedOut.get()) { 441 // we are timed out but try to grab if some tasks has been completed 442 // poll will return null if no tasks is present 443 future = completion.poll(); 444 LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future); 445 } else if (timeout > 0) { 446 long left = timeout - watch.taken(); 447 if (left < 0) { 448 left = 0; 449 } 450 LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left); 451 future = completion.poll(left, TimeUnit.MILLISECONDS); 452 } else { 453 LOG.trace("Polling completion task #{}", aggregated); 454 // we must not block so poll every second 455 future = completion.poll(1, TimeUnit.SECONDS); 456 if (future == null) { 457 // and continue loop which will recheck if we are done 458 continue; 459 } 460 } 461 462 if (future == null) { 463 ParallelAggregateTimeoutTask task = new ParallelAggregateTimeoutTask(original, result, completion, aggregated, total, timedOut); 464 if (parallelAggregate) { 465 aggregateExecutorService.submit(task); 466 } else { 467 // in non parallel mode then just run the task 468 task.run(); 469 } 470 } else { 471 // there is a result to aggregate 472 Exchange subExchange = future.get(); 473 474 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 475 Integer number = getExchangeIndex(subExchange); 476 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); 477 if (stopOnException && !continueProcessing) { 478 // we want to stop on exception and an exception or failure occurred 479 // this is similar to what the pipeline does, so we should do the same to not surprise end users 480 // so we should set the failed exchange as the result and break out 481 result.set(subExchange); 482 stoppedOnException = true; 483 break; 484 } 485 486 // we got a result so aggregate it 487 ParallelAggregateTask task = new ParallelAggregateTask(result, subExchange, aggregated); 488 if (parallelAggregate) { 489 aggregateExecutorService.submit(task); 490 } else { 491 // in non parallel mode then just run the task 492 task.run(); 493 } 494 } 495 } 496 497 if (timedOut.get() || stoppedOnException) { 498 if (timedOut.get()) { 499 LOG.debug("Cancelling tasks due timeout after {} millis.", timeout); 500 } 501 if (stoppedOnException) { 502 LOG.debug("Cancelling tasks due stopOnException."); 503 } 504 // cancel tasks as we timed out (its safe to cancel done tasks) 505 running.set(false); 506 } 507 } 508 } 509 510 /** 511 * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing. 512 */ 513 private final class ParallelAggregateTask implements Runnable { 514 515 private final AtomicExchange result; 516 private final Exchange subExchange; 517 private final AtomicInteger aggregated; 518 519 private ParallelAggregateTask(AtomicExchange result, Exchange subExchange, AtomicInteger aggregated) { 520 this.result = result; 521 this.subExchange = subExchange; 522 this.aggregated = aggregated; 523 } 524 525 @Override 526 public void run() { 527 try { 528 if (parallelAggregate) { 529 doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); 530 } else { 531 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 532 } 533 } catch (Throwable e) { 534 // wrap in exception to explain where it failed 535 subExchange.setException(new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e)); 536 } finally { 537 aggregated.incrementAndGet(); 538 } 539 } 540 } 541 542 /** 543 * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing. 544 */ 545 private final class ParallelAggregateTimeoutTask implements Runnable { 546 547 private final Exchange original; 548 private final AtomicExchange result; 549 private final CompletionService<Exchange> completion; 550 private final AtomicInteger aggregated; 551 private final AtomicInteger total; 552 private final AtomicBoolean timedOut; 553 554 private ParallelAggregateTimeoutTask(Exchange original, AtomicExchange result, CompletionService<Exchange> completion, 555 AtomicInteger aggregated, AtomicInteger total, AtomicBoolean timedOut) { 556 this.original = original; 557 this.result = result; 558 this.completion = completion; 559 this.aggregated = aggregated; 560 this.total = total; 561 this.timedOut = timedOut; 562 } 563 564 @Override 565 public void run() { 566 AggregationStrategy strategy = getAggregationStrategy(null); 567 if (strategy instanceof DelegateAggregationStrategy) { 568 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 569 } 570 if (strategy instanceof TimeoutAwareAggregationStrategy) { 571 // notify the strategy we timed out 572 Exchange oldExchange = result.get(); 573 if (oldExchange == null) { 574 // if they all timed out the result may not have been set yet, so use the original exchange 575 oldExchange = original; 576 } 577 ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout); 578 } else { 579 // log a WARN we timed out since it will not be aggregated and the Exchange will be lost 580 LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated.intValue()); 581 } 582 LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue()); 583 timedOut.set(true); 584 585 // mark that index as timed out, which allows us to try to retrieve 586 // any already completed tasks in the next loop 587 if (completion instanceof SubmitOrderedCompletionService) { 588 ((SubmitOrderedCompletionService<?>) completion).timeoutTask(); 589 } 590 591 // we timed out so increment the counter 592 aggregated.incrementAndGet(); 593 } 594 } 595 596 protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception { 597 AtomicInteger total = new AtomicInteger(); 598 Iterator<ProcessorExchangePair> it = pairs.iterator(); 599 600 while (it.hasNext()) { 601 ProcessorExchangePair pair = it.next(); 602 // in case the iterator returns null then continue to next 603 if (pair == null) { 604 continue; 605 } 606 Exchange subExchange = pair.getExchange(); 607 updateNewExchange(subExchange, total.get(), pairs, it); 608 609 boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total); 610 if (!sync) { 611 if (LOG.isTraceEnabled()) { 612 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId()); 613 } 614 // the remainder of the multicast will be completed async 615 // so we break out now, then the callback will be invoked which then continue routing from where we left here 616 return false; 617 } 618 619 if (LOG.isTraceEnabled()) { 620 LOG.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId()); 621 } 622 623 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 624 // remember to test for stop on exception and aggregate before copying back results 625 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); 626 if (stopOnException && !continueProcessing) { 627 if (subExchange.getException() != null) { 628 // wrap in exception to explain where it failed 629 CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException()); 630 subExchange.setException(cause); 631 } 632 // we want to stop on exception, and the exception was handled by the error handler 633 // this is similar to what the pipeline does, so we should do the same to not surprise end users 634 // so we should set the failed exchange as the result and be done 635 result.set(subExchange); 636 return true; 637 } 638 639 LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange); 640 641 if (parallelAggregate) { 642 doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); 643 } else { 644 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 645 } 646 647 total.incrementAndGet(); 648 } 649 650 LOG.debug("Done sequential processing {} exchanges", total); 651 652 return true; 653 } 654 655 private boolean doProcessSequential(final Exchange original, final AtomicExchange result, 656 final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it, 657 final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) { 658 boolean sync = true; 659 660 final Exchange exchange = pair.getExchange(); 661 Processor processor = pair.getProcessor(); 662 final Producer producer = pair.getProducer(); 663 664 TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; 665 666 // compute time taken if sending to another endpoint 667 final StopWatch watch = producer != null ? new StopWatch() : null; 668 669 try { 670 // prepare tracing starting from a new block 671 if (traced != null) { 672 traced.pushBlock(); 673 } 674 675 if (producer != null) { 676 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); 677 } 678 // let the prepared process it, remember to begin the exchange pair 679 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); 680 pair.begin(); 681 sync = async.process(exchange, new AsyncCallback() { 682 public void done(boolean doneSync) { 683 // we are done with the exchange pair 684 pair.done(); 685 686 // okay we are done, so notify the exchange was sent 687 if (producer != null) { 688 long timeTaken = watch.stop(); 689 Endpoint endpoint = producer.getEndpoint(); 690 // emit event that the exchange was sent to the endpoint 691 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 692 } 693 694 // we only have to handle async completion of the routing slip 695 if (doneSync) { 696 return; 697 } 698 699 // continue processing the multicast asynchronously 700 Exchange subExchange = exchange; 701 702 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 703 // remember to test for stop on exception and aggregate before copying back results 704 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); 705 if (stopOnException && !continueProcessing) { 706 if (subExchange.getException() != null) { 707 // wrap in exception to explain where it failed 708 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException())); 709 } else { 710 // we want to stop on exception, and the exception was handled by the error handler 711 // this is similar to what the pipeline does, so we should do the same to not surprise end users 712 // so we should set the failed exchange as the result and be done 713 result.set(subExchange); 714 } 715 // and do the done work 716 doDone(original, subExchange, pairs, callback, false, true); 717 return; 718 } 719 720 try { 721 if (parallelAggregate) { 722 doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); 723 } else { 724 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 725 } 726 } catch (Throwable e) { 727 // wrap in exception to explain where it failed 728 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); 729 // and do the done work 730 doDone(original, subExchange, pairs, callback, false, true); 731 return; 732 } 733 734 total.incrementAndGet(); 735 736 // maybe there are more processors to multicast 737 while (it.hasNext()) { 738 739 // prepare and run the next 740 ProcessorExchangePair pair = it.next(); 741 subExchange = pair.getExchange(); 742 updateNewExchange(subExchange, total.get(), pairs, it); 743 boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total); 744 745 if (!sync) { 746 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId()); 747 return; 748 } 749 750 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 751 // remember to test for stop on exception and aggregate before copying back results 752 continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); 753 if (stopOnException && !continueProcessing) { 754 if (subExchange.getException() != null) { 755 // wrap in exception to explain where it failed 756 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException())); 757 } else { 758 // we want to stop on exception, and the exception was handled by the error handler 759 // this is similar to what the pipeline does, so we should do the same to not surprise end users 760 // so we should set the failed exchange as the result and be done 761 result.set(subExchange); 762 } 763 // and do the done work 764 doDone(original, subExchange, pairs, callback, false, true); 765 return; 766 } 767 768 // must catch any exceptions from aggregation 769 try { 770 if (parallelAggregate) { 771 doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); 772 } else { 773 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 774 } 775 } catch (Throwable e) { 776 // wrap in exception to explain where it failed 777 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); 778 // and do the done work 779 doDone(original, subExchange, pairs, callback, false, true); 780 return; 781 } 782 783 total.incrementAndGet(); 784 } 785 786 // do the done work 787 subExchange = result.get() != null ? result.get() : null; 788 doDone(original, subExchange, pairs, callback, false, true); 789 } 790 }); 791 } finally { 792 // pop the block so by next round we have the same staring point and thus the tracing looks accurate 793 if (traced != null) { 794 traced.popBlock(); 795 } 796 } 797 798 return sync; 799 } 800 801 private void doProcessParallel(final ProcessorExchangePair pair) throws Exception { 802 final Exchange exchange = pair.getExchange(); 803 Processor processor = pair.getProcessor(); 804 Producer producer = pair.getProducer(); 805 806 TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; 807 808 // compute time taken if sending to another endpoint 809 StopWatch watch = null; 810 if (producer != null) { 811 watch = new StopWatch(); 812 } 813 814 try { 815 // prepare tracing starting from a new block 816 if (traced != null) { 817 traced.pushBlock(); 818 } 819 820 if (producer != null) { 821 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); 822 } 823 // let the prepared process it, remember to begin the exchange pair 824 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); 825 pair.begin(); 826 // we invoke it synchronously as parallel async routing is too hard 827 AsyncProcessorHelper.process(async, exchange); 828 } finally { 829 pair.done(); 830 // pop the block so by next round we have the same staring point and thus the tracing looks accurate 831 if (traced != null) { 832 traced.popBlock(); 833 } 834 if (producer != null) { 835 long timeTaken = watch.stop(); 836 Endpoint endpoint = producer.getEndpoint(); 837 // emit event that the exchange was sent to the endpoint 838 // this is okay to do here in the finally block, as the processing is not using the async routing engine 839 //( we invoke it synchronously as parallel async routing is too hard) 840 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 841 } 842 } 843 } 844 845 /** 846 * Common work which must be done when we are done multicasting. 847 * <p/> 848 * This logic applies for both running synchronous and asynchronous as there are multiple exist points 849 * when using the asynchronous routing engine. And therefore we want the logic in one method instead 850 * of being scattered. 851 * 852 * @param original the original exchange 853 * @param subExchange the current sub exchange, can be <tt>null</tt> for the synchronous part 854 * @param pairs the pairs with the exchanges to process 855 * @param callback the callback 856 * @param doneSync the <tt>doneSync</tt> parameter to call on callback 857 * @param forceExhaust whether or not error handling is exhausted 858 */ 859 protected void doDone(Exchange original, Exchange subExchange, final Iterable<ProcessorExchangePair> pairs, 860 AsyncCallback callback, boolean doneSync, boolean forceExhaust) { 861 862 // we are done so close the pairs iterator 863 if (pairs != null && pairs instanceof Closeable) { 864 IOHelper.close((Closeable) pairs, "pairs", LOG); 865 } 866 867 AggregationStrategy strategy = getAggregationStrategy(subExchange); 868 if (strategy instanceof DelegateAggregationStrategy) { 869 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 870 } 871 // invoke the on completion callback 872 if (strategy instanceof CompletionAwareAggregationStrategy) { 873 ((CompletionAwareAggregationStrategy) strategy).onCompletion(subExchange); 874 } 875 876 // cleanup any per exchange aggregation strategy 877 removeAggregationStrategyFromExchange(original); 878 879 // we need to know if there was an exception, and if the stopOnException option was enabled 880 // also we would need to know if any error handler has attempted redelivery and exhausted 881 boolean stoppedOnException = false; 882 boolean exception = false; 883 boolean exhaust = forceExhaust || subExchange != null && (subExchange.getException() != null || ExchangeHelper.isRedeliveryExhausted(subExchange)); 884 if (original.getException() != null || subExchange != null && subExchange.getException() != null) { 885 // there was an exception and we stopped 886 stoppedOnException = isStopOnException(); 887 exception = true; 888 } 889 890 // must copy results at this point 891 if (subExchange != null) { 892 if (stoppedOnException) { 893 // if we stopped due an exception then only propagate the exception 894 original.setException(subExchange.getException()); 895 } else { 896 // copy the current result to original so it will contain this result of this eip 897 ExchangeHelper.copyResults(original, subExchange); 898 } 899 } 900 901 // .. and then if there was an exception we need to configure the redelivery exhaust 902 // for example the noErrorHandler will not cause redelivery exhaust so if this error 903 // handled has been in use, then the exhaust would be false (if not forced) 904 if (exception) { 905 // multicast uses error handling on its output processors and they have tried to redeliver 906 // so we shall signal back to the other error handlers that we are exhausted and they should not 907 // also try to redeliver as we will then do that twice 908 original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); 909 } 910 911 callback.done(doneSync); 912 } 913 914 /** 915 * Aggregate the {@link Exchange} with the current result. 916 * This method is synchronized and is called directly when parallelAggregate is disabled (by default). 917 * 918 * @param strategy the aggregation strategy to use 919 * @param result the current result 920 * @param exchange the exchange to be added to the result 921 * @see #doAggregateInternal(org.apache.camel.processor.aggregate.AggregationStrategy, org.apache.camel.util.concurrent.AtomicExchange, org.apache.camel.Exchange) 922 */ 923 protected synchronized void doAggregate(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { 924 doAggregateInternal(strategy, result, exchange); 925 } 926 927 /** 928 * Aggregate the {@link Exchange} with the current result. 929 * This method is unsynchronized and is called directly when parallelAggregate is enabled. 930 * In all other cases, this method is called from the doAggregate which is a synchronized method 931 * 932 * @param strategy the aggregation strategy to use 933 * @param result the current result 934 * @param exchange the exchange to be added to the result 935 * @see #doAggregate(org.apache.camel.processor.aggregate.AggregationStrategy, org.apache.camel.util.concurrent.AtomicExchange, org.apache.camel.Exchange) 936 */ 937 protected void doAggregateInternal(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { 938 if (strategy != null) { 939 // prepare the exchanges for aggregation 940 Exchange oldExchange = result.get(); 941 ExchangeHelper.prepareAggregation(oldExchange, exchange); 942 result.set(strategy.aggregate(oldExchange, exchange)); 943 } 944 } 945 946 protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, 947 Iterator<ProcessorExchangePair> it) { 948 exchange.setProperty(Exchange.MULTICAST_INDEX, index); 949 if (it.hasNext()) { 950 exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE); 951 } else { 952 exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE); 953 } 954 } 955 956 protected Integer getExchangeIndex(Exchange exchange) { 957 return exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class); 958 } 959 960 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { 961 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size()); 962 963 StreamCache streamCache = null; 964 if (isParallelProcessing() && exchange.getIn().getBody() instanceof StreamCache) { 965 // in parallel processing case, the stream must be copied, therefore get the stream 966 streamCache = (StreamCache) exchange.getIn().getBody(); 967 } 968 969 int index = 0; 970 for (Processor processor : processors) { 971 // copy exchange, and do not share the unit of work 972 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 973 974 if (streamCache != null) { 975 if (index > 0) { 976 // copy it otherwise parallel processing is not possible, 977 // because streams can only be read once 978 StreamCache copiedStreamCache = streamCache.copy(copy); 979 if (copiedStreamCache != null) { 980 copy.getIn().setBody(copiedStreamCache); 981 } 982 } 983 } 984 985 // If the multi-cast processor has an aggregation strategy 986 // then the StreamCache created by the child routes must not be 987 // closed by the unit of work of the child route, but by the unit of 988 // work of the parent route or grand parent route or grand grand parent route ...(in case of nesting). 989 // Set therefore the unit of work of the parent route as stream cache unit of work, 990 // if it is not already set. 991 if (copy.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) { 992 copy.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, exchange.getUnitOfWork()); 993 } 994 // if we share unit of work, we need to prepare the child exchange 995 if (isShareUnitOfWork()) { 996 prepareSharedUnitOfWork(copy, exchange); 997 } 998 999 // and add the pair 1000 RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; 1001 result.add(createProcessorExchangePair(index++, processor, copy, routeContext)); 1002 } 1003 1004 if (exchange.getException() != null) { 1005 // force any exceptions occurred during creation of exchange paris to be thrown 1006 // before returning the answer; 1007 throw exchange.getException(); 1008 } 1009 1010 return result; 1011 } 1012 1013 /** 1014 * Creates the {@link ProcessorExchangePair} which holds the processor and exchange to be send out. 1015 * <p/> 1016 * You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they 1017 * need to be specially prepared before use. 1018 * 1019 * @param index the index 1020 * @param processor the processor 1021 * @param exchange the exchange 1022 * @param routeContext the route context 1023 * @return prepared for use 1024 */ 1025 protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange, 1026 RouteContext routeContext) { 1027 Processor prepared = processor; 1028 1029 // set property which endpoint we send to 1030 setToEndpoint(exchange, prepared); 1031 1032 // rework error handling to support fine grained error handling 1033 prepared = createErrorHandler(routeContext, exchange, prepared); 1034 1035 // invoke on prepare on the exchange if specified 1036 if (onPrepare != null) { 1037 try { 1038 onPrepare.process(exchange); 1039 } catch (Exception e) { 1040 exchange.setException(e); 1041 } 1042 } 1043 return new DefaultProcessorExchangePair(index, processor, prepared, exchange); 1044 } 1045 1046 protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) { 1047 Processor answer; 1048 1049 boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class); 1050 1051 // do not wrap in error handler if we are inside a try block 1052 if (!tryBlock && routeContext != null) { 1053 // wrap the producer in error handler so we have fine grained error handling on 1054 // the output side instead of the input side 1055 // this is needed to support redelivery on that output alone and not doing redelivery 1056 // for the entire multicast block again which will start from scratch again 1057 1058 // create key for cache 1059 final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor); 1060 1061 // lookup cached first to reuse and preserve memory 1062 answer = errorHandlers.get(key); 1063 if (answer != null) { 1064 LOG.trace("Using existing error handler for: {}", processor); 1065 return answer; 1066 } 1067 1068 LOG.trace("Creating error handler for: {}", processor); 1069 ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder(); 1070 // create error handler (create error handler directly to keep it light weight, 1071 // instead of using ProcessorDefinition.wrapInErrorHandler) 1072 try { 1073 processor = builder.createErrorHandler(routeContext, processor); 1074 1075 // and wrap in unit of work processor so the copy exchange also can run under UoW 1076 answer = createUnitOfWorkProcessor(routeContext, processor, exchange); 1077 1078 boolean child = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null; 1079 1080 // must start the error handler 1081 ServiceHelper.startServices(answer); 1082 1083 // here we don't cache the child unit of work 1084 if (!child) { 1085 // add to cache 1086 errorHandlers.putIfAbsent(key, answer); 1087 } 1088 1089 } catch (Exception e) { 1090 throw ObjectHelper.wrapRuntimeCamelException(e); 1091 } 1092 } else { 1093 // and wrap in unit of work processor so the copy exchange also can run under UoW 1094 answer = createUnitOfWorkProcessor(routeContext, processor, exchange); 1095 } 1096 1097 return answer; 1098 } 1099 1100 /** 1101 * Strategy to create the unit of work to be used for the sub route 1102 * 1103 * @param routeContext the route context 1104 * @param processor the processor 1105 * @param exchange the exchange 1106 * @return the unit of work processor 1107 */ 1108 protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) { 1109 CamelInternalProcessor internal = new CamelInternalProcessor(processor); 1110 1111 // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW 1112 UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); 1113 if (parent != null) { 1114 internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeContext, parent)); 1115 } else { 1116 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); 1117 } 1118 1119 return internal; 1120 } 1121 1122 /** 1123 * Prepares the exchange for participating in a shared unit of work 1124 * <p/> 1125 * This ensures a child exchange can access its parent {@link UnitOfWork} when it participate 1126 * in a shared unit of work. 1127 * 1128 * @param childExchange the child exchange 1129 * @param parentExchange the parent exchange 1130 */ 1131 protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange parentExchange) { 1132 childExchange.setProperty(Exchange.PARENT_UNIT_OF_WORK, parentExchange.getUnitOfWork()); 1133 } 1134 1135 protected void doStart() throws Exception { 1136 if (isParallelProcessing() && executorService == null) { 1137 throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set"); 1138 } 1139 if (timeout > 0 && !isParallelProcessing()) { 1140 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled"); 1141 } 1142 if (isParallelProcessing() && aggregateExecutorService == null) { 1143 // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread 1144 // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run 1145 // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing 1146 String name = getClass().getSimpleName() + "-AggregateTask"; 1147 aggregateExecutorService = createAggregateExecutorService(name); 1148 } 1149 ServiceHelper.startServices(aggregationStrategy, processors); 1150 } 1151 1152 /** 1153 * Strategy to create the thread pool for the aggregator background task which waits for and aggregates 1154 * completed tasks when running in parallel mode. 1155 * 1156 * @param name the suggested name for the background thread 1157 * @return the thread pool 1158 */ 1159 protected synchronized ExecutorService createAggregateExecutorService(String name) { 1160 // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in 1161 return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name); 1162 } 1163 1164 @Override 1165 protected void doStop() throws Exception { 1166 ServiceHelper.stopServices(processors, errorHandlers, aggregationStrategy); 1167 } 1168 1169 @Override 1170 protected void doShutdown() throws Exception { 1171 ServiceHelper.stopAndShutdownServices(processors, errorHandlers, aggregationStrategy); 1172 // only clear error handlers when shutting down 1173 errorHandlers.clear(); 1174 1175 if (shutdownExecutorService && executorService != null) { 1176 getCamelContext().getExecutorServiceManager().shutdownNow(executorService); 1177 } 1178 if (aggregateExecutorService != null) { 1179 getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService); 1180 } 1181 } 1182 1183 protected static void setToEndpoint(Exchange exchange, Processor processor) { 1184 if (processor instanceof Producer) { 1185 Producer producer = (Producer) processor; 1186 exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); 1187 } 1188 } 1189 1190 protected AggregationStrategy getAggregationStrategy(Exchange exchange) { 1191 AggregationStrategy answer = null; 1192 1193 // prefer to use per Exchange aggregation strategy over a global strategy 1194 if (exchange != null) { 1195 Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); 1196 Map<Object, AggregationStrategy> map = CastUtils.cast(property); 1197 if (map != null) { 1198 answer = map.get(this); 1199 } 1200 } 1201 if (answer == null) { 1202 // fallback to global strategy 1203 answer = getAggregationStrategy(); 1204 } 1205 return answer; 1206 } 1207 1208 /** 1209 * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}. 1210 * 1211 * @param exchange the exchange 1212 * @param aggregationStrategy the strategy 1213 */ 1214 protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) { 1215 Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); 1216 Map<Object, AggregationStrategy> map = CastUtils.cast(property); 1217 if (map == null) { 1218 map = new ConcurrentHashMap<Object, AggregationStrategy>(); 1219 } else { 1220 // it is not safe to use the map directly as the exchange doesn't have the deep copy of it's properties 1221 // we just create a new copy if we need to change the map 1222 map = new ConcurrentHashMap<Object, AggregationStrategy>(map); 1223 } 1224 // store the strategy using this processor as the key 1225 // (so we can store multiple strategies on the same exchange) 1226 map.put(this, aggregationStrategy); 1227 exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map); 1228 } 1229 1230 /** 1231 * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange} 1232 * which must be done after use. 1233 * 1234 * @param exchange the current exchange 1235 */ 1236 protected void removeAggregationStrategyFromExchange(Exchange exchange) { 1237 Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); 1238 Map<Object, AggregationStrategy> map = CastUtils.cast(property); 1239 if (map == null) { 1240 return; 1241 } 1242 // remove the strategy using this processor as the key 1243 map.remove(this); 1244 } 1245 1246 /** 1247 * Is the multicast processor working in streaming mode? 1248 * <p/> 1249 * In streaming mode: 1250 * <ul> 1251 * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li> 1252 * <li>for parallel processing, we start aggregating responses as they get send back to the processor; 1253 * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li> 1254 * </ul> 1255 */ 1256 public boolean isStreaming() { 1257 return streaming; 1258 } 1259 1260 /** 1261 * Should the multicast processor stop processing further exchanges in case of an exception occurred? 1262 */ 1263 public boolean isStopOnException() { 1264 return stopOnException; 1265 } 1266 1267 /** 1268 * Returns the producers to multicast to 1269 */ 1270 public Collection<Processor> getProcessors() { 1271 return processors; 1272 } 1273 1274 /** 1275 * An optional timeout in millis when using parallel processing 1276 */ 1277 public long getTimeout() { 1278 return timeout; 1279 } 1280 1281 /** 1282 * Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead. 1283 */ 1284 public AggregationStrategy getAggregationStrategy() { 1285 return aggregationStrategy; 1286 } 1287 1288 public boolean isParallelProcessing() { 1289 return parallelProcessing; 1290 } 1291 1292 public boolean isParallelAggregate() { 1293 return parallelAggregate; 1294 } 1295 1296 public boolean isShareUnitOfWork() { 1297 return shareUnitOfWork; 1298 } 1299 1300 public List<Processor> next() { 1301 if (!hasNext()) { 1302 return null; 1303 } 1304 return new ArrayList<Processor>(processors); 1305 } 1306 1307 public boolean hasNext() { 1308 return processors != null && !processors.isEmpty(); 1309 } 1310}