001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor.aggregate; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.LinkedHashSet; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentSkipListSet; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.ScheduledExecutorService; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicInteger; 031import java.util.concurrent.atomic.AtomicLong; 032import java.util.concurrent.locks.Lock; 033import java.util.concurrent.locks.ReentrantLock; 034 035import org.apache.camel.AsyncCallback; 036import org.apache.camel.AsyncProcessor; 037import org.apache.camel.CamelContext; 038import org.apache.camel.CamelExchangeException; 039import org.apache.camel.Endpoint; 040import org.apache.camel.Exchange; 041import org.apache.camel.Expression; 042import org.apache.camel.Navigate; 043import org.apache.camel.NoSuchEndpointException; 044import org.apache.camel.Predicate; 045import org.apache.camel.Processor; 046import org.apache.camel.ProducerTemplate; 047import org.apache.camel.ShutdownRunningTask; 048import org.apache.camel.TimeoutMap; 049import org.apache.camel.Traceable; 050import org.apache.camel.spi.AggregationRepository; 051import org.apache.camel.spi.ExceptionHandler; 052import org.apache.camel.spi.IdAware; 053import org.apache.camel.spi.OptimisticLockingAggregationRepository; 054import org.apache.camel.spi.RecoverableAggregationRepository; 055import org.apache.camel.spi.ShutdownAware; 056import org.apache.camel.spi.ShutdownPrepared; 057import org.apache.camel.spi.Synchronization; 058import org.apache.camel.support.DefaultTimeoutMap; 059import org.apache.camel.support.LoggingExceptionHandler; 060import org.apache.camel.support.ServiceSupport; 061import org.apache.camel.util.AsyncProcessorHelper; 062import org.apache.camel.util.ExchangeHelper; 063import org.apache.camel.util.LRUCache; 064import org.apache.camel.util.ObjectHelper; 065import org.apache.camel.util.ServiceHelper; 066import org.apache.camel.util.StopWatch; 067import org.apache.camel.util.TimeUtils; 068import org.slf4j.Logger; 069import org.slf4j.LoggerFactory; 070 071/** 072 * An implementation of the <a 073 * href="http://camel.apache.org/aggregator2.html">Aggregator</a> 074 * pattern where a batch of messages are processed (up to a maximum amount or 075 * until some timeout is reached) and messages for the same correlation key are 076 * combined together using some kind of {@link AggregationStrategy} 077 * (by default the latest message is used) to compress many message exchanges 078 * into a smaller number of exchanges. 079 * <p/> 080 * A good example of this is stock market data; you may be receiving 30,000 081 * messages/second and you may want to throttle it right down so that multiple 082 * messages for the same stock are combined (or just the latest message is used 083 * and older prices are discarded). Another idea is to combine line item messages 084 * together into a single invoice message. 085 */ 086public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, ShutdownAware, IdAware { 087 088 public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker"; 089 090 private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class); 091 092 private final Lock lock = new ReentrantLock(); 093 private final CamelContext camelContext; 094 private final Processor processor; 095 private String id; 096 private AggregationStrategy aggregationStrategy; 097 private boolean preCompletion; 098 private Expression correlationExpression; 099 private AggregateController aggregateController; 100 private final ExecutorService executorService; 101 private final boolean shutdownExecutorService; 102 private OptimisticLockRetryPolicy optimisticLockRetryPolicy = new OptimisticLockRetryPolicy(); 103 private ScheduledExecutorService timeoutCheckerExecutorService; 104 private boolean shutdownTimeoutCheckerExecutorService; 105 private ScheduledExecutorService recoverService; 106 // store correlation key -> exchange id in timeout map 107 private TimeoutMap<String, String> timeoutMap; 108 private ExceptionHandler exceptionHandler; 109 private AggregationRepository aggregationRepository; 110 private Map<String, String> closedCorrelationKeys; 111 private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<String>(); 112 private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); 113 private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>(); 114 115 private final AggregateProcessorStatistics statistics = new Statistics(); 116 private final AtomicLong totalIn = new AtomicLong(); 117 private final AtomicLong totalCompleted = new AtomicLong(); 118 private final AtomicLong completedBySize = new AtomicLong(); 119 private final AtomicLong completedByStrategy = new AtomicLong(); 120 private final AtomicLong completedByInterval = new AtomicLong(); 121 private final AtomicLong completedByTimeout = new AtomicLong(); 122 private final AtomicLong completedByPredicate = new AtomicLong(); 123 private final AtomicLong completedByBatchConsumer = new AtomicLong(); 124 private final AtomicLong completedByForce = new AtomicLong(); 125 126 // keep booking about redelivery 127 private class RedeliveryData { 128 int redeliveryCounter; 129 } 130 131 private class Statistics implements AggregateProcessorStatistics { 132 133 private boolean statisticsEnabled = true; 134 135 public long getTotalIn() { 136 return totalIn.get(); 137 } 138 139 public long getTotalCompleted() { 140 return totalCompleted.get(); 141 } 142 143 public long getCompletedBySize() { 144 return completedBySize.get(); 145 } 146 147 public long getCompletedByStrategy() { 148 return completedByStrategy.get(); 149 } 150 151 public long getCompletedByInterval() { 152 return completedByInterval.get(); 153 } 154 155 public long getCompletedByTimeout() { 156 return completedByTimeout.get(); 157 } 158 159 public long getCompletedByPredicate() { 160 return completedByPredicate.get(); 161 } 162 163 public long getCompletedByBatchConsumer() { 164 return completedByBatchConsumer.get(); 165 } 166 167 public long getCompletedByForce() { 168 return completedByForce.get(); 169 } 170 171 public void reset() { 172 totalIn.set(0); 173 totalCompleted.set(0); 174 completedBySize.set(0); 175 completedByStrategy.set(0); 176 completedByTimeout.set(0); 177 completedByPredicate.set(0); 178 completedByBatchConsumer.set(0); 179 completedByForce.set(0); 180 } 181 182 public boolean isStatisticsEnabled() { 183 return statisticsEnabled; 184 } 185 186 public void setStatisticsEnabled(boolean statisticsEnabled) { 187 this.statisticsEnabled = statisticsEnabled; 188 } 189 } 190 191 // options 192 private boolean ignoreInvalidCorrelationKeys; 193 private Integer closeCorrelationKeyOnCompletion; 194 private boolean parallelProcessing; 195 private boolean optimisticLocking; 196 197 // different ways to have completion triggered 198 private boolean eagerCheckCompletion; 199 private Predicate completionPredicate; 200 private long completionTimeout; 201 private Expression completionTimeoutExpression; 202 private long completionInterval; 203 private int completionSize; 204 private Expression completionSizeExpression; 205 private boolean completionFromBatchConsumer; 206 private AtomicInteger batchConsumerCounter = new AtomicInteger(); 207 private boolean discardOnCompletionTimeout; 208 private boolean forceCompletionOnStop; 209 private boolean completeAllOnStop; 210 211 private ProducerTemplate deadLetterProducerTemplate; 212 213 public AggregateProcessor(CamelContext camelContext, Processor processor, 214 Expression correlationExpression, AggregationStrategy aggregationStrategy, 215 ExecutorService executorService, boolean shutdownExecutorService) { 216 ObjectHelper.notNull(camelContext, "camelContext"); 217 ObjectHelper.notNull(processor, "processor"); 218 ObjectHelper.notNull(correlationExpression, "correlationExpression"); 219 ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy"); 220 ObjectHelper.notNull(executorService, "executorService"); 221 this.camelContext = camelContext; 222 this.processor = processor; 223 this.correlationExpression = correlationExpression; 224 this.aggregationStrategy = aggregationStrategy; 225 this.executorService = executorService; 226 this.shutdownExecutorService = shutdownExecutorService; 227 this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass()); 228 } 229 230 @Override 231 public String toString() { 232 return "AggregateProcessor[to: " + processor + "]"; 233 } 234 235 public String getTraceLabel() { 236 return "aggregate[" + correlationExpression + "]"; 237 } 238 239 public List<Processor> next() { 240 if (!hasNext()) { 241 return null; 242 } 243 List<Processor> answer = new ArrayList<Processor>(1); 244 answer.add(processor); 245 return answer; 246 } 247 248 public boolean hasNext() { 249 return processor != null; 250 } 251 252 public String getId() { 253 return id; 254 } 255 256 public void setId(String id) { 257 this.id = id; 258 } 259 260 public void process(Exchange exchange) throws Exception { 261 AsyncProcessorHelper.process(this, exchange); 262 } 263 264 public boolean process(Exchange exchange, AsyncCallback callback) { 265 try { 266 doProcess(exchange); 267 } catch (Throwable e) { 268 exchange.setException(e); 269 } 270 callback.done(true); 271 return true; 272 } 273 274 protected void doProcess(Exchange exchange) throws Exception { 275 276 if (getStatistics().isStatisticsEnabled()) { 277 totalIn.incrementAndGet(); 278 } 279 280 //check for the special header to force completion of all groups (and ignore the exchange otherwise) 281 boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); 282 if (completeAllGroups) { 283 forceCompletionOfAllGroups(); 284 return; 285 } 286 287 // compute correlation expression 288 String key = correlationExpression.evaluate(exchange, String.class); 289 if (ObjectHelper.isEmpty(key)) { 290 // we have a bad correlation key 291 if (isIgnoreInvalidCorrelationKeys()) { 292 LOG.debug("Invalid correlation key. This Exchange will be ignored: {}", exchange); 293 return; 294 } else { 295 throw new CamelExchangeException("Invalid correlation key", exchange); 296 } 297 } 298 299 // is the correlation key closed? 300 if (closedCorrelationKeys != null && closedCorrelationKeys.containsKey(key)) { 301 throw new ClosedCorrelationKeyException(key, exchange); 302 } 303 304 // when optimist locking is enabled we keep trying until we succeed 305 if (optimisticLocking) { 306 List<Exchange> aggregated = null; 307 boolean exhaustedRetries = true; 308 int attempt = 0; 309 do { 310 attempt++; 311 // copy exchange, and do not share the unit of work 312 // the aggregated output runs in another unit of work 313 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 314 try { 315 aggregated = doAggregation(key, copy); 316 exhaustedRetries = false; 317 break; 318 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { 319 LOG.trace("On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to add() key: {} and exchange: {}", 320 new Object[]{attempt, aggregationRepository, key, copy, e}); 321 optimisticLockRetryPolicy.doDelay(attempt); 322 } 323 } while (optimisticLockRetryPolicy.shouldRetry(attempt)); 324 325 if (exhaustedRetries) { 326 throw new CamelExchangeException("Exhausted optimistic locking retry attempts, tried " + attempt + " times", exchange, 327 new OptimisticLockingAggregationRepository.OptimisticLockingException()); 328 } else if (aggregated != null) { 329 // we are completed so submit to completion 330 for (Exchange agg : aggregated) { 331 onSubmitCompletion(key, agg); 332 } 333 } 334 } else { 335 // copy exchange, and do not share the unit of work 336 // the aggregated output runs in another unit of work 337 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 338 339 // when memory based then its fast using synchronized, but if the aggregation repository is IO 340 // bound such as JPA etc then concurrent aggregation per correlation key could 341 // improve performance as we can run aggregation repository get/add in parallel 342 List<Exchange> aggregated = null; 343 lock.lock(); 344 try { 345 aggregated = doAggregation(key, copy); 346 } finally { 347 lock.unlock(); 348 } 349 350 // we are completed so do that work outside the lock 351 if (aggregated != null) { 352 for (Exchange agg : aggregated) { 353 onSubmitCompletion(key, agg); 354 } 355 } 356 } 357 358 // check for the special header to force completion of all groups (inclusive of the message) 359 boolean completeAllGroupsInclusive = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class); 360 if (completeAllGroupsInclusive) { 361 forceCompletionOfAllGroups(); 362 } 363 } 364 365 /** 366 * Aggregates the exchange with the given correlation key 367 * <p/> 368 * This method <b>must</b> be run synchronized as we cannot aggregate the same correlation key 369 * in parallel. 370 * <p/> 371 * The returned {@link Exchange} should be send downstream using the {@link #onSubmitCompletion(String, org.apache.camel.Exchange)} 372 * method which sends out the aggregated and completed {@link Exchange}. 373 * 374 * @param key the correlation key 375 * @param newExchange the exchange 376 * @return the aggregated exchange(s) which is complete, or <tt>null</tt> if not yet complete 377 * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating 378 */ 379 private List<Exchange> doAggregation(String key, Exchange newExchange) throws CamelExchangeException { 380 LOG.trace("onAggregation +++ start +++ with correlation key: {}", key); 381 382 List<Exchange> list = new ArrayList<Exchange>(); 383 String complete = null; 384 385 Exchange answer; 386 Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key); 387 Exchange oldExchange = originalExchange; 388 389 Integer size = 1; 390 if (oldExchange != null) { 391 // hack to support legacy AggregationStrategy's that modify and return the oldExchange, these will not 392 // working when using an identify based approach for optimistic locking like the MemoryAggregationRepository. 393 if (optimisticLocking && aggregationRepository instanceof MemoryAggregationRepository) { 394 oldExchange = originalExchange.copy(); 395 } 396 size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class); 397 size++; 398 } 399 400 // prepare the exchanges for aggregation 401 ExchangeHelper.prepareAggregation(oldExchange, newExchange); 402 403 // check if we are pre complete 404 if (preCompletion) { 405 try { 406 // put the current aggregated size on the exchange so its avail during completion check 407 newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); 408 complete = isPreCompleted(key, oldExchange, newExchange); 409 // make sure to track timeouts if not complete 410 if (complete == null) { 411 trackTimeout(key, newExchange); 412 } 413 // remove it afterwards 414 newExchange.removeProperty(Exchange.AGGREGATED_SIZE); 415 } catch (Throwable e) { 416 // must catch any exception from aggregation 417 throw new CamelExchangeException("Error occurred during preComplete", newExchange, e); 418 } 419 } else if (isEagerCheckCompletion()) { 420 // put the current aggregated size on the exchange so its avail during completion check 421 newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); 422 complete = isCompleted(key, newExchange); 423 // make sure to track timeouts if not complete 424 if (complete == null) { 425 trackTimeout(key, newExchange); 426 } 427 // remove it afterwards 428 newExchange.removeProperty(Exchange.AGGREGATED_SIZE); 429 } 430 431 if (preCompletion && complete != null) { 432 // need to pre complete the current group before we aggregate 433 doAggregationComplete(complete, list, key, originalExchange, oldExchange); 434 // as we complete the current group eager, we should indicate the new group is not complete 435 complete = null; 436 // and clear old/original exchange as we start on a new group 437 oldExchange = null; 438 originalExchange = null; 439 // and reset the size to 1 440 size = 1; 441 // make sure to track timeout as we just restart the correlation group when we are in pre completion mode 442 trackTimeout(key, newExchange); 443 } 444 445 // aggregate the exchanges 446 try { 447 answer = onAggregation(oldExchange, newExchange); 448 } catch (Throwable e) { 449 // must catch any exception from aggregation 450 throw new CamelExchangeException("Error occurred during aggregation", newExchange, e); 451 } 452 if (answer == null) { 453 throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", newExchange); 454 } 455 456 // update the aggregated size 457 answer.setProperty(Exchange.AGGREGATED_SIZE, size); 458 459 // maybe we should check completion after the aggregation 460 if (!preCompletion && !isEagerCheckCompletion()) { 461 complete = isCompleted(key, answer); 462 // make sure to track timeouts if not complete 463 if (complete == null) { 464 trackTimeout(key, newExchange); 465 } 466 } 467 468 if (complete == null) { 469 // only need to update aggregation repository if we are not complete 470 doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer); 471 } else { 472 // if we are complete then add the answer to the list 473 doAggregationComplete(complete, list, key, originalExchange, answer); 474 } 475 476 LOG.trace("onAggregation +++ end +++ with correlation key: {}", key); 477 return list; 478 } 479 480 protected void doAggregationComplete(String complete, List<Exchange> list, String key, Exchange originalExchange, Exchange answer) { 481 if ("consumer".equals(complete)) { 482 for (String batchKey : batchConsumerCorrelationKeys) { 483 Exchange batchAnswer; 484 if (batchKey.equals(key)) { 485 // skip the current aggregated key as we have already aggregated it and have the answer 486 batchAnswer = answer; 487 } else { 488 batchAnswer = aggregationRepository.get(camelContext, batchKey); 489 } 490 491 if (batchAnswer != null) { 492 batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); 493 onCompletion(batchKey, originalExchange, batchAnswer, false); 494 list.add(batchAnswer); 495 } 496 } 497 batchConsumerCorrelationKeys.clear(); 498 // we have already submitted to completion, so answer should be null 499 answer = null; 500 } else if (answer != null) { 501 // we are complete for this exchange 502 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); 503 answer = onCompletion(key, originalExchange, answer, false); 504 } 505 506 if (answer != null) { 507 list.add(answer); 508 } 509 } 510 511 protected void doAggregationRepositoryAdd(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) { 512 LOG.trace("In progress aggregated oldExchange: {}, newExchange: {} with correlation key: {}", new Object[]{oldExchange, newExchange, key}); 513 if (optimisticLocking) { 514 try { 515 ((OptimisticLockingAggregationRepository)aggregationRepository).add(camelContext, key, oldExchange, newExchange); 516 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { 517 onOptimisticLockingFailure(oldExchange, newExchange); 518 throw e; 519 } 520 } else { 521 aggregationRepository.add(camelContext, key, newExchange); 522 } 523 } 524 525 protected void onOptimisticLockingFailure(Exchange oldExchange, Exchange newExchange) { 526 AggregationStrategy strategy = aggregationStrategy; 527 if (strategy instanceof DelegateAggregationStrategy) { 528 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 529 } 530 if (strategy instanceof OptimisticLockingAwareAggregationStrategy) { 531 LOG.trace("onOptimisticLockFailure with AggregationStrategy: {}, oldExchange: {}, newExchange: {}", 532 new Object[]{strategy, oldExchange, newExchange}); 533 ((OptimisticLockingAwareAggregationStrategy)strategy).onOptimisticLockFailure(oldExchange, newExchange); 534 } 535 } 536 537 /** 538 * Tests whether the given exchanges is pre-complete or not 539 * 540 * @param key the correlation key 541 * @param oldExchange the existing exchange 542 * @param newExchange the incoming exchange 543 * @return <tt>null</tt> if not pre-completed, otherwise a String with the type that triggered the pre-completion 544 */ 545 protected String isPreCompleted(String key, Exchange oldExchange, Exchange newExchange) { 546 boolean complete = false; 547 AggregationStrategy strategy = aggregationStrategy; 548 if (strategy instanceof DelegateAggregationStrategy) { 549 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 550 } 551 if (strategy instanceof PreCompletionAwareAggregationStrategy) { 552 complete = ((PreCompletionAwareAggregationStrategy) strategy).preComplete(oldExchange, newExchange); 553 } 554 return complete ? "strategy" : null; 555 } 556 557 /** 558 * Tests whether the given exchange is complete or not 559 * 560 * @param key the correlation key 561 * @param exchange the incoming exchange 562 * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion 563 */ 564 protected String isCompleted(String key, Exchange exchange) { 565 // batch consumer completion must always run first 566 if (isCompletionFromBatchConsumer()) { 567 batchConsumerCorrelationKeys.add(key); 568 batchConsumerCounter.incrementAndGet(); 569 int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class); 570 if (size > 0 && batchConsumerCounter.intValue() >= size) { 571 // batch consumer is complete then reset the counter 572 batchConsumerCounter.set(0); 573 return "consumer"; 574 } 575 } 576 577 if (exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, boolean.class)) { 578 return "strategy"; 579 } 580 581 if (getCompletionPredicate() != null) { 582 boolean answer = getCompletionPredicate().matches(exchange); 583 if (answer) { 584 return "predicate"; 585 } 586 } 587 588 boolean sizeChecked = false; 589 if (getCompletionSizeExpression() != null) { 590 Integer value = getCompletionSizeExpression().evaluate(exchange, Integer.class); 591 if (value != null && value > 0) { 592 // mark as already checked size as expression takes precedence over static configured 593 sizeChecked = true; 594 int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); 595 if (size >= value) { 596 return "size"; 597 } 598 } 599 } 600 if (!sizeChecked && getCompletionSize() > 0) { 601 int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); 602 if (size >= getCompletionSize()) { 603 return "size"; 604 } 605 } 606 607 // not complete 608 return null; 609 } 610 611 protected void trackTimeout(String key, Exchange exchange) { 612 // timeout can be either evaluated based on an expression or from a fixed value 613 // expression takes precedence 614 boolean timeoutSet = false; 615 if (getCompletionTimeoutExpression() != null) { 616 Long value = getCompletionTimeoutExpression().evaluate(exchange, Long.class); 617 if (value != null && value > 0) { 618 if (LOG.isTraceEnabled()) { 619 LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}", 620 new Object[]{key, value, exchange}); 621 } 622 addExchangeToTimeoutMap(key, exchange, value); 623 timeoutSet = true; 624 } 625 } 626 if (!timeoutSet && getCompletionTimeout() > 0) { 627 // timeout is used so use the timeout map to keep an eye on this 628 if (LOG.isTraceEnabled()) { 629 LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}", 630 new Object[]{key, getCompletionTimeout(), exchange}); 631 } 632 addExchangeToTimeoutMap(key, exchange, getCompletionTimeout()); 633 } 634 } 635 636 protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) { 637 return aggregationStrategy.aggregate(oldExchange, newExchange); 638 } 639 640 protected boolean onPreCompletionAggregation(Exchange oldExchange, Exchange newExchange) { 641 AggregationStrategy strategy = aggregationStrategy; 642 if (strategy instanceof DelegateAggregationStrategy) { 643 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 644 } 645 if (strategy instanceof PreCompletionAwareAggregationStrategy) { 646 return ((PreCompletionAwareAggregationStrategy) strategy).preComplete(oldExchange, newExchange); 647 } 648 return false; 649 } 650 651 protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) { 652 // store the correlation key as property before we remove so the repository has that information 653 if (original != null) { 654 original.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key); 655 } 656 aggregated.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key); 657 658 // only remove if we have previous added (as we could potentially complete with only 1 exchange) 659 // (if we have previous added then we have that as the original exchange) 660 if (original != null) { 661 // remove from repository as its completed, we do this first as to trigger any OptimisticLockingException's 662 aggregationRepository.remove(aggregated.getContext(), key, original); 663 } 664 665 if (!fromTimeout && timeoutMap != null) { 666 // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker) 667 LOG.trace("Removing correlation key {} from timeout", key); 668 timeoutMap.remove(key); 669 } 670 671 // this key has been closed so add it to the closed map 672 if (closedCorrelationKeys != null) { 673 closedCorrelationKeys.put(key, key); 674 } 675 676 if (fromTimeout) { 677 // invoke timeout if its timeout aware aggregation strategy, 678 // to allow any custom processing before discarding the exchange 679 AggregationStrategy strategy = aggregationStrategy; 680 if (strategy instanceof DelegateAggregationStrategy) { 681 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 682 } 683 if (strategy instanceof TimeoutAwareAggregationStrategy) { 684 long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1; 685 ((TimeoutAwareAggregationStrategy) strategy).timeout(aggregated, -1, -1, timeout); 686 } 687 } 688 689 Exchange answer; 690 if (fromTimeout && isDiscardOnCompletionTimeout()) { 691 // discard due timeout 692 LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated); 693 // must confirm the discarded exchange 694 aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId()); 695 // and remove redelivery state as well 696 redeliveryState.remove(aggregated.getExchangeId()); 697 // the completion was from timeout and we should just discard it 698 answer = null; 699 } else { 700 // the aggregated exchange should be published (sent out) 701 answer = aggregated; 702 } 703 704 return answer; 705 } 706 707 private void onSubmitCompletion(final String key, final Exchange exchange) { 708 LOG.debug("Aggregation complete for correlation key {} sending aggregated exchange: {}", key, exchange); 709 710 // add this as in progress before we submit the task 711 inProgressCompleteExchanges.add(exchange.getExchangeId()); 712 713 // invoke the on completion callback 714 AggregationStrategy target = aggregationStrategy; 715 if (target instanceof DelegateAggregationStrategy) { 716 target = ((DelegateAggregationStrategy) target).getDelegate(); 717 } 718 if (target instanceof CompletionAwareAggregationStrategy) { 719 ((CompletionAwareAggregationStrategy) target).onCompletion(exchange); 720 } 721 722 if (getStatistics().isStatisticsEnabled()) { 723 totalCompleted.incrementAndGet(); 724 725 String completedBy = exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class); 726 if ("interval".equals(completedBy)) { 727 completedByInterval.incrementAndGet(); 728 } else if ("timeout".equals(completedBy)) { 729 completedByTimeout.incrementAndGet(); 730 } else if ("force".equals(completedBy)) { 731 completedByForce.incrementAndGet(); 732 } else if ("consumer".equals(completedBy)) { 733 completedByBatchConsumer.incrementAndGet(); 734 } else if ("predicate".equals(completedBy)) { 735 completedByPredicate.incrementAndGet(); 736 } else if ("size".equals(completedBy)) { 737 completedBySize.incrementAndGet(); 738 } else if ("strategy".equals(completedBy)) { 739 completedByStrategy.incrementAndGet(); 740 } 741 } 742 743 // send this exchange 744 executorService.submit(new Runnable() { 745 public void run() { 746 LOG.debug("Processing aggregated exchange: {}", exchange); 747 748 // add on completion task so we remember to update the inProgressCompleteExchanges 749 exchange.addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId())); 750 751 try { 752 processor.process(exchange); 753 } catch (Throwable e) { 754 exchange.setException(e); 755 } 756 757 // log exception if there was a problem 758 if (exchange.getException() != null) { 759 // if there was an exception then let the exception handler handle it 760 getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); 761 } else { 762 LOG.trace("Processing aggregated exchange: {} complete.", exchange); 763 } 764 } 765 }); 766 } 767 768 /** 769 * Restores the timeout map with timeout values from the aggregation repository. 770 * <p/> 771 * This is needed in case the aggregator has been stopped and started again (for example a server restart). 772 * Then the existing exchanges from the {@link AggregationRepository} must have their timeout conditions restored. 773 */ 774 protected void restoreTimeoutMapFromAggregationRepository() throws Exception { 775 // grab the timeout value for each partly aggregated exchange 776 Set<String> keys = aggregationRepository.getKeys(); 777 if (keys == null || keys.isEmpty()) { 778 return; 779 } 780 781 StopWatch watch = new StopWatch(); 782 LOG.trace("Starting restoring CompletionTimeout for {} existing exchanges from the aggregation repository...", keys.size()); 783 784 for (String key : keys) { 785 Exchange exchange = aggregationRepository.get(camelContext, key); 786 // grab the timeout value 787 long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0; 788 if (timeout > 0) { 789 LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout); 790 addExchangeToTimeoutMap(key, exchange, timeout); 791 } 792 } 793 794 // log duration of this task so end user can see how long it takes to pre-check this upon starting 795 LOG.info("Restored {} CompletionTimeout conditions in the AggregationTimeoutChecker in {}", 796 timeoutMap.size(), TimeUtils.printDuration(watch.stop())); 797 } 798 799 /** 800 * Adds the given exchange to the timeout map, which is used by the timeout checker task to trigger timeouts. 801 * 802 * @param key the correlation key 803 * @param exchange the exchange 804 * @param timeout the timeout value in millis 805 */ 806 private void addExchangeToTimeoutMap(String key, Exchange exchange, long timeout) { 807 // store the timeout value on the exchange as well, in case we need it later 808 exchange.setProperty(Exchange.AGGREGATED_TIMEOUT, timeout); 809 timeoutMap.put(key, exchange.getExchangeId(), timeout); 810 } 811 812 /** 813 * Current number of closed correlation keys in the memory cache 814 */ 815 public int getClosedCorrelationKeysCacheSize() { 816 if (closedCorrelationKeys != null) { 817 return closedCorrelationKeys.size(); 818 } else { 819 return 0; 820 } 821 } 822 823 /** 824 * Clear all the closed correlation keys stored in the cache 825 */ 826 public void clearClosedCorrelationKeysCache() { 827 if (closedCorrelationKeys != null) { 828 closedCorrelationKeys.clear(); 829 } 830 } 831 832 public AggregateProcessorStatistics getStatistics() { 833 return statistics; 834 } 835 836 public int getInProgressCompleteExchanges() { 837 return inProgressCompleteExchanges.size(); 838 } 839 840 public Predicate getCompletionPredicate() { 841 return completionPredicate; 842 } 843 844 public void setCompletionPredicate(Predicate completionPredicate) { 845 this.completionPredicate = completionPredicate; 846 } 847 848 public boolean isEagerCheckCompletion() { 849 return eagerCheckCompletion; 850 } 851 852 public void setEagerCheckCompletion(boolean eagerCheckCompletion) { 853 this.eagerCheckCompletion = eagerCheckCompletion; 854 } 855 856 public long getCompletionTimeout() { 857 return completionTimeout; 858 } 859 860 public void setCompletionTimeout(long completionTimeout) { 861 this.completionTimeout = completionTimeout; 862 } 863 864 public Expression getCompletionTimeoutExpression() { 865 return completionTimeoutExpression; 866 } 867 868 public void setCompletionTimeoutExpression(Expression completionTimeoutExpression) { 869 this.completionTimeoutExpression = completionTimeoutExpression; 870 } 871 872 public long getCompletionInterval() { 873 return completionInterval; 874 } 875 876 public void setCompletionInterval(long completionInterval) { 877 this.completionInterval = completionInterval; 878 } 879 880 public int getCompletionSize() { 881 return completionSize; 882 } 883 884 public void setCompletionSize(int completionSize) { 885 this.completionSize = completionSize; 886 } 887 888 public Expression getCompletionSizeExpression() { 889 return completionSizeExpression; 890 } 891 892 public void setCompletionSizeExpression(Expression completionSizeExpression) { 893 this.completionSizeExpression = completionSizeExpression; 894 } 895 896 public boolean isIgnoreInvalidCorrelationKeys() { 897 return ignoreInvalidCorrelationKeys; 898 } 899 900 public void setIgnoreInvalidCorrelationKeys(boolean ignoreInvalidCorrelationKeys) { 901 this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys; 902 } 903 904 public Integer getCloseCorrelationKeyOnCompletion() { 905 return closeCorrelationKeyOnCompletion; 906 } 907 908 public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) { 909 this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion; 910 } 911 912 public boolean isCompletionFromBatchConsumer() { 913 return completionFromBatchConsumer; 914 } 915 916 public void setCompletionFromBatchConsumer(boolean completionFromBatchConsumer) { 917 this.completionFromBatchConsumer = completionFromBatchConsumer; 918 } 919 920 public boolean isCompleteAllOnStop() { 921 return completeAllOnStop; 922 } 923 924 public ExceptionHandler getExceptionHandler() { 925 return exceptionHandler; 926 } 927 928 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 929 this.exceptionHandler = exceptionHandler; 930 } 931 932 public boolean isParallelProcessing() { 933 return parallelProcessing; 934 } 935 936 public void setParallelProcessing(boolean parallelProcessing) { 937 this.parallelProcessing = parallelProcessing; 938 } 939 940 public boolean isOptimisticLocking() { 941 return optimisticLocking; 942 } 943 944 public void setOptimisticLocking(boolean optimisticLocking) { 945 this.optimisticLocking = optimisticLocking; 946 } 947 948 public AggregationRepository getAggregationRepository() { 949 return aggregationRepository; 950 } 951 952 public void setAggregationRepository(AggregationRepository aggregationRepository) { 953 this.aggregationRepository = aggregationRepository; 954 } 955 956 public boolean isDiscardOnCompletionTimeout() { 957 return discardOnCompletionTimeout; 958 } 959 960 public void setDiscardOnCompletionTimeout(boolean discardOnCompletionTimeout) { 961 this.discardOnCompletionTimeout = discardOnCompletionTimeout; 962 } 963 964 public void setForceCompletionOnStop(boolean forceCompletionOnStop) { 965 this.forceCompletionOnStop = forceCompletionOnStop; 966 } 967 968 public void setCompleteAllOnStop(boolean completeAllOnStop) { 969 this.completeAllOnStop = completeAllOnStop; 970 } 971 972 public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { 973 this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; 974 } 975 976 public ScheduledExecutorService getTimeoutCheckerExecutorService() { 977 return timeoutCheckerExecutorService; 978 } 979 980 public boolean isShutdownTimeoutCheckerExecutorService() { 981 return shutdownTimeoutCheckerExecutorService; 982 } 983 984 public void setShutdownTimeoutCheckerExecutorService(boolean shutdownTimeoutCheckerExecutorService) { 985 this.shutdownTimeoutCheckerExecutorService = shutdownTimeoutCheckerExecutorService; 986 } 987 988 public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) { 989 this.optimisticLockRetryPolicy = optimisticLockRetryPolicy; 990 } 991 992 public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() { 993 return optimisticLockRetryPolicy; 994 } 995 996 public AggregationStrategy getAggregationStrategy() { 997 return aggregationStrategy; 998 } 999 1000 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 1001 this.aggregationStrategy = aggregationStrategy; 1002 } 1003 1004 public Expression getCorrelationExpression() { 1005 return correlationExpression; 1006 } 1007 1008 public void setCorrelationExpression(Expression correlationExpression) { 1009 this.correlationExpression = correlationExpression; 1010 } 1011 1012 public AggregateController getAggregateController() { 1013 return aggregateController; 1014 } 1015 1016 public void setAggregateController(AggregateController aggregateController) { 1017 this.aggregateController = aggregateController; 1018 } 1019 1020 /** 1021 * On completion task which keeps the booking of the in progress up to date 1022 */ 1023 private final class AggregateOnCompletion implements Synchronization { 1024 private final String exchangeId; 1025 1026 private AggregateOnCompletion(String exchangeId) { 1027 // must use the original exchange id as it could potentially change if send over SEDA etc. 1028 this.exchangeId = exchangeId; 1029 } 1030 1031 public void onFailure(Exchange exchange) { 1032 LOG.trace("Aggregated exchange onFailure: {}", exchange); 1033 1034 // must remember to remove in progress when we failed 1035 inProgressCompleteExchanges.remove(exchangeId); 1036 // do not remove redelivery state as we need it when we redeliver again later 1037 } 1038 1039 public void onComplete(Exchange exchange) { 1040 LOG.trace("Aggregated exchange onComplete: {}", exchange); 1041 1042 // only confirm if we processed without a problem 1043 try { 1044 aggregationRepository.confirm(exchange.getContext(), exchangeId); 1045 // and remove redelivery state as well 1046 redeliveryState.remove(exchangeId); 1047 } finally { 1048 // must remember to remove in progress when we are complete 1049 inProgressCompleteExchanges.remove(exchangeId); 1050 } 1051 } 1052 1053 @Override 1054 public String toString() { 1055 return "AggregateOnCompletion"; 1056 } 1057 } 1058 1059 /** 1060 * Background task that looks for aggregated exchanges which is triggered by completion timeouts. 1061 */ 1062 private final class AggregationTimeoutMap extends DefaultTimeoutMap<String, String> { 1063 1064 private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) { 1065 // do NOT use locking on the timeout map as this aggregator has its own shared lock we will use instead 1066 super(executor, requestMapPollTimeMillis, optimisticLocking); 1067 } 1068 1069 @Override 1070 public void purge() { 1071 // must acquire the shared aggregation lock to be able to purge 1072 if (!optimisticLocking) { 1073 lock.lock(); 1074 } 1075 try { 1076 super.purge(); 1077 } finally { 1078 if (!optimisticLocking) { 1079 lock.unlock(); 1080 } 1081 } 1082 } 1083 1084 @Override 1085 public boolean onEviction(String key, String exchangeId) { 1086 log.debug("Completion timeout triggered for correlation key: {}", key); 1087 1088 boolean inProgress = inProgressCompleteExchanges.contains(exchangeId); 1089 if (inProgress) { 1090 LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); 1091 return true; 1092 } 1093 1094 // get the aggregated exchange 1095 boolean evictionStolen = false; 1096 Exchange answer = aggregationRepository.get(camelContext, key); 1097 if (answer == null) { 1098 evictionStolen = true; 1099 } else { 1100 // indicate it was completed by timeout 1101 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); 1102 try { 1103 answer = onCompletion(key, answer, answer, true); 1104 if (answer != null) { 1105 onSubmitCompletion(key, answer); 1106 } 1107 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { 1108 evictionStolen = true; 1109 } 1110 } 1111 1112 if (optimisticLocking && evictionStolen) { 1113 LOG.debug("Another Camel instance has already successfully correlated or processed this timeout eviction " 1114 + "for exchange with id: {} and correlation id: {}", exchangeId, key); 1115 } 1116 return true; 1117 } 1118 } 1119 1120 /** 1121 * Background task that triggers completion based on interval. 1122 */ 1123 private final class AggregationIntervalTask implements Runnable { 1124 1125 public void run() { 1126 // only run if CamelContext has been fully started 1127 if (!camelContext.getStatus().isStarted()) { 1128 LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", camelContext.getName()); 1129 return; 1130 } 1131 1132 LOG.trace("Starting completion interval task"); 1133 1134 // trigger completion for all in the repository 1135 Set<String> keys = aggregationRepository.getKeys(); 1136 1137 if (keys != null && !keys.isEmpty()) { 1138 // must acquire the shared aggregation lock to be able to trigger interval completion 1139 if (!optimisticLocking) { 1140 lock.lock(); 1141 } 1142 try { 1143 for (String key : keys) { 1144 boolean stolenInterval = false; 1145 Exchange exchange = aggregationRepository.get(camelContext, key); 1146 if (exchange == null) { 1147 stolenInterval = true; 1148 } else { 1149 LOG.trace("Completion interval triggered for correlation key: {}", key); 1150 // indicate it was completed by interval 1151 exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval"); 1152 try { 1153 Exchange answer = onCompletion(key, exchange, exchange, false); 1154 if (answer != null) { 1155 onSubmitCompletion(key, answer); 1156 } 1157 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { 1158 stolenInterval = true; 1159 } 1160 } 1161 if (optimisticLocking && stolenInterval) { 1162 LOG.debug("Another Camel instance has already processed this interval aggregation for exchange with correlation id: {}", key); 1163 } 1164 } 1165 } finally { 1166 if (!optimisticLocking) { 1167 lock.unlock(); 1168 } 1169 } 1170 } 1171 1172 LOG.trace("Completion interval task complete"); 1173 } 1174 } 1175 1176 /** 1177 * Background task that looks for aggregated exchanges to recover. 1178 */ 1179 private final class RecoverTask implements Runnable { 1180 private final RecoverableAggregationRepository recoverable; 1181 1182 private RecoverTask(RecoverableAggregationRepository recoverable) { 1183 this.recoverable = recoverable; 1184 } 1185 1186 public void run() { 1187 // only run if CamelContext has been fully started 1188 if (!camelContext.getStatus().isStarted()) { 1189 LOG.trace("Recover check cannot start due CamelContext({}) has not been started yet", camelContext.getName()); 1190 return; 1191 } 1192 1193 LOG.trace("Starting recover check"); 1194 1195 // copy the current in progress before doing scan 1196 final Set<String> copyOfInProgress = new LinkedHashSet<String>(inProgressCompleteExchanges); 1197 1198 Set<String> exchangeIds = recoverable.scan(camelContext); 1199 for (String exchangeId : exchangeIds) { 1200 1201 // we may shutdown while doing recovery 1202 if (!isRunAllowed()) { 1203 LOG.info("We are shutting down so stop recovering"); 1204 return; 1205 } 1206 1207 // consider in progress if it was in progress before we did the scan, or currently after we did the scan 1208 // its safer to consider it in progress than risk duplicates due both in progress + recovered 1209 boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId); 1210 if (inProgress) { 1211 LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); 1212 } else { 1213 LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId); 1214 Exchange exchange = recoverable.recover(camelContext, exchangeId); 1215 if (exchange != null) { 1216 // get the correlation key 1217 String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class); 1218 // and mark it as redelivered 1219 exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE); 1220 1221 // get the current redelivery data 1222 RedeliveryData data = redeliveryState.get(exchange.getExchangeId()); 1223 1224 // if we are exhausted, then move to dead letter channel 1225 if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) { 1226 LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries() 1227 + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri()); 1228 1229 // send to DLC 1230 try { 1231 // set redelivery counter 1232 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); 1233 exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); 1234 deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange); 1235 } catch (Throwable e) { 1236 exchange.setException(e); 1237 } 1238 1239 // handle if failed 1240 if (exchange.getException() != null) { 1241 getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException()); 1242 } else { 1243 // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again 1244 recoverable.confirm(camelContext, exchangeId); 1245 } 1246 } else { 1247 // update current redelivery state 1248 if (data == null) { 1249 // create new data 1250 data = new RedeliveryData(); 1251 redeliveryState.put(exchange.getExchangeId(), data); 1252 } 1253 data.redeliveryCounter++; 1254 1255 // set redelivery counter 1256 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); 1257 if (recoverable.getMaximumRedeliveries() > 0) { 1258 exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries()); 1259 } 1260 1261 LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId); 1262 1263 // not exhaust so resubmit the recovered exchange 1264 onSubmitCompletion(key, exchange); 1265 } 1266 } 1267 } 1268 } 1269 1270 LOG.trace("Recover check complete"); 1271 } 1272 } 1273 1274 @Override 1275 protected void doStart() throws Exception { 1276 AggregationStrategy strategy = aggregationStrategy; 1277 if (strategy instanceof DelegateAggregationStrategy) { 1278 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 1279 } 1280 if (strategy instanceof PreCompletionAwareAggregationStrategy) { 1281 preCompletion = true; 1282 LOG.info("PreCompletionAwareAggregationStrategy detected. Aggregator {} is in pre-completion mode.", getId()); 1283 } 1284 1285 if (!preCompletion) { 1286 // if not in pre completion mode then check we configured the completion required 1287 if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null 1288 && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null 1289 && getCompletionSizeExpression() == null) { 1290 throw new IllegalStateException("At least one of the completions options" 1291 + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set"); 1292 } 1293 } 1294 1295 if (getCloseCorrelationKeyOnCompletion() != null) { 1296 if (getCloseCorrelationKeyOnCompletion() > 0) { 1297 LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of " + getCloseCorrelationKeyOnCompletion()); 1298 closedCorrelationKeys = new LRUCache<String, String>(getCloseCorrelationKeyOnCompletion()); 1299 } else { 1300 LOG.info("Using ClosedCorrelationKeys with unbounded capacity"); 1301 closedCorrelationKeys = new ConcurrentHashMap<String, String>(); 1302 } 1303 } 1304 1305 if (aggregationRepository == null) { 1306 aggregationRepository = new MemoryAggregationRepository(optimisticLocking); 1307 LOG.info("Defaulting to MemoryAggregationRepository"); 1308 } 1309 1310 if (optimisticLocking) { 1311 if (!(aggregationRepository instanceof OptimisticLockingAggregationRepository)) { 1312 throw new IllegalArgumentException("Optimistic locking cannot be enabled without using an AggregationRepository that implements OptimisticLockingAggregationRepository"); 1313 } 1314 LOG.info("Optimistic locking is enabled"); 1315 } 1316 1317 ServiceHelper.startServices(aggregationStrategy, processor, aggregationRepository); 1318 1319 // should we use recover checker 1320 if (aggregationRepository instanceof RecoverableAggregationRepository) { 1321 RecoverableAggregationRepository recoverable = (RecoverableAggregationRepository) aggregationRepository; 1322 if (recoverable.isUseRecovery()) { 1323 long interval = recoverable.getRecoveryIntervalInMillis(); 1324 if (interval <= 0) { 1325 throw new IllegalArgumentException("AggregationRepository has recovery enabled and the RecoveryInterval option must be a positive number, was: " + interval); 1326 } 1327 1328 // create a background recover thread to check every interval 1329 recoverService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateRecoverChecker", 1); 1330 Runnable recoverTask = new RecoverTask(recoverable); 1331 LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every " + interval + " millis."); 1332 // use fixed delay so there is X interval between each run 1333 recoverService.scheduleWithFixedDelay(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS); 1334 1335 if (recoverable.getDeadLetterUri() != null) { 1336 int max = recoverable.getMaximumRedeliveries(); 1337 if (max <= 0) { 1338 throw new IllegalArgumentException("Option maximumRedeliveries must be a positive number, was: " + max); 1339 } 1340 LOG.info("After " + max + " failed redelivery attempts Exchanges will be moved to deadLetterUri: " + recoverable.getDeadLetterUri()); 1341 1342 // dead letter uri must be a valid endpoint 1343 Endpoint endpoint = camelContext.getEndpoint(recoverable.getDeadLetterUri()); 1344 if (endpoint == null) { 1345 throw new NoSuchEndpointException(recoverable.getDeadLetterUri()); 1346 } 1347 deadLetterProducerTemplate = camelContext.createProducerTemplate(); 1348 } 1349 } 1350 } 1351 1352 if (getCompletionInterval() > 0 && getCompletionTimeout() > 0) { 1353 throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both."); 1354 } 1355 if (getCompletionInterval() > 0) { 1356 LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis."); 1357 if (getTimeoutCheckerExecutorService() == null) { 1358 setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1)); 1359 shutdownTimeoutCheckerExecutorService = true; 1360 } 1361 // trigger completion based on interval 1362 getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS); 1363 } 1364 1365 // start timeout service if its in use 1366 if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) { 1367 LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout() + " millis of inactivity."); 1368 if (getTimeoutCheckerExecutorService() == null) { 1369 setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1)); 1370 shutdownTimeoutCheckerExecutorService = true; 1371 } 1372 // check for timed out aggregated messages once every second 1373 timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L); 1374 // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we 1375 // need to re-establish the timeout map so timeout can trigger 1376 restoreTimeoutMapFromAggregationRepository(); 1377 ServiceHelper.startService(timeoutMap); 1378 } 1379 1380 if (aggregateController == null) { 1381 aggregateController = new DefaultAggregateController(); 1382 } 1383 aggregateController.onStart(this); 1384 } 1385 1386 @Override 1387 protected void doStop() throws Exception { 1388 // note: we cannot do doForceCompletionOnStop from this doStop method 1389 // as this is handled in the prepareShutdown method which is also invoked when stopping a route 1390 // and is better suited for preparing to shutdown than this doStop method is 1391 1392 if (aggregateController != null) { 1393 aggregateController.onStop(this); 1394 } 1395 1396 if (recoverService != null) { 1397 camelContext.getExecutorServiceManager().shutdown(recoverService); 1398 } 1399 ServiceHelper.stopServices(timeoutMap, processor, deadLetterProducerTemplate); 1400 1401 if (closedCorrelationKeys != null) { 1402 // it may be a service so stop it as well 1403 ServiceHelper.stopService(closedCorrelationKeys); 1404 closedCorrelationKeys.clear(); 1405 } 1406 batchConsumerCorrelationKeys.clear(); 1407 redeliveryState.clear(); 1408 } 1409 1410 @Override 1411 public void prepareShutdown(boolean suspendOnly, boolean forced) { 1412 // we are shutting down, so force completion if this option was enabled 1413 // but only do this when forced=false, as that is when we have chance to 1414 // send out new messages to be routed by Camel. When forced=true, then 1415 // we have to shutdown in a hurry 1416 if (!forced && forceCompletionOnStop) { 1417 doForceCompletionOnStop(); 1418 } 1419 } 1420 1421 @Override 1422 public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { 1423 // not in use 1424 return true; 1425 } 1426 1427 @Override 1428 public int getPendingExchangesSize() { 1429 if (completeAllOnStop) { 1430 // we want to regard all pending exchanges in the repo as inflight 1431 Set<String> keys = getAggregationRepository().getKeys(); 1432 return keys != null ? keys.size() : 0; 1433 } else { 1434 return 0; 1435 } 1436 } 1437 1438 private void doForceCompletionOnStop() { 1439 int expected = forceCompletionOfAllGroups(); 1440 1441 StopWatch watch = new StopWatch(); 1442 while (inProgressCompleteExchanges.size() > 0) { 1443 LOG.trace("Waiting for {} inflight exchanges to complete", getInProgressCompleteExchanges()); 1444 try { 1445 Thread.sleep(100); 1446 } catch (InterruptedException e) { 1447 // break out as we got interrupted such as the JVM terminating 1448 LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", getInProgressCompleteExchanges()); 1449 break; 1450 } 1451 } 1452 1453 if (expected > 0) { 1454 LOG.info("Forcing completion of all groups with {} exchanges completed in {}", expected, TimeUtils.printDuration(watch.stop())); 1455 } 1456 } 1457 1458 @Override 1459 protected void doShutdown() throws Exception { 1460 // shutdown aggregation repository and the strategy 1461 ServiceHelper.stopAndShutdownServices(aggregationRepository, aggregationStrategy); 1462 1463 // cleanup when shutting down 1464 inProgressCompleteExchanges.clear(); 1465 1466 if (shutdownExecutorService) { 1467 camelContext.getExecutorServiceManager().shutdownNow(executorService); 1468 } 1469 if (shutdownTimeoutCheckerExecutorService) { 1470 camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService); 1471 timeoutCheckerExecutorService = null; 1472 } 1473 1474 super.doShutdown(); 1475 } 1476 1477 public int forceCompletionOfGroup(String key) { 1478 // must acquire the shared aggregation lock to be able to trigger force completion 1479 int total = 0; 1480 1481 if (!optimisticLocking) { 1482 lock.lock(); 1483 } 1484 try { 1485 Exchange exchange = aggregationRepository.get(camelContext, key); 1486 if (exchange != null) { 1487 total = 1; 1488 LOG.trace("Force completion triggered for correlation key: {}", key); 1489 // indicate it was completed by a force completion request 1490 exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force"); 1491 Exchange answer = onCompletion(key, exchange, exchange, false); 1492 if (answer != null) { 1493 onSubmitCompletion(key, answer); 1494 } 1495 } 1496 } finally { 1497 if (!optimisticLocking) { 1498 lock.unlock(); 1499 } 1500 } 1501 LOG.trace("Completed force completion of group {}", key); 1502 1503 if (total > 0) { 1504 LOG.debug("Forcing completion of group {} with {} exchanges", key, total); 1505 } 1506 return total; 1507 } 1508 1509 public int forceCompletionOfAllGroups() { 1510 1511 // only run if CamelContext has been fully started or is stopping 1512 boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping(); 1513 if (!allow) { 1514 LOG.warn("Cannot start force completion of all groups because CamelContext({}) has not been started", camelContext.getName()); 1515 return 0; 1516 } 1517 1518 LOG.trace("Starting force completion of all groups task"); 1519 1520 // trigger completion for all in the repository 1521 Set<String> keys = aggregationRepository.getKeys(); 1522 1523 int total = 0; 1524 if (keys != null && !keys.isEmpty()) { 1525 // must acquire the shared aggregation lock to be able to trigger force completion 1526 if (!optimisticLocking) { 1527 lock.lock(); 1528 } 1529 total = keys.size(); 1530 try { 1531 for (String key : keys) { 1532 Exchange exchange = aggregationRepository.get(camelContext, key); 1533 if (exchange != null) { 1534 LOG.trace("Force completion triggered for correlation key: {}", key); 1535 // indicate it was completed by a force completion request 1536 exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force"); 1537 Exchange answer = onCompletion(key, exchange, exchange, false); 1538 if (answer != null) { 1539 onSubmitCompletion(key, answer); 1540 } 1541 } 1542 } 1543 } finally { 1544 if (!optimisticLocking) { 1545 lock.unlock(); 1546 } 1547 } 1548 } 1549 LOG.trace("Completed force completion of all groups task"); 1550 1551 if (total > 0) { 1552 LOG.debug("Forcing completion of all groups with {} exchanges", total); 1553 } 1554 return total; 1555 } 1556 1557}