001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor.aggregate; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.LinkedHashSet; 022import java.util.List; 023import java.util.Map; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.ConcurrentSkipListSet; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.ScheduledExecutorService; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.atomic.AtomicBoolean; 031import java.util.concurrent.atomic.AtomicInteger; 032import java.util.concurrent.atomic.AtomicLong; 033import java.util.concurrent.locks.Lock; 034import java.util.concurrent.locks.ReentrantLock; 035 036import org.apache.camel.AsyncCallback; 037import org.apache.camel.AsyncProcessor; 038import org.apache.camel.CamelContext; 039import org.apache.camel.CamelContextAware; 040import org.apache.camel.CamelExchangeException; 041import org.apache.camel.Endpoint; 042import org.apache.camel.Exchange; 043import org.apache.camel.Expression; 044import org.apache.camel.Navigate; 045import org.apache.camel.NoSuchEndpointException; 046import org.apache.camel.Predicate; 047import org.apache.camel.Processor; 048import org.apache.camel.ProducerTemplate; 049import org.apache.camel.ShutdownRunningTask; 050import org.apache.camel.TimeoutMap; 051import org.apache.camel.Traceable; 052import org.apache.camel.spi.AggregationRepository; 053import org.apache.camel.spi.ExceptionHandler; 054import org.apache.camel.spi.IdAware; 055import org.apache.camel.spi.OptimisticLockingAggregationRepository; 056import org.apache.camel.spi.RecoverableAggregationRepository; 057import org.apache.camel.spi.ShutdownAware; 058import org.apache.camel.spi.ShutdownPrepared; 059import org.apache.camel.spi.Synchronization; 060import org.apache.camel.support.DefaultTimeoutMap; 061import org.apache.camel.support.LoggingExceptionHandler; 062import org.apache.camel.support.ServiceSupport; 063import org.apache.camel.util.AsyncProcessorHelper; 064import org.apache.camel.util.ExchangeHelper; 065import org.apache.camel.util.LRUCacheFactory; 066import org.apache.camel.util.ObjectHelper; 067import org.apache.camel.util.ServiceHelper; 068import org.apache.camel.util.StopWatch; 069import org.apache.camel.util.TimeUtils; 070import org.slf4j.Logger; 071import org.slf4j.LoggerFactory; 072 073/** 074 * An implementation of the <a 075 * href="http://camel.apache.org/aggregator2.html">Aggregator</a> 076 * pattern where a batch of messages are processed (up to a maximum amount or 077 * until some timeout is reached) and messages for the same correlation key are 078 * combined together using some kind of {@link AggregationStrategy} 079 * (by default the latest message is used) to compress many message exchanges 080 * into a smaller number of exchanges. 081 * <p/> 082 * A good example of this is stock market data; you may be receiving 30,000 083 * messages/second and you may want to throttle it right down so that multiple 084 * messages for the same stock are combined (or just the latest message is used 085 * and older prices are discarded). Another idea is to combine line item messages 086 * together into a single invoice message. 087 */ 088public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, ShutdownAware, IdAware { 089 090 public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker"; 091 092 private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class); 093 094 private final Lock lock = new ReentrantLock(); 095 private final AtomicBoolean aggregateRepositoryWarned = new AtomicBoolean(); 096 private final CamelContext camelContext; 097 private final Processor processor; 098 private String id; 099 private AggregationStrategy aggregationStrategy; 100 private boolean preCompletion; 101 private Expression correlationExpression; 102 private AggregateController aggregateController; 103 private final ExecutorService executorService; 104 private final boolean shutdownExecutorService; 105 private OptimisticLockRetryPolicy optimisticLockRetryPolicy = new OptimisticLockRetryPolicy(); 106 private ScheduledExecutorService timeoutCheckerExecutorService; 107 private boolean shutdownTimeoutCheckerExecutorService; 108 private ScheduledExecutorService recoverService; 109 // store correlation key -> exchange id in timeout map 110 private TimeoutMap<String, String> timeoutMap; 111 private ExceptionHandler exceptionHandler; 112 private AggregationRepository aggregationRepository; 113 private Map<String, String> closedCorrelationKeys; 114 private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<String>(); 115 private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>()); 116 private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>(); 117 118 private final AggregateProcessorStatistics statistics = new Statistics(); 119 private final AtomicLong totalIn = new AtomicLong(); 120 private final AtomicLong totalCompleted = new AtomicLong(); 121 private final AtomicLong completedBySize = new AtomicLong(); 122 private final AtomicLong completedByStrategy = new AtomicLong(); 123 private final AtomicLong completedByInterval = new AtomicLong(); 124 private final AtomicLong completedByTimeout = new AtomicLong(); 125 private final AtomicLong completedByPredicate = new AtomicLong(); 126 private final AtomicLong completedByBatchConsumer = new AtomicLong(); 127 private final AtomicLong completedByForce = new AtomicLong(); 128 129 // keep booking about redelivery 130 private class RedeliveryData { 131 int redeliveryCounter; 132 } 133 134 private class Statistics implements AggregateProcessorStatistics { 135 136 private boolean statisticsEnabled = true; 137 138 public long getTotalIn() { 139 return totalIn.get(); 140 } 141 142 public long getTotalCompleted() { 143 return totalCompleted.get(); 144 } 145 146 public long getCompletedBySize() { 147 return completedBySize.get(); 148 } 149 150 public long getCompletedByStrategy() { 151 return completedByStrategy.get(); 152 } 153 154 public long getCompletedByInterval() { 155 return completedByInterval.get(); 156 } 157 158 public long getCompletedByTimeout() { 159 return completedByTimeout.get(); 160 } 161 162 public long getCompletedByPredicate() { 163 return completedByPredicate.get(); 164 } 165 166 public long getCompletedByBatchConsumer() { 167 return completedByBatchConsumer.get(); 168 } 169 170 public long getCompletedByForce() { 171 return completedByForce.get(); 172 } 173 174 public void reset() { 175 totalIn.set(0); 176 totalCompleted.set(0); 177 completedBySize.set(0); 178 completedByStrategy.set(0); 179 completedByTimeout.set(0); 180 completedByPredicate.set(0); 181 completedByBatchConsumer.set(0); 182 completedByForce.set(0); 183 } 184 185 public boolean isStatisticsEnabled() { 186 return statisticsEnabled; 187 } 188 189 public void setStatisticsEnabled(boolean statisticsEnabled) { 190 this.statisticsEnabled = statisticsEnabled; 191 } 192 } 193 194 // options 195 private boolean ignoreInvalidCorrelationKeys; 196 private Integer closeCorrelationKeyOnCompletion; 197 private boolean parallelProcessing; 198 private boolean optimisticLocking; 199 200 // different ways to have completion triggered 201 private boolean eagerCheckCompletion; 202 private Predicate completionPredicate; 203 private long completionTimeout; 204 private Expression completionTimeoutExpression; 205 private long completionInterval; 206 private int completionSize; 207 private Expression completionSizeExpression; 208 private boolean completionFromBatchConsumer; 209 private boolean completionOnNewCorrelationGroup; 210 private AtomicInteger batchConsumerCounter = new AtomicInteger(); 211 private boolean discardOnCompletionTimeout; 212 private boolean forceCompletionOnStop; 213 private boolean completeAllOnStop; 214 private long completionTimeoutCheckerInterval = 1000; 215 216 private ProducerTemplate deadLetterProducerTemplate; 217 218 public AggregateProcessor(CamelContext camelContext, Processor processor, 219 Expression correlationExpression, AggregationStrategy aggregationStrategy, 220 ExecutorService executorService, boolean shutdownExecutorService) { 221 ObjectHelper.notNull(camelContext, "camelContext"); 222 ObjectHelper.notNull(processor, "processor"); 223 ObjectHelper.notNull(correlationExpression, "correlationExpression"); 224 ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy"); 225 ObjectHelper.notNull(executorService, "executorService"); 226 this.camelContext = camelContext; 227 this.processor = processor; 228 this.correlationExpression = correlationExpression; 229 this.aggregationStrategy = aggregationStrategy; 230 this.executorService = executorService; 231 this.shutdownExecutorService = shutdownExecutorService; 232 this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass()); 233 } 234 235 @Override 236 public String toString() { 237 return "AggregateProcessor[to: " + processor + "]"; 238 } 239 240 public String getTraceLabel() { 241 return "aggregate[" + correlationExpression + "]"; 242 } 243 244 public List<Processor> next() { 245 if (!hasNext()) { 246 return null; 247 } 248 List<Processor> answer = new ArrayList<Processor>(1); 249 answer.add(processor); 250 return answer; 251 } 252 253 public boolean hasNext() { 254 return processor != null; 255 } 256 257 public String getId() { 258 return id; 259 } 260 261 public void setId(String id) { 262 this.id = id; 263 } 264 265 public void process(Exchange exchange) throws Exception { 266 AsyncProcessorHelper.process(this, exchange); 267 } 268 269 public boolean process(Exchange exchange, AsyncCallback callback) { 270 try { 271 doProcess(exchange); 272 } catch (Throwable e) { 273 exchange.setException(e); 274 } 275 callback.done(true); 276 return true; 277 } 278 279 protected void doProcess(Exchange exchange) throws Exception { 280 281 if (getStatistics().isStatisticsEnabled()) { 282 totalIn.incrementAndGet(); 283 } 284 285 //check for the special header to force completion of all groups (and ignore the exchange otherwise) 286 boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); 287 if (completeAllGroups) { 288 forceCompletionOfAllGroups(); 289 return; 290 } 291 292 // compute correlation expression 293 String key = correlationExpression.evaluate(exchange, String.class); 294 if (ObjectHelper.isEmpty(key)) { 295 // we have a bad correlation key 296 if (isIgnoreInvalidCorrelationKeys()) { 297 LOG.debug("Invalid correlation key. This Exchange will be ignored: {}", exchange); 298 return; 299 } else { 300 throw new CamelExchangeException("Invalid correlation key", exchange); 301 } 302 } 303 304 // is the correlation key closed? 305 if (closedCorrelationKeys != null && closedCorrelationKeys.containsKey(key)) { 306 throw new ClosedCorrelationKeyException(key, exchange); 307 } 308 309 // when optimist locking is enabled we keep trying until we succeed 310 if (optimisticLocking) { 311 List<Exchange> aggregated = null; 312 boolean exhaustedRetries = true; 313 int attempt = 0; 314 do { 315 attempt++; 316 // copy exchange, and do not share the unit of work 317 // the aggregated output runs in another unit of work 318 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 319 try { 320 aggregated = doAggregation(key, copy); 321 exhaustedRetries = false; 322 break; 323 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { 324 LOG.trace("On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to add() key: {} and exchange: {}", 325 new Object[]{attempt, aggregationRepository, key, copy, e}); 326 optimisticLockRetryPolicy.doDelay(attempt); 327 } 328 } while (optimisticLockRetryPolicy.shouldRetry(attempt)); 329 330 if (exhaustedRetries) { 331 throw new CamelExchangeException("Exhausted optimistic locking retry attempts, tried " + attempt + " times", exchange, 332 new OptimisticLockingAggregationRepository.OptimisticLockingException()); 333 } else if (aggregated != null) { 334 // we are completed so submit to completion 335 for (Exchange agg : aggregated) { 336 onSubmitCompletion(key, agg); 337 } 338 } 339 } else { 340 // copy exchange, and do not share the unit of work 341 // the aggregated output runs in another unit of work 342 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); 343 344 // when memory based then its fast using synchronized, but if the aggregation repository is IO 345 // bound such as JPA etc then concurrent aggregation per correlation key could 346 // improve performance as we can run aggregation repository get/add in parallel 347 List<Exchange> aggregated = null; 348 lock.lock(); 349 try { 350 aggregated = doAggregation(key, copy); 351 } finally { 352 lock.unlock(); 353 } 354 // we are completed so do that work outside the lock 355 if (aggregated != null) { 356 for (Exchange agg : aggregated) { 357 onSubmitCompletion(key, agg); 358 } 359 } 360 } 361 362 // check for the special header to force completion of all groups (inclusive of the message) 363 boolean completeAllGroupsInclusive = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class); 364 if (completeAllGroupsInclusive) { 365 forceCompletionOfAllGroups(); 366 } 367 } 368 369 /** 370 * Aggregates the exchange with the given correlation key 371 * <p/> 372 * This method <b>must</b> be run synchronized as we cannot aggregate the same correlation key 373 * in parallel. 374 * <p/> 375 * The returned {@link Exchange} should be send downstream using the {@link #onSubmitCompletion(String, org.apache.camel.Exchange)} 376 * method which sends out the aggregated and completed {@link Exchange}. 377 * 378 * @param key the correlation key 379 * @param newExchange the exchange 380 * @return the aggregated exchange(s) which is complete, or <tt>null</tt> if not yet complete 381 * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating 382 */ 383 private List<Exchange> doAggregation(String key, Exchange newExchange) throws CamelExchangeException { 384 LOG.trace("onAggregation +++ start +++ with correlation key: {}", key); 385 386 List<Exchange> list = new ArrayList<Exchange>(); 387 String complete = null; 388 389 Exchange answer; 390 Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key); 391 Exchange oldExchange = originalExchange; 392 393 Integer size = 1; 394 if (oldExchange != null) { 395 // hack to support legacy AggregationStrategy's that modify and return the oldExchange, these will not 396 // working when using an identify based approach for optimistic locking like the MemoryAggregationRepository. 397 if (optimisticLocking && aggregationRepository instanceof MemoryAggregationRepository) { 398 oldExchange = originalExchange.copy(); 399 } 400 size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class); 401 size++; 402 } 403 404 // prepare the exchanges for aggregation 405 ExchangeHelper.prepareAggregation(oldExchange, newExchange); 406 407 // check if we are pre complete 408 if (preCompletion) { 409 try { 410 // put the current aggregated size on the exchange so its avail during completion check 411 newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); 412 complete = isPreCompleted(key, oldExchange, newExchange); 413 // make sure to track timeouts if not complete 414 if (complete == null) { 415 trackTimeout(key, newExchange); 416 } 417 // remove it afterwards 418 newExchange.removeProperty(Exchange.AGGREGATED_SIZE); 419 } catch (Throwable e) { 420 // must catch any exception from aggregation 421 throw new CamelExchangeException("Error occurred during preComplete", newExchange, e); 422 } 423 } else if (isEagerCheckCompletion()) { 424 // put the current aggregated size on the exchange so its avail during completion check 425 newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); 426 complete = isCompleted(key, newExchange); 427 // make sure to track timeouts if not complete 428 if (complete == null) { 429 trackTimeout(key, newExchange); 430 } 431 // remove it afterwards 432 newExchange.removeProperty(Exchange.AGGREGATED_SIZE); 433 } 434 435 if (preCompletion && complete != null) { 436 // need to pre complete the current group before we aggregate 437 doAggregationComplete(complete, list, key, originalExchange, oldExchange); 438 // as we complete the current group eager, we should indicate the new group is not complete 439 complete = null; 440 // and clear old/original exchange as we start on a new group 441 oldExchange = null; 442 originalExchange = null; 443 // and reset the size to 1 444 size = 1; 445 // make sure to track timeout as we just restart the correlation group when we are in pre completion mode 446 trackTimeout(key, newExchange); 447 } 448 449 // aggregate the exchanges 450 try { 451 answer = onAggregation(oldExchange, newExchange); 452 } catch (Throwable e) { 453 // must catch any exception from aggregation 454 throw new CamelExchangeException("Error occurred during aggregation", newExchange, e); 455 } 456 if (answer == null) { 457 throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", newExchange); 458 } 459 460 // check for the special exchange property to force completion of all groups 461 boolean completeAllGroups = answer.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class); 462 if (completeAllGroups) { 463 // remove the exchange property so we do not complete again 464 answer.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS); 465 forceCompletionOfAllGroups(); 466 } else if (isCompletionOnNewCorrelationGroup() && originalExchange == null) { 467 // its a new group so force complete of all existing groups 468 forceCompletionOfAllGroups(); 469 } 470 471 // special for some repository implementations 472 if (aggregationRepository instanceof RecoverableAggregationRepository) { 473 boolean valid = oldExchange == null || answer.getExchangeId().equals(oldExchange.getExchangeId()); 474 if (!valid && aggregateRepositoryWarned.compareAndSet(false, true)) { 475 LOG.warn("AggregationStrategy should return the oldExchange instance instead of the newExchange whenever possible" 476 + " as otherwise this can lead to unexpected behavior with some RecoverableAggregationRepository implementations"); 477 } 478 } 479 480 // update the aggregated size 481 answer.setProperty(Exchange.AGGREGATED_SIZE, size); 482 483 // maybe we should check completion after the aggregation 484 if (!preCompletion && !isEagerCheckCompletion()) { 485 complete = isCompleted(key, answer); 486 // make sure to track timeouts if not complete 487 if (complete == null) { 488 trackTimeout(key, newExchange); 489 } 490 } 491 492 if (complete == null) { 493 // only need to update aggregation repository if we are not complete 494 doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer); 495 } else { 496 // if we are complete then add the answer to the list 497 doAggregationComplete(complete, list, key, originalExchange, answer); 498 } 499 500 LOG.trace("onAggregation +++ end +++ with correlation key: {}", key); 501 return list; 502 } 503 504 protected void doAggregationComplete(String complete, List<Exchange> list, String key, Exchange originalExchange, Exchange answer) { 505 if ("consumer".equals(complete)) { 506 for (String batchKey : batchConsumerCorrelationKeys) { 507 Exchange batchAnswer; 508 if (batchKey.equals(key)) { 509 // skip the current aggregated key as we have already aggregated it and have the answer 510 batchAnswer = answer; 511 } else { 512 batchAnswer = aggregationRepository.get(camelContext, batchKey); 513 } 514 515 if (batchAnswer != null) { 516 batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); 517 onCompletion(batchKey, originalExchange, batchAnswer, false); 518 list.add(batchAnswer); 519 } 520 } 521 batchConsumerCorrelationKeys.clear(); 522 // we have already submitted to completion, so answer should be null 523 answer = null; 524 } else if (answer != null) { 525 // we are complete for this exchange 526 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); 527 answer = onCompletion(key, originalExchange, answer, false); 528 } 529 530 if (answer != null) { 531 list.add(answer); 532 } 533 } 534 535 protected void doAggregationRepositoryAdd(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) { 536 LOG.trace("In progress aggregated oldExchange: {}, newExchange: {} with correlation key: {}", new Object[]{oldExchange, newExchange, key}); 537 if (optimisticLocking) { 538 try { 539 ((OptimisticLockingAggregationRepository)aggregationRepository).add(camelContext, key, oldExchange, newExchange); 540 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { 541 onOptimisticLockingFailure(oldExchange, newExchange); 542 throw e; 543 } 544 } else { 545 aggregationRepository.add(camelContext, key, newExchange); 546 } 547 } 548 549 protected void onOptimisticLockingFailure(Exchange oldExchange, Exchange newExchange) { 550 AggregationStrategy strategy = aggregationStrategy; 551 if (strategy instanceof DelegateAggregationStrategy) { 552 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 553 } 554 if (strategy instanceof OptimisticLockingAwareAggregationStrategy) { 555 LOG.trace("onOptimisticLockFailure with AggregationStrategy: {}, oldExchange: {}, newExchange: {}", 556 new Object[]{strategy, oldExchange, newExchange}); 557 ((OptimisticLockingAwareAggregationStrategy)strategy).onOptimisticLockFailure(oldExchange, newExchange); 558 } 559 } 560 561 /** 562 * Tests whether the given exchanges is pre-complete or not 563 * 564 * @param key the correlation key 565 * @param oldExchange the existing exchange 566 * @param newExchange the incoming exchange 567 * @return <tt>null</tt> if not pre-completed, otherwise a String with the type that triggered the pre-completion 568 */ 569 protected String isPreCompleted(String key, Exchange oldExchange, Exchange newExchange) { 570 boolean complete = false; 571 AggregationStrategy strategy = aggregationStrategy; 572 if (strategy instanceof DelegateAggregationStrategy) { 573 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 574 } 575 if (strategy instanceof PreCompletionAwareAggregationStrategy) { 576 complete = ((PreCompletionAwareAggregationStrategy) strategy).preComplete(oldExchange, newExchange); 577 } 578 return complete ? "strategy" : null; 579 } 580 581 /** 582 * Tests whether the given exchange is complete or not 583 * 584 * @param key the correlation key 585 * @param exchange the incoming exchange 586 * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion 587 */ 588 protected String isCompleted(String key, Exchange exchange) { 589 // batch consumer completion must always run first 590 if (isCompletionFromBatchConsumer()) { 591 batchConsumerCorrelationKeys.add(key); 592 batchConsumerCounter.incrementAndGet(); 593 int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class); 594 if (size > 0 && batchConsumerCounter.intValue() >= size) { 595 // batch consumer is complete then reset the counter 596 batchConsumerCounter.set(0); 597 return "consumer"; 598 } 599 } 600 601 if (exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, boolean.class)) { 602 return "strategy"; 603 } 604 605 if (getCompletionPredicate() != null) { 606 boolean answer = getCompletionPredicate().matches(exchange); 607 if (answer) { 608 return "predicate"; 609 } 610 } 611 612 boolean sizeChecked = false; 613 if (getCompletionSizeExpression() != null) { 614 Integer value = getCompletionSizeExpression().evaluate(exchange, Integer.class); 615 if (value != null && value > 0) { 616 // mark as already checked size as expression takes precedence over static configured 617 sizeChecked = true; 618 int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); 619 if (size >= value) { 620 return "size"; 621 } 622 } 623 } 624 if (!sizeChecked && getCompletionSize() > 0) { 625 int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); 626 if (size >= getCompletionSize()) { 627 return "size"; 628 } 629 } 630 631 // not complete 632 return null; 633 } 634 635 protected void trackTimeout(String key, Exchange exchange) { 636 // timeout can be either evaluated based on an expression or from a fixed value 637 // expression takes precedence 638 boolean timeoutSet = false; 639 if (getCompletionTimeoutExpression() != null) { 640 Long value = getCompletionTimeoutExpression().evaluate(exchange, Long.class); 641 if (value != null && value > 0) { 642 if (LOG.isTraceEnabled()) { 643 LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}", 644 new Object[]{key, value, exchange}); 645 } 646 addExchangeToTimeoutMap(key, exchange, value); 647 timeoutSet = true; 648 } 649 } 650 if (!timeoutSet && getCompletionTimeout() > 0) { 651 // timeout is used so use the timeout map to keep an eye on this 652 if (LOG.isTraceEnabled()) { 653 LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}", 654 new Object[]{key, getCompletionTimeout(), exchange}); 655 } 656 addExchangeToTimeoutMap(key, exchange, getCompletionTimeout()); 657 } 658 } 659 660 protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) { 661 return aggregationStrategy.aggregate(oldExchange, newExchange); 662 } 663 664 protected boolean onPreCompletionAggregation(Exchange oldExchange, Exchange newExchange) { 665 AggregationStrategy strategy = aggregationStrategy; 666 if (strategy instanceof DelegateAggregationStrategy) { 667 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 668 } 669 if (strategy instanceof PreCompletionAwareAggregationStrategy) { 670 return ((PreCompletionAwareAggregationStrategy) strategy).preComplete(oldExchange, newExchange); 671 } 672 return false; 673 } 674 675 protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) { 676 // store the correlation key as property before we remove so the repository has that information 677 if (original != null) { 678 original.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key); 679 } 680 aggregated.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key); 681 682 // only remove if we have previous added (as we could potentially complete with only 1 exchange) 683 // (if we have previous added then we have that as the original exchange) 684 if (original != null) { 685 // remove from repository as its completed, we do this first as to trigger any OptimisticLockingException's 686 aggregationRepository.remove(aggregated.getContext(), key, original); 687 } 688 689 if (!fromTimeout && timeoutMap != null) { 690 // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker) 691 LOG.trace("Removing correlation key {} from timeout", key); 692 timeoutMap.remove(key); 693 } 694 695 // this key has been closed so add it to the closed map 696 if (closedCorrelationKeys != null) { 697 closedCorrelationKeys.put(key, key); 698 } 699 700 if (fromTimeout) { 701 // invoke timeout if its timeout aware aggregation strategy, 702 // to allow any custom processing before discarding the exchange 703 AggregationStrategy strategy = aggregationStrategy; 704 if (strategy instanceof DelegateAggregationStrategy) { 705 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 706 } 707 if (strategy instanceof TimeoutAwareAggregationStrategy) { 708 long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1; 709 ((TimeoutAwareAggregationStrategy) strategy).timeout(aggregated, -1, -1, timeout); 710 } 711 } 712 713 Exchange answer; 714 if (fromTimeout && isDiscardOnCompletionTimeout()) { 715 // discard due timeout 716 LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated); 717 // must confirm the discarded exchange 718 aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId()); 719 // and remove redelivery state as well 720 redeliveryState.remove(aggregated.getExchangeId()); 721 // the completion was from timeout and we should just discard it 722 answer = null; 723 } else { 724 // the aggregated exchange should be published (sent out) 725 answer = aggregated; 726 } 727 728 return answer; 729 } 730 731 private void onSubmitCompletion(final String key, final Exchange exchange) { 732 LOG.debug("Aggregation complete for correlation key {} sending aggregated exchange: {}", key, exchange); 733 734 // add this as in progress before we submit the task 735 inProgressCompleteExchanges.add(exchange.getExchangeId()); 736 737 // invoke the on completion callback 738 AggregationStrategy target = aggregationStrategy; 739 if (target instanceof DelegateAggregationStrategy) { 740 target = ((DelegateAggregationStrategy) target).getDelegate(); 741 } 742 if (target instanceof CompletionAwareAggregationStrategy) { 743 ((CompletionAwareAggregationStrategy) target).onCompletion(exchange); 744 } 745 746 if (getStatistics().isStatisticsEnabled()) { 747 totalCompleted.incrementAndGet(); 748 749 String completedBy = exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class); 750 if ("interval".equals(completedBy)) { 751 completedByInterval.incrementAndGet(); 752 } else if ("timeout".equals(completedBy)) { 753 completedByTimeout.incrementAndGet(); 754 } else if ("force".equals(completedBy)) { 755 completedByForce.incrementAndGet(); 756 } else if ("consumer".equals(completedBy)) { 757 completedByBatchConsumer.incrementAndGet(); 758 } else if ("predicate".equals(completedBy)) { 759 completedByPredicate.incrementAndGet(); 760 } else if ("size".equals(completedBy)) { 761 completedBySize.incrementAndGet(); 762 } else if ("strategy".equals(completedBy)) { 763 completedByStrategy.incrementAndGet(); 764 } 765 } 766 767 // send this exchange 768 executorService.submit(new Runnable() { 769 public void run() { 770 LOG.debug("Processing aggregated exchange: {}", exchange); 771 772 // add on completion task so we remember to update the inProgressCompleteExchanges 773 exchange.addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId())); 774 775 try { 776 processor.process(exchange); 777 } catch (Throwable e) { 778 exchange.setException(e); 779 } 780 781 // log exception if there was a problem 782 if (exchange.getException() != null) { 783 // if there was an exception then let the exception handler handle it 784 getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); 785 } else { 786 LOG.trace("Processing aggregated exchange: {} complete.", exchange); 787 } 788 } 789 }); 790 } 791 792 /** 793 * Restores the timeout map with timeout values from the aggregation repository. 794 * <p/> 795 * This is needed in case the aggregator has been stopped and started again (for example a server restart). 796 * Then the existing exchanges from the {@link AggregationRepository} must have their timeout conditions restored. 797 */ 798 protected void restoreTimeoutMapFromAggregationRepository() throws Exception { 799 // grab the timeout value for each partly aggregated exchange 800 Set<String> keys = aggregationRepository.getKeys(); 801 if (keys == null || keys.isEmpty()) { 802 return; 803 } 804 805 StopWatch watch = new StopWatch(); 806 LOG.trace("Starting restoring CompletionTimeout for {} existing exchanges from the aggregation repository...", keys.size()); 807 808 for (String key : keys) { 809 Exchange exchange = aggregationRepository.get(camelContext, key); 810 // grab the timeout value 811 long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0; 812 if (timeout > 0) { 813 LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout); 814 addExchangeToTimeoutMap(key, exchange, timeout); 815 } 816 } 817 818 // log duration of this task so end user can see how long it takes to pre-check this upon starting 819 LOG.info("Restored {} CompletionTimeout conditions in the AggregationTimeoutChecker in {}", 820 timeoutMap.size(), TimeUtils.printDuration(watch.taken())); 821 } 822 823 /** 824 * Adds the given exchange to the timeout map, which is used by the timeout checker task to trigger timeouts. 825 * 826 * @param key the correlation key 827 * @param exchange the exchange 828 * @param timeout the timeout value in millis 829 */ 830 private void addExchangeToTimeoutMap(String key, Exchange exchange, long timeout) { 831 // store the timeout value on the exchange as well, in case we need it later 832 exchange.setProperty(Exchange.AGGREGATED_TIMEOUT, timeout); 833 timeoutMap.put(key, exchange.getExchangeId(), timeout); 834 } 835 836 /** 837 * Current number of closed correlation keys in the memory cache 838 */ 839 public int getClosedCorrelationKeysCacheSize() { 840 if (closedCorrelationKeys != null) { 841 return closedCorrelationKeys.size(); 842 } else { 843 return 0; 844 } 845 } 846 847 /** 848 * Clear all the closed correlation keys stored in the cache 849 */ 850 public void clearClosedCorrelationKeysCache() { 851 if (closedCorrelationKeys != null) { 852 closedCorrelationKeys.clear(); 853 } 854 } 855 856 public AggregateProcessorStatistics getStatistics() { 857 return statistics; 858 } 859 860 public int getInProgressCompleteExchanges() { 861 return inProgressCompleteExchanges.size(); 862 } 863 864 public Predicate getCompletionPredicate() { 865 return completionPredicate; 866 } 867 868 public void setCompletionPredicate(Predicate completionPredicate) { 869 this.completionPredicate = completionPredicate; 870 } 871 872 public boolean isEagerCheckCompletion() { 873 return eagerCheckCompletion; 874 } 875 876 public void setEagerCheckCompletion(boolean eagerCheckCompletion) { 877 this.eagerCheckCompletion = eagerCheckCompletion; 878 } 879 880 public long getCompletionTimeout() { 881 return completionTimeout; 882 } 883 884 public void setCompletionTimeout(long completionTimeout) { 885 this.completionTimeout = completionTimeout; 886 } 887 888 public Expression getCompletionTimeoutExpression() { 889 return completionTimeoutExpression; 890 } 891 892 public void setCompletionTimeoutExpression(Expression completionTimeoutExpression) { 893 this.completionTimeoutExpression = completionTimeoutExpression; 894 } 895 896 public long getCompletionInterval() { 897 return completionInterval; 898 } 899 900 public void setCompletionInterval(long completionInterval) { 901 this.completionInterval = completionInterval; 902 } 903 904 public int getCompletionSize() { 905 return completionSize; 906 } 907 908 public void setCompletionSize(int completionSize) { 909 this.completionSize = completionSize; 910 } 911 912 public Expression getCompletionSizeExpression() { 913 return completionSizeExpression; 914 } 915 916 public void setCompletionSizeExpression(Expression completionSizeExpression) { 917 this.completionSizeExpression = completionSizeExpression; 918 } 919 920 public boolean isIgnoreInvalidCorrelationKeys() { 921 return ignoreInvalidCorrelationKeys; 922 } 923 924 public void setIgnoreInvalidCorrelationKeys(boolean ignoreInvalidCorrelationKeys) { 925 this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys; 926 } 927 928 public Integer getCloseCorrelationKeyOnCompletion() { 929 return closeCorrelationKeyOnCompletion; 930 } 931 932 public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) { 933 this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion; 934 } 935 936 public boolean isCompletionFromBatchConsumer() { 937 return completionFromBatchConsumer; 938 } 939 940 public void setCompletionFromBatchConsumer(boolean completionFromBatchConsumer) { 941 this.completionFromBatchConsumer = completionFromBatchConsumer; 942 } 943 944 public boolean isCompletionOnNewCorrelationGroup() { 945 return completionOnNewCorrelationGroup; 946 } 947 948 public void setCompletionOnNewCorrelationGroup(boolean completionOnNewCorrelationGroup) { 949 this.completionOnNewCorrelationGroup = completionOnNewCorrelationGroup; 950 } 951 952 public boolean isCompleteAllOnStop() { 953 return completeAllOnStop; 954 } 955 956 public long getCompletionTimeoutCheckerInterval() { 957 return completionTimeoutCheckerInterval; 958 } 959 960 public void setCompletionTimeoutCheckerInterval(long completionTimeoutCheckerInterval) { 961 this.completionTimeoutCheckerInterval = completionTimeoutCheckerInterval; 962 } 963 964 public ExceptionHandler getExceptionHandler() { 965 return exceptionHandler; 966 } 967 968 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 969 this.exceptionHandler = exceptionHandler; 970 } 971 972 public boolean isParallelProcessing() { 973 return parallelProcessing; 974 } 975 976 public void setParallelProcessing(boolean parallelProcessing) { 977 this.parallelProcessing = parallelProcessing; 978 } 979 980 public boolean isOptimisticLocking() { 981 return optimisticLocking; 982 } 983 984 public void setOptimisticLocking(boolean optimisticLocking) { 985 this.optimisticLocking = optimisticLocking; 986 } 987 988 public AggregationRepository getAggregationRepository() { 989 return aggregationRepository; 990 } 991 992 public void setAggregationRepository(AggregationRepository aggregationRepository) { 993 this.aggregationRepository = aggregationRepository; 994 } 995 996 public boolean isDiscardOnCompletionTimeout() { 997 return discardOnCompletionTimeout; 998 } 999 1000 public void setDiscardOnCompletionTimeout(boolean discardOnCompletionTimeout) { 1001 this.discardOnCompletionTimeout = discardOnCompletionTimeout; 1002 } 1003 1004 public void setForceCompletionOnStop(boolean forceCompletionOnStop) { 1005 this.forceCompletionOnStop = forceCompletionOnStop; 1006 } 1007 1008 public void setCompleteAllOnStop(boolean completeAllOnStop) { 1009 this.completeAllOnStop = completeAllOnStop; 1010 } 1011 1012 public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { 1013 this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; 1014 } 1015 1016 public ScheduledExecutorService getTimeoutCheckerExecutorService() { 1017 return timeoutCheckerExecutorService; 1018 } 1019 1020 public boolean isShutdownTimeoutCheckerExecutorService() { 1021 return shutdownTimeoutCheckerExecutorService; 1022 } 1023 1024 public void setShutdownTimeoutCheckerExecutorService(boolean shutdownTimeoutCheckerExecutorService) { 1025 this.shutdownTimeoutCheckerExecutorService = shutdownTimeoutCheckerExecutorService; 1026 } 1027 1028 public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) { 1029 this.optimisticLockRetryPolicy = optimisticLockRetryPolicy; 1030 } 1031 1032 public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() { 1033 return optimisticLockRetryPolicy; 1034 } 1035 1036 public AggregationStrategy getAggregationStrategy() { 1037 return aggregationStrategy; 1038 } 1039 1040 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 1041 this.aggregationStrategy = aggregationStrategy; 1042 } 1043 1044 public Expression getCorrelationExpression() { 1045 return correlationExpression; 1046 } 1047 1048 public void setCorrelationExpression(Expression correlationExpression) { 1049 this.correlationExpression = correlationExpression; 1050 } 1051 1052 public AggregateController getAggregateController() { 1053 return aggregateController; 1054 } 1055 1056 public void setAggregateController(AggregateController aggregateController) { 1057 this.aggregateController = aggregateController; 1058 } 1059 1060 /** 1061 * On completion task which keeps the booking of the in progress up to date 1062 */ 1063 private final class AggregateOnCompletion implements Synchronization { 1064 private final String exchangeId; 1065 1066 private AggregateOnCompletion(String exchangeId) { 1067 // must use the original exchange id as it could potentially change if send over SEDA etc. 1068 this.exchangeId = exchangeId; 1069 } 1070 1071 public void onFailure(Exchange exchange) { 1072 LOG.trace("Aggregated exchange onFailure: {}", exchange); 1073 1074 // must remember to remove in progress when we failed 1075 inProgressCompleteExchanges.remove(exchangeId); 1076 // do not remove redelivery state as we need it when we redeliver again later 1077 } 1078 1079 public void onComplete(Exchange exchange) { 1080 LOG.trace("Aggregated exchange onComplete: {}", exchange); 1081 1082 // only confirm if we processed without a problem 1083 try { 1084 aggregationRepository.confirm(exchange.getContext(), exchangeId); 1085 // and remove redelivery state as well 1086 redeliveryState.remove(exchangeId); 1087 } finally { 1088 // must remember to remove in progress when we are complete 1089 inProgressCompleteExchanges.remove(exchangeId); 1090 } 1091 } 1092 1093 @Override 1094 public String toString() { 1095 return "AggregateOnCompletion"; 1096 } 1097 } 1098 1099 /** 1100 * Background task that looks for aggregated exchanges which is triggered by completion timeouts. 1101 */ 1102 private final class AggregationTimeoutMap extends DefaultTimeoutMap<String, String> { 1103 1104 private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) { 1105 // do NOT use locking on the timeout map as this aggregator has its own shared lock we will use instead 1106 super(executor, requestMapPollTimeMillis, optimisticLocking); 1107 } 1108 1109 @Override 1110 public void purge() { 1111 // must acquire the shared aggregation lock to be able to purge 1112 if (!optimisticLocking) { 1113 lock.lock(); 1114 } 1115 try { 1116 super.purge(); 1117 } finally { 1118 if (!optimisticLocking) { 1119 lock.unlock(); 1120 } 1121 } 1122 } 1123 1124 @Override 1125 public boolean onEviction(String key, String exchangeId) { 1126 log.debug("Completion timeout triggered for correlation key: {}", key); 1127 1128 boolean inProgress = inProgressCompleteExchanges.contains(exchangeId); 1129 if (inProgress) { 1130 LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); 1131 return true; 1132 } 1133 1134 // get the aggregated exchange 1135 boolean evictionStolen = false; 1136 Exchange answer = aggregationRepository.get(camelContext, key); 1137 if (answer == null) { 1138 evictionStolen = true; 1139 } else { 1140 // indicate it was completed by timeout 1141 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); 1142 try { 1143 answer = onCompletion(key, answer, answer, true); 1144 if (answer != null) { 1145 onSubmitCompletion(key, answer); 1146 } 1147 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { 1148 evictionStolen = true; 1149 } 1150 } 1151 1152 if (optimisticLocking && evictionStolen) { 1153 LOG.debug("Another Camel instance has already successfully correlated or processed this timeout eviction " 1154 + "for exchange with id: {} and correlation id: {}", exchangeId, key); 1155 } 1156 return true; 1157 } 1158 } 1159 1160 /** 1161 * Background task that triggers completion based on interval. 1162 */ 1163 private final class AggregationIntervalTask implements Runnable { 1164 1165 public void run() { 1166 // only run if CamelContext has been fully started 1167 if (!camelContext.getStatus().isStarted()) { 1168 LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", camelContext.getName()); 1169 return; 1170 } 1171 1172 LOG.trace("Starting completion interval task"); 1173 1174 // trigger completion for all in the repository 1175 Set<String> keys = aggregationRepository.getKeys(); 1176 1177 if (keys != null && !keys.isEmpty()) { 1178 // must acquire the shared aggregation lock to be able to trigger interval completion 1179 if (!optimisticLocking) { 1180 lock.lock(); 1181 } 1182 try { 1183 for (String key : keys) { 1184 boolean stolenInterval = false; 1185 Exchange exchange = aggregationRepository.get(camelContext, key); 1186 if (exchange == null) { 1187 stolenInterval = true; 1188 } else { 1189 LOG.trace("Completion interval triggered for correlation key: {}", key); 1190 // indicate it was completed by interval 1191 exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval"); 1192 try { 1193 Exchange answer = onCompletion(key, exchange, exchange, false); 1194 if (answer != null) { 1195 onSubmitCompletion(key, answer); 1196 } 1197 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) { 1198 stolenInterval = true; 1199 } 1200 } 1201 if (optimisticLocking && stolenInterval) { 1202 LOG.debug("Another Camel instance has already processed this interval aggregation for exchange with correlation id: {}", key); 1203 } 1204 } 1205 } finally { 1206 if (!optimisticLocking) { 1207 lock.unlock(); 1208 } 1209 } 1210 } 1211 1212 LOG.trace("Completion interval task complete"); 1213 } 1214 } 1215 1216 /** 1217 * Background task that looks for aggregated exchanges to recover. 1218 */ 1219 private final class RecoverTask implements Runnable { 1220 private final RecoverableAggregationRepository recoverable; 1221 1222 private RecoverTask(RecoverableAggregationRepository recoverable) { 1223 this.recoverable = recoverable; 1224 } 1225 1226 public void run() { 1227 // only run if CamelContext has been fully started 1228 if (!camelContext.getStatus().isStarted()) { 1229 LOG.trace("Recover check cannot start due CamelContext({}) has not been started yet", camelContext.getName()); 1230 return; 1231 } 1232 1233 LOG.trace("Starting recover check"); 1234 1235 // copy the current in progress before doing scan 1236 final Set<String> copyOfInProgress = new LinkedHashSet<String>(inProgressCompleteExchanges); 1237 1238 Set<String> exchangeIds = recoverable.scan(camelContext); 1239 for (String exchangeId : exchangeIds) { 1240 1241 // we may shutdown while doing recovery 1242 if (!isRunAllowed()) { 1243 LOG.info("We are shutting down so stop recovering"); 1244 return; 1245 } 1246 if (!optimisticLocking) { 1247 lock.lock(); 1248 } 1249 try { 1250 // consider in progress if it was in progress before we did the scan, or currently after we did the scan 1251 // its safer to consider it in progress than risk duplicates due both in progress + recovered 1252 boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId); 1253 if (inProgress) { 1254 LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId); 1255 } else { 1256 LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId); 1257 Exchange exchange = recoverable.recover(camelContext, exchangeId); 1258 if (exchange != null) { 1259 // get the correlation key 1260 String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class); 1261 // and mark it as redelivered 1262 exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE); 1263 1264 // get the current redelivery data 1265 RedeliveryData data = redeliveryState.get(exchange.getExchangeId()); 1266 1267 // if we are exhausted, then move to dead letter channel 1268 if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) { 1269 LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries() 1270 + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri()); 1271 1272 // send to DLC 1273 try { 1274 // set redelivery counter 1275 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); 1276 exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); 1277 deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange); 1278 } catch (Throwable e) { 1279 exchange.setException(e); 1280 } 1281 1282 // handle if failed 1283 if (exchange.getException() != null) { 1284 getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException()); 1285 } else { 1286 // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again 1287 recoverable.confirm(camelContext, exchangeId); 1288 } 1289 } else { 1290 // update current redelivery state 1291 if (data == null) { 1292 // create new data 1293 data = new RedeliveryData(); 1294 redeliveryState.put(exchange.getExchangeId(), data); 1295 } 1296 data.redeliveryCounter++; 1297 1298 // set redelivery counter 1299 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); 1300 if (recoverable.getMaximumRedeliveries() > 0) { 1301 exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries()); 1302 } 1303 1304 LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId); 1305 1306 // not exhaust so resubmit the recovered exchange 1307 onSubmitCompletion(key, exchange); 1308 } 1309 } 1310 } 1311 } finally { 1312 if (!optimisticLocking) { 1313 lock.unlock(); 1314 } 1315 } 1316 } 1317 1318 LOG.trace("Recover check complete"); 1319 } 1320 } 1321 1322 @Override 1323 @SuppressWarnings("unchecked") 1324 protected void doStart() throws Exception { 1325 AggregationStrategy strategy = aggregationStrategy; 1326 if (strategy instanceof DelegateAggregationStrategy) { 1327 strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); 1328 } 1329 if (strategy instanceof CamelContextAware) { 1330 ((CamelContextAware) strategy).setCamelContext(camelContext); 1331 } 1332 if (strategy instanceof PreCompletionAwareAggregationStrategy) { 1333 preCompletion = true; 1334 LOG.info("PreCompletionAwareAggregationStrategy detected. Aggregator {} is in pre-completion mode.", getId()); 1335 } 1336 1337 if (!preCompletion) { 1338 // if not in pre completion mode then check we configured the completion required 1339 if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null 1340 && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null 1341 && getCompletionSizeExpression() == null) { 1342 throw new IllegalStateException("At least one of the completions options" 1343 + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set"); 1344 } 1345 } 1346 1347 if (getCloseCorrelationKeyOnCompletion() != null) { 1348 if (getCloseCorrelationKeyOnCompletion() > 0) { 1349 LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of {}", getCloseCorrelationKeyOnCompletion()); 1350 closedCorrelationKeys = LRUCacheFactory.newLRUCache(getCloseCorrelationKeyOnCompletion()); 1351 } else { 1352 LOG.info("Using ClosedCorrelationKeys with unbounded capacity"); 1353 closedCorrelationKeys = new ConcurrentHashMap<String, String>(); 1354 } 1355 } 1356 1357 if (aggregationRepository == null) { 1358 aggregationRepository = new MemoryAggregationRepository(optimisticLocking); 1359 LOG.info("Defaulting to MemoryAggregationRepository"); 1360 } 1361 1362 if (optimisticLocking) { 1363 if (!(aggregationRepository instanceof OptimisticLockingAggregationRepository)) { 1364 throw new IllegalArgumentException("Optimistic locking cannot be enabled without using an AggregationRepository that implements OptimisticLockingAggregationRepository"); 1365 } 1366 LOG.info("Optimistic locking is enabled"); 1367 } 1368 1369 ServiceHelper.startServices(aggregationStrategy, processor, aggregationRepository); 1370 1371 // should we use recover checker 1372 if (aggregationRepository instanceof RecoverableAggregationRepository) { 1373 RecoverableAggregationRepository recoverable = (RecoverableAggregationRepository) aggregationRepository; 1374 if (recoverable.isUseRecovery()) { 1375 long interval = recoverable.getRecoveryIntervalInMillis(); 1376 if (interval <= 0) { 1377 throw new IllegalArgumentException("AggregationRepository has recovery enabled and the RecoveryInterval option must be a positive number, was: " + interval); 1378 } 1379 1380 // create a background recover thread to check every interval 1381 recoverService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateRecoverChecker", 1); 1382 Runnable recoverTask = new RecoverTask(recoverable); 1383 LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every {} millis.", interval); 1384 // use fixed delay so there is X interval between each run 1385 recoverService.scheduleWithFixedDelay(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS); 1386 1387 if (recoverable.getDeadLetterUri() != null) { 1388 int max = recoverable.getMaximumRedeliveries(); 1389 if (max <= 0) { 1390 throw new IllegalArgumentException("Option maximumRedeliveries must be a positive number, was: " + max); 1391 } 1392 LOG.info("After {} failed redelivery attempts Exchanges will be moved to deadLetterUri: {}", max, recoverable.getDeadLetterUri()); 1393 1394 // dead letter uri must be a valid endpoint 1395 Endpoint endpoint = camelContext.getEndpoint(recoverable.getDeadLetterUri()); 1396 if (endpoint == null) { 1397 throw new NoSuchEndpointException(recoverable.getDeadLetterUri()); 1398 } 1399 deadLetterProducerTemplate = camelContext.createProducerTemplate(); 1400 } 1401 } 1402 } 1403 1404 if (getCompletionInterval() > 0 && getCompletionTimeout() > 0) { 1405 throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both."); 1406 } 1407 if (getCompletionInterval() > 0) { 1408 LOG.info("Using CompletionInterval to run every {} millis.", getCompletionInterval()); 1409 if (getTimeoutCheckerExecutorService() == null) { 1410 setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1)); 1411 shutdownTimeoutCheckerExecutorService = true; 1412 } 1413 // trigger completion based on interval 1414 getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS); 1415 } 1416 1417 // start timeout service if its in use 1418 if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) { 1419 LOG.info("Using CompletionTimeout to trigger after {} millis of inactivity.", getCompletionTimeout()); 1420 if (getTimeoutCheckerExecutorService() == null) { 1421 setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1)); 1422 shutdownTimeoutCheckerExecutorService = true; 1423 } 1424 // check for timed out aggregated messages once every second 1425 timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), getCompletionTimeoutCheckerInterval()); 1426 // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we 1427 // need to re-establish the timeout map so timeout can trigger 1428 restoreTimeoutMapFromAggregationRepository(); 1429 ServiceHelper.startService(timeoutMap); 1430 } 1431 1432 if (aggregateController == null) { 1433 aggregateController = new DefaultAggregateController(); 1434 } 1435 aggregateController.onStart(this); 1436 } 1437 1438 @Override 1439 protected void doStop() throws Exception { 1440 // note: we cannot do doForceCompletionOnStop from this doStop method 1441 // as this is handled in the prepareShutdown method which is also invoked when stopping a route 1442 // and is better suited for preparing to shutdown than this doStop method is 1443 1444 if (aggregateController != null) { 1445 aggregateController.onStop(this); 1446 } 1447 1448 if (recoverService != null) { 1449 camelContext.getExecutorServiceManager().shutdown(recoverService); 1450 } 1451 ServiceHelper.stopServices(timeoutMap, processor, deadLetterProducerTemplate); 1452 1453 if (closedCorrelationKeys != null) { 1454 // it may be a service so stop it as well 1455 ServiceHelper.stopService(closedCorrelationKeys); 1456 closedCorrelationKeys.clear(); 1457 } 1458 batchConsumerCorrelationKeys.clear(); 1459 redeliveryState.clear(); 1460 } 1461 1462 @Override 1463 public void prepareShutdown(boolean suspendOnly, boolean forced) { 1464 // we are shutting down, so force completion if this option was enabled 1465 // but only do this when forced=false, as that is when we have chance to 1466 // send out new messages to be routed by Camel. When forced=true, then 1467 // we have to shutdown in a hurry 1468 if (!forced && forceCompletionOnStop) { 1469 doForceCompletionOnStop(); 1470 } 1471 } 1472 1473 @Override 1474 public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) { 1475 // not in use 1476 return true; 1477 } 1478 1479 @Override 1480 public int getPendingExchangesSize() { 1481 if (completeAllOnStop) { 1482 // we want to regard all pending exchanges in the repo as inflight 1483 Set<String> keys = getAggregationRepository().getKeys(); 1484 return keys != null ? keys.size() : 0; 1485 } else { 1486 return 0; 1487 } 1488 } 1489 1490 private void doForceCompletionOnStop() { 1491 int expected = forceCompletionOfAllGroups(); 1492 1493 StopWatch watch = new StopWatch(); 1494 while (inProgressCompleteExchanges.size() > 0) { 1495 LOG.trace("Waiting for {} inflight exchanges to complete", getInProgressCompleteExchanges()); 1496 try { 1497 Thread.sleep(100); 1498 } catch (InterruptedException e) { 1499 // break out as we got interrupted such as the JVM terminating 1500 LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", getInProgressCompleteExchanges()); 1501 break; 1502 } 1503 } 1504 1505 if (expected > 0) { 1506 LOG.info("Forcing completion of all groups with {} exchanges completed in {}", expected, TimeUtils.printDuration(watch.taken())); 1507 } 1508 } 1509 1510 @Override 1511 protected void doShutdown() throws Exception { 1512 // shutdown aggregation repository and the strategy 1513 ServiceHelper.stopAndShutdownServices(aggregationRepository, aggregationStrategy); 1514 1515 // cleanup when shutting down 1516 inProgressCompleteExchanges.clear(); 1517 1518 if (shutdownExecutorService) { 1519 camelContext.getExecutorServiceManager().shutdownNow(executorService); 1520 } 1521 if (shutdownTimeoutCheckerExecutorService) { 1522 camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService); 1523 timeoutCheckerExecutorService = null; 1524 } 1525 1526 super.doShutdown(); 1527 } 1528 1529 public int forceCompletionOfGroup(String key) { 1530 // must acquire the shared aggregation lock to be able to trigger force completion 1531 int total = 0; 1532 1533 if (!optimisticLocking) { 1534 lock.lock(); 1535 } 1536 try { 1537 Exchange exchange = aggregationRepository.get(camelContext, key); 1538 if (exchange != null) { 1539 total = 1; 1540 LOG.trace("Force completion triggered for correlation key: {}", key); 1541 // indicate it was completed by a force completion request 1542 exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force"); 1543 Exchange answer = onCompletion(key, exchange, exchange, false); 1544 if (answer != null) { 1545 onSubmitCompletion(key, answer); 1546 } 1547 } 1548 } finally { 1549 if (!optimisticLocking) { 1550 lock.unlock(); 1551 } 1552 } 1553 LOG.trace("Completed force completion of group {}", key); 1554 1555 if (total > 0) { 1556 LOG.debug("Forcing completion of group {} with {} exchanges", key, total); 1557 } 1558 return total; 1559 } 1560 1561 public int forceCompletionOfAllGroups() { 1562 1563 // only run if CamelContext has been fully started or is stopping 1564 boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping(); 1565 if (!allow) { 1566 LOG.warn("Cannot start force completion of all groups because CamelContext({}) has not been started", camelContext.getName()); 1567 return 0; 1568 } 1569 1570 LOG.trace("Starting force completion of all groups task"); 1571 1572 // trigger completion for all in the repository 1573 Set<String> keys = aggregationRepository.getKeys(); 1574 1575 int total = 0; 1576 if (keys != null && !keys.isEmpty()) { 1577 // must acquire the shared aggregation lock to be able to trigger force completion 1578 if (!optimisticLocking) { 1579 lock.lock(); 1580 } 1581 total = keys.size(); 1582 try { 1583 for (String key : keys) { 1584 Exchange exchange = aggregationRepository.get(camelContext, key); 1585 if (exchange != null) { 1586 LOG.trace("Force completion triggered for correlation key: {}", key); 1587 // indicate it was completed by a force completion request 1588 exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force"); 1589 Exchange answer = onCompletion(key, exchange, exchange, false); 1590 if (answer != null) { 1591 onSubmitCompletion(key, answer); 1592 } 1593 } 1594 } 1595 } finally { 1596 if (!optimisticLocking) { 1597 lock.unlock(); 1598 } 1599 } 1600 } 1601 LOG.trace("Completed force completion of all groups task"); 1602 1603 if (total > 0) { 1604 LOG.debug("Forcing completion of all groups with {} exchanges", total); 1605 } 1606 return total; 1607 } 1608 1609}