001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.ExecutorService; 022import java.util.concurrent.ScheduledExecutorService; 023 024import javax.xml.bind.annotation.XmlAccessType; 025import javax.xml.bind.annotation.XmlAccessorType; 026import javax.xml.bind.annotation.XmlAttribute; 027import javax.xml.bind.annotation.XmlElement; 028import javax.xml.bind.annotation.XmlElementRef; 029import javax.xml.bind.annotation.XmlRootElement; 030import javax.xml.bind.annotation.XmlTransient; 031 032import org.apache.camel.CamelContextAware; 033import org.apache.camel.Expression; 034import org.apache.camel.Predicate; 035import org.apache.camel.Processor; 036import org.apache.camel.builder.AggregationStrategyClause; 037import org.apache.camel.builder.ExpressionClause; 038import org.apache.camel.builder.PredicateClause; 039import org.apache.camel.model.language.ExpressionDefinition; 040import org.apache.camel.processor.CamelInternalProcessor; 041import org.apache.camel.processor.aggregate.AggregateController; 042import org.apache.camel.processor.aggregate.AggregateProcessor; 043import org.apache.camel.processor.aggregate.AggregationStrategy; 044import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 045import org.apache.camel.processor.aggregate.ClosedCorrelationKeyException; 046import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy; 047import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; 048import org.apache.camel.spi.AggregationRepository; 049import org.apache.camel.spi.AsPredicate; 050import org.apache.camel.spi.Metadata; 051import org.apache.camel.spi.RouteContext; 052import org.apache.camel.util.concurrent.SynchronousExecutorService; 053 054/** 055 * Aggregates many messages into a single message 056 * 057 * @version 058 */ 059@Metadata(label = "eip,routing") 060@XmlRootElement(name = "aggregate") 061@XmlAccessorType(XmlAccessType.FIELD) 062public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> { 063 @XmlElement(name = "correlationExpression", required = true) 064 private ExpressionSubElementDefinition correlationExpression; 065 @XmlElement(name = "completionPredicate") @AsPredicate 066 private ExpressionSubElementDefinition completionPredicate; 067 @XmlElement(name = "completionTimeout") 068 private ExpressionSubElementDefinition completionTimeoutExpression; 069 @XmlElement(name = "completionSize") 070 private ExpressionSubElementDefinition completionSizeExpression; 071 @XmlElement(name = "optimisticLockRetryPolicy") 072 private OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition; 073 @XmlTransient 074 private ExpressionDefinition expression; 075 @XmlTransient 076 private AggregationStrategy aggregationStrategy; 077 @XmlTransient 078 private ExecutorService executorService; 079 @XmlTransient 080 private ScheduledExecutorService timeoutCheckerExecutorService; 081 @XmlTransient 082 private AggregationRepository aggregationRepository; 083 @XmlTransient 084 private OptimisticLockRetryPolicy optimisticLockRetryPolicy; 085 @XmlAttribute 086 private Boolean parallelProcessing; 087 @XmlAttribute 088 private Boolean optimisticLocking; 089 @XmlAttribute 090 private String executorServiceRef; 091 @XmlAttribute 092 private String timeoutCheckerExecutorServiceRef; 093 @XmlAttribute 094 private String aggregationRepositoryRef; 095 @XmlAttribute 096 private String strategyRef; 097 @XmlAttribute 098 private String strategyMethodName; 099 @XmlAttribute 100 private Boolean strategyMethodAllowNull; 101 @XmlAttribute 102 private Integer completionSize; 103 @XmlAttribute 104 private Long completionInterval; 105 @XmlAttribute 106 private Long completionTimeout; 107 @XmlAttribute @Metadata(defaultValue = "1000") 108 private Long completionTimeoutCheckerInterval = 1000L; 109 @XmlAttribute 110 private Boolean completionFromBatchConsumer; 111 @XmlAttribute 112 private Boolean completionOnNewCorrelationGroup; 113 @XmlAttribute 114 @Deprecated 115 private Boolean groupExchanges; 116 @XmlAttribute 117 private Boolean eagerCheckCompletion; 118 @XmlAttribute 119 private Boolean ignoreInvalidCorrelationKeys; 120 @XmlAttribute 121 private Integer closeCorrelationKeyOnCompletion; 122 @XmlAttribute 123 private Boolean discardOnCompletionTimeout; 124 @XmlAttribute 125 private Boolean forceCompletionOnStop; 126 @XmlAttribute 127 private Boolean completeAllOnStop; 128 @XmlTransient 129 private AggregateController aggregateController; 130 @XmlAttribute 131 private String aggregateControllerRef; 132 @XmlElementRef 133 private List<ProcessorDefinition<?>> outputs = new ArrayList<>(); 134 135 public AggregateDefinition() { 136 } 137 138 public AggregateDefinition(@AsPredicate Predicate predicate) { 139 this(ExpressionNodeHelper.toExpressionDefinition(predicate)); 140 } 141 142 public AggregateDefinition(Expression expression) { 143 this(ExpressionNodeHelper.toExpressionDefinition(expression)); 144 } 145 146 public AggregateDefinition(ExpressionDefinition correlationExpression) { 147 setExpression(correlationExpression); 148 149 ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition(); 150 cor.setExpressionType(correlationExpression); 151 setCorrelationExpression(cor); 152 } 153 154 public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 155 this(correlationExpression); 156 this.aggregationStrategy = aggregationStrategy; 157 } 158 159 @Override 160 public String toString() { 161 return "Aggregate[" + description() + " -> " + getOutputs() + "]"; 162 } 163 164 protected String description() { 165 return getExpression() != null ? getExpression().getLabel() : ""; 166 } 167 168 @Override 169 public String getShortName() { 170 return "aggregate"; 171 } 172 173 @Override 174 public String getLabel() { 175 return "aggregate[" + description() + "]"; 176 } 177 178 @Override 179 public Processor createProcessor(RouteContext routeContext) throws Exception { 180 return createAggregator(routeContext); 181 } 182 183 protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception { 184 Processor childProcessor = this.createChildProcessor(routeContext, true); 185 186 // wrap the aggregate route in a unit of work processor 187 CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor); 188 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); 189 190 Expression correlation = getExpression().createExpression(routeContext); 191 AggregationStrategy strategy = createAggregationStrategy(routeContext); 192 193 boolean parallel = getParallelProcessing() != null && getParallelProcessing(); 194 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, parallel); 195 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, parallel); 196 if (threadPool == null && !parallel) { 197 // executor service is mandatory for the Aggregator 198 // we do not run in parallel mode, but use a synchronous executor, so we run in current thread 199 threadPool = new SynchronousExecutorService(); 200 shutdownThreadPool = true; 201 } 202 203 AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), internal, 204 correlation, strategy, threadPool, shutdownThreadPool); 205 206 AggregationRepository repository = createAggregationRepository(routeContext); 207 if (repository != null) { 208 answer.setAggregationRepository(repository); 209 } 210 211 if (getAggregateController() == null && getAggregateControllerRef() != null) { 212 setAggregateController(routeContext.mandatoryLookup(getAggregateControllerRef(), AggregateController.class)); 213 } 214 215 // this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool 216 boolean shutdownTimeoutThreadPool = false; 217 ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService; 218 if (timeoutThreadPool == null && timeoutCheckerExecutorServiceRef != null) { 219 // lookup existing thread pool 220 timeoutThreadPool = routeContext.getCamelContext().getRegistry().lookupByNameAndType(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class); 221 if (timeoutThreadPool == null) { 222 // then create a thread pool assuming the ref is a thread pool profile id 223 timeoutThreadPool = routeContext.getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, 224 AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER, timeoutCheckerExecutorServiceRef); 225 if (timeoutThreadPool == null) { 226 throw new IllegalArgumentException("ExecutorServiceRef " + timeoutCheckerExecutorServiceRef 227 + " not found in registry (as an ScheduledExecutorService instance) or as a thread pool profile."); 228 } 229 shutdownTimeoutThreadPool = true; 230 } 231 } 232 answer.setTimeoutCheckerExecutorService(timeoutThreadPool); 233 answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool); 234 235 // set other options 236 answer.setParallelProcessing(parallel); 237 if (getOptimisticLocking() != null) { 238 answer.setOptimisticLocking(getOptimisticLocking()); 239 } 240 if (getCompletionPredicate() != null) { 241 Predicate predicate = getCompletionPredicate().createPredicate(routeContext); 242 answer.setCompletionPredicate(predicate); 243 } else if (strategy instanceof Predicate) { 244 // if aggregation strategy implements predicate and was not configured then use as fallback 245 log.debug("Using AggregationStrategy as completion predicate: {}", strategy); 246 answer.setCompletionPredicate((Predicate) strategy); 247 } 248 if (getCompletionTimeoutExpression() != null) { 249 Expression expression = getCompletionTimeoutExpression().createExpression(routeContext); 250 answer.setCompletionTimeoutExpression(expression); 251 } 252 if (getCompletionTimeout() != null) { 253 answer.setCompletionTimeout(getCompletionTimeout()); 254 } 255 if (getCompletionInterval() != null) { 256 answer.setCompletionInterval(getCompletionInterval()); 257 } 258 if (getCompletionSizeExpression() != null) { 259 Expression expression = getCompletionSizeExpression().createExpression(routeContext); 260 answer.setCompletionSizeExpression(expression); 261 } 262 if (getCompletionSize() != null) { 263 answer.setCompletionSize(getCompletionSize()); 264 } 265 if (getCompletionFromBatchConsumer() != null) { 266 answer.setCompletionFromBatchConsumer(getCompletionFromBatchConsumer()); 267 } 268 if (getCompletionOnNewCorrelationGroup() != null) { 269 answer.setCompletionOnNewCorrelationGroup(getCompletionOnNewCorrelationGroup()); 270 } 271 if (getEagerCheckCompletion() != null) { 272 answer.setEagerCheckCompletion(getEagerCheckCompletion()); 273 } 274 if (getIgnoreInvalidCorrelationKeys() != null) { 275 answer.setIgnoreInvalidCorrelationKeys(getIgnoreInvalidCorrelationKeys()); 276 } 277 if (getCloseCorrelationKeyOnCompletion() != null) { 278 answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion()); 279 } 280 if (getDiscardOnCompletionTimeout() != null) { 281 answer.setDiscardOnCompletionTimeout(getDiscardOnCompletionTimeout()); 282 } 283 if (getForceCompletionOnStop() != null) { 284 answer.setForceCompletionOnStop(getForceCompletionOnStop()); 285 } 286 if (getCompleteAllOnStop() != null) { 287 answer.setCompleteAllOnStop(getCompleteAllOnStop()); 288 } 289 if (optimisticLockRetryPolicy == null) { 290 if (getOptimisticLockRetryPolicyDefinition() != null) { 291 answer.setOptimisticLockRetryPolicy(getOptimisticLockRetryPolicyDefinition().createOptimisticLockRetryPolicy()); 292 } 293 } else { 294 answer.setOptimisticLockRetryPolicy(optimisticLockRetryPolicy); 295 } 296 if (getAggregateController() != null) { 297 answer.setAggregateController(getAggregateController()); 298 } 299 if (getCompletionTimeoutCheckerInterval() != null) { 300 answer.setCompletionTimeoutCheckerInterval(getCompletionTimeoutCheckerInterval()); 301 } 302 return answer; 303 } 304 305 @Override 306 public void configureChild(ProcessorDefinition<?> output) { 307 if (expression instanceof ExpressionClause) { 308 ExpressionClause<?> clause = (ExpressionClause<?>) expression; 309 if (clause.getExpressionType() != null) { 310 // if using the Java DSL then the expression may have been set using the 311 // ExpressionClause which is a fancy builder to define expressions and predicates 312 // using fluent builders in the DSL. However we need afterwards a callback to 313 // reset the expression to the expression type the ExpressionClause did build for us 314 expression = clause.getExpressionType(); 315 // set the correlation expression from the expression type, as the model definition 316 // would then be accurate 317 correlationExpression = new ExpressionSubElementDefinition(); 318 correlationExpression.setExpressionType(clause.getExpressionType()); 319 } 320 } 321 } 322 323 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 324 AggregationStrategy strategy = getAggregationStrategy(); 325 if (strategy == null && strategyRef != null) { 326 Object aggStrategy = routeContext.lookup(strategyRef, Object.class); 327 if (aggStrategy instanceof AggregationStrategy) { 328 strategy = (AggregationStrategy) aggStrategy; 329 } else if (aggStrategy != null) { 330 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName()); 331 if (getStrategyMethodAllowNull() != null) { 332 adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); 333 adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); 334 } 335 strategy = adapter; 336 } else { 337 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); 338 } 339 } 340 341 if (groupExchanges != null && groupExchanges) { 342 if (strategy != null || strategyRef != null) { 343 throw new IllegalArgumentException("Options groupExchanges and AggregationStrategy cannot be enabled at the same time"); 344 } 345 if (eagerCheckCompletion != null && !eagerCheckCompletion) { 346 throw new IllegalArgumentException("Option eagerCheckCompletion cannot be false when groupExchanges has been enabled"); 347 } 348 // set eager check to enabled by default when using grouped exchanges 349 setEagerCheckCompletion(true); 350 // if grouped exchange is enabled then use special strategy for that 351 strategy = new GroupedExchangeAggregationStrategy(); 352 } 353 354 if (strategy == null) { 355 throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this); 356 } 357 358 if (strategy instanceof CamelContextAware) { 359 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 360 } 361 362 return strategy; 363 } 364 365 private AggregationRepository createAggregationRepository(RouteContext routeContext) { 366 AggregationRepository repository = getAggregationRepository(); 367 if (repository == null && aggregationRepositoryRef != null) { 368 repository = routeContext.mandatoryLookup(aggregationRepositoryRef, AggregationRepository.class); 369 } 370 return repository; 371 } 372 373 public AggregationStrategy getAggregationStrategy() { 374 return aggregationStrategy; 375 } 376 377 /** 378 * The AggregationStrategy to use. 379 * <p/> 380 * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges. 381 * At first call the oldExchange parameter is null. 382 * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange. 383 */ 384 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 385 this.aggregationStrategy = aggregationStrategy; 386 } 387 388 public String getAggregationStrategyRef() { 389 return strategyRef; 390 } 391 392 /** 393 * A reference to lookup the AggregationStrategy in the Registry. 394 * <p/> 395 * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges. 396 * At first call the oldExchange parameter is null. 397 * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange. 398 */ 399 public void setAggregationStrategyRef(String aggregationStrategyRef) { 400 this.strategyRef = aggregationStrategyRef; 401 } 402 403 public String getStrategyRef() { 404 return strategyRef; 405 } 406 407 /** 408 * A reference to lookup the AggregationStrategy in the Registry. 409 * <p/> 410 * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges. 411 * At first call the oldExchange parameter is null. 412 * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange. 413 */ 414 public void setStrategyRef(String strategyRef) { 415 this.strategyRef = strategyRef; 416 } 417 418 public String getAggregationStrategyMethodName() { 419 return strategyMethodName; 420 } 421 422 /** 423 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 424 */ 425 public void setAggregationStrategyMethodName(String strategyMethodName) { 426 this.strategyMethodName = strategyMethodName; 427 } 428 429 public Boolean getStrategyMethodAllowNull() { 430 return strategyMethodAllowNull; 431 } 432 433 public String getStrategyMethodName() { 434 return strategyMethodName; 435 } 436 437 /** 438 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 439 */ 440 public void setStrategyMethodName(String strategyMethodName) { 441 this.strategyMethodName = strategyMethodName; 442 } 443 444 /** 445 * If this option is false then the aggregate method is not used for the very first aggregation. 446 * If this option is true then null values is used as the oldExchange (at the very first aggregation), 447 * when using POJOs as the AggregationStrategy. 448 */ 449 public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { 450 this.strategyMethodAllowNull = strategyMethodAllowNull; 451 } 452 453 /** 454 * The expression used to calculate the correlation key to use for aggregation. 455 * The Exchange which has the same correlation key is aggregated together. 456 * If the correlation key could not be evaluated an Exception is thrown. 457 * You can disable this by using the ignoreBadCorrelationKeys option. 458 */ 459 public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) { 460 this.correlationExpression = correlationExpression; 461 } 462 463 public ExpressionSubElementDefinition getCorrelationExpression() { 464 return correlationExpression; 465 } 466 467 public Integer getCompletionSize() { 468 return completionSize; 469 } 470 471 public void setCompletionSize(Integer completionSize) { 472 this.completionSize = completionSize; 473 } 474 475 public OptimisticLockRetryPolicyDefinition getOptimisticLockRetryPolicyDefinition() { 476 return optimisticLockRetryPolicyDefinition; 477 } 478 479 public void setOptimisticLockRetryPolicyDefinition(OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition) { 480 this.optimisticLockRetryPolicyDefinition = optimisticLockRetryPolicyDefinition; 481 } 482 483 public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() { 484 return optimisticLockRetryPolicy; 485 } 486 487 public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) { 488 this.optimisticLockRetryPolicy = optimisticLockRetryPolicy; 489 } 490 491 public Long getCompletionInterval() { 492 return completionInterval; 493 } 494 495 public void setCompletionInterval(Long completionInterval) { 496 this.completionInterval = completionInterval; 497 } 498 499 public Long getCompletionTimeout() { 500 return completionTimeout; 501 } 502 503 public void setCompletionTimeout(Long completionTimeout) { 504 this.completionTimeout = completionTimeout; 505 } 506 507 public Long getCompletionTimeoutCheckerInterval() { 508 return completionTimeoutCheckerInterval; 509 } 510 511 public void setCompletionTimeoutCheckerInterval(Long completionTimeoutCheckerInterval) { 512 this.completionTimeoutCheckerInterval = completionTimeoutCheckerInterval; 513 } 514 515 public ExpressionSubElementDefinition getCompletionPredicate() { 516 return completionPredicate; 517 } 518 519 public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) { 520 this.completionPredicate = completionPredicate; 521 } 522 523 public ExpressionSubElementDefinition getCompletionTimeoutExpression() { 524 return completionTimeoutExpression; 525 } 526 527 public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) { 528 this.completionTimeoutExpression = completionTimeoutExpression; 529 } 530 531 public ExpressionSubElementDefinition getCompletionSizeExpression() { 532 return completionSizeExpression; 533 } 534 535 public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) { 536 this.completionSizeExpression = completionSizeExpression; 537 } 538 539 public Boolean getGroupExchanges() { 540 return groupExchanges; 541 } 542 543 public void setGroupExchanges(Boolean groupExchanges) { 544 this.groupExchanges = groupExchanges; 545 } 546 547 public Boolean getCompletionFromBatchConsumer() { 548 return completionFromBatchConsumer; 549 } 550 551 public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) { 552 this.completionFromBatchConsumer = completionFromBatchConsumer; 553 } 554 555 public Boolean getCompletionOnNewCorrelationGroup() { 556 return completionOnNewCorrelationGroup; 557 } 558 559 public void setCompletionOnNewCorrelationGroup(Boolean completionOnNewCorrelationGroup) { 560 this.completionOnNewCorrelationGroup = completionOnNewCorrelationGroup; 561 } 562 563 public ExecutorService getExecutorService() { 564 return executorService; 565 } 566 567 public void setExecutorService(ExecutorService executorService) { 568 this.executorService = executorService; 569 } 570 571 public Boolean getOptimisticLocking() { 572 return optimisticLocking; 573 } 574 575 public void setOptimisticLocking(boolean optimisticLocking) { 576 this.optimisticLocking = optimisticLocking; 577 } 578 579 public Boolean getParallelProcessing() { 580 return parallelProcessing; 581 } 582 583 public void setParallelProcessing(boolean parallelProcessing) { 584 this.parallelProcessing = parallelProcessing; 585 } 586 587 public String getExecutorServiceRef() { 588 return executorServiceRef; 589 } 590 591 public void setExecutorServiceRef(String executorServiceRef) { 592 this.executorServiceRef = executorServiceRef; 593 } 594 595 public Boolean getEagerCheckCompletion() { 596 return eagerCheckCompletion; 597 } 598 599 public void setEagerCheckCompletion(Boolean eagerCheckCompletion) { 600 this.eagerCheckCompletion = eagerCheckCompletion; 601 } 602 603 public Boolean getIgnoreInvalidCorrelationKeys() { 604 return ignoreInvalidCorrelationKeys; 605 } 606 607 public void setIgnoreInvalidCorrelationKeys(Boolean ignoreInvalidCorrelationKeys) { 608 this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys; 609 } 610 611 public Integer getCloseCorrelationKeyOnCompletion() { 612 return closeCorrelationKeyOnCompletion; 613 } 614 615 public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) { 616 this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion; 617 } 618 619 public AggregationRepository getAggregationRepository() { 620 return aggregationRepository; 621 } 622 623 public void setAggregationRepository(AggregationRepository aggregationRepository) { 624 this.aggregationRepository = aggregationRepository; 625 } 626 627 public String getAggregationRepositoryRef() { 628 return aggregationRepositoryRef; 629 } 630 631 public void setAggregationRepositoryRef(String aggregationRepositoryRef) { 632 this.aggregationRepositoryRef = aggregationRepositoryRef; 633 } 634 635 public Boolean getDiscardOnCompletionTimeout() { 636 return discardOnCompletionTimeout; 637 } 638 639 public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) { 640 this.discardOnCompletionTimeout = discardOnCompletionTimeout; 641 } 642 643 public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { 644 this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; 645 } 646 647 public ScheduledExecutorService getTimeoutCheckerExecutorService() { 648 return timeoutCheckerExecutorService; 649 } 650 651 public void setTimeoutCheckerExecutorServiceRef(String timeoutCheckerExecutorServiceRef) { 652 this.timeoutCheckerExecutorServiceRef = timeoutCheckerExecutorServiceRef; 653 } 654 655 public String getTimeoutCheckerExecutorServiceRef() { 656 return timeoutCheckerExecutorServiceRef; 657 } 658 659 public Boolean getForceCompletionOnStop() { 660 return forceCompletionOnStop; 661 } 662 663 public void setForceCompletionOnStop(Boolean forceCompletionOnStop) { 664 this.forceCompletionOnStop = forceCompletionOnStop; 665 } 666 667 public Boolean getCompleteAllOnStop() { 668 return completeAllOnStop; 669 } 670 671 public void setCompleteAllOnStop(Boolean completeAllOnStop) { 672 this.completeAllOnStop = completeAllOnStop; 673 } 674 675 public AggregateController getAggregateController() { 676 return aggregateController; 677 } 678 679 public void setAggregateController(AggregateController aggregateController) { 680 this.aggregateController = aggregateController; 681 } 682 683 public String getAggregateControllerRef() { 684 return aggregateControllerRef; 685 } 686 687 /** 688 * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control 689 * this aggregator. 690 */ 691 public void setAggregateControllerRef(String aggregateControllerRef) { 692 this.aggregateControllerRef = aggregateControllerRef; 693 } 694 695 // Fluent API 696 //------------------------------------------------------------------------- 697 698 /** 699 * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange. 700 * As opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange. 701 * 702 * @return builder 703 */ 704 public AggregateDefinition eagerCheckCompletion() { 705 setEagerCheckCompletion(true); 706 return this; 707 } 708 709 /** 710 * If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just 711 * ignore the incoming Exchange. 712 * 713 * @return builder 714 */ 715 public AggregateDefinition ignoreInvalidCorrelationKeys() { 716 setIgnoreInvalidCorrelationKeys(true); 717 return this; 718 } 719 720 /** 721 * Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key 722 * that has been closed, it will be defined and a {@link ClosedCorrelationKeyException} 723 * is thrown. 724 * 725 * @param capacity the maximum capacity of the closed correlation key cache. 726 * Use <tt>0</tt> or negative value for unbounded capacity. 727 * @return builder 728 */ 729 public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) { 730 setCloseCorrelationKeyOnCompletion(capacity); 731 return this; 732 } 733 734 /** 735 * Discards the aggregated message on completion timeout. 736 * <p/> 737 * This means on timeout the aggregated message is dropped and not sent out of the aggregator. 738 * 739 * @return builder 740 */ 741 public AggregateDefinition discardOnCompletionTimeout() { 742 setDiscardOnCompletionTimeout(true); 743 return this; 744 } 745 746 /** 747 * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer} 748 * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported 749 * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete. 750 * 751 * @return builder 752 */ 753 public AggregateDefinition completionFromBatchConsumer() { 754 setCompletionFromBatchConsumer(true); 755 return this; 756 } 757 758 /** 759 * Enables completion on all previous groups when a new incoming correlation group. This can for example be used 760 * to complete groups with same correlation keys when they are in consecutive order. 761 * Notice when this is enabled then only 1 correlation group can be in progress as when a new correlation group 762 * starts, then the previous groups is forced completed. 763 * 764 * @return builder 765 */ 766 public AggregateDefinition completionOnNewCorrelationGroup() { 767 setCompletionOnNewCorrelationGroup(true); 768 return this; 769 } 770 771 /** 772 * Number of messages aggregated before the aggregation is complete. This option can be set as either 773 * a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. 774 * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. 775 * 776 * @param completionSize the completion size, must be a positive number 777 * @return builder 778 */ 779 public AggregateDefinition completionSize(int completionSize) { 780 setCompletionSize(completionSize); 781 return this; 782 } 783 784 /** 785 * Number of messages aggregated before the aggregation is complete. This option can be set as either 786 * a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. 787 * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. 788 * 789 * @param completionSize the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type 790 * @return builder 791 */ 792 public AggregateDefinition completionSize(Expression completionSize) { 793 setCompletionSizeExpression(new ExpressionSubElementDefinition(completionSize)); 794 return this; 795 } 796 797 /** 798 * A repeating period in millis by which the aggregator will complete all current aggregated exchanges. 799 * Camel has a background task which is triggered every period. You cannot use this option together 800 * with completionTimeout, only one of them can be used. 801 * 802 * @param completionInterval the interval in millis, must be a positive value 803 * @return the builder 804 */ 805 public AggregateDefinition completionInterval(long completionInterval) { 806 setCompletionInterval(completionInterval); 807 return this; 808 } 809 810 /** 811 * Time in millis that an aggregated exchange should be inactive before its complete (timeout). 812 * This option can be set as either a fixed value or using an Expression which allows you to evaluate 813 * a timeout dynamically - will use Long as result. 814 * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. 815 * You cannot use this option together with completionInterval, only one of the two can be used. 816 * <p/> 817 * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option 818 * to configure how frequently to run the checker. 819 * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. 820 * It is not recommended to use very low timeout values or checker intervals. 821 * 822 * @param completionTimeout the timeout in millis, must be a positive value 823 * @return the builder 824 */ 825 public AggregateDefinition completionTimeout(long completionTimeout) { 826 setCompletionTimeout(completionTimeout); 827 return this; 828 } 829 830 /** 831 * Time in millis that an aggregated exchange should be inactive before its complete (timeout). 832 * This option can be set as either a fixed value or using an Expression which allows you to evaluate 833 * a timeout dynamically - will use Long as result. 834 * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. 835 * You cannot use this option together with completionInterval, only one of the two can be used. 836 * <p/> 837 * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option 838 * to configure how frequently to run the checker. 839 * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. 840 * It is not recommended to use very low timeout values or checker intervals. 841 * 842 * @param completionTimeout the timeout as an {@link Expression} which is evaluated as a {@link Long} type 843 * @return the builder 844 */ 845 public AggregateDefinition completionTimeout(Expression completionTimeout) { 846 setCompletionTimeoutExpression(new ExpressionSubElementDefinition(completionTimeout)); 847 return this; 848 } 849 850 /** 851 * Interval in millis that is used by the background task that checks for timeouts ({@link org.apache.camel.TimeoutMap}). 852 * <p/> 853 * By default the timeout checker runs every second. 854 * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. 855 * It is not recommended to use very low timeout values or checker intervals. 856 * 857 * @param completionTimeoutCheckerInterval the interval in millis, must be a positive value 858 * @return the builder 859 */ 860 public AggregateDefinition completionTimeoutCheckerInterval(long completionTimeoutCheckerInterval) { 861 setCompletionTimeoutCheckerInterval(completionTimeoutCheckerInterval); 862 return this; 863 } 864 865 /** 866 * Sets the AggregationStrategy to use with a fluent builder. 867 */ 868 public AggregationStrategyClause<AggregateDefinition> aggregationStrategy() { 869 AggregationStrategyClause<AggregateDefinition> clause = new AggregationStrategyClause<>(this); 870 setAggregationStrategy(clause); 871 return clause; 872 } 873 874 /** 875 * Sets the AggregationStrategy to use with a fluent builder. 876 */ 877 public AggregationStrategyClause<AggregateDefinition> strategy() { 878 return aggregationStrategy(); 879 } 880 881 /** 882 * Sets the aggregate strategy to use 883 * 884 * @param aggregationStrategy the aggregate strategy to use 885 * @return the builder 886 */ 887 public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) { 888 return aggregationStrategy(aggregationStrategy); 889 } 890 891 /** 892 * Sets the aggregate strategy to use 893 * 894 * @param aggregationStrategy the aggregate strategy to use 895 * @return the builder 896 */ 897 public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 898 setAggregationStrategy(aggregationStrategy); 899 return this; 900 } 901 902 /** 903 * Sets the aggregate strategy to use 904 * 905 * @param aggregationStrategyRef reference to the strategy to lookup in the registry 906 * @return the builder 907 */ 908 public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) { 909 setAggregationStrategyRef(aggregationStrategyRef); 910 return this; 911 } 912 913 /** 914 * Sets the method name to use when using a POJO as {@link AggregationStrategy}. 915 * 916 * @param methodName the method name to call 917 * @return the builder 918 */ 919 public AggregateDefinition aggregationStrategyMethodName(String methodName) { 920 setAggregationStrategyMethodName(methodName); 921 return this; 922 } 923 924 /** 925 * Sets allowing null when using a POJO as {@link AggregationStrategy}. 926 * 927 * @return the builder 928 */ 929 public AggregateDefinition aggregationStrategyMethodAllowNull() { 930 setStrategyMethodAllowNull(true); 931 return this; 932 } 933 934 /** 935 * Sets the custom aggregate repository to use. 936 * <p/> 937 * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository} 938 * 939 * @param aggregationRepository the aggregate repository to use 940 * @return the builder 941 */ 942 public AggregateDefinition aggregationRepository(AggregationRepository aggregationRepository) { 943 setAggregationRepository(aggregationRepository); 944 return this; 945 } 946 947 /** 948 * Sets the custom aggregate repository to use 949 * <p/> 950 * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository} 951 * 952 * @param aggregationRepositoryRef reference to the repository to lookup in the registry 953 * @return the builder 954 */ 955 public AggregateDefinition aggregationRepositoryRef(String aggregationRepositoryRef) { 956 setAggregationRepositoryRef(aggregationRepositoryRef); 957 return this; 958 } 959 960 /** 961 * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single 962 * combined Exchange holding all the aggregated exchanges in a {@link java.util.List}. 963 * 964 * @deprecated use {@link GroupedExchangeAggregationStrategy} as aggregation strategy instead. 965 */ 966 @Deprecated 967 public AggregateDefinition groupExchanges() { 968 setGroupExchanges(true); 969 // must use eager check when using grouped exchanges 970 setEagerCheckCompletion(true); 971 return this; 972 } 973 974 /** 975 * A Predicate to indicate when an aggregated exchange is complete. 976 * If this is not specified and the AggregationStrategy object implements Predicate, 977 * the aggregationStrategy object will be used as the completionPredicate. 978 */ 979 public AggregateDefinition completionPredicate(@AsPredicate Predicate predicate) { 980 checkNoCompletedPredicate(); 981 setCompletionPredicate(new ExpressionSubElementDefinition(predicate)); 982 return this; 983 } 984 985 /** 986 * A Predicate to indicate when an aggregated exchange is complete. 987 * If this is not specified and the AggregationStrategy object implements Predicate, 988 * the aggregationStrategy object will be used as the completionPredicate. 989 */ 990 @AsPredicate 991 public PredicateClause<AggregateDefinition> completionPredicate() { 992 PredicateClause<AggregateDefinition> clause = new PredicateClause<>(this); 993 completionPredicate(clause); 994 return clause; 995 } 996 997 /** 998 * A Predicate to indicate when an aggregated exchange is complete. 999 * If this is not specified and the AggregationStrategy object implements Predicate, 1000 * the aggregationStrategy object will be used as the completionPredicate. 1001 */ 1002 @AsPredicate 1003 public PredicateClause<AggregateDefinition> completion() { 1004 return completionPredicate(); 1005 } 1006 1007 /** 1008 * A Predicate to indicate when an aggregated exchange is complete. 1009 * If this is not specified and the AggregationStrategy object implements Predicate, 1010 * the aggregationStrategy object will be used as the completionPredicate. 1011 */ 1012 public AggregateDefinition completion(@AsPredicate Predicate predicate) { 1013 return completionPredicate(predicate); 1014 } 1015 1016 /** 1017 * Indicates to complete all current aggregated exchanges when the context is stopped 1018 */ 1019 public AggregateDefinition forceCompletionOnStop() { 1020 setForceCompletionOnStop(true); 1021 return this; 1022 } 1023 1024 /** 1025 * Indicates to wait to complete all current and partial (pending) aggregated exchanges when the context is stopped. 1026 * <p/> 1027 * This also means that we will wait for all pending exchanges which are stored in the aggregation repository 1028 * to complete so the repository is empty before we can stop. 1029 * <p/> 1030 * You may want to enable this when using the memory based aggregation repository that is memory based only, 1031 * and do not store data on disk. When this option is enabled, then the aggregator is waiting to complete 1032 * all those exchanges before its stopped, when stopping CamelContext or the route using it. 1033 */ 1034 public AggregateDefinition completeAllOnStop() { 1035 setCompleteAllOnStop(true); 1036 return this; 1037 } 1038 1039 /** 1040 * When aggregated are completed they are being send out of the aggregator. 1041 * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. 1042 * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads. 1043 */ 1044 public AggregateDefinition parallelProcessing() { 1045 setParallelProcessing(true); 1046 return this; 1047 } 1048 1049 /** 1050 * When aggregated are completed they are being send out of the aggregator. 1051 * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. 1052 * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads. 1053 */ 1054 public AggregateDefinition parallelProcessing(boolean parallelProcessing) { 1055 setParallelProcessing(parallelProcessing); 1056 return this; 1057 } 1058 1059 /** 1060 * Turns on using optimistic locking, which requires the aggregationRepository being used, 1061 * is supporting this by implementing {@link org.apache.camel.spi.OptimisticLockingAggregationRepository}. 1062 */ 1063 public AggregateDefinition optimisticLocking() { 1064 setOptimisticLocking(true); 1065 return this; 1066 } 1067 1068 /** 1069 * Allows to configure retry settings when using optimistic locking. 1070 */ 1071 public AggregateDefinition optimisticLockRetryPolicy(OptimisticLockRetryPolicy policy) { 1072 setOptimisticLockRetryPolicy(policy); 1073 return this; 1074 } 1075 1076 /** 1077 * If using parallelProcessing you can specify a custom thread pool to be used. 1078 * In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well. 1079 */ 1080 public AggregateDefinition executorService(ExecutorService executorService) { 1081 setExecutorService(executorService); 1082 return this; 1083 } 1084 1085 /** 1086 * If using parallelProcessing you can specify a custom thread pool to be used. 1087 * In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well. 1088 */ 1089 public AggregateDefinition executorServiceRef(String executorServiceRef) { 1090 setExecutorServiceRef(executorServiceRef); 1091 return this; 1092 } 1093 1094 /** 1095 * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a 1096 * background thread is created to check for the completion for every aggregator. 1097 * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator. 1098 */ 1099 public AggregateDefinition timeoutCheckerExecutorService(ScheduledExecutorService executorService) { 1100 setTimeoutCheckerExecutorService(executorService); 1101 return this; 1102 } 1103 1104 /** 1105 * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a 1106 * background thread is created to check for the completion for every aggregator. 1107 * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator. 1108 */ 1109 public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) { 1110 setTimeoutCheckerExecutorServiceRef(executorServiceRef); 1111 return this; 1112 } 1113 1114 /** 1115 * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control 1116 * this aggregator. 1117 */ 1118 public AggregateDefinition aggregateController(AggregateController aggregateController) { 1119 setAggregateController(aggregateController); 1120 return this; 1121 } 1122 1123 // Section - Methods from ExpressionNode 1124 // Needed to copy methods from ExpressionNode here so that I could specify the 1125 // correlation expression as optional in JAXB 1126 1127 public ExpressionDefinition getExpression() { 1128 if (expression == null && correlationExpression != null) { 1129 expression = correlationExpression.getExpressionType(); 1130 } 1131 return expression; 1132 } 1133 1134 public void setExpression(ExpressionDefinition expression) { 1135 this.expression = expression; 1136 } 1137 1138 protected void checkNoCompletedPredicate() { 1139 if (getCompletionPredicate() != null) { 1140 throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this); 1141 } 1142 } 1143 1144 @Override 1145 public List<ProcessorDefinition<?>> getOutputs() { 1146 return outputs; 1147 } 1148 1149 public boolean isOutputSupported() { 1150 return true; 1151 } 1152 1153 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 1154 this.outputs = outputs; 1155 } 1156 1157}