001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.aggregate; 018 019 import java.util.ArrayList; 020 import java.util.HashMap; 021 import java.util.HashSet; 022 import java.util.LinkedHashSet; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.Set; 026 import java.util.concurrent.ConcurrentHashMap; 027 import java.util.concurrent.ExecutorService; 028 import java.util.concurrent.ScheduledExecutorService; 029 import java.util.concurrent.TimeUnit; 030 import java.util.concurrent.atomic.AtomicInteger; 031 import java.util.concurrent.locks.Lock; 032 import java.util.concurrent.locks.ReentrantLock; 033 034 import org.apache.camel.CamelContext; 035 import org.apache.camel.CamelExchangeException; 036 import org.apache.camel.Endpoint; 037 import org.apache.camel.Exchange; 038 import org.apache.camel.Expression; 039 import org.apache.camel.Navigate; 040 import org.apache.camel.NoSuchEndpointException; 041 import org.apache.camel.Predicate; 042 import org.apache.camel.Processor; 043 import org.apache.camel.ProducerTemplate; 044 import org.apache.camel.TimeoutMap; 045 import org.apache.camel.Traceable; 046 import org.apache.camel.impl.LoggingExceptionHandler; 047 import org.apache.camel.spi.AggregationRepository; 048 import org.apache.camel.spi.ExceptionHandler; 049 import org.apache.camel.spi.RecoverableAggregationRepository; 050 import org.apache.camel.spi.ShutdownPrepared; 051 import org.apache.camel.spi.Synchronization; 052 import org.apache.camel.support.DefaultTimeoutMap; 053 import org.apache.camel.support.ServiceSupport; 054 import org.apache.camel.util.ExchangeHelper; 055 import org.apache.camel.util.LRUCache; 056 import org.apache.camel.util.ObjectHelper; 057 import org.apache.camel.util.ServiceHelper; 058 import org.apache.camel.util.StopWatch; 059 import org.apache.camel.util.TimeUtils; 060 import org.slf4j.Logger; 061 import org.slf4j.LoggerFactory; 062 063 /** 064 * An implementation of the <a 065 * href="http://camel.apache.org/aggregator2.html">Aggregator</a> 066 * pattern where a batch of messages are processed (up to a maximum amount or 067 * until some timeout is reached) and messages for the same correlation key are 068 * combined together using some kind of {@link AggregationStrategy} 069 * (by default the latest message is used) to compress many message exchanges 070 * into a smaller number of exchanges. 071 * <p/> 072 * A good example of this is stock market data; you may be receiving 30,000 073 * messages/second and you may want to throttle it right down so that multiple 074 * messages for the same stock are combined (or just the latest message is used 075 * and older prices are discarded). Another idea is to combine line item messages 076 * together into a single invoice message. 077 */ 078 public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable, ShutdownPrepared { 079 080 public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker"; 081 082 private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class); 083 084 private final Lock lock = new ReentrantLock(); 085 private final CamelContext camelContext; 086 private final Processor processor; 087 private final AggregationStrategy aggregationStrategy; 088 private final Expression correlationExpression; 089 private final ExecutorService executorService; 090 private final boolean shutdownExecutorService; 091 private ScheduledExecutorService timeoutCheckerExecutorService; 092 private boolean shutdownTimeoutCheckerExecutorService; 093 private ScheduledExecutorService recoverService; 094 // store correlation key -> exchange id in timeout map 095 private TimeoutMap<String, String> timeoutMap; 096 private ExceptionHandler exceptionHandler = new LoggingExceptionHandler(getClass()); 097 private AggregationRepository aggregationRepository = new MemoryAggregationRepository(); 098 private Map<Object, Object> closedCorrelationKeys; 099 private Set<String> batchConsumerCorrelationKeys = new LinkedHashSet<String>(); 100 private final Set<String> inProgressCompleteExchanges = new HashSet<String>(); 101 private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>(); 102 103 // keep booking about redelivery 104 private class RedeliveryData { 105 int redeliveryCounter; 106 } 107 108 // options 109 private boolean ignoreInvalidCorrelationKeys; 110 private Integer closeCorrelationKeyOnCompletion; 111 private boolean parallelProcessing; 112 113 // different ways to have completion triggered 114 private boolean eagerCheckCompletion; 115 private Predicate completionPredicate; 116 private long completionTimeout; 117 private Expression completionTimeoutExpression; 118 private long completionInterval; 119 private int completionSize; 120 private Expression completionSizeExpression; 121 private boolean completionFromBatchConsumer; 122 private AtomicInteger batchConsumerCounter = new AtomicInteger(); 123 private boolean discardOnCompletionTimeout; 124 private boolean forceCompletionOnStop; 125 126 private ProducerTemplate deadLetterProducerTemplate; 127 128 public AggregateProcessor(CamelContext camelContext, Processor processor, 129 Expression correlationExpression, AggregationStrategy aggregationStrategy, 130 ExecutorService executorService, boolean shutdownExecutorService) { 131 ObjectHelper.notNull(camelContext, "camelContext"); 132 ObjectHelper.notNull(processor, "processor"); 133 ObjectHelper.notNull(correlationExpression, "correlationExpression"); 134 ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy"); 135 ObjectHelper.notNull(executorService, "executorService"); 136 this.camelContext = camelContext; 137 this.processor = processor; 138 this.correlationExpression = correlationExpression; 139 this.aggregationStrategy = aggregationStrategy; 140 this.executorService = executorService; 141 this.shutdownExecutorService = shutdownExecutorService; 142 } 143 144 @Override 145 public String toString() { 146 return "AggregateProcessor[to: " + processor + "]"; 147 } 148 149 public String getTraceLabel() { 150 return "aggregate[" + correlationExpression + "]"; 151 } 152 153 public List<Processor> next() { 154 if (!hasNext()) { 155 return null; 156 } 157 List<Processor> answer = new ArrayList<Processor>(1); 158 answer.add(processor); 159 return answer; 160 } 161 162 public boolean hasNext() { 163 return processor != null; 164 } 165 166 public void process(Exchange exchange) throws Exception { 167 168 //check for the special header to force completion of all groups (and ignore the exchange otherwise) 169 boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); 170 if (completeAllGroups) { 171 forceCompletionOfAllGroups(); 172 return; 173 } 174 175 // compute correlation expression 176 String key = correlationExpression.evaluate(exchange, String.class); 177 if (ObjectHelper.isEmpty(key)) { 178 // we have a bad correlation key 179 if (isIgnoreInvalidCorrelationKeys()) { 180 LOG.debug("Invalid correlation key. This Exchange will be ignored: {}", exchange); 181 return; 182 } else { 183 throw new CamelExchangeException("Invalid correlation key", exchange); 184 } 185 } 186 187 // is the correlation key closed? 188 if (closedCorrelationKeys != null && closedCorrelationKeys.containsKey(key)) { 189 throw new ClosedCorrelationKeyException(key, exchange); 190 } 191 192 // copy exchange, and do not share the unit of work 193 // the aggregated output runs in another unit of work 194 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 195 196 // when memory based then its fast using synchronized, but if the aggregation repository is IO 197 // bound such as JPA etc then concurrent aggregation per correlation key could 198 // improve performance as we can run aggregation repository get/add in parallel 199 lock.lock(); 200 try { 201 doAggregation(key, copy); 202 } finally { 203 lock.unlock(); 204 } 205 } 206 207 /** 208 * Aggregates the exchange with the given correlation key 209 * <p/> 210 * This method <b>must</b> be run synchronized as we cannot aggregate the same correlation key 211 * in parallel. 212 * 213 * @param key the correlation key 214 * @param exchange the exchange 215 * @return the aggregated exchange 216 * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating 217 */ 218 private Exchange doAggregation(String key, Exchange exchange) throws CamelExchangeException { 219 LOG.trace("onAggregation +++ start +++ with correlation key: {}", key); 220 221 Exchange answer; 222 Exchange oldExchange = aggregationRepository.get(exchange.getContext(), key); 223 Exchange newExchange = exchange; 224 225 Integer size = 1; 226 if (oldExchange != null) { 227 size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class); 228 size++; 229 } 230 231 // check if we are complete 232 String complete = null; 233 if (isEagerCheckCompletion()) { 234 // put the current aggregated size on the exchange so its avail during completion check 235 newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); 236 complete = isCompleted(key, newExchange); 237 // remove it afterwards 238 newExchange.removeProperty(Exchange.AGGREGATED_SIZE); 239 } 240 241 // prepare the exchanges for aggregation and aggregate it 242 ExchangeHelper.prepareAggregation(oldExchange, newExchange); 243 // must catch any exception from aggregation 244 try { 245 answer = onAggregation(oldExchange, exchange); 246 } catch (Throwable e) { 247 throw new CamelExchangeException("Error occurred during aggregation", exchange, e); 248 } 249 if (answer == null) { 250 throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", exchange); 251 } 252 253 // update the aggregated size 254 answer.setProperty(Exchange.AGGREGATED_SIZE, size); 255 256 // maybe we should check completion after the aggregation 257 if (!isEagerCheckCompletion()) { 258 complete = isCompleted(key, answer); 259 } 260 261 // only need to update aggregation repository if we are not complete 262 if (complete == null) { 263 LOG.trace("In progress aggregated exchange: {} with correlation key: {}", answer, key); 264 aggregationRepository.add(exchange.getContext(), key, answer); 265 } else { 266 // if batch consumer completion is enabled then we need to complete the group 267 if ("consumer".equals(complete)) { 268 for (String batchKey : batchConsumerCorrelationKeys) { 269 Exchange batchAnswer; 270 if (batchKey.equals(key)) { 271 // skip the current aggregated key as we have already aggregated it and have the answer 272 batchAnswer = answer; 273 } else { 274 batchAnswer = aggregationRepository.get(camelContext, batchKey); 275 } 276 277 if (batchAnswer != null) { 278 batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); 279 onCompletion(batchKey, batchAnswer, false); 280 } 281 } 282 batchConsumerCorrelationKeys.clear(); 283 } else { 284 // we are complete for this exchange 285 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); 286 onCompletion(key, answer, false); 287 } 288 } 289 290 LOG.trace("onAggregation +++ end +++ with correlation key: {}", key); 291 292 return answer; 293 } 294 295 /** 296 * Tests whether the given exchange is complete or not 297 * 298 * @param key the correlation key 299 * @param exchange the incoming exchange 300 * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion 301 */ 302 protected String isCompleted(String key, Exchange exchange) { 303 // batch consumer completion must always run first 304 if (isCompletionFromBatchConsumer()) { 305 batchConsumerCorrelationKeys.add(key); 306 batchConsumerCounter.incrementAndGet(); 307 int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class); 308 if (size > 0 && batchConsumerCounter.intValue() >= size) { 309 // batch consumer is complete then reset the counter 310 batchConsumerCounter.set(0); 311 return "consumer"; 312 } 313 } 314 315 if (getCompletionPredicate() != null) { 316 boolean answer = getCompletionPredicate().matches(exchange); 317 if (answer) { 318 return "predicate"; 319 } 320 } 321 322 boolean sizeChecked = false; 323 if (getCompletionSizeExpression() != null) { 324 Integer value = getCompletionSizeExpression().evaluate(exchange, Integer.class); 325 if (value != null && value > 0) { 326 // mark as already checked size as expression takes precedence over static configured 327 sizeChecked = true; 328 int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); 329 if (size >= value) { 330 return "size"; 331 } 332 } 333 } 334 if (!sizeChecked && getCompletionSize() > 0) { 335 int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); 336 if (size >= getCompletionSize()) { 337 return "size"; 338 } 339 } 340 341 // timeout can be either evaluated based on an expression or from a fixed value 342 // expression takes precedence 343 boolean timeoutSet = false; 344 if (getCompletionTimeoutExpression() != null) { 345 Long value = getCompletionTimeoutExpression().evaluate(exchange, Long.class); 346 if (value != null && value > 0) { 347 if (LOG.isTraceEnabled()) { 348 LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}", 349 new Object[]{key, value, exchange}); 350 } 351 addExchangeToTimeoutMap(key, exchange, value); 352 timeoutSet = true; 353 } 354 } 355 if (!timeoutSet && getCompletionTimeout() > 0) { 356 // timeout is used so use the timeout map to keep an eye on this 357 if (LOG.isTraceEnabled()) { 358 LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}", 359 new Object[]{key, getCompletionTimeout(), exchange}); 360 } 361 addExchangeToTimeoutMap(key, exchange, getCompletionTimeout()); 362 } 363 364 // not complete 365 return null; 366 } 367 368 protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) { 369 return aggregationStrategy.aggregate(oldExchange, newExchange); 370 } 371 372 protected void onCompletion(final String key, final Exchange exchange, boolean fromTimeout) { 373 // store the correlation key as property 374 exchange.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key); 375 // remove from repository as its completed 376 aggregationRepository.remove(exchange.getContext(), key, exchange); 377 if (!fromTimeout && timeoutMap != null) { 378 // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker) 379 timeoutMap.remove(key); 380 } 381 382 // this key has been closed so add it to the closed map 383 if (closedCorrelationKeys != null) { 384 closedCorrelationKeys.put(key, key); 385 } 386 387 if (fromTimeout) { 388 // invoke timeout if its timeout aware aggregation strategy, 389 // to allow any custom processing before discarding the exchange 390 if (aggregationStrategy instanceof TimeoutAwareAggregationStrategy) { 391 long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1; 392 ((TimeoutAwareAggregationStrategy) aggregationStrategy).timeout(exchange, -1, -1, timeout); 393 } 394 } 395 396 if (fromTimeout && isDiscardOnCompletionTimeout()) { 397 // discard due timeout 398 LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: ()", key, exchange); 399 // must confirm the discarded exchange 400 aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId()); 401 // and remove redelivery state as well 402 redeliveryState.remove(exchange.getExchangeId()); 403 } else { 404 // the aggregated exchange should be published (sent out) 405 onSubmitCompletion(key, exchange); 406 } 407 } 408 409 private void onSubmitCompletion(final Object key, final Exchange exchange) { 410 LOG.debug("Aggregation complete for correlation key {} sending aggregated exchange: {}", key, exchange); 411 412 // add this as in progress before we submit the task 413 inProgressCompleteExchanges.add(exchange.getExchangeId()); 414 415 // invoke the on completion callback 416 if (aggregationStrategy instanceof CompletionAwareAggregationStrategy) { 417 ((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange); 418 } 419 420 // send this exchange 421 executorService.submit(new Runnable() { 422 public void run() { 423 LOG.debug("Processing aggregated exchange: {}", exchange); 424 425 // add on completion task so we remember to update the inProgressCompleteExchanges 426 exchange.addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId())); 427 428 try { 429 processor.process(exchange); 430 } catch (Throwable e) { 431 exchange.setException(e); 432 } 433 434 // log exception if there was a problem 435 if (exchange.getException() != null) { 436 // if there was an exception then let the exception handler handle it 437 getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); 438 } else { 439 LOG.trace("Processing aggregated exchange: {} complete.", exchange); 440 } 441 } 442 }); 443 } 444 445 /** 446 * Restores the timeout map with timeout values from the aggregation repository. 447 * <p/> 448 * This is needed in case the aggregator has been stopped and started again (for example a server restart). 449 * Then the existing exchanges from the {@link AggregationRepository} must have its timeout conditions restored. 450 */ 451 protected void restoreTimeoutMapFromAggregationRepository() throws Exception { 452 // grab the timeout value for each partly aggregated exchange 453 Set<String> keys = aggregationRepository.getKeys(); 454 if (keys == null || keys.isEmpty()) { 455 return; 456 } 457 458 StopWatch watch = new StopWatch(); 459 LOG.trace("Starting restoring CompletionTimeout for {} existing exchanges from the aggregation repository...", keys.size()); 460 461 for (String key : keys) { 462 Exchange exchange = aggregationRepository.get(camelContext, key); 463 // grab the timeout value 464 long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0; 465 if (timeout > 0) { 466 LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout); 467 addExchangeToTimeoutMap(key, exchange, timeout); 468 } 469 } 470 471 // log duration of this task so end user can see how long it takes to pre-check this upon starting 472 LOG.info("Restored {} CompletionTimeout conditions in the AggregationTimeoutChecker in {}", 473 timeoutMap.size(), TimeUtils.printDuration(watch.stop())); 474 } 475 476 /** 477 * Adds the given exchange to the timeout map, which is used by the timeout checker task to trigger timeouts. 478 * 479 * @param key the correlation key 480 * @param exchange the exchange 481 * @param timeout the timeout value in millis 482 */ 483 private void addExchangeToTimeoutMap(String key, Exchange exchange, long timeout) { 484 // store the timeout value on the exchange as well, in case we need it later 485 exchange.setProperty(Exchange.AGGREGATED_TIMEOUT, timeout); 486 timeoutMap.put(key, exchange.getExchangeId(), timeout); 487 } 488 489 public Predicate getCompletionPredicate() { 490 return completionPredicate; 491 } 492 493 public void setCompletionPredicate(Predicate completionPredicate) { 494 this.completionPredicate = completionPredicate; 495 } 496 497 public boolean isEagerCheckCompletion() { 498 return eagerCheckCompletion; 499 } 500 501 public void setEagerCheckCompletion(boolean eagerCheckCompletion) { 502 this.eagerCheckCompletion = eagerCheckCompletion; 503 } 504 505 public long getCompletionTimeout() { 506 return completionTimeout; 507 } 508 509 public void setCompletionTimeout(long completionTimeout) { 510 this.completionTimeout = completionTimeout; 511 } 512 513 public Expression getCompletionTimeoutExpression() { 514 return completionTimeoutExpression; 515 } 516 517 public void setCompletionTimeoutExpression(Expression completionTimeoutExpression) { 518 this.completionTimeoutExpression = completionTimeoutExpression; 519 } 520 521 public long getCompletionInterval() { 522 return completionInterval; 523 } 524 525 public void setCompletionInterval(long completionInterval) { 526 this.completionInterval = completionInterval; 527 } 528 529 public int getCompletionSize() { 530 return completionSize; 531 } 532 533 public void setCompletionSize(int completionSize) { 534 this.completionSize = completionSize; 535 } 536 537 public Expression getCompletionSizeExpression() { 538 return completionSizeExpression; 539 } 540 541 public void setCompletionSizeExpression(Expression completionSizeExpression) { 542 this.completionSizeExpression = completionSizeExpression; 543 } 544 545 public boolean isIgnoreInvalidCorrelationKeys() { 546 return ignoreInvalidCorrelationKeys; 547 } 548 549 public void setIgnoreInvalidCorrelationKeys(boolean ignoreInvalidCorrelationKeys) { 550 this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys; 551 } 552 553 public Integer getCloseCorrelationKeyOnCompletion() { 554 return closeCorrelationKeyOnCompletion; 555 } 556 557 public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) { 558 this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion; 559 } 560 561 public boolean isCompletionFromBatchConsumer() { 562 return completionFromBatchConsumer; 563 } 564 565 public void setCompletionFromBatchConsumer(boolean completionFromBatchConsumer) { 566 this.completionFromBatchConsumer = completionFromBatchConsumer; 567 } 568 569 public ExceptionHandler getExceptionHandler() { 570 return exceptionHandler; 571 } 572 573 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 574 this.exceptionHandler = exceptionHandler; 575 } 576 577 public boolean isParallelProcessing() { 578 return parallelProcessing; 579 } 580 581 public void setParallelProcessing(boolean parallelProcessing) { 582 this.parallelProcessing = parallelProcessing; 583 } 584 585 public AggregationRepository getAggregationRepository() { 586 return aggregationRepository; 587 } 588 589 public void setAggregationRepository(AggregationRepository aggregationRepository) { 590 this.aggregationRepository = aggregationRepository; 591 } 592 593 public boolean isDiscardOnCompletionTimeout() { 594 return discardOnCompletionTimeout; 595 } 596 597 public void setDiscardOnCompletionTimeout(boolean discardOnCompletionTimeout) { 598 this.discardOnCompletionTimeout = discardOnCompletionTimeout; 599 } 600 601 public void setForceCompletionOnStop(boolean forceCompletionOnStop) { 602 this.forceCompletionOnStop = forceCompletionOnStop; 603 } 604 605 public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { 606 this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; 607 } 608 609 public ScheduledExecutorService getTimeoutCheckerExecutorService() { 610 return timeoutCheckerExecutorService; 611 } 612 613 public boolean isShutdownTimeoutCheckerExecutorService() { 614 return shutdownTimeoutCheckerExecutorService; 615 } 616 617 public void setShutdownTimeoutCheckerExecutorService(boolean shutdownTimeoutCheckerExecutorService) { 618 this.shutdownTimeoutCheckerExecutorService = shutdownTimeoutCheckerExecutorService; 619 } 620 621 /** 622 * On completion task which keeps the booking of the in progress up to date 623 */ 624 private final class AggregateOnCompletion implements Synchronization { 625 private final String exchangeId; 626 627 private AggregateOnCompletion(String exchangeId) { 628 // must use the original exchange id as it could potentially change if send over SEDA etc. 629 this.exchangeId = exchangeId; 630 } 631 632 public void onFailure(Exchange exchange) { 633 LOG.trace("Aggregated exchange onFailure: {}", exchange); 634 635 // must remember to remove in progress when we failed 636 inProgressCompleteExchanges.remove(exchangeId); 637 // do not remove redelivery state as we need it when we redeliver again later 638 } 639 640 public void onComplete(Exchange exchange) { 641 LOG.trace("Aggregated exchange onComplete: {}", exchange); 642 643 // only confirm if we processed without a problem 644 try { 645 aggregationRepository.confirm(exchange.getContext(), exchangeId); 646 // and remove redelivery state as well 647 redeliveryState.remove(exchangeId); 648 } finally { 649 // must remember to remove in progress when we are complete 650 inProgressCompleteExchanges.remove(exchangeId); 651 } 652 } 653 654 @Override 655 public String toString() { 656 return "AggregateOnCompletion"; 657 } 658 } 659 660 /** 661 * Background task that looks for aggregated exchanges which is triggered by completion timeouts. 662 */ 663 private final class AggregationTimeoutMap extends DefaultTimeoutMap<String, String> { 664 665 private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) { 666 // do NOT use locking on the timeout map as this aggregator has its own shared lock we will use instead 667 super(executor, requestMapPollTimeMillis, false); 668 } 669 670 @Override 671 public void purge() { 672 // must acquire the shared aggregation lock to be able to purge 673 lock.lock(); 674 try { 675 super.purge(); 676 } finally { 677 lock.unlock(); 678 } 679 } 680 681 @Override 682 public boolean onEviction(String key, String exchangeId) { 683 log.debug("Completion timeout triggered for correlation key: {}", key); 684 685 boolean inProgress = inProgressCompleteExchanges.contains(exchangeId); 686 if (inProgress) { 687 LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); 688 return true; 689 } 690 691 // get the aggregated exchange 692 Exchange answer = aggregationRepository.get(camelContext, key); 693 if (answer != null) { 694 // indicate it was completed by timeout 695 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); 696 onCompletion(key, answer, true); 697 } 698 return true; 699 } 700 } 701 702 /** 703 * Background task that triggers completion based on interval. 704 */ 705 private final class AggregationIntervalTask implements Runnable { 706 707 public void run() { 708 // only run if CamelContext has been fully started 709 if (!camelContext.getStatus().isStarted()) { 710 LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", camelContext.getName()); 711 return; 712 } 713 714 LOG.trace("Starting completion interval task"); 715 716 // trigger completion for all in the repository 717 Set<String> keys = aggregationRepository.getKeys(); 718 719 if (keys != null && !keys.isEmpty()) { 720 // must acquire the shared aggregation lock to be able to trigger interval completion 721 lock.lock(); 722 try { 723 for (String key : keys) { 724 Exchange exchange = aggregationRepository.get(camelContext, key); 725 if (exchange != null) { 726 LOG.trace("Completion interval triggered for correlation key: {}", key); 727 // indicate it was completed by interval 728 exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval"); 729 onCompletion(key, exchange, false); 730 } 731 } 732 } finally { 733 lock.unlock(); 734 } 735 } 736 737 LOG.trace("Completion interval task complete"); 738 } 739 } 740 741 /** 742 * Background task that looks for aggregated exchanges to recover. 743 */ 744 private final class RecoverTask implements Runnable { 745 private final RecoverableAggregationRepository recoverable; 746 747 private RecoverTask(RecoverableAggregationRepository recoverable) { 748 this.recoverable = recoverable; 749 } 750 751 public void run() { 752 // only run if CamelContext has been fully started 753 if (!camelContext.getStatus().isStarted()) { 754 LOG.trace("Recover check cannot start due CamelContext({}) has not been started yet", camelContext.getName()); 755 return; 756 } 757 758 LOG.trace("Starting recover check"); 759 760 // copy the current in progress before doing scan 761 final Set<String> copyOfInProgress = new LinkedHashSet<String>(inProgressCompleteExchanges); 762 763 Set<String> exchangeIds = recoverable.scan(camelContext); 764 for (String exchangeId : exchangeIds) { 765 766 // we may shutdown while doing recovery 767 if (!isRunAllowed()) { 768 LOG.info("We are shutting down so stop recovering"); 769 return; 770 } 771 772 // consider in progress if it was in progress before we did the scan, or currently after we did the scan 773 // its safer to consider it in progress than risk duplicates due both in progress + recovered 774 boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId); 775 if (inProgress) { 776 LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); 777 } else { 778 LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId); 779 Exchange exchange = recoverable.recover(camelContext, exchangeId); 780 if (exchange != null) { 781 // get the correlation key 782 String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class); 783 // and mark it as redelivered 784 exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE); 785 786 // get the current redelivery data 787 RedeliveryData data = redeliveryState.get(exchange.getExchangeId()); 788 789 // if we are exhausted, then move to dead letter channel 790 if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) { 791 LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries() 792 + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri()); 793 794 // send to DLC 795 try { 796 // set redelivery counter 797 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); 798 exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); 799 deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange); 800 } catch (Throwable e) { 801 exchange.setException(e); 802 } 803 804 // handle if failed 805 if (exchange.getException() != null) { 806 getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException()); 807 } else { 808 // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again 809 recoverable.confirm(camelContext, exchangeId); 810 } 811 } else { 812 // update current redelivery state 813 if (data == null) { 814 // create new data 815 data = new RedeliveryData(); 816 redeliveryState.put(exchange.getExchangeId(), data); 817 } 818 data.redeliveryCounter++; 819 820 // set redelivery counter 821 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); 822 if (recoverable.getMaximumRedeliveries() > 0) { 823 exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries()); 824 } 825 826 LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId); 827 828 // not exhaust so resubmit the recovered exchange 829 onSubmitCompletion(key, exchange); 830 } 831 } 832 } 833 } 834 835 LOG.trace("Recover check complete"); 836 } 837 } 838 839 @Override 840 protected void doStart() throws Exception { 841 if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null 842 && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null 843 && getCompletionSizeExpression() == null) { 844 throw new IllegalStateException("At least one of the completions options" 845 + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set"); 846 } 847 848 if (getCloseCorrelationKeyOnCompletion() != null) { 849 if (getCloseCorrelationKeyOnCompletion() > 0) { 850 LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of " + getCloseCorrelationKeyOnCompletion()); 851 closedCorrelationKeys = new LRUCache<Object, Object>(getCloseCorrelationKeyOnCompletion()); 852 } else { 853 LOG.info("Using ClosedCorrelationKeys with unbounded capacity"); 854 closedCorrelationKeys = new HashMap<Object, Object>(); 855 } 856 } 857 858 ServiceHelper.startServices(processor, aggregationRepository); 859 860 // should we use recover checker 861 if (aggregationRepository instanceof RecoverableAggregationRepository) { 862 RecoverableAggregationRepository recoverable = (RecoverableAggregationRepository) aggregationRepository; 863 if (recoverable.isUseRecovery()) { 864 long interval = recoverable.getRecoveryIntervalInMillis(); 865 if (interval <= 0) { 866 throw new IllegalArgumentException("AggregationRepository has recovery enabled and the RecoveryInterval option must be a positive number, was: " + interval); 867 } 868 869 // create a background recover thread to check every interval 870 recoverService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateRecoverChecker", 1); 871 Runnable recoverTask = new RecoverTask(recoverable); 872 LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every " + interval + " millis."); 873 // use fixed delay so there is X interval between each run 874 recoverService.scheduleWithFixedDelay(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS); 875 876 if (recoverable.getDeadLetterUri() != null) { 877 int max = recoverable.getMaximumRedeliveries(); 878 if (max <= 0) { 879 throw new IllegalArgumentException("Option maximumRedeliveries must be a positive number, was: " + max); 880 } 881 LOG.info("After " + max + " failed redelivery attempts Exchanges will be moved to deadLetterUri: " + recoverable.getDeadLetterUri()); 882 883 // dead letter uri must be a valid endpoint 884 Endpoint endpoint = camelContext.getEndpoint(recoverable.getDeadLetterUri()); 885 if (endpoint == null) { 886 throw new NoSuchEndpointException(recoverable.getDeadLetterUri()); 887 } 888 deadLetterProducerTemplate = camelContext.createProducerTemplate(); 889 } 890 } 891 } 892 893 if (getCompletionInterval() > 0 && getCompletionTimeout() > 0) { 894 throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both."); 895 } 896 if (getCompletionInterval() > 0) { 897 LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis."); 898 if (getTimeoutCheckerExecutorService() == null) { 899 setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1)); 900 shutdownTimeoutCheckerExecutorService = true; 901 } 902 // trigger completion based on interval 903 getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS); 904 } 905 906 // start timeout service if its in use 907 if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) { 908 LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout() + " millis of inactivity."); 909 if (getTimeoutCheckerExecutorService() == null) { 910 setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1)); 911 shutdownTimeoutCheckerExecutorService = true; 912 } 913 // check for timed out aggregated messages once every second 914 timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L); 915 // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we 916 // need to re-establish the timeout map so timeout can trigger 917 restoreTimeoutMapFromAggregationRepository(); 918 ServiceHelper.startService(timeoutMap); 919 } 920 } 921 922 @Override 923 protected void doStop() throws Exception { 924 // note: we cannot do doForceCompletionOnStop from this doStop method 925 // as this is handled in the prepareShutdown method which is also invoked when stopping a route 926 // and is better suited for preparing to shutdown than this doStop method is 927 928 if (recoverService != null) { 929 camelContext.getExecutorServiceManager().shutdownNow(recoverService); 930 } 931 ServiceHelper.stopServices(timeoutMap, processor, deadLetterProducerTemplate); 932 933 if (closedCorrelationKeys != null) { 934 // it may be a service so stop it as well 935 ServiceHelper.stopService(closedCorrelationKeys); 936 closedCorrelationKeys.clear(); 937 } 938 batchConsumerCorrelationKeys.clear(); 939 redeliveryState.clear(); 940 } 941 942 @Override 943 public void prepareShutdown(boolean forced) { 944 // we are shutting down, so force completion if this option was enabled 945 // but only do this when forced=false, as that is when we have chance to 946 // send out new messages to be routed by Camel. When forced=true, then 947 // we have to shutdown in a hurry 948 if (!forced && forceCompletionOnStop) { 949 doForceCompletionOnStop(); 950 } 951 } 952 953 private void doForceCompletionOnStop() { 954 int expected = forceCompletionOfAllGroups(); 955 956 StopWatch watch = new StopWatch(); 957 while (inProgressCompleteExchanges.size() > 0) { 958 LOG.trace("Waiting for {} inflight exchanges to complete", inProgressCompleteExchanges.size()); 959 try { 960 Thread.sleep(100); 961 } catch (InterruptedException e) { 962 // break out as we got interrupted such as the JVM terminating 963 LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", inProgressCompleteExchanges.size()); 964 break; 965 } 966 } 967 968 if (expected > 0) { 969 LOG.info("Forcing completion of all groups with {} exchanges completed in {}", expected, TimeUtils.printDuration(watch.stop())); 970 } 971 } 972 973 @Override 974 protected void doShutdown() throws Exception { 975 // shutdown aggregation repository 976 ServiceHelper.stopService(aggregationRepository); 977 978 // cleanup when shutting down 979 inProgressCompleteExchanges.clear(); 980 981 if (shutdownExecutorService) { 982 camelContext.getExecutorServiceManager().shutdownNow(executorService); 983 } 984 if (shutdownTimeoutCheckerExecutorService) { 985 camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService); 986 timeoutCheckerExecutorService = null; 987 } 988 989 super.doShutdown(); 990 } 991 992 public int forceCompletionOfAllGroups() { 993 994 // only run if CamelContext has been fully started or is stopping 995 boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping(); 996 if (!allow) { 997 LOG.warn("Cannot start force completion of all groups because CamelContext({}) has not been started", camelContext.getName()); 998 return 0; 999 } 1000 1001 LOG.trace("Starting force completion of all groups task"); 1002 1003 // trigger completion for all in the repository 1004 Set<String> keys = aggregationRepository.getKeys(); 1005 1006 int total = 0; 1007 if (keys != null && !keys.isEmpty()) { 1008 // must acquire the shared aggregation lock to be able to trigger force completion 1009 lock.lock(); 1010 total = keys.size(); 1011 try { 1012 for (String key : keys) { 1013 Exchange exchange = aggregationRepository.get(camelContext, key); 1014 if (exchange != null) { 1015 LOG.trace("Force completion triggered for correlation key: {}", key); 1016 // indicate it was completed by a force completion request 1017 exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion"); 1018 onCompletion(key, exchange, false); 1019 } 1020 } 1021 } finally { 1022 lock.unlock(); 1023 } 1024 } 1025 LOG.trace("Completed force completion of all groups task"); 1026 1027 if (total > 0) { 1028 LOG.debug("Forcing completion of all groups with {} exchanges", total); 1029 } 1030 return total; 1031 } 1032 1033 }