001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.concurrent.Callable; 026 import java.util.concurrent.CompletionService; 027 import java.util.concurrent.ConcurrentHashMap; 028 import java.util.concurrent.ConcurrentMap; 029 import java.util.concurrent.CountDownLatch; 030 import java.util.concurrent.ExecutionException; 031 import java.util.concurrent.ExecutorCompletionService; 032 import java.util.concurrent.ExecutorService; 033 import java.util.concurrent.Future; 034 import java.util.concurrent.TimeUnit; 035 import java.util.concurrent.atomic.AtomicBoolean; 036 import java.util.concurrent.atomic.AtomicInteger; 037 038 import org.apache.camel.AsyncCallback; 039 import org.apache.camel.AsyncProcessor; 040 import org.apache.camel.CamelContext; 041 import org.apache.camel.CamelExchangeException; 042 import org.apache.camel.Endpoint; 043 import org.apache.camel.ErrorHandlerFactory; 044 import org.apache.camel.Exchange; 045 import org.apache.camel.Navigate; 046 import org.apache.camel.Processor; 047 import org.apache.camel.Producer; 048 import org.apache.camel.Traceable; 049 import org.apache.camel.processor.aggregate.AggregationStrategy; 050 import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy; 051 import org.apache.camel.spi.RouteContext; 052 import org.apache.camel.spi.TracedRouteNodes; 053 import org.apache.camel.spi.UnitOfWork; 054 import org.apache.camel.support.ServiceSupport; 055 import org.apache.camel.util.AsyncProcessorConverterHelper; 056 import org.apache.camel.util.AsyncProcessorHelper; 057 import org.apache.camel.util.CastUtils; 058 import org.apache.camel.util.EventHelper; 059 import org.apache.camel.util.ExchangeHelper; 060 import org.apache.camel.util.KeyValueHolder; 061 import org.apache.camel.util.ObjectHelper; 062 import org.apache.camel.util.ServiceHelper; 063 import org.apache.camel.util.StopWatch; 064 import org.apache.camel.util.concurrent.AtomicException; 065 import org.apache.camel.util.concurrent.AtomicExchange; 066 import org.apache.camel.util.concurrent.SubmitOrderedCompletionService; 067 import org.slf4j.Logger; 068 import org.slf4j.LoggerFactory; 069 070 import static org.apache.camel.util.ObjectHelper.notNull; 071 072 073 /** 074 * Implements the Multicast pattern to send a message exchange to a number of 075 * endpoints, each endpoint receiving a copy of the message exchange. 076 * 077 * @version 078 * @see Pipeline 079 */ 080 public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable { 081 082 private static final transient Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class); 083 084 /** 085 * Class that represent each step in the multicast route to do 086 */ 087 static final class DefaultProcessorExchangePair implements ProcessorExchangePair { 088 private final int index; 089 private final Processor processor; 090 private final Processor prepared; 091 private final Exchange exchange; 092 093 private DefaultProcessorExchangePair(int index, Processor processor, Processor prepared, Exchange exchange) { 094 this.index = index; 095 this.processor = processor; 096 this.prepared = prepared; 097 this.exchange = exchange; 098 } 099 100 public int getIndex() { 101 return index; 102 } 103 104 public Exchange getExchange() { 105 return exchange; 106 } 107 108 public Producer getProducer() { 109 if (processor instanceof Producer) { 110 return (Producer) processor; 111 } 112 return null; 113 } 114 115 public Processor getProcessor() { 116 return prepared; 117 } 118 119 public void begin() { 120 // noop 121 } 122 123 public void done() { 124 // noop 125 } 126 127 } 128 129 /** 130 * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges 131 * <p/> 132 * See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods. 133 */ 134 static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> { 135 136 public PreparedErrorHandler(RouteContext key, Processor value) { 137 super(key, value); 138 } 139 140 } 141 142 protected final Processor onPrepare; 143 private final CamelContext camelContext; 144 private Collection<Processor> processors; 145 private final AggregationStrategy aggregationStrategy; 146 private final boolean parallelProcessing; 147 private final boolean streaming; 148 private final boolean stopOnException; 149 private final ExecutorService executorService; 150 private final boolean shutdownExecutorService; 151 private ExecutorService aggregateExecutorService; 152 private final long timeout; 153 private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>(); 154 private final boolean shareUnitOfWork; 155 156 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) { 157 this(camelContext, processors, null); 158 } 159 160 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) { 161 this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false); 162 } 163 164 public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, 165 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, 166 boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { 167 notNull(camelContext, "camelContext"); 168 this.camelContext = camelContext; 169 this.processors = processors; 170 this.aggregationStrategy = aggregationStrategy; 171 this.executorService = executorService; 172 this.shutdownExecutorService = shutdownExecutorService; 173 this.streaming = streaming; 174 this.stopOnException = stopOnException; 175 // must enable parallel if executor service is provided 176 this.parallelProcessing = parallelProcessing || executorService != null; 177 this.timeout = timeout; 178 this.onPrepare = onPrepare; 179 this.shareUnitOfWork = shareUnitOfWork; 180 } 181 182 @Override 183 public String toString() { 184 return "Multicast[" + getProcessors() + "]"; 185 } 186 187 public String getTraceLabel() { 188 return "multicast"; 189 } 190 191 public CamelContext getCamelContext() { 192 return camelContext; 193 } 194 195 public void process(Exchange exchange) throws Exception { 196 AsyncProcessorHelper.process(this, exchange); 197 } 198 199 public boolean process(Exchange exchange, AsyncCallback callback) { 200 final AtomicExchange result = new AtomicExchange(); 201 final Iterable<ProcessorExchangePair> pairs; 202 203 try { 204 boolean sync = true; 205 206 pairs = createProcessorExchangePairs(exchange); 207 208 if (isParallelProcessing()) { 209 // ensure an executor is set when running in parallel 210 ObjectHelper.notNull(executorService, "executorService", this); 211 doProcessParallel(exchange, result, pairs, isStreaming(), callback); 212 } else { 213 sync = doProcessSequential(exchange, result, pairs, callback); 214 } 215 216 if (!sync) { 217 // the remainder of the multicast will be completed async 218 // so we break out now, then the callback will be invoked which then continue routing from where we left here 219 return false; 220 } 221 } catch (Throwable e) { 222 exchange.setException(e); 223 // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted 224 // and do the done work 225 doDone(exchange, null, callback, true, false); 226 return true; 227 } 228 229 // multicasting was processed successfully 230 // and do the done work 231 Exchange subExchange = result.get() != null ? result.get() : null; 232 doDone(exchange, subExchange, callback, true, true); 233 return true; 234 } 235 236 protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs, 237 final boolean streaming, final AsyncCallback callback) throws Exception { 238 239 ObjectHelper.notNull(executorService, "ExecutorService", this); 240 ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this); 241 242 final CompletionService<Exchange> completion; 243 if (streaming) { 244 // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence) 245 completion = new ExecutorCompletionService<Exchange>(executorService); 246 } else { 247 // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence) 248 completion = new SubmitOrderedCompletionService<Exchange>(executorService); 249 } 250 251 final AtomicInteger total = new AtomicInteger(0); 252 final Iterator<ProcessorExchangePair> it = pairs.iterator(); 253 254 if (it.hasNext()) { 255 // when parallel then aggregate on the fly 256 final AtomicBoolean running = new AtomicBoolean(true); 257 final AtomicBoolean allTasksSubmitted = new AtomicBoolean(); 258 final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1); 259 final AtomicException executionException = new AtomicException(); 260 261 // issue task to execute in separate thread so it can aggregate on-the-fly 262 // while we submit new tasks, and those tasks complete concurrently 263 // this allows us to optimize work and reduce memory consumption 264 final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running, 265 aggregationOnTheFlyDone, allTasksSubmitted, executionException); 266 final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean(); 267 268 LOG.trace("Starting to submit parallel tasks"); 269 270 while (it.hasNext()) { 271 final ProcessorExchangePair pair = it.next(); 272 final Exchange subExchange = pair.getExchange(); 273 updateNewExchange(subExchange, total.intValue(), pairs, it); 274 275 completion.submit(new Callable<Exchange>() { 276 public Exchange call() throws Exception { 277 // only start the aggregation task when the task is being executed to avoid staring 278 // the aggregation task to early and pile up too many threads 279 if (aggregationTaskSubmitted.compareAndSet(false, true)) { 280 // but only submit the task once 281 aggregateExecutorService.submit(aggregateOnTheFlyTask); 282 } 283 284 if (!running.get()) { 285 // do not start processing the task if we are not running 286 return subExchange; 287 } 288 289 try { 290 doProcessParallel(pair); 291 } catch (Throwable e) { 292 subExchange.setException(e); 293 } 294 295 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 296 Integer number = getExchangeIndex(subExchange); 297 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); 298 if (stopOnException && !continueProcessing) { 299 // signal to stop running 300 running.set(false); 301 // throw caused exception 302 if (subExchange.getException() != null) { 303 // wrap in exception to explain where it failed 304 CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException()); 305 subExchange.setException(cause); 306 } 307 } 308 309 LOG.trace("Parallel processing complete for exchange: {}", subExchange); 310 return subExchange; 311 } 312 }); 313 314 total.incrementAndGet(); 315 } 316 317 // signal all tasks has been submitted 318 LOG.trace("Signaling that all {} tasks has been submitted.", total.get()); 319 allTasksSubmitted.set(true); 320 321 // its to hard to do parallel async routing so we let the caller thread be synchronously 322 // and have it pickup the replies and do the aggregation (eg we use a latch to wait) 323 // wait for aggregation to be done 324 LOG.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", total.get(), original.getExchangeId()); 325 aggregationOnTheFlyDone.await(); 326 327 // did we fail for whatever reason, if so throw that caused exception 328 if (executionException.get() != null) { 329 if (LOG.isDebugEnabled()) { 330 LOG.debug("Parallel processing failed due {}", executionException.get().getMessage()); 331 } 332 throw executionException.get(); 333 } 334 } 335 336 // no everything is okay so we are done 337 LOG.debug("Done parallel processing {} exchanges", total); 338 } 339 340 /** 341 * Task to aggregate on-the-fly for completed tasks when using parallel processing. 342 * <p/> 343 * This ensures lower memory consumption as we do not need to keep all completed tasks in memory 344 * before we perform aggregation. Instead this separate thread will run and aggregate when new 345 * completed tasks is done. 346 * <p/> 347 * The logic is fairly complex as this implementation has to keep track how far it got, and also 348 * signal back to the <i>main</t> thread when its done, so the <i>main</t> thread can continue 349 * processing when the entire splitting is done. 350 */ 351 private final class AggregateOnTheFlyTask implements Runnable { 352 353 private final AtomicExchange result; 354 private final Exchange original; 355 private final AtomicInteger total; 356 private final CompletionService<Exchange> completion; 357 private final AtomicBoolean running; 358 private final CountDownLatch aggregationOnTheFlyDone; 359 private final AtomicBoolean allTasksSubmitted; 360 private final AtomicException executionException; 361 362 private AggregateOnTheFlyTask(AtomicExchange result, Exchange original, AtomicInteger total, 363 CompletionService<Exchange> completion, AtomicBoolean running, 364 CountDownLatch aggregationOnTheFlyDone, AtomicBoolean allTasksSubmitted, 365 AtomicException executionException) { 366 this.result = result; 367 this.original = original; 368 this.total = total; 369 this.completion = completion; 370 this.running = running; 371 this.aggregationOnTheFlyDone = aggregationOnTheFlyDone; 372 this.allTasksSubmitted = allTasksSubmitted; 373 this.executionException = executionException; 374 } 375 376 public void run() { 377 LOG.trace("Aggregate on the fly task started for exchangeId: {}", original.getExchangeId()); 378 379 try { 380 aggregateOnTheFly(); 381 } catch (Throwable e) { 382 if (e instanceof Exception) { 383 executionException.set((Exception) e); 384 } else { 385 executionException.set(ObjectHelper.wrapRuntimeCamelException(e)); 386 } 387 } finally { 388 // must signal we are done so the latch can open and let the other thread continue processing 389 LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId()); 390 LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId()); 391 aggregationOnTheFlyDone.countDown(); 392 } 393 } 394 395 private void aggregateOnTheFly() throws InterruptedException, ExecutionException { 396 boolean timedOut = false; 397 boolean stoppedOnException = false; 398 final StopWatch watch = new StopWatch(); 399 int aggregated = 0; 400 boolean done = false; 401 // not a for loop as on the fly may still run 402 while (!done) { 403 // check if we have already aggregate everything 404 if (allTasksSubmitted.get() && aggregated >= total.get()) { 405 LOG.debug("Done aggregating {} exchanges on the fly.", aggregated); 406 break; 407 } 408 409 Future<Exchange> future; 410 if (timedOut) { 411 // we are timed out but try to grab if some tasks has been completed 412 // poll will return null if no tasks is present 413 future = completion.poll(); 414 LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future); 415 } else if (timeout > 0) { 416 long left = timeout - watch.taken(); 417 if (left < 0) { 418 left = 0; 419 } 420 LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left); 421 future = completion.poll(left, TimeUnit.MILLISECONDS); 422 } else { 423 LOG.trace("Polling completion task #{}", aggregated); 424 // we must not block so poll every second 425 future = completion.poll(1, TimeUnit.SECONDS); 426 if (future == null) { 427 // and continue loop which will recheck if we are done 428 continue; 429 } 430 } 431 432 if (future == null && timedOut) { 433 // we are timed out and no more tasks complete so break out 434 break; 435 } else if (future == null) { 436 // timeout occurred 437 AggregationStrategy strategy = getAggregationStrategy(null); 438 if (strategy instanceof TimeoutAwareAggregationStrategy) { 439 // notify the strategy we timed out 440 Exchange oldExchange = result.get(); 441 if (oldExchange == null) { 442 // if they all timed out the result may not have been set yet, so use the original exchange 443 oldExchange = original; 444 } 445 ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated, total.intValue(), timeout); 446 } else { 447 // log a WARN we timed out since it will not be aggregated and the Exchange will be lost 448 LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated); 449 } 450 LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated); 451 timedOut = true; 452 453 // mark that index as timed out, which allows us to try to retrieve 454 // any already completed tasks in the next loop 455 if (completion instanceof SubmitOrderedCompletionService) { 456 ((SubmitOrderedCompletionService<?>) completion).timeoutTask(); 457 } 458 } else { 459 // there is a result to aggregate 460 Exchange subExchange = future.get(); 461 462 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 463 Integer number = getExchangeIndex(subExchange); 464 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); 465 if (stopOnException && !continueProcessing) { 466 // we want to stop on exception and an exception or failure occurred 467 // this is similar to what the pipeline does, so we should do the same to not surprise end users 468 // so we should set the failed exchange as the result and break out 469 result.set(subExchange); 470 stoppedOnException = true; 471 break; 472 } 473 474 // we got a result so aggregate it 475 AggregationStrategy strategy = getAggregationStrategy(subExchange); 476 doAggregate(strategy, result, subExchange); 477 } 478 479 aggregated++; 480 } 481 482 if (timedOut || stoppedOnException) { 483 if (timedOut) { 484 LOG.debug("Cancelling tasks due timeout after {} millis.", timeout); 485 } 486 if (stoppedOnException) { 487 LOG.debug("Cancelling tasks due stopOnException."); 488 } 489 // cancel tasks as we timed out (its safe to cancel done tasks) 490 running.set(false); 491 } 492 } 493 } 494 495 protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception { 496 AtomicInteger total = new AtomicInteger(); 497 Iterator<ProcessorExchangePair> it = pairs.iterator(); 498 499 while (it.hasNext()) { 500 ProcessorExchangePair pair = it.next(); 501 Exchange subExchange = pair.getExchange(); 502 updateNewExchange(subExchange, total.get(), pairs, it); 503 504 boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total); 505 if (!sync) { 506 if (LOG.isTraceEnabled()) { 507 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId()); 508 } 509 // the remainder of the multicast will be completed async 510 // so we break out now, then the callback will be invoked which then continue routing from where we left here 511 return false; 512 } 513 514 if (LOG.isTraceEnabled()) { 515 LOG.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId()); 516 } 517 518 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 519 // remember to test for stop on exception and aggregate before copying back results 520 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); 521 if (stopOnException && !continueProcessing) { 522 if (subExchange.getException() != null) { 523 // wrap in exception to explain where it failed 524 CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException()); 525 subExchange.setException(cause); 526 } 527 // we want to stop on exception, and the exception was handled by the error handler 528 // this is similar to what the pipeline does, so we should do the same to not surprise end users 529 // so we should set the failed exchange as the result and be done 530 result.set(subExchange); 531 return true; 532 } 533 534 LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange); 535 536 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 537 total.incrementAndGet(); 538 } 539 540 LOG.debug("Done sequential processing {} exchanges", total); 541 542 return true; 543 } 544 545 private boolean doProcessSequential(final Exchange original, final AtomicExchange result, 546 final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it, 547 final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) { 548 boolean sync = true; 549 550 final Exchange exchange = pair.getExchange(); 551 Processor processor = pair.getProcessor(); 552 final Producer producer = pair.getProducer(); 553 554 TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; 555 556 // compute time taken if sending to another endpoint 557 final StopWatch watch = producer != null ? new StopWatch() : null; 558 559 try { 560 // prepare tracing starting from a new block 561 if (traced != null) { 562 traced.pushBlock(); 563 } 564 565 if (producer != null) { 566 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); 567 } 568 // let the prepared process it, remember to begin the exchange pair 569 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); 570 pair.begin(); 571 sync = AsyncProcessorHelper.process(async, exchange, new AsyncCallback() { 572 public void done(boolean doneSync) { 573 // we are done with the exchange pair 574 pair.done(); 575 576 // okay we are done, so notify the exchange was sent 577 if (producer != null) { 578 long timeTaken = watch.stop(); 579 Endpoint endpoint = producer.getEndpoint(); 580 // emit event that the exchange was sent to the endpoint 581 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 582 } 583 584 // we only have to handle async completion of the routing slip 585 if (doneSync) { 586 return; 587 } 588 589 // continue processing the multicast asynchronously 590 Exchange subExchange = exchange; 591 592 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 593 // remember to test for stop on exception and aggregate before copying back results 594 boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); 595 if (stopOnException && !continueProcessing) { 596 if (subExchange.getException() != null) { 597 // wrap in exception to explain where it failed 598 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException())); 599 } else { 600 // we want to stop on exception, and the exception was handled by the error handler 601 // this is similar to what the pipeline does, so we should do the same to not surprise end users 602 // so we should set the failed exchange as the result and be done 603 result.set(subExchange); 604 } 605 // and do the done work 606 doDone(original, subExchange, callback, false, true); 607 return; 608 } 609 610 try { 611 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 612 } catch (Throwable e) { 613 // wrap in exception to explain where it failed 614 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); 615 // and do the done work 616 doDone(original, subExchange, callback, false, true); 617 return; 618 } 619 620 total.incrementAndGet(); 621 622 // maybe there are more processors to multicast 623 while (it.hasNext()) { 624 625 // prepare and run the next 626 ProcessorExchangePair pair = it.next(); 627 subExchange = pair.getExchange(); 628 updateNewExchange(subExchange, total.get(), pairs, it); 629 boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total); 630 631 if (!sync) { 632 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId()); 633 return; 634 } 635 636 // Decide whether to continue with the multicast or not; similar logic to the Pipeline 637 // remember to test for stop on exception and aggregate before copying back results 638 continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); 639 if (stopOnException && !continueProcessing) { 640 if (subExchange.getException() != null) { 641 // wrap in exception to explain where it failed 642 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException())); 643 } else { 644 // we want to stop on exception, and the exception was handled by the error handler 645 // this is similar to what the pipeline does, so we should do the same to not surprise end users 646 // so we should set the failed exchange as the result and be done 647 result.set(subExchange); 648 } 649 // and do the done work 650 doDone(original, subExchange, callback, false, true); 651 return; 652 } 653 654 // must catch any exceptions from aggregation 655 try { 656 doAggregate(getAggregationStrategy(subExchange), result, subExchange); 657 } catch (Throwable e) { 658 // wrap in exception to explain where it failed 659 subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); 660 // and do the done work 661 doDone(original, subExchange, callback, false, true); 662 return; 663 } 664 665 total.incrementAndGet(); 666 } 667 668 // do the done work 669 subExchange = result.get() != null ? result.get() : null; 670 doDone(original, subExchange, callback, false, true); 671 } 672 }); 673 } finally { 674 // pop the block so by next round we have the same staring point and thus the tracing looks accurate 675 if (traced != null) { 676 traced.popBlock(); 677 } 678 } 679 680 return sync; 681 } 682 683 private void doProcessParallel(final ProcessorExchangePair pair) throws Exception { 684 final Exchange exchange = pair.getExchange(); 685 Processor processor = pair.getProcessor(); 686 Producer producer = pair.getProducer(); 687 688 TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; 689 690 // compute time taken if sending to another endpoint 691 StopWatch watch = null; 692 if (producer != null) { 693 watch = new StopWatch(); 694 } 695 696 try { 697 // prepare tracing starting from a new block 698 if (traced != null) { 699 traced.pushBlock(); 700 } 701 702 if (producer != null) { 703 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); 704 } 705 // let the prepared process it, remember to begin the exchange pair 706 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); 707 pair.begin(); 708 // we invoke it synchronously as parallel async routing is too hard 709 AsyncProcessorHelper.process(async, exchange); 710 } finally { 711 pair.done(); 712 // pop the block so by next round we have the same staring point and thus the tracing looks accurate 713 if (traced != null) { 714 traced.popBlock(); 715 } 716 if (producer != null) { 717 long timeTaken = watch.stop(); 718 Endpoint endpoint = producer.getEndpoint(); 719 // emit event that the exchange was sent to the endpoint 720 // this is okay to do here in the finally block, as the processing is not using the async routing engine 721 //( we invoke it synchronously as parallel async routing is too hard) 722 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); 723 } 724 } 725 } 726 727 /** 728 * Common work which must be done when we are done multicasting. 729 * <p/> 730 * This logic applies for both running synchronous and asynchronous as there are multiple exist points 731 * when using the asynchronous routing engine. And therefore we want the logic in one method instead 732 * of being scattered. 733 * 734 * @param original the original exchange 735 * @param subExchange the current sub exchange, can be <tt>null</tt> for the synchronous part 736 * @param callback the callback 737 * @param doneSync the <tt>doneSync</tt> parameter to call on callback 738 * @param exhaust whether or not error handling is exhausted 739 */ 740 protected void doDone(Exchange original, Exchange subExchange, AsyncCallback callback, boolean doneSync, boolean exhaust) { 741 // cleanup any per exchange aggregation strategy 742 removeAggregationStrategyFromExchange(original); 743 if (original.getException() != null || subExchange != null && subExchange.getException() != null) { 744 // multicast uses error handling on its output processors and they have tried to redeliver 745 // so we shall signal back to the other error handlers that we are exhausted and they should not 746 // also try to redeliver as we will then do that twice 747 original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); 748 } 749 if (subExchange != null) { 750 // and copy the current result to original so it will contain this result of this eip 751 ExchangeHelper.copyResults(original, subExchange); 752 } 753 callback.done(doneSync); 754 } 755 756 /** 757 * Aggregate the {@link Exchange} with the current result 758 * 759 * @param strategy the aggregation strategy to use 760 * @param result the current result 761 * @param exchange the exchange to be added to the result 762 */ 763 protected synchronized void doAggregate(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { 764 if (strategy != null) { 765 // prepare the exchanges for aggregation 766 Exchange oldExchange = result.get(); 767 ExchangeHelper.prepareAggregation(oldExchange, exchange); 768 result.set(strategy.aggregate(oldExchange, exchange)); 769 } 770 } 771 772 protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, 773 Iterator<ProcessorExchangePair> it) { 774 exchange.setProperty(Exchange.MULTICAST_INDEX, index); 775 if (it.hasNext()) { 776 exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE); 777 } else { 778 exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE); 779 } 780 } 781 782 protected Integer getExchangeIndex(Exchange exchange) { 783 return exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class); 784 } 785 786 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { 787 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size()); 788 789 int index = 0; 790 for (Processor processor : processors) { 791 // copy exchange, and do not share the unit of work 792 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 793 794 // if we share unit of work, we need to prepare the child exchange 795 if (isShareUnitOfWork()) { 796 prepareSharedUnitOfWork(copy, exchange); 797 } 798 799 // and add the pair 800 RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; 801 result.add(createProcessorExchangePair(index++, processor, copy, routeContext)); 802 } 803 804 if (exchange.getException() != null) { 805 // force any exceptions occurred during creation of exchange paris to be thrown 806 // before returning the answer; 807 throw exchange.getException(); 808 } 809 810 return result; 811 } 812 813 /** 814 * Creates the {@link ProcessorExchangePair} which holds the processor and exchange to be send out. 815 * <p/> 816 * You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they 817 * need to be specially prepared before use. 818 * 819 * @param index the index 820 * @param processor the processor 821 * @param exchange the exchange 822 * @param routeContext the route context 823 * @return prepared for use 824 */ 825 protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange, 826 RouteContext routeContext) { 827 Processor prepared = processor; 828 829 // set property which endpoint we send to 830 setToEndpoint(exchange, prepared); 831 832 // rework error handling to support fine grained error handling 833 prepared = createErrorHandler(routeContext, exchange, prepared); 834 835 // invoke on prepare on the exchange if specified 836 if (onPrepare != null) { 837 try { 838 onPrepare.process(exchange); 839 } catch (Exception e) { 840 exchange.setException(e); 841 } 842 } 843 return new DefaultProcessorExchangePair(index, processor, prepared, exchange); 844 } 845 846 protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) { 847 Processor answer; 848 849 if (routeContext != null) { 850 // wrap the producer in error handler so we have fine grained error handling on 851 // the output side instead of the input side 852 // this is needed to support redelivery on that output alone and not doing redelivery 853 // for the entire multicast block again which will start from scratch again 854 855 // create key for cache 856 final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor); 857 858 // lookup cached first to reuse and preserve memory 859 answer = errorHandlers.get(key); 860 if (answer != null) { 861 LOG.trace("Using existing error handler for: {}", processor); 862 return answer; 863 } 864 865 LOG.trace("Creating error handler for: {}", processor); 866 ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder(); 867 // create error handler (create error handler directly to keep it light weight, 868 // instead of using ProcessorDefinition.wrapInErrorHandler) 869 try { 870 processor = builder.createErrorHandler(routeContext, processor); 871 872 // and wrap in unit of work processor so the copy exchange also can run under UoW 873 answer = createUnitOfWorkProcessor(routeContext, processor, exchange); 874 875 // must start the error handler 876 ServiceHelper.startServices(answer); 877 } catch (Exception e) { 878 throw ObjectHelper.wrapRuntimeCamelException(e); 879 } 880 // here we don't cache the ChildUnitOfWorkProcessor 881 // As the UnitOfWorkProcess will be delegate to the Parent 882 if (!(answer instanceof ChildUnitOfWorkProcessor)) { 883 // add to cache 884 errorHandlers.putIfAbsent(key, answer); 885 } 886 } else { 887 // and wrap in unit of work processor so the copy exchange also can run under UoW 888 answer = createUnitOfWorkProcessor(routeContext, processor, exchange); 889 } 890 891 return answer; 892 } 893 894 /** 895 * Strategy to create the {@link UnitOfWorkProcessor} to be used for the sub route 896 * 897 * @param routeContext the route context 898 * @param processor the processor wrapped in this unit of work processor 899 * @param exchange the exchange 900 * @return the unit of work processor 901 */ 902 protected UnitOfWorkProcessor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) { 903 UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); 904 if (parent != null) { 905 return new ChildUnitOfWorkProcessor(parent, routeContext, processor); 906 } else { 907 return new UnitOfWorkProcessor(routeContext, processor); 908 } 909 } 910 911 /** 912 * Prepares the exchange for participating in a shared unit of work 913 * <p/> 914 * This ensures a child exchange can access its parent {@link UnitOfWork} when it participate 915 * in a shared unit of work. 916 * 917 * @param childExchange the child exchange 918 * @param parentExchange the parent exchange 919 */ 920 protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange parentExchange) { 921 childExchange.setProperty(Exchange.PARENT_UNIT_OF_WORK, parentExchange.getUnitOfWork()); 922 } 923 924 protected void doStart() throws Exception { 925 if (isParallelProcessing() && executorService == null) { 926 throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set"); 927 } 928 if (timeout > 0 && !isParallelProcessing()) { 929 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled"); 930 } 931 if (isParallelProcessing() && aggregateExecutorService == null) { 932 // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread 933 // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run 934 // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing 935 String name = getClass().getSimpleName() + "-AggregateTask"; 936 aggregateExecutorService = createAggregateExecutorService(name); 937 } 938 ServiceHelper.startServices(processors); 939 } 940 941 /** 942 * Strategy to create the thread pool for the aggregator background task which waits for and aggregates 943 * completed tasks when running in parallel mode. 944 * 945 * @param name the suggested name for the background thread 946 * @return the thread pool 947 */ 948 protected synchronized ExecutorService createAggregateExecutorService(String name) { 949 // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in 950 return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name); 951 } 952 953 @Override 954 protected void doStop() throws Exception { 955 ServiceHelper.stopServices(processors, errorHandlers); 956 } 957 958 @Override 959 protected void doShutdown() throws Exception { 960 ServiceHelper.stopAndShutdownServices(processors, errorHandlers); 961 // only clear error handlers when shutting down 962 errorHandlers.clear(); 963 964 if (shutdownExecutorService && executorService != null) { 965 getCamelContext().getExecutorServiceManager().shutdownNow(executorService); 966 } 967 if (aggregateExecutorService != null) { 968 getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService); 969 } 970 } 971 972 protected static void setToEndpoint(Exchange exchange, Processor processor) { 973 if (processor instanceof Producer) { 974 Producer producer = (Producer) processor; 975 exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); 976 } 977 } 978 979 protected AggregationStrategy getAggregationStrategy(Exchange exchange) { 980 AggregationStrategy answer = null; 981 982 // prefer to use per Exchange aggregation strategy over a global strategy 983 if (exchange != null) { 984 Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); 985 Map<Object, AggregationStrategy> map = CastUtils.cast(property); 986 if (map != null) { 987 answer = map.get(this); 988 } 989 } 990 if (answer == null) { 991 // fallback to global strategy 992 answer = getAggregationStrategy(); 993 } 994 return answer; 995 } 996 997 /** 998 * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}. 999 * 1000 * @param exchange the exchange 1001 * @param aggregationStrategy the strategy 1002 */ 1003 protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) { 1004 Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); 1005 Map<Object, AggregationStrategy> map = CastUtils.cast(property); 1006 if (map == null) { 1007 map = new HashMap<Object, AggregationStrategy>(); 1008 } else { 1009 // it is not safe to use the map directly as the exchange doesn't have the deep copy of it's properties 1010 // we just create a new copy if we need to change the map 1011 map = new HashMap<Object, AggregationStrategy>(map); 1012 } 1013 // store the strategy using this processor as the key 1014 // (so we can store multiple strategies on the same exchange) 1015 map.put(this, aggregationStrategy); 1016 exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map); 1017 } 1018 1019 /** 1020 * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange} 1021 * which must be done after use. 1022 * 1023 * @param exchange the current exchange 1024 */ 1025 protected void removeAggregationStrategyFromExchange(Exchange exchange) { 1026 Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); 1027 Map<Object, AggregationStrategy> map = CastUtils.cast(property); 1028 if (map == null) { 1029 return; 1030 } 1031 // remove the strategy using this processor as the key 1032 map.remove(this); 1033 } 1034 1035 /** 1036 * Is the multicast processor working in streaming mode? 1037 * <p/> 1038 * In streaming mode: 1039 * <ul> 1040 * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li> 1041 * <li>for parallel processing, we start aggregating responses as they get send back to the processor; 1042 * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li> 1043 * </ul> 1044 */ 1045 public boolean isStreaming() { 1046 return streaming; 1047 } 1048 1049 /** 1050 * Should the multicast processor stop processing further exchanges in case of an exception occurred? 1051 */ 1052 public boolean isStopOnException() { 1053 return stopOnException; 1054 } 1055 1056 /** 1057 * Returns the producers to multicast to 1058 */ 1059 public Collection<Processor> getProcessors() { 1060 return processors; 1061 } 1062 1063 /** 1064 * An optional timeout in millis when using parallel processing 1065 */ 1066 public long getTimeout() { 1067 return timeout; 1068 } 1069 1070 /** 1071 * Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead. 1072 */ 1073 public AggregationStrategy getAggregationStrategy() { 1074 return aggregationStrategy; 1075 } 1076 1077 public boolean isParallelProcessing() { 1078 return parallelProcessing; 1079 } 1080 1081 public boolean isShareUnitOfWork() { 1082 return shareUnitOfWork; 1083 } 1084 1085 public List<Processor> next() { 1086 if (!hasNext()) { 1087 return null; 1088 } 1089 return new ArrayList<Processor>(processors); 1090 } 1091 1092 public boolean hasNext() { 1093 return processors != null && !processors.isEmpty(); 1094 } 1095 }