001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.ExecutorService; 022import java.util.concurrent.ScheduledExecutorService; 023import javax.xml.bind.annotation.XmlAccessType; 024import javax.xml.bind.annotation.XmlAccessorType; 025import javax.xml.bind.annotation.XmlAttribute; 026import javax.xml.bind.annotation.XmlElement; 027import javax.xml.bind.annotation.XmlElementRef; 028import javax.xml.bind.annotation.XmlRootElement; 029import javax.xml.bind.annotation.XmlTransient; 030 031import org.apache.camel.CamelContextAware; 032import org.apache.camel.Expression; 033import org.apache.camel.Predicate; 034import org.apache.camel.Processor; 035import org.apache.camel.builder.AggregationStrategyClause; 036import org.apache.camel.builder.ExpressionClause; 037import org.apache.camel.builder.PredicateClause; 038import org.apache.camel.model.language.ExpressionDefinition; 039import org.apache.camel.processor.CamelInternalProcessor; 040import org.apache.camel.processor.aggregate.AggregateController; 041import org.apache.camel.processor.aggregate.AggregateProcessor; 042import org.apache.camel.processor.aggregate.AggregationStrategy; 043import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 044import org.apache.camel.processor.aggregate.ClosedCorrelationKeyException; 045import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy; 046import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; 047import org.apache.camel.spi.AggregationRepository; 048import org.apache.camel.spi.AsPredicate; 049import org.apache.camel.spi.Metadata; 050import org.apache.camel.spi.RouteContext; 051import org.apache.camel.util.concurrent.SynchronousExecutorService; 052 053/** 054 * Aggregates many messages into a single message 055 * 056 * @version 057 */ 058@Metadata(label = "eip,routing") 059@XmlRootElement(name = "aggregate") 060@XmlAccessorType(XmlAccessType.FIELD) 061public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> { 062 @XmlElement(name = "correlationExpression", required = true) 063 private ExpressionSubElementDefinition correlationExpression; 064 @XmlElement(name = "completionPredicate") @AsPredicate 065 private ExpressionSubElementDefinition completionPredicate; 066 @XmlElement(name = "completionTimeout") 067 private ExpressionSubElementDefinition completionTimeoutExpression; 068 @XmlElement(name = "completionSize") 069 private ExpressionSubElementDefinition completionSizeExpression; 070 @XmlElement(name = "optimisticLockRetryPolicy") 071 private OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition; 072 @XmlTransient 073 private ExpressionDefinition expression; 074 @XmlTransient 075 private AggregationStrategy aggregationStrategy; 076 @XmlTransient 077 private ExecutorService executorService; 078 @XmlTransient 079 private ScheduledExecutorService timeoutCheckerExecutorService; 080 @XmlTransient 081 private AggregationRepository aggregationRepository; 082 @XmlTransient 083 private OptimisticLockRetryPolicy optimisticLockRetryPolicy; 084 @XmlAttribute 085 private Boolean parallelProcessing; 086 @XmlAttribute 087 private Boolean optimisticLocking; 088 @XmlAttribute 089 private String executorServiceRef; 090 @XmlAttribute 091 private String timeoutCheckerExecutorServiceRef; 092 @XmlAttribute 093 private String aggregationRepositoryRef; 094 @XmlAttribute 095 private String strategyRef; 096 @XmlAttribute 097 private String strategyMethodName; 098 @XmlAttribute 099 private Boolean strategyMethodAllowNull; 100 @XmlAttribute 101 private Integer completionSize; 102 @XmlAttribute 103 private Long completionInterval; 104 @XmlAttribute 105 private Long completionTimeout; 106 @XmlAttribute @Metadata(defaultValue = "1000") 107 private Long completionTimeoutCheckerInterval = 1000L; 108 @XmlAttribute 109 private Boolean completionFromBatchConsumer; 110 @XmlAttribute 111 private Boolean completionOnNewCorrelationGroup; 112 @XmlAttribute 113 @Deprecated 114 private Boolean groupExchanges; 115 @XmlAttribute 116 private Boolean eagerCheckCompletion; 117 @XmlAttribute 118 private Boolean ignoreInvalidCorrelationKeys; 119 @XmlAttribute 120 private Integer closeCorrelationKeyOnCompletion; 121 @XmlAttribute 122 private Boolean discardOnCompletionTimeout; 123 @XmlAttribute 124 private Boolean forceCompletionOnStop; 125 @XmlAttribute 126 private Boolean completeAllOnStop; 127 @XmlTransient 128 private AggregateController aggregateController; 129 @XmlAttribute 130 private String aggregateControllerRef; 131 @XmlElementRef 132 private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); 133 134 public AggregateDefinition() { 135 } 136 137 public AggregateDefinition(@AsPredicate Predicate predicate) { 138 this(ExpressionNodeHelper.toExpressionDefinition(predicate)); 139 } 140 141 public AggregateDefinition(Expression expression) { 142 this(ExpressionNodeHelper.toExpressionDefinition(expression)); 143 } 144 145 public AggregateDefinition(ExpressionDefinition correlationExpression) { 146 setExpression(correlationExpression); 147 148 ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition(); 149 cor.setExpressionType(correlationExpression); 150 setCorrelationExpression(cor); 151 } 152 153 public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 154 this(correlationExpression); 155 this.aggregationStrategy = aggregationStrategy; 156 } 157 158 @Override 159 public String toString() { 160 return "Aggregate[" + description() + " -> " + getOutputs() + "]"; 161 } 162 163 protected String description() { 164 return getExpression() != null ? getExpression().getLabel() : ""; 165 } 166 167 @Override 168 public String getLabel() { 169 return "aggregate[" + description() + "]"; 170 } 171 172 @Override 173 public Processor createProcessor(RouteContext routeContext) throws Exception { 174 return createAggregator(routeContext); 175 } 176 177 protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception { 178 Processor childProcessor = this.createChildProcessor(routeContext, true); 179 180 // wrap the aggregate route in a unit of work processor 181 CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor); 182 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); 183 184 Expression correlation = getExpression().createExpression(routeContext); 185 AggregationStrategy strategy = createAggregationStrategy(routeContext); 186 187 boolean parallel = getParallelProcessing() != null && getParallelProcessing(); 188 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, parallel); 189 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, parallel); 190 if (threadPool == null && !parallel) { 191 // executor service is mandatory for the Aggregator 192 // we do not run in parallel mode, but use a synchronous executor, so we run in current thread 193 threadPool = new SynchronousExecutorService(); 194 shutdownThreadPool = true; 195 } 196 197 AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), internal, 198 correlation, strategy, threadPool, shutdownThreadPool); 199 200 AggregationRepository repository = createAggregationRepository(routeContext); 201 if (repository != null) { 202 answer.setAggregationRepository(repository); 203 } 204 205 if (getAggregateController() == null && getAggregateControllerRef() != null) { 206 setAggregateController(routeContext.mandatoryLookup(getAggregateControllerRef(), AggregateController.class)); 207 } 208 209 // this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool 210 boolean shutdownTimeoutThreadPool = false; 211 ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService; 212 if (timeoutThreadPool == null && timeoutCheckerExecutorServiceRef != null) { 213 // lookup existing thread pool 214 timeoutThreadPool = routeContext.getCamelContext().getRegistry().lookupByNameAndType(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class); 215 if (timeoutThreadPool == null) { 216 // then create a thread pool assuming the ref is a thread pool profile id 217 timeoutThreadPool = routeContext.getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, 218 AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER, timeoutCheckerExecutorServiceRef); 219 if (timeoutThreadPool == null) { 220 throw new IllegalArgumentException("ExecutorServiceRef " + timeoutCheckerExecutorServiceRef 221 + " not found in registry (as an ScheduledExecutorService instance) or as a thread pool profile."); 222 } 223 shutdownTimeoutThreadPool = true; 224 } 225 } 226 answer.setTimeoutCheckerExecutorService(timeoutThreadPool); 227 answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool); 228 229 // set other options 230 answer.setParallelProcessing(parallel); 231 if (getOptimisticLocking() != null) { 232 answer.setOptimisticLocking(getOptimisticLocking()); 233 } 234 if (getCompletionPredicate() != null) { 235 Predicate predicate = getCompletionPredicate().createPredicate(routeContext); 236 answer.setCompletionPredicate(predicate); 237 } else if (strategy instanceof Predicate) { 238 // if aggregation strategy implements predicate and was not configured then use as fallback 239 log.debug("Using AggregationStrategy as completion predicate: {}", strategy); 240 answer.setCompletionPredicate((Predicate) strategy); 241 } 242 if (getCompletionTimeoutExpression() != null) { 243 Expression expression = getCompletionTimeoutExpression().createExpression(routeContext); 244 answer.setCompletionTimeoutExpression(expression); 245 } 246 if (getCompletionTimeout() != null) { 247 answer.setCompletionTimeout(getCompletionTimeout()); 248 } 249 if (getCompletionInterval() != null) { 250 answer.setCompletionInterval(getCompletionInterval()); 251 } 252 if (getCompletionSizeExpression() != null) { 253 Expression expression = getCompletionSizeExpression().createExpression(routeContext); 254 answer.setCompletionSizeExpression(expression); 255 } 256 if (getCompletionSize() != null) { 257 answer.setCompletionSize(getCompletionSize()); 258 } 259 if (getCompletionFromBatchConsumer() != null) { 260 answer.setCompletionFromBatchConsumer(getCompletionFromBatchConsumer()); 261 } 262 if (getCompletionOnNewCorrelationGroup() != null) { 263 answer.setCompletionOnNewCorrelationGroup(getCompletionOnNewCorrelationGroup()); 264 } 265 if (getEagerCheckCompletion() != null) { 266 answer.setEagerCheckCompletion(getEagerCheckCompletion()); 267 } 268 if (getIgnoreInvalidCorrelationKeys() != null) { 269 answer.setIgnoreInvalidCorrelationKeys(getIgnoreInvalidCorrelationKeys()); 270 } 271 if (getCloseCorrelationKeyOnCompletion() != null) { 272 answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion()); 273 } 274 if (getDiscardOnCompletionTimeout() != null) { 275 answer.setDiscardOnCompletionTimeout(getDiscardOnCompletionTimeout()); 276 } 277 if (getForceCompletionOnStop() != null) { 278 answer.setForceCompletionOnStop(getForceCompletionOnStop()); 279 } 280 if (getCompleteAllOnStop() != null) { 281 answer.setCompleteAllOnStop(getCompleteAllOnStop()); 282 } 283 if (optimisticLockRetryPolicy == null) { 284 if (getOptimisticLockRetryPolicyDefinition() != null) { 285 answer.setOptimisticLockRetryPolicy(getOptimisticLockRetryPolicyDefinition().createOptimisticLockRetryPolicy()); 286 } 287 } else { 288 answer.setOptimisticLockRetryPolicy(optimisticLockRetryPolicy); 289 } 290 if (getAggregateController() != null) { 291 answer.setAggregateController(getAggregateController()); 292 } 293 if (getCompletionTimeoutCheckerInterval() != null) { 294 answer.setCompletionTimeoutCheckerInterval(getCompletionTimeoutCheckerInterval()); 295 } 296 return answer; 297 } 298 299 @Override 300 public void configureChild(ProcessorDefinition<?> output) { 301 if (expression instanceof ExpressionClause) { 302 ExpressionClause<?> clause = (ExpressionClause<?>) expression; 303 if (clause.getExpressionType() != null) { 304 // if using the Java DSL then the expression may have been set using the 305 // ExpressionClause which is a fancy builder to define expressions and predicates 306 // using fluent builders in the DSL. However we need afterwards a callback to 307 // reset the expression to the expression type the ExpressionClause did build for us 308 expression = clause.getExpressionType(); 309 // set the correlation expression from the expression type, as the model definition 310 // would then be accurate 311 correlationExpression = new ExpressionSubElementDefinition(); 312 correlationExpression.setExpressionType(clause.getExpressionType()); 313 } 314 } 315 } 316 317 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 318 AggregationStrategy strategy = getAggregationStrategy(); 319 if (strategy == null && strategyRef != null) { 320 Object aggStrategy = routeContext.lookup(strategyRef, Object.class); 321 if (aggStrategy instanceof AggregationStrategy) { 322 strategy = (AggregationStrategy) aggStrategy; 323 } else if (aggStrategy != null) { 324 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName()); 325 if (getStrategyMethodAllowNull() != null) { 326 adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); 327 adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); 328 } 329 strategy = adapter; 330 } else { 331 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); 332 } 333 } 334 335 if (groupExchanges != null && groupExchanges) { 336 if (strategy != null || strategyRef != null) { 337 throw new IllegalArgumentException("Options groupExchanges and AggregationStrategy cannot be enabled at the same time"); 338 } 339 if (eagerCheckCompletion != null && !eagerCheckCompletion) { 340 throw new IllegalArgumentException("Option eagerCheckCompletion cannot be false when groupExchanges has been enabled"); 341 } 342 // set eager check to enabled by default when using grouped exchanges 343 setEagerCheckCompletion(true); 344 // if grouped exchange is enabled then use special strategy for that 345 strategy = new GroupedExchangeAggregationStrategy(); 346 } 347 348 if (strategy == null) { 349 throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this); 350 } 351 352 if (strategy instanceof CamelContextAware) { 353 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 354 } 355 356 return strategy; 357 } 358 359 private AggregationRepository createAggregationRepository(RouteContext routeContext) { 360 AggregationRepository repository = getAggregationRepository(); 361 if (repository == null && aggregationRepositoryRef != null) { 362 repository = routeContext.mandatoryLookup(aggregationRepositoryRef, AggregationRepository.class); 363 } 364 return repository; 365 } 366 367 public AggregationStrategy getAggregationStrategy() { 368 return aggregationStrategy; 369 } 370 371 /** 372 * The AggregationStrategy to use. 373 * <p/> 374 * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges. 375 * At first call the oldExchange parameter is null. 376 * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange. 377 */ 378 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 379 this.aggregationStrategy = aggregationStrategy; 380 } 381 382 public String getAggregationStrategyRef() { 383 return strategyRef; 384 } 385 386 /** 387 * A reference to lookup the AggregationStrategy in the Registry. 388 * <p/> 389 * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges. 390 * At first call the oldExchange parameter is null. 391 * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange. 392 */ 393 public void setAggregationStrategyRef(String aggregationStrategyRef) { 394 this.strategyRef = aggregationStrategyRef; 395 } 396 397 public String getStrategyRef() { 398 return strategyRef; 399 } 400 401 /** 402 * A reference to lookup the AggregationStrategy in the Registry. 403 * <p/> 404 * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges. 405 * At first call the oldExchange parameter is null. 406 * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange. 407 */ 408 public void setStrategyRef(String strategyRef) { 409 this.strategyRef = strategyRef; 410 } 411 412 public String getAggregationStrategyMethodName() { 413 return strategyMethodName; 414 } 415 416 /** 417 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 418 */ 419 public void setAggregationStrategyMethodName(String strategyMethodName) { 420 this.strategyMethodName = strategyMethodName; 421 } 422 423 public Boolean getStrategyMethodAllowNull() { 424 return strategyMethodAllowNull; 425 } 426 427 public String getStrategyMethodName() { 428 return strategyMethodName; 429 } 430 431 /** 432 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 433 */ 434 public void setStrategyMethodName(String strategyMethodName) { 435 this.strategyMethodName = strategyMethodName; 436 } 437 438 /** 439 * If this option is false then the aggregate method is not used for the very first aggregation. 440 * If this option is true then null values is used as the oldExchange (at the very first aggregation), 441 * when using POJOs as the AggregationStrategy. 442 */ 443 public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { 444 this.strategyMethodAllowNull = strategyMethodAllowNull; 445 } 446 447 /** 448 * The expression used to calculate the correlation key to use for aggregation. 449 * The Exchange which has the same correlation key is aggregated together. 450 * If the correlation key could not be evaluated an Exception is thrown. 451 * You can disable this by using the ignoreBadCorrelationKeys option. 452 */ 453 public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) { 454 this.correlationExpression = correlationExpression; 455 } 456 457 public ExpressionSubElementDefinition getCorrelationExpression() { 458 return correlationExpression; 459 } 460 461 public Integer getCompletionSize() { 462 return completionSize; 463 } 464 465 public void setCompletionSize(Integer completionSize) { 466 this.completionSize = completionSize; 467 } 468 469 public OptimisticLockRetryPolicyDefinition getOptimisticLockRetryPolicyDefinition() { 470 return optimisticLockRetryPolicyDefinition; 471 } 472 473 public void setOptimisticLockRetryPolicyDefinition(OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition) { 474 this.optimisticLockRetryPolicyDefinition = optimisticLockRetryPolicyDefinition; 475 } 476 477 public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() { 478 return optimisticLockRetryPolicy; 479 } 480 481 public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) { 482 this.optimisticLockRetryPolicy = optimisticLockRetryPolicy; 483 } 484 485 public Long getCompletionInterval() { 486 return completionInterval; 487 } 488 489 public void setCompletionInterval(Long completionInterval) { 490 this.completionInterval = completionInterval; 491 } 492 493 public Long getCompletionTimeout() { 494 return completionTimeout; 495 } 496 497 public void setCompletionTimeout(Long completionTimeout) { 498 this.completionTimeout = completionTimeout; 499 } 500 501 public Long getCompletionTimeoutCheckerInterval() { 502 return completionTimeoutCheckerInterval; 503 } 504 505 public void setCompletionTimeoutCheckerInterval(Long completionTimeoutCheckerInterval) { 506 this.completionTimeoutCheckerInterval = completionTimeoutCheckerInterval; 507 } 508 509 public ExpressionSubElementDefinition getCompletionPredicate() { 510 return completionPredicate; 511 } 512 513 public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) { 514 this.completionPredicate = completionPredicate; 515 } 516 517 public ExpressionSubElementDefinition getCompletionTimeoutExpression() { 518 return completionTimeoutExpression; 519 } 520 521 public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) { 522 this.completionTimeoutExpression = completionTimeoutExpression; 523 } 524 525 public ExpressionSubElementDefinition getCompletionSizeExpression() { 526 return completionSizeExpression; 527 } 528 529 public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) { 530 this.completionSizeExpression = completionSizeExpression; 531 } 532 533 public Boolean getGroupExchanges() { 534 return groupExchanges; 535 } 536 537 public void setGroupExchanges(Boolean groupExchanges) { 538 this.groupExchanges = groupExchanges; 539 } 540 541 public Boolean getCompletionFromBatchConsumer() { 542 return completionFromBatchConsumer; 543 } 544 545 public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) { 546 this.completionFromBatchConsumer = completionFromBatchConsumer; 547 } 548 549 public Boolean getCompletionOnNewCorrelationGroup() { 550 return completionOnNewCorrelationGroup; 551 } 552 553 public void setCompletionOnNewCorrelationGroup(Boolean completionOnNewCorrelationGroup) { 554 this.completionOnNewCorrelationGroup = completionOnNewCorrelationGroup; 555 } 556 557 public ExecutorService getExecutorService() { 558 return executorService; 559 } 560 561 public void setExecutorService(ExecutorService executorService) { 562 this.executorService = executorService; 563 } 564 565 public Boolean getOptimisticLocking() { 566 return optimisticLocking; 567 } 568 569 public void setOptimisticLocking(boolean optimisticLocking) { 570 this.optimisticLocking = optimisticLocking; 571 } 572 573 public Boolean getParallelProcessing() { 574 return parallelProcessing; 575 } 576 577 public void setParallelProcessing(boolean parallelProcessing) { 578 this.parallelProcessing = parallelProcessing; 579 } 580 581 public String getExecutorServiceRef() { 582 return executorServiceRef; 583 } 584 585 public void setExecutorServiceRef(String executorServiceRef) { 586 this.executorServiceRef = executorServiceRef; 587 } 588 589 public Boolean getEagerCheckCompletion() { 590 return eagerCheckCompletion; 591 } 592 593 public void setEagerCheckCompletion(Boolean eagerCheckCompletion) { 594 this.eagerCheckCompletion = eagerCheckCompletion; 595 } 596 597 public Boolean getIgnoreInvalidCorrelationKeys() { 598 return ignoreInvalidCorrelationKeys; 599 } 600 601 public void setIgnoreInvalidCorrelationKeys(Boolean ignoreInvalidCorrelationKeys) { 602 this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys; 603 } 604 605 public Integer getCloseCorrelationKeyOnCompletion() { 606 return closeCorrelationKeyOnCompletion; 607 } 608 609 public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) { 610 this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion; 611 } 612 613 public AggregationRepository getAggregationRepository() { 614 return aggregationRepository; 615 } 616 617 public void setAggregationRepository(AggregationRepository aggregationRepository) { 618 this.aggregationRepository = aggregationRepository; 619 } 620 621 public String getAggregationRepositoryRef() { 622 return aggregationRepositoryRef; 623 } 624 625 public void setAggregationRepositoryRef(String aggregationRepositoryRef) { 626 this.aggregationRepositoryRef = aggregationRepositoryRef; 627 } 628 629 public Boolean getDiscardOnCompletionTimeout() { 630 return discardOnCompletionTimeout; 631 } 632 633 public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) { 634 this.discardOnCompletionTimeout = discardOnCompletionTimeout; 635 } 636 637 public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { 638 this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; 639 } 640 641 public ScheduledExecutorService getTimeoutCheckerExecutorService() { 642 return timeoutCheckerExecutorService; 643 } 644 645 public void setTimeoutCheckerExecutorServiceRef(String timeoutCheckerExecutorServiceRef) { 646 this.timeoutCheckerExecutorServiceRef = timeoutCheckerExecutorServiceRef; 647 } 648 649 public String getTimeoutCheckerExecutorServiceRef() { 650 return timeoutCheckerExecutorServiceRef; 651 } 652 653 public Boolean getForceCompletionOnStop() { 654 return forceCompletionOnStop; 655 } 656 657 public void setForceCompletionOnStop(Boolean forceCompletionOnStop) { 658 this.forceCompletionOnStop = forceCompletionOnStop; 659 } 660 661 public Boolean getCompleteAllOnStop() { 662 return completeAllOnStop; 663 } 664 665 public void setCompleteAllOnStop(Boolean completeAllOnStop) { 666 this.completeAllOnStop = completeAllOnStop; 667 } 668 669 public AggregateController getAggregateController() { 670 return aggregateController; 671 } 672 673 public void setAggregateController(AggregateController aggregateController) { 674 this.aggregateController = aggregateController; 675 } 676 677 public String getAggregateControllerRef() { 678 return aggregateControllerRef; 679 } 680 681 /** 682 * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control 683 * this aggregator. 684 */ 685 public void setAggregateControllerRef(String aggregateControllerRef) { 686 this.aggregateControllerRef = aggregateControllerRef; 687 } 688 689 // Fluent API 690 //------------------------------------------------------------------------- 691 692 /** 693 * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange. 694 * As opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange. 695 * 696 * @return builder 697 */ 698 public AggregateDefinition eagerCheckCompletion() { 699 setEagerCheckCompletion(true); 700 return this; 701 } 702 703 /** 704 * If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just 705 * ignore the incoming Exchange. 706 * 707 * @return builder 708 */ 709 public AggregateDefinition ignoreInvalidCorrelationKeys() { 710 setIgnoreInvalidCorrelationKeys(true); 711 return this; 712 } 713 714 /** 715 * Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key 716 * that has been closed, it will be defined and a {@link ClosedCorrelationKeyException} 717 * is thrown. 718 * 719 * @param capacity the maximum capacity of the closed correlation key cache. 720 * Use <tt>0</tt> or negative value for unbounded capacity. 721 * @return builder 722 */ 723 public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) { 724 setCloseCorrelationKeyOnCompletion(capacity); 725 return this; 726 } 727 728 /** 729 * Discards the aggregated message on completion timeout. 730 * <p/> 731 * This means on timeout the aggregated message is dropped and not sent out of the aggregator. 732 * 733 * @return builder 734 */ 735 public AggregateDefinition discardOnCompletionTimeout() { 736 setDiscardOnCompletionTimeout(true); 737 return this; 738 } 739 740 /** 741 * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer} 742 * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported 743 * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete. 744 * 745 * @return builder 746 */ 747 public AggregateDefinition completionFromBatchConsumer() { 748 setCompletionFromBatchConsumer(true); 749 return this; 750 } 751 752 /** 753 * Enables completion on all previous groups when a new incoming correlation group. This can for example be used 754 * to complete groups with same correlation keys when they are in consecutive order. 755 * Notice when this is enabled then only 1 correlation group can be in progress as when a new correlation group 756 * starts, then the previous groups is forced completed. 757 * 758 * @return builder 759 */ 760 public AggregateDefinition completionOnNewCorrelationGroup() { 761 setCompletionOnNewCorrelationGroup(true); 762 return this; 763 } 764 765 /** 766 * Number of messages aggregated before the aggregation is complete. This option can be set as either 767 * a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. 768 * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. 769 * 770 * @param completionSize the completion size, must be a positive number 771 * @return builder 772 */ 773 public AggregateDefinition completionSize(int completionSize) { 774 setCompletionSize(completionSize); 775 return this; 776 } 777 778 /** 779 * Number of messages aggregated before the aggregation is complete. This option can be set as either 780 * a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result. 781 * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. 782 * 783 * @param completionSize the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type 784 * @return builder 785 */ 786 public AggregateDefinition completionSize(Expression completionSize) { 787 setCompletionSizeExpression(new ExpressionSubElementDefinition(completionSize)); 788 return this; 789 } 790 791 /** 792 * A repeating period in millis by which the aggregator will complete all current aggregated exchanges. 793 * Camel has a background task which is triggered every period. You cannot use this option together 794 * with completionTimeout, only one of them can be used. 795 * 796 * @param completionInterval the interval in millis, must be a positive value 797 * @return the builder 798 */ 799 public AggregateDefinition completionInterval(long completionInterval) { 800 setCompletionInterval(completionInterval); 801 return this; 802 } 803 804 /** 805 * Time in millis that an aggregated exchange should be inactive before its complete (timeout). 806 * This option can be set as either a fixed value or using an Expression which allows you to evaluate 807 * a timeout dynamically - will use Long as result. 808 * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. 809 * You cannot use this option together with completionInterval, only one of the two can be used. 810 * <p/> 811 * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option 812 * to configure how frequently to run the checker. 813 * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. 814 * It is not recommended to use very low timeout values or checker intervals. 815 * 816 * @param completionTimeout the timeout in millis, must be a positive value 817 * @return the builder 818 */ 819 public AggregateDefinition completionTimeout(long completionTimeout) { 820 setCompletionTimeout(completionTimeout); 821 return this; 822 } 823 824 /** 825 * Time in millis that an aggregated exchange should be inactive before its complete (timeout). 826 * This option can be set as either a fixed value or using an Expression which allows you to evaluate 827 * a timeout dynamically - will use Long as result. 828 * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0. 829 * You cannot use this option together with completionInterval, only one of the two can be used. 830 * <p/> 831 * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option 832 * to configure how frequently to run the checker. 833 * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. 834 * It is not recommended to use very low timeout values or checker intervals. 835 * 836 * @param completionTimeout the timeout as an {@link Expression} which is evaluated as a {@link Long} type 837 * @return the builder 838 */ 839 public AggregateDefinition completionTimeout(Expression completionTimeout) { 840 setCompletionTimeoutExpression(new ExpressionSubElementDefinition(completionTimeout)); 841 return this; 842 } 843 844 /** 845 * Interval in millis that is used by the background task that checks for timeouts ({@link org.apache.camel.TimeoutMap}). 846 * <p/> 847 * By default the timeout checker runs every second. 848 * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value. 849 * It is not recommended to use very low timeout values or checker intervals. 850 * 851 * @param completionTimeoutCheckerInterval the interval in millis, must be a positive value 852 * @return the builder 853 */ 854 public AggregateDefinition completionTimeoutCheckerInterval(long completionTimeoutCheckerInterval) { 855 setCompletionTimeoutCheckerInterval(completionTimeoutCheckerInterval); 856 return this; 857 } 858 859 /** 860 * Sets the AggregationStrategy to use with a fluent builder. 861 */ 862 public AggregationStrategyClause<AggregateDefinition> aggregationStrategy() { 863 AggregationStrategyClause<AggregateDefinition> clause = new AggregationStrategyClause<>(this); 864 setAggregationStrategy(clause); 865 return clause; 866 } 867 868 /** 869 * Sets the AggregationStrategy to use with a fluent builder. 870 */ 871 public AggregationStrategyClause<AggregateDefinition> strategy() { 872 return aggregationStrategy(); 873 } 874 875 /** 876 * Sets the aggregate strategy to use 877 * 878 * @param aggregationStrategy the aggregate strategy to use 879 * @return the builder 880 */ 881 public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) { 882 return aggregationStrategy(aggregationStrategy); 883 } 884 885 /** 886 * Sets the aggregate strategy to use 887 * 888 * @param aggregationStrategy the aggregate strategy to use 889 * @return the builder 890 */ 891 public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 892 setAggregationStrategy(aggregationStrategy); 893 return this; 894 } 895 896 /** 897 * Sets the aggregate strategy to use 898 * 899 * @param aggregationStrategyRef reference to the strategy to lookup in the registry 900 * @return the builder 901 */ 902 public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) { 903 setAggregationStrategyRef(aggregationStrategyRef); 904 return this; 905 } 906 907 /** 908 * Sets the method name to use when using a POJO as {@link AggregationStrategy}. 909 * 910 * @param methodName the method name to call 911 * @return the builder 912 */ 913 public AggregateDefinition aggregationStrategyMethodName(String methodName) { 914 setAggregationStrategyMethodName(methodName); 915 return this; 916 } 917 918 /** 919 * Sets allowing null when using a POJO as {@link AggregationStrategy}. 920 * 921 * @return the builder 922 */ 923 public AggregateDefinition aggregationStrategyMethodAllowNull() { 924 setStrategyMethodAllowNull(true); 925 return this; 926 } 927 928 /** 929 * Sets the custom aggregate repository to use. 930 * <p/> 931 * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository} 932 * 933 * @param aggregationRepository the aggregate repository to use 934 * @return the builder 935 */ 936 public AggregateDefinition aggregationRepository(AggregationRepository aggregationRepository) { 937 setAggregationRepository(aggregationRepository); 938 return this; 939 } 940 941 /** 942 * Sets the custom aggregate repository to use 943 * <p/> 944 * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository} 945 * 946 * @param aggregationRepositoryRef reference to the repository to lookup in the registry 947 * @return the builder 948 */ 949 public AggregateDefinition aggregationRepositoryRef(String aggregationRepositoryRef) { 950 setAggregationRepositoryRef(aggregationRepositoryRef); 951 return this; 952 } 953 954 /** 955 * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single 956 * combined Exchange holding all the aggregated exchanges in a {@link java.util.List}. 957 * 958 * @deprecated use {@link GroupedExchangeAggregationStrategy} as aggregation strategy instead. 959 */ 960 @Deprecated 961 public AggregateDefinition groupExchanges() { 962 setGroupExchanges(true); 963 // must use eager check when using grouped exchanges 964 setEagerCheckCompletion(true); 965 return this; 966 } 967 968 /** 969 * A Predicate to indicate when an aggregated exchange is complete. 970 * If this is not specified and the AggregationStrategy object implements Predicate, 971 * the aggregationStrategy object will be used as the completionPredicate. 972 */ 973 public AggregateDefinition completionPredicate(@AsPredicate Predicate predicate) { 974 checkNoCompletedPredicate(); 975 setCompletionPredicate(new ExpressionSubElementDefinition(predicate)); 976 return this; 977 } 978 979 /** 980 * A Predicate to indicate when an aggregated exchange is complete. 981 * If this is not specified and the AggregationStrategy object implements Predicate, 982 * the aggregationStrategy object will be used as the completionPredicate. 983 */ 984 @AsPredicate 985 public PredicateClause<AggregateDefinition> completionPredicate() { 986 PredicateClause<AggregateDefinition> clause = new PredicateClause<>(this); 987 completionPredicate(clause); 988 return clause; 989 } 990 991 /** 992 * A Predicate to indicate when an aggregated exchange is complete. 993 * If this is not specified and the AggregationStrategy object implements Predicate, 994 * the aggregationStrategy object will be used as the completionPredicate. 995 */ 996 @AsPredicate 997 public PredicateClause<AggregateDefinition> completion() { 998 return completionPredicate(); 999 } 1000 1001 /** 1002 * A Predicate to indicate when an aggregated exchange is complete. 1003 * If this is not specified and the AggregationStrategy object implements Predicate, 1004 * the aggregationStrategy object will be used as the completionPredicate. 1005 */ 1006 public AggregateDefinition completion(@AsPredicate Predicate predicate) { 1007 return completionPredicate(predicate); 1008 } 1009 1010 /** 1011 * Indicates to complete all current aggregated exchanges when the context is stopped 1012 */ 1013 public AggregateDefinition forceCompletionOnStop() { 1014 setForceCompletionOnStop(true); 1015 return this; 1016 } 1017 1018 /** 1019 * Indicates to wait to complete all current and partial (pending) aggregated exchanges when the context is stopped. 1020 * <p/> 1021 * This also means that we will wait for all pending exchanges which are stored in the aggregation repository 1022 * to complete so the repository is empty before we can stop. 1023 * <p/> 1024 * You may want to enable this when using the memory based aggregation repository that is memory based only, 1025 * and do not store data on disk. When this option is enabled, then the aggregator is waiting to complete 1026 * all those exchanges before its stopped, when stopping CamelContext or the route using it. 1027 */ 1028 public AggregateDefinition completeAllOnStop() { 1029 setCompleteAllOnStop(true); 1030 return this; 1031 } 1032 1033 /** 1034 * When aggregated are completed they are being send out of the aggregator. 1035 * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. 1036 * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads. 1037 */ 1038 public AggregateDefinition parallelProcessing() { 1039 setParallelProcessing(true); 1040 return this; 1041 } 1042 1043 /** 1044 * When aggregated are completed they are being send out of the aggregator. 1045 * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. 1046 * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads. 1047 */ 1048 public AggregateDefinition parallelProcessing(boolean parallelProcessing) { 1049 setParallelProcessing(parallelProcessing); 1050 return this; 1051 } 1052 1053 /** 1054 * Turns on using optimistic locking, which requires the aggregationRepository being used, 1055 * is supporting this by implementing {@link org.apache.camel.spi.OptimisticLockingAggregationRepository}. 1056 */ 1057 public AggregateDefinition optimisticLocking() { 1058 setOptimisticLocking(true); 1059 return this; 1060 } 1061 1062 /** 1063 * Allows to configure retry settings when using optimistic locking. 1064 */ 1065 public AggregateDefinition optimisticLockRetryPolicy(OptimisticLockRetryPolicy policy) { 1066 setOptimisticLockRetryPolicy(policy); 1067 return this; 1068 } 1069 1070 /** 1071 * If using parallelProcessing you can specify a custom thread pool to be used. 1072 * In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well. 1073 */ 1074 public AggregateDefinition executorService(ExecutorService executorService) { 1075 setExecutorService(executorService); 1076 return this; 1077 } 1078 1079 /** 1080 * If using parallelProcessing you can specify a custom thread pool to be used. 1081 * In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well. 1082 */ 1083 public AggregateDefinition executorServiceRef(String executorServiceRef) { 1084 setExecutorServiceRef(executorServiceRef); 1085 return this; 1086 } 1087 1088 /** 1089 * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a 1090 * background thread is created to check for the completion for every aggregator. 1091 * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator. 1092 */ 1093 public AggregateDefinition timeoutCheckerExecutorService(ScheduledExecutorService executorService) { 1094 setTimeoutCheckerExecutorService(executorService); 1095 return this; 1096 } 1097 1098 /** 1099 * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a 1100 * background thread is created to check for the completion for every aggregator. 1101 * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator. 1102 */ 1103 public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) { 1104 setTimeoutCheckerExecutorServiceRef(executorServiceRef); 1105 return this; 1106 } 1107 1108 /** 1109 * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control 1110 * this aggregator. 1111 */ 1112 public AggregateDefinition aggregateController(AggregateController aggregateController) { 1113 setAggregateController(aggregateController); 1114 return this; 1115 } 1116 1117 // Section - Methods from ExpressionNode 1118 // Needed to copy methods from ExpressionNode here so that I could specify the 1119 // correlation expression as optional in JAXB 1120 1121 public ExpressionDefinition getExpression() { 1122 if (expression == null && correlationExpression != null) { 1123 expression = correlationExpression.getExpressionType(); 1124 } 1125 return expression; 1126 } 1127 1128 public void setExpression(ExpressionDefinition expression) { 1129 this.expression = expression; 1130 } 1131 1132 protected void checkNoCompletedPredicate() { 1133 if (getCompletionPredicate() != null) { 1134 throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this); 1135 } 1136 } 1137 1138 @Override 1139 public List<ProcessorDefinition<?>> getOutputs() { 1140 return outputs; 1141 } 1142 1143 public boolean isOutputSupported() { 1144 return true; 1145 } 1146 1147 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 1148 this.outputs = outputs; 1149 } 1150 1151}