001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.io.Closeable; 020import java.util.ArrayList; 021import java.util.Collection; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.Callable; 026import java.util.concurrent.CompletionService; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.CountDownLatch; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.ExecutorCompletionService; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Future; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicBoolean; 036import java.util.concurrent.atomic.AtomicInteger; 037 038import org.apache.camel.AsyncCallback; 039import org.apache.camel.AsyncProcessor; 040import org.apache.camel.CamelContext; 041import org.apache.camel.CamelContextAware; 042import org.apache.camel.CamelExchangeException; 043import org.apache.camel.Endpoint; 044import org.apache.camel.ErrorHandlerFactory; 045import org.apache.camel.Exchange; 046import org.apache.camel.Navigate; 047import org.apache.camel.Processor; 048import org.apache.camel.Producer; 049import org.apache.camel.StreamCache; 050import org.apache.camel.Traceable; 051import org.apache.camel.processor.aggregate.AggregationStrategy; 052import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy; 053import org.apache.camel.processor.aggregate.DelegateAggregationStrategy; 054import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy; 055import org.apache.camel.spi.IdAware; 056import org.apache.camel.spi.RouteContext; 057import org.apache.camel.spi.TracedRouteNodes; 058import org.apache.camel.spi.UnitOfWork; 059import org.apache.camel.support.ServiceSupport; 060import org.apache.camel.util.AsyncProcessorConverterHelper; 061import org.apache.camel.util.AsyncProcessorHelper; 062import org.apache.camel.util.CastUtils; 063import org.apache.camel.util.EventHelper; 064import org.apache.camel.util.ExchangeHelper; 065import org.apache.camel.util.IOHelper; 066import org.apache.camel.util.KeyValueHolder; 067import org.apache.camel.util.ObjectHelper; 068import org.apache.camel.util.ServiceHelper; 069import org.apache.camel.util.StopWatch; 070import org.apache.camel.util.concurrent.AtomicException; 071import org.apache.camel.util.concurrent.AtomicExchange; 072import org.apache.camel.util.concurrent.SubmitOrderedCompletionService; 073import org.slf4j.Logger; 074import org.slf4j.LoggerFactory; 075 076import static org.apache.camel.util.ObjectHelper.notNull; 077 078 079/** 080 * Implements the Multicast pattern to send a message exchange to a number of 081 * endpoints, each endpoint receiving a copy of the message exchange. 082 * 083 * @version 084 * @see Pipeline 085 */ 086public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware { 087 088 private static final Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class); 089 090 /** 091 * Class that represent each step in the multicast route to do 092 */ 093 static final class DefaultProcessorExchangePair implements ProcessorExchangePair { 094 private final int index; 095 private final Processor processor; 096 private final Processor prepared; 097 private final Exchange exchange; 098 099 private DefaultProcessorExchangePair(int index, Processor processor, Processor prepared, Exchange exchange) { 100 this.index = index; 101 this.processor = processor; 102 this.prepared = prepared; 103 this.exchange = exchange; 104 } 105 106 public int getIndex() { 107 return index; 108 } 109 110 public Exchange getExchange() { 111 return exchange; 112 } 113 114 public Producer getProducer() { 115 if (processor instanceof Producer) { 116 return (Producer) processor; 117 } 118 return null; 119 } 120 121 public Processor getProcessor() { 122 return prepared; 123 } 124 125 public void begin() { 126 // noop 127 } 128 129 public void done() { 130 // noop 131 } 132 133 } 134 135 /** 136 * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges 137 * <p/> 138 * See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods. 139 */ 140 static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> { 141 142 PreparedErrorHandler(RouteContext key, Processor value) { 143 super(key, value); 144 } 145 146 } 147 148 protected final Processor onPrepare; 149 private final CamelContext camelContext; 150 private String id; 151 private Collection<Processor> processors; 152 private final AggregationStrategy aggregationStrategy; 153 private final boolean parallelProcessing; 154 private final boolean streaming; 155 private final boolean parallelAggregate; 156 private final boolean stopOnAggregateException; 157 private final boolean stopOnException; 158 private final ExecutorService executorService; 159 private final boolean shutdownExecutorService; 160 private ExecutorService aggregateExecutorService; 161 private final long timeout; 162 private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>(); 163 private final boolean shareUnitOfWork; 164 165 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) { 166 this(camelContext, processors, null); 167 } 168 169 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) { 170 this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false, false); 171 } 172 173 @Deprecated 174 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, 175 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, 176 boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { 177 this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, 178 streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false); 179 } 180 181 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, 182 ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, 183 boolean shareUnitOfWork, boolean parallelAggregate) { 184 this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, 185 shareUnitOfWork, false, false); 186 } 187 188 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, 189 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, 190 boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, 191 boolean parallelAggregate, boolean stopOnAggregateException) { 192 notNull(camelContext, "camelContext"); 193 this.camelContext = camelContext; 194 this.processors = processors; 195 this.aggregationStrategy = aggregationStrategy; 196 this.executorService = executorService; 197 this.shutdownExecutorService = shutdownExecutorService; 198 this.streaming = streaming; 199 this.stopOnException = stopOnException; 200 // must enable parallel if executor service is provided 201 this.parallelProcessing = parallelProcessing || executorService != null; 202 this.timeout = timeout; 203 this.onPrepare = onPrepare; 204 this.shareUnitOfWork = shareUnitOfWork; 205 this.parallelAggregate = parallelAggregate; 206 this.stopOnAggregateException = stopOnAggregateException; 207 } 208 209 @Override 210 public String toString() { 211 return "Multicast[" + getProcessors() + "]"; 212 } 213 214 public String getId() { 215 return id; 216 } 217 218 public void setId(String id) { 219 this.id = id; 220 } 221 222 public String getTraceLabel() { 223 return "multicast"; 224 } 225 226 public CamelContext getCamelContext() { 227 return camelContext; 228 } 229 230 public void process(Exchange exchange) throws Exception { 231 AsyncProcessorHelper.process(this, exchange); 232 } 233 234 public boolean process(Exchange exchange, AsyncCallback callback) { 235 final AtomicExchange result = new AtomicExchange(); 236 Iterable<ProcessorExchangePair> pairs = null; 237 238 try { 239 boolean sync = true; 240 241 pairs = createProcessorExchangePairs(exchange); 242 243 if (isParallelProcessing()) { 244 // ensure an executor is set when running in parallel 245 ObjectHelper.notNull(executorService, "executorService", this); 246 doProcessParallel(exchange, result, pairs, isStreaming(), callback); 247 } else { 248 sync = doProcessSequential(exchange, result, pairs, callback); 249 } 250 251 if (!sync) { 252 // the remainder of the multicast will be completed async 253 // so we break out now, then the callback will be invoked which then continue routing from where we left here 254 return false; 255 } 256 } catch (Throwable e) { 257 exchange.setException(e); 258 // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted 259 // and do the done work 260 doDone(exchange, null, pairs, callback, true, false); 261 return true; 262 } 263 264 // multicasting was processed successfully 265 // and do the done work 266 Exchange subExchange = result.get() != null ? result.get() : null; 267 doDone(exchange, subExchange, pairs, callback, true, true); 268 return true; 269 } 270 271 protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs, 272 final boolean streaming, final AsyncCallback callback) throws Exception { 273 274 ObjectHelper.notNull(executorService, "ExecutorService", this); 275 ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this); 276 277 final CompletionService<Exchange> completion; 278 if (streaming) { 279 // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence) 280 completion = new ExecutorCompletionService<Exchange>(executorService); 281 } else { 282 // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence) 283 completion = new SubmitOrderedCompletionService<Exchange>(executorService); 284 } 285 286 final AtomicInteger total = new AtomicInteger(0); 287 final Iterator<ProcessorExchangePair> it = pairs.iterator(); 288 289 if (it.hasNext()) { 290 // when parallel then aggregate on the fly 291 final AtomicBoolean running = new AtomicBoolean(true); 292 final AtomicBoolean allTasksSubmitted = new AtomicBoolean(); 293 final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1); 294 final AtomicException executionException = new AtomicException(); 295 296 // issue task to execute in separate thread so it can aggregate on-the-fly 297 // while we submit new tasks, and those tasks complete concurrently 298 // this allows us to optimize work and reduce memory consumption 299 final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running, 300 aggregationOnTheFlyDone, allTasksSubmitted, executionException); 301 final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean(); 302 303 LOG.trace("Starting to submit parallel tasks"); 304 305 try { 306 while (it.hasNext()) { 307 final ProcessorExchangePair pair = it.next(); 308 // in case the iterator returns null then continue to next 309 if (pair == null) { 310 continue; 311 } 312 313 final Exchange subExchange = pair.getExchange(); 314 updateNewExchange(subExchange, total.intValue(), pairs, it); 315 316 completion.submit(new Callable<Exchange>() { 317 public Exchange call() throws Exception { 318 // start the aggregation task at this stage only in order not to pile up too many threads 319 if (aggregationTaskSubmitted.compareAndSet(false, true)) { 320 // but only submit the aggregation task once 321 aggregateExecutorService.submit(aggregateOnTheFlyTask); 322 } 323 324 if (!running.get()) { 325 // do not start processing the task if we are not running 326 return subExchange; 327 } 328 329 try { 330 doProcessParallel(pair); 331 } catch (Throwable e) { 332 subExchange.setException(e); 333 } 334 335 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 336 Integer number = getExchangeIndex(subExchange); 337 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); 338 if (stopOnException && !continueProcessing) { 339 // signal to stop running 340 running.set(false); 341 // throw caused exception 342 if (subExchange.getException() != null) { 343 // wrap in exception to explain where it failed 344 CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException()); 345 subExchange.setException(cause); 346 } 347 } 348 349 LOG.trace("Parallel processing complete for exchange: {}", subExchange); 350 return subExchange; 351 } 352 }); 353 354 total.incrementAndGet(); 355 } 356 } catch (Throwable e) { 357 // The methods it.hasNext and it.next can throw RuntimeExceptions when custom iterators are implemented. 358 // We have to catch the exception here otherwise the aggregator threads would pile up. 359 if (e instanceof Exception) { 360 executionException.set((Exception) e); 361 } else { 362 executionException.set(ObjectHelper.wrapRuntimeCamelException(e)); 363 } 364 // and because of the exception we must signal we are done so the latch can open and let the other thread continue processing 365 LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId()); 366 LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId()); 367 aggregationOnTheFlyDone.countDown(); 368 } 369 370 // signal all tasks has been submitted 371 LOG.trace("Signaling that all {} tasks has been submitted.", total.get()); 372 allTasksSubmitted.set(true); 373 374 // its to hard to do parallel async routing so we let the caller thread be synchronously 375 // and have it pickup the replies and do the aggregation (eg we use a latch to wait) 376 // wait for aggregation to be done 377 LOG.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", total.get(), original.getExchangeId()); 378 aggregationOnTheFlyDone.await(); 379 380 // did we fail for whatever reason, if so throw that caused exception 381 if (executionException.get() != null) { 382 if (LOG.isDebugEnabled()) { 383 LOG.debug("Parallel processing failed due {}", executionException.get().getMessage()); 384 } 385 throw executionException.get(); 386 } 387 } 388 389 // no everything is okay so we are done 390 LOG.debug("Done parallel processing {} exchanges", total); 391 } 392 393 /** 394 * Boss worker to control aggregate on-the-fly for completed tasks when using parallel processing. 395 * <p/> 396 * This ensures lower memory consumption as we do not need to keep all completed tasks in memory 397 * before we perform aggregation. Instead this separate thread will run and aggregate when new 398 * completed tasks is done. 399 * <p/> 400 * The logic is fairly complex as this implementation has to keep track how far it got, and also 401 * signal back to the <i>main</t> thread when its done, so the <i>main</t> thread can continue 402 * processing when the entire splitting is done. 403 */ 404 private final class AggregateOnTheFlyTask implements Runnable { 405 406 private final AtomicExchange result; 407 private final Exchange original; 408 private final AtomicInteger total; 409 private final CompletionService<Exchange> completion; 410 private final AtomicBoolean running; 411 private final CountDownLatch aggregationOnTheFlyDone; 412 private final AtomicBoolean allTasksSubmitted; 413 private final AtomicException executionException; 414 415 private AggregateOnTheFlyTask(AtomicExchange result, Exchange original, AtomicInteger total, 416 CompletionService<Exchange> completion, AtomicBoolean running, 417 CountDownLatch aggregationOnTheFlyDone, AtomicBoolean allTasksSubmitted, 418 AtomicException executionException) { 419 this.result = result; 420 this.original = original; 421 this.total = total; 422 this.completion = completion; 423 this.running = running; 424 this.aggregationOnTheFlyDone = aggregationOnTheFlyDone; 425 this.allTasksSubmitted = allTasksSubmitted; 426 this.executionException = executionException; 427 } 428 429 public void run() { 430 LOG.trace("Aggregate on the fly task started for exchangeId: {}", original.getExchangeId()); 431 432 try { 433 aggregateOnTheFly(); 434 } catch (Throwable e) { 435 if (e instanceof Exception) { 436 executionException.set((Exception) e); 437 } else { 438 executionException.set(ObjectHelper.wrapRuntimeCamelException(e)); 439 } 440 } finally { 441 // must signal we are done so the latch can open and let the other thread continue processing 442 LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId()); 443 LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId()); 444 aggregationOnTheFlyDone.countDown(); 445 } 446 } 447 448 private void aggregateOnTheFly() throws InterruptedException, ExecutionException { 449 final AtomicBoolean timedOut = new AtomicBoolean(); 450 boolean stoppedOnException = false; 451 final StopWatch watch = new StopWatch(); 452 final AtomicInteger aggregated = new AtomicInteger(); 453 boolean done = false; 454 // not a for loop as on the fly may still run 455 while (!done) { 456 // check if we have already aggregate everything 457 if (allTasksSubmitted.get() && aggregated.intValue() >= total.get()) { 458 LOG.debug("Done aggregating {} exchanges on the fly.", aggregated); 459 break; 460 } 461 462 Future<Exchange> future; 463 if (timedOut.get()) { 464 // we are timed out but try to grab if some tasks has been completed 465 // poll will return null if no tasks is present 466 future = completion.poll(); 467 LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future); 468 } else if (timeout > 0) { 469 long left = timeout - watch.taken(); 470 if (left < 0) { 471 left = 0; 472 } 473 LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left); 474 future = completion.poll(left, TimeUnit.MILLISECONDS); 475 } else { 476 LOG.trace("Polling completion task #{}", aggregated); 477 // we must not block so poll every second 478 future = completion.poll(1, TimeUnit.SECONDS); 479 if (future == null) { 480 // and continue loop which will recheck if we are done 481 continue; 482 } 483 } 484 485 if (future == null) { 486 ParallelAggregateTimeoutTask task = new ParallelAggregateTimeoutTask(original, result, completion, aggregated, total, timedOut); 487 if (parallelAggregate) { 488 aggregateExecutorService.submit(task); 489 } else { 490 // in non parallel mode then just run the task 491 task.run(); 492 } 493 } else { 494 // there is a result to aggregate 495 Exchange subExchange = future.get(); 496 497 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 498 Integer number = getExchangeIndex(subExchange); 499 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); 500 if (stopOnException && !continueProcessing) { 501 // we want to stop on exception and an exception or failure occurred 502 // this is similar to what the pipeline does, so we should do the same to not surprise end users 503 // so we should set the failed exchange as the result and break out 504 result.set(subExchange); 505 stoppedOnException = true; 506 break; 507 } 508 509 // we got a result so aggregate it 510 ParallelAggregateTask task = new ParallelAggregateTask(result, subExchange, aggregated); 511 if (parallelAggregate) { 512 aggregateExecutorService.submit(task); 513 } else { 514 // in non parallel mode then just run the task 515 task.run(); 516 } 517 } 518 } 519 520 if (timedOut.get() || stoppedOnException) { 521 if (timedOut.get()) { 522 LOG.debug("Cancelling tasks due timeout after {} millis.", timeout); 523 } 524 if (stoppedOnException) { 525 LOG.debug("Cancelling tasks due stopOnException."); 526 } 527 // cancel tasks as we timed out (its safe to cancel done tasks) 528 running.set(false); 529 } 530 } 531 } 532 533 /** 534 * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing. 535 */ 536 private final class ParallelAggregateTask implements Runnable { 537 538 private final AtomicExchange result; 539 private final Exchange subExchange; 540 private final AtomicInteger aggregated; 541 542 private ParallelAggregateTask(AtomicExchange result, Exchange subExchange, AtomicInteger aggregated) { 543 this.result = result; 544 this.subExchange = subExchange; 545 this.aggregated = aggregated; 546 } 547 548 @Override 549 public void run() { 550 try { 551 if (parallelAggregate) { 552 doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); 553 } else { 554 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 555 } 556 } catch (Throwable e) { 557 if (isStopOnAggregateException()) { 558 throw e; 559 } else { 560 // wrap in exception to explain where it failed 561 CamelExchangeException cex = new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e); 562 subExchange.setException(cex); 563 LOG.debug(cex.getMessage(), cex); 564 } 565 } finally { 566 aggregated.incrementAndGet(); 567 } 568 } 569 } 570 571 /** 572 * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing. 573 */ 574 private final class ParallelAggregateTimeoutTask implements Runnable { 575 576 private final Exchange original; 577 private final AtomicExchange result; 578 private final CompletionService<Exchange> completion; 579 private final AtomicInteger aggregated; 580 private final AtomicInteger total; 581 private final AtomicBoolean timedOut; 582 583 private ParallelAggregateTimeoutTask(Exchange original, AtomicExchange result, CompletionService<Exchange> completion, 584 AtomicInteger aggregated, AtomicInteger total, AtomicBoolean timedOut) { 585 this.original = original; 586 this.result = result; 587 this.completion = completion; 588 this.aggregated = aggregated; 589 this.total = total; 590 this.timedOut = timedOut; 591 } 592 593 @Override 594 public void run() { 595 AggregationStrategy strategy = getAggregationStrategy(null); 596 if (strategy instanceof DelegateAggregationStrategy) { 597 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 598 } 599 if (strategy instanceof TimeoutAwareAggregationStrategy) { 600 // notify the strategy we timed out 601 Exchange oldExchange = result.get(); 602 if (oldExchange == null) { 603 // if they all timed out the result may not have been set yet, so use the original exchange 604 oldExchange = original; 605 } 606 ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout); 607 } else { 608 // log a WARN we timed out since it will not be aggregated and the Exchange will be lost 609 LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated.intValue()); 610 } 611 LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue()); 612 timedOut.set(true); 613 614 // mark that index as timed out, which allows us to try to retrieve 615 // any already completed tasks in the next loop 616 if (completion instanceof SubmitOrderedCompletionService) { 617 ((SubmitOrderedCompletionService<?>) completion).timeoutTask(); 618 } 619 620 // we timed out so increment the counter 621 aggregated.incrementAndGet(); 622 } 623 } 624 625 protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception { 626 AtomicInteger total = new AtomicInteger(); 627 Iterator<ProcessorExchangePair> it = pairs.iterator(); 628 629 while (it.hasNext()) { 630 ProcessorExchangePair pair = it.next(); 631 // in case the iterator returns null then continue to next 632 if (pair == null) { 633 continue; 634 } 635 Exchange subExchange = pair.getExchange(); 636 updateNewExchange(subExchange, total.get(), pairs, it); 637 638 boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total); 639 if (!sync) { 640 if (LOG.isTraceEnabled()) { 641 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId()); 642 } 643 // the remainder of the multicast will be completed async 644 // so we break out now, then the callback will be invoked which then continue routing from where we left here 645 return false; 646 } 647 648 if (LOG.isTraceEnabled()) { 649 LOG.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId()); 650 } 651 652 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 653 // remember to test for stop on exception and aggregate before copying back results 654 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); 655 if (stopOnException && !continueProcessing) { 656 if (subExchange.getException() != null) { 657 // wrap in exception to explain where it failed 658 CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException()); 659 subExchange.setException(cause); 660 } 661 // we want to stop on exception, and the exception was handled by the error handler 662 // this is similar to what the pipeline does, so we should do the same to not surprise end users 663 // so we should set the failed exchange as the result and be done 664 result.set(subExchange); 665 return true; 666 } 667 668 LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange); 669 670 if (parallelAggregate) { 671 doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); 672 } else { 673 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 674 } 675 676 total.incrementAndGet(); 677 } 678 679 LOG.debug("Done sequential processing {} exchanges", total); 680 681 return true; 682 } 683 684 private boolean doProcessSequential(final Exchange original, final AtomicExchange result, 685 final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it, 686 final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) { 687 boolean sync = true; 688 689 final Exchange exchange = pair.getExchange(); 690 Processor processor = pair.getProcessor(); 691 final Producer producer = pair.getProducer(); 692 693 TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; 694 695 try { 696 // prepare tracing starting from a new block 697 if (traced != null) { 698 traced.pushBlock(); 699 } 700 701 StopWatch sw = null; 702 if (producer != null) { 703 boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); 704 if (sending) { 705 sw = new StopWatch(); 706 } 707 } 708 709 // compute time taken if sending to another endpoint 710 final StopWatch watch = sw; 711 712 // let the prepared process it, remember to begin the exchange pair 713 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); 714 pair.begin(); 715 sync = async.process(exchange, new AsyncCallback() { 716 public void done(boolean doneSync) { 717 // we are done with the exchange pair 718 pair.done(); 719 720 // okay we are done, so notify the exchange was sent 721 if (producer != null && watch != null) { 722 long timeTaken = watch.taken(); 723 Endpoint endpoint = producer.getEndpoint(); 724 // emit event that the exchange was sent to the endpoint 725 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 726 } 727 728 // we only have to handle async completion of the routing slip 729 if (doneSync) { 730 return; 731 } 732 733 // continue processing the multicast asynchronously 734 Exchange subExchange = exchange; 735 736 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 737 // remember to test for stop on exception and aggregate before copying back results 738 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); 739 if (stopOnException && !continueProcessing) { 740 if (subExchange.getException() != null) { 741 // wrap in exception to explain where it failed 742 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException())); 743 } else { 744 // we want to stop on exception, and the exception was handled by the error handler 745 // this is similar to what the pipeline does, so we should do the same to not surprise end users 746 // so we should set the failed exchange as the result and be done 747 result.set(subExchange); 748 } 749 // and do the done work 750 doDone(original, subExchange, pairs, callback, false, true); 751 return; 752 } 753 754 try { 755 if (parallelAggregate) { 756 doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); 757 } else { 758 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 759 } 760 } catch (Throwable e) { 761 // wrap in exception to explain where it failed 762 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); 763 // and do the done work 764 doDone(original, subExchange, pairs, callback, false, true); 765 return; 766 } 767 768 total.incrementAndGet(); 769 770 // maybe there are more processors to multicast 771 while (it.hasNext()) { 772 773 // prepare and run the next 774 ProcessorExchangePair pair = it.next(); 775 subExchange = pair.getExchange(); 776 updateNewExchange(subExchange, total.get(), pairs, it); 777 boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total); 778 779 if (!sync) { 780 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId()); 781 return; 782 } 783 784 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 785 // remember to test for stop on exception and aggregate before copying back results 786 continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); 787 if (stopOnException && !continueProcessing) { 788 if (subExchange.getException() != null) { 789 // wrap in exception to explain where it failed 790 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException())); 791 } else { 792 // we want to stop on exception, and the exception was handled by the error handler 793 // this is similar to what the pipeline does, so we should do the same to not surprise end users 794 // so we should set the failed exchange as the result and be done 795 result.set(subExchange); 796 } 797 // and do the done work 798 doDone(original, subExchange, pairs, callback, false, true); 799 return; 800 } 801 802 // must catch any exceptions from aggregation 803 try { 804 if (parallelAggregate) { 805 doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); 806 } else { 807 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 808 } 809 } catch (Throwable e) { 810 // wrap in exception to explain where it failed 811 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); 812 // and do the done work 813 doDone(original, subExchange, pairs, callback, false, true); 814 return; 815 } 816 817 total.incrementAndGet(); 818 } 819 820 // do the done work 821 subExchange = result.get() != null ? result.get() : null; 822 doDone(original, subExchange, pairs, callback, false, true); 823 } 824 }); 825 } finally { 826 // pop the block so by next round we have the same staring point and thus the tracing looks accurate 827 if (traced != null) { 828 traced.popBlock(); 829 } 830 } 831 832 return sync; 833 } 834 835 private void doProcessParallel(final ProcessorExchangePair pair) throws Exception { 836 final Exchange exchange = pair.getExchange(); 837 Processor processor = pair.getProcessor(); 838 Producer producer = pair.getProducer(); 839 840 TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; 841 842 // compute time taken if sending to another endpoint 843 StopWatch watch = null; 844 try { 845 // prepare tracing starting from a new block 846 if (traced != null) { 847 traced.pushBlock(); 848 } 849 850 if (producer != null) { 851 boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); 852 if (sending) { 853 watch = new StopWatch(); 854 } 855 } 856 // let the prepared process it, remember to begin the exchange pair 857 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); 858 pair.begin(); 859 // we invoke it synchronously as parallel async routing is too hard 860 AsyncProcessorHelper.process(async, exchange); 861 } finally { 862 pair.done(); 863 // pop the block so by next round we have the same staring point and thus the tracing looks accurate 864 if (traced != null) { 865 traced.popBlock(); 866 } 867 if (producer != null && watch != null) { 868 Endpoint endpoint = producer.getEndpoint(); 869 long timeTaken = watch.taken(); 870 // emit event that the exchange was sent to the endpoint 871 // this is okay to do here in the finally block, as the processing is not using the async routing engine 872 //( we invoke it synchronously as parallel async routing is too hard) 873 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 874 } 875 } 876 } 877 878 /** 879 * Common work which must be done when we are done multicasting. 880 * <p/> 881 * This logic applies for both running synchronous and asynchronous as there are multiple exist points 882 * when using the asynchronous routing engine. And therefore we want the logic in one method instead 883 * of being scattered. 884 * 885 * @param original the original exchange 886 * @param subExchange the current sub exchange, can be <tt>null</tt> for the synchronous part 887 * @param pairs the pairs with the exchanges to process 888 * @param callback the callback 889 * @param doneSync the <tt>doneSync</tt> parameter to call on callback 890 * @param forceExhaust whether or not error handling is exhausted 891 */ 892 protected void doDone(Exchange original, Exchange subExchange, final Iterable<ProcessorExchangePair> pairs, 893 AsyncCallback callback, boolean doneSync, boolean forceExhaust) { 894 895 // we are done so close the pairs iterator 896 if (pairs instanceof Closeable) { 897 IOHelper.close((Closeable) pairs, "pairs", LOG); 898 } 899 900 AggregationStrategy strategy = getAggregationStrategy(subExchange); 901 if (strategy instanceof DelegateAggregationStrategy) { 902 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 903 } 904 // invoke the on completion callback 905 if (strategy instanceof CompletionAwareAggregationStrategy) { 906 ((CompletionAwareAggregationStrategy) strategy).onCompletion(subExchange); 907 } 908 909 // cleanup any per exchange aggregation strategy 910 removeAggregationStrategyFromExchange(original); 911 912 // we need to know if there was an exception, and if the stopOnException option was enabled 913 // also we would need to know if any error handler has attempted redelivery and exhausted 914 boolean stoppedOnException = false; 915 boolean exception = false; 916 boolean exhaust = forceExhaust || subExchange != null && (subExchange.getException() != null || ExchangeHelper.isRedeliveryExhausted(subExchange)); 917 if (original.getException() != null || subExchange != null && subExchange.getException() != null) { 918 // there was an exception and we stopped 919 stoppedOnException = isStopOnException(); 920 exception = true; 921 } 922 923 // must copy results at this point 924 if (subExchange != null) { 925 if (stoppedOnException) { 926 // if we stopped due an exception then only propagate the exception 927 original.setException(subExchange.getException()); 928 } else { 929 // copy the current result to original so it will contain this result of this eip 930 ExchangeHelper.copyResults(original, subExchange); 931 } 932 } 933 934 // .. and then if there was an exception we need to configure the redelivery exhaust 935 // for example the noErrorHandler will not cause redelivery exhaust so if this error 936 // handled has been in use, then the exhaust would be false (if not forced) 937 if (exception) { 938 // multicast uses error handling on its output processors and they have tried to redeliver 939 // so we shall signal back to the other error handlers that we are exhausted and they should not 940 // also try to redeliver as we will then do that twice 941 original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); 942 } 943 944 callback.done(doneSync); 945 } 946 947 /** 948 * Aggregate the {@link Exchange} with the current result. 949 * This method is synchronized and is called directly when parallelAggregate is disabled (by default). 950 * 951 * @param strategy the aggregation strategy to use 952 * @param result the current result 953 * @param exchange the exchange to be added to the result 954 * @see #doAggregateInternal(org.apache.camel.processor.aggregate.AggregationStrategy, org.apache.camel.util.concurrent.AtomicExchange, org.apache.camel.Exchange) 955 */ 956 protected synchronized void doAggregate(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { 957 doAggregateInternal(strategy, result, exchange); 958 } 959 960 /** 961 * Aggregate the {@link Exchange} with the current result. 962 * This method is unsynchronized and is called directly when parallelAggregate is enabled. 963 * In all other cases, this method is called from the doAggregate which is a synchronized method 964 * 965 * @param strategy the aggregation strategy to use 966 * @param result the current result 967 * @param exchange the exchange to be added to the result 968 * @see #doAggregate(org.apache.camel.processor.aggregate.AggregationStrategy, org.apache.camel.util.concurrent.AtomicExchange, org.apache.camel.Exchange) 969 */ 970 protected void doAggregateInternal(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { 971 if (strategy != null) { 972 // prepare the exchanges for aggregation 973 Exchange oldExchange = result.get(); 974 ExchangeHelper.prepareAggregation(oldExchange, exchange); 975 result.set(strategy.aggregate(oldExchange, exchange)); 976 } 977 } 978 979 protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, 980 Iterator<ProcessorExchangePair> it) { 981 exchange.setProperty(Exchange.MULTICAST_INDEX, index); 982 if (it.hasNext()) { 983 exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE); 984 } else { 985 exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE); 986 } 987 } 988 989 protected Integer getExchangeIndex(Exchange exchange) { 990 return exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class); 991 } 992 993 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { 994 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size()); 995 996 StreamCache streamCache = null; 997 if (isParallelProcessing() && exchange.getIn().getBody() instanceof StreamCache) { 998 // in parallel processing case, the stream must be copied, therefore get the stream 999 streamCache = (StreamCache) exchange.getIn().getBody(); 1000 } 1001 1002 int index = 0; 1003 for (Processor processor : processors) { 1004 // copy exchange, and do not share the unit of work 1005 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 1006 1007 if (streamCache != null) { 1008 if (index > 0) { 1009 // copy it otherwise parallel processing is not possible, 1010 // because streams can only be read once 1011 StreamCache copiedStreamCache = streamCache.copy(copy); 1012 if (copiedStreamCache != null) { 1013 copy.getIn().setBody(copiedStreamCache); 1014 } 1015 } 1016 } 1017 1018 // If the multi-cast processor has an aggregation strategy 1019 // then the StreamCache created by the child routes must not be 1020 // closed by the unit of work of the child route, but by the unit of 1021 // work of the parent route or grand parent route or grand grand parent route ...(in case of nesting). 1022 // Set therefore the unit of work of the parent route as stream cache unit of work, 1023 // if it is not already set. 1024 if (copy.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) { 1025 copy.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, exchange.getUnitOfWork()); 1026 } 1027 // if we share unit of work, we need to prepare the child exchange 1028 if (isShareUnitOfWork()) { 1029 prepareSharedUnitOfWork(copy, exchange); 1030 } 1031 1032 // and add the pair 1033 RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; 1034 result.add(createProcessorExchangePair(index++, processor, copy, routeContext)); 1035 } 1036 1037 if (exchange.getException() != null) { 1038 // force any exceptions occurred during creation of exchange paris to be thrown 1039 // before returning the answer; 1040 throw exchange.getException(); 1041 } 1042 1043 return result; 1044 } 1045 1046 /** 1047 * Creates the {@link ProcessorExchangePair} which holds the processor and exchange to be send out. 1048 * <p/> 1049 * You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they 1050 * need to be specially prepared before use. 1051 * 1052 * @param index the index 1053 * @param processor the processor 1054 * @param exchange the exchange 1055 * @param routeContext the route context 1056 * @return prepared for use 1057 */ 1058 protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange, 1059 RouteContext routeContext) { 1060 Processor prepared = processor; 1061 1062 // set property which endpoint we send to 1063 setToEndpoint(exchange, prepared); 1064 1065 // rework error handling to support fine grained error handling 1066 prepared = createErrorHandler(routeContext, exchange, prepared); 1067 1068 // invoke on prepare on the exchange if specified 1069 if (onPrepare != null) { 1070 try { 1071 onPrepare.process(exchange); 1072 } catch (Exception e) { 1073 exchange.setException(e); 1074 } 1075 } 1076 return new DefaultProcessorExchangePair(index, processor, prepared, exchange); 1077 } 1078 1079 protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) { 1080 Processor answer; 1081 1082 boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class); 1083 1084 // do not wrap in error handler if we are inside a try block 1085 if (!tryBlock && routeContext != null) { 1086 // wrap the producer in error handler so we have fine grained error handling on 1087 // the output side instead of the input side 1088 // this is needed to support redelivery on that output alone and not doing redelivery 1089 // for the entire multicast block again which will start from scratch again 1090 1091 // create key for cache 1092 final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor); 1093 1094 // lookup cached first to reuse and preserve memory 1095 answer = errorHandlers.get(key); 1096 if (answer != null) { 1097 LOG.trace("Using existing error handler for: {}", processor); 1098 return answer; 1099 } 1100 1101 LOG.trace("Creating error handler for: {}", processor); 1102 ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder(); 1103 // create error handler (create error handler directly to keep it light weight, 1104 // instead of using ProcessorDefinition.wrapInErrorHandler) 1105 try { 1106 processor = builder.createErrorHandler(routeContext, processor); 1107 1108 // and wrap in unit of work processor so the copy exchange also can run under UoW 1109 answer = createUnitOfWorkProcessor(routeContext, processor, exchange); 1110 1111 boolean child = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null; 1112 1113 // must start the error handler 1114 ServiceHelper.startServices(answer); 1115 1116 // here we don't cache the child unit of work 1117 if (!child) { 1118 // add to cache 1119 errorHandlers.putIfAbsent(key, answer); 1120 } 1121 1122 } catch (Exception e) { 1123 throw ObjectHelper.wrapRuntimeCamelException(e); 1124 } 1125 } else { 1126 // and wrap in unit of work processor so the copy exchange also can run under UoW 1127 answer = createUnitOfWorkProcessor(routeContext, processor, exchange); 1128 } 1129 1130 return answer; 1131 } 1132 1133 /** 1134 * Strategy to create the unit of work to be used for the sub route 1135 * 1136 * @param routeContext the route context 1137 * @param processor the processor 1138 * @param exchange the exchange 1139 * @return the unit of work processor 1140 */ 1141 protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) { 1142 CamelInternalProcessor internal = new CamelInternalProcessor(processor); 1143 1144 // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW 1145 UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); 1146 if (parent != null) { 1147 internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeContext, parent)); 1148 } else { 1149 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); 1150 } 1151 1152 return internal; 1153 } 1154 1155 /** 1156 * Prepares the exchange for participating in a shared unit of work 1157 * <p/> 1158 * This ensures a child exchange can access its parent {@link UnitOfWork} when it participate 1159 * in a shared unit of work. 1160 * 1161 * @param childExchange the child exchange 1162 * @param parentExchange the parent exchange 1163 */ 1164 protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange parentExchange) { 1165 childExchange.setProperty(Exchange.PARENT_UNIT_OF_WORK, parentExchange.getUnitOfWork()); 1166 } 1167 1168 protected void doStart() throws Exception { 1169 if (isParallelProcessing() && executorService == null) { 1170 throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set"); 1171 } 1172 if (timeout > 0 && !isParallelProcessing()) { 1173 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled"); 1174 } 1175 if (isParallelProcessing() && aggregateExecutorService == null) { 1176 // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread 1177 // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run 1178 // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing 1179 String name = getClass().getSimpleName() + "-AggregateTask"; 1180 aggregateExecutorService = createAggregateExecutorService(name); 1181 } 1182 if (aggregationStrategy instanceof CamelContextAware) { 1183 ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext); 1184 } 1185 1186 ServiceHelper.startServices(aggregationStrategy, processors); 1187 } 1188 1189 /** 1190 * Strategy to create the thread pool for the aggregator background task which waits for and aggregates 1191 * completed tasks when running in parallel mode. 1192 * 1193 * @param name the suggested name for the background thread 1194 * @return the thread pool 1195 */ 1196 protected synchronized ExecutorService createAggregateExecutorService(String name) { 1197 // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in 1198 return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name); 1199 } 1200 1201 @Override 1202 protected void doStop() throws Exception { 1203 ServiceHelper.stopServices(processors, errorHandlers, aggregationStrategy); 1204 } 1205 1206 @Override 1207 protected void doShutdown() throws Exception { 1208 ServiceHelper.stopAndShutdownServices(processors, errorHandlers, aggregationStrategy); 1209 // only clear error handlers when shutting down 1210 errorHandlers.clear(); 1211 1212 if (shutdownExecutorService && executorService != null) { 1213 getCamelContext().getExecutorServiceManager().shutdownNow(executorService); 1214 } 1215 if (aggregateExecutorService != null) { 1216 getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService); 1217 } 1218 } 1219 1220 protected static void setToEndpoint(Exchange exchange, Processor processor) { 1221 if (processor instanceof Producer) { 1222 Producer producer = (Producer) processor; 1223 exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); 1224 } 1225 } 1226 1227 protected AggregationStrategy getAggregationStrategy(Exchange exchange) { 1228 AggregationStrategy answer = null; 1229 1230 // prefer to use per Exchange aggregation strategy over a global strategy 1231 if (exchange != null) { 1232 Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); 1233 Map<Object, AggregationStrategy> map = CastUtils.cast(property); 1234 if (map != null) { 1235 answer = map.get(this); 1236 } 1237 } 1238 if (answer == null) { 1239 // fallback to global strategy 1240 answer = getAggregationStrategy(); 1241 } 1242 return answer; 1243 } 1244 1245 /** 1246 * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}. 1247 * 1248 * @param exchange the exchange 1249 * @param aggregationStrategy the strategy 1250 */ 1251 protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) { 1252 Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); 1253 Map<Object, AggregationStrategy> map = CastUtils.cast(property); 1254 if (map == null) { 1255 map = new ConcurrentHashMap<Object, AggregationStrategy>(); 1256 } else { 1257 // it is not safe to use the map directly as the exchange doesn't have the deep copy of it's properties 1258 // we just create a new copy if we need to change the map 1259 map = new ConcurrentHashMap<Object, AggregationStrategy>(map); 1260 } 1261 // store the strategy using this processor as the key 1262 // (so we can store multiple strategies on the same exchange) 1263 map.put(this, aggregationStrategy); 1264 exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map); 1265 } 1266 1267 /** 1268 * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange} 1269 * which must be done after use. 1270 * 1271 * @param exchange the current exchange 1272 */ 1273 protected void removeAggregationStrategyFromExchange(Exchange exchange) { 1274 Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); 1275 Map<Object, AggregationStrategy> map = CastUtils.cast(property); 1276 if (map == null) { 1277 return; 1278 } 1279 // remove the strategy using this processor as the key 1280 map.remove(this); 1281 } 1282 1283 /** 1284 * Is the multicast processor working in streaming mode? 1285 * <p/> 1286 * In streaming mode: 1287 * <ul> 1288 * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li> 1289 * <li>for parallel processing, we start aggregating responses as they get send back to the processor; 1290 * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li> 1291 * </ul> 1292 */ 1293 public boolean isStreaming() { 1294 return streaming; 1295 } 1296 1297 /** 1298 * Should the multicast processor stop processing further exchanges in case of an exception occurred? 1299 */ 1300 public boolean isStopOnException() { 1301 return stopOnException; 1302 } 1303 1304 /** 1305 * Returns the producers to multicast to 1306 */ 1307 public Collection<Processor> getProcessors() { 1308 return processors; 1309 } 1310 1311 /** 1312 * An optional timeout in millis when using parallel processing 1313 */ 1314 public long getTimeout() { 1315 return timeout; 1316 } 1317 1318 /** 1319 * Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead. 1320 */ 1321 public AggregationStrategy getAggregationStrategy() { 1322 return aggregationStrategy; 1323 } 1324 1325 public boolean isParallelProcessing() { 1326 return parallelProcessing; 1327 } 1328 1329 public boolean isParallelAggregate() { 1330 return parallelAggregate; 1331 } 1332 1333 public boolean isStopOnAggregateException() { 1334 return stopOnAggregateException; 1335 } 1336 1337 public boolean isShareUnitOfWork() { 1338 return shareUnitOfWork; 1339 } 1340 1341 public List<Processor> next() { 1342 if (!hasNext()) { 1343 return null; 1344 } 1345 return new ArrayList<Processor>(processors); 1346 } 1347 1348 public boolean hasNext() { 1349 return processors != null && !processors.isEmpty(); 1350 } 1351}