001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.ExecutorService; 022import java.util.concurrent.ScheduledExecutorService; 023import javax.xml.bind.annotation.XmlAccessType; 024import javax.xml.bind.annotation.XmlAccessorType; 025import javax.xml.bind.annotation.XmlAttribute; 026import javax.xml.bind.annotation.XmlElement; 027import javax.xml.bind.annotation.XmlElementRef; 028import javax.xml.bind.annotation.XmlRootElement; 029import javax.xml.bind.annotation.XmlTransient; 030 031import org.apache.camel.CamelContextAware; 032import org.apache.camel.Expression; 033import org.apache.camel.Predicate; 034import org.apache.camel.Processor; 035import org.apache.camel.builder.ExpressionClause; 036import org.apache.camel.model.language.ExpressionDefinition; 037import org.apache.camel.processor.CamelInternalProcessor; 038import org.apache.camel.processor.aggregate.AggregateController; 039import org.apache.camel.processor.aggregate.AggregateProcessor; 040import org.apache.camel.processor.aggregate.AggregationStrategy; 041import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 042import org.apache.camel.processor.aggregate.ClosedCorrelationKeyException; 043import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy; 044import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy; 045import org.apache.camel.spi.AggregationRepository; 046import org.apache.camel.spi.Metadata; 047import org.apache.camel.spi.RouteContext; 048import org.apache.camel.util.concurrent.SynchronousExecutorService; 049 050/** 051 * Aggregates many messages into a single message 052 * 053 * @version 054 */ 055@Metadata(label = "eip,routing") 056@XmlRootElement(name = "aggregate") 057@XmlAccessorType(XmlAccessType.FIELD) 058public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> { 059 @XmlElement(name = "correlationExpression", required = true) 060 private ExpressionSubElementDefinition correlationExpression; 061 @XmlElement(name = "completionPredicate") 062 private ExpressionSubElementDefinition completionPredicate; 063 @XmlElement(name = "completionTimeout") 064 private ExpressionSubElementDefinition completionTimeoutExpression; 065 @XmlElement(name = "completionSize") 066 private ExpressionSubElementDefinition completionSizeExpression; 067 @XmlElement(name = "optimisticLockRetryPolicy") 068 private OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition; 069 @XmlTransient 070 private ExpressionDefinition expression; 071 @XmlTransient 072 private AggregationStrategy aggregationStrategy; 073 @XmlTransient 074 private ExecutorService executorService; 075 @XmlTransient 076 private ScheduledExecutorService timeoutCheckerExecutorService; 077 @XmlTransient 078 private AggregationRepository aggregationRepository; 079 @XmlTransient 080 private OptimisticLockRetryPolicy optimisticLockRetryPolicy; 081 @XmlAttribute 082 private Boolean parallelProcessing; 083 @XmlAttribute 084 private Boolean optimisticLocking; 085 @XmlAttribute 086 private String executorServiceRef; 087 @XmlAttribute 088 private String timeoutCheckerExecutorServiceRef; 089 @XmlAttribute 090 private String aggregationRepositoryRef; 091 @XmlAttribute 092 private String strategyRef; 093 @XmlAttribute 094 private String strategyMethodName; 095 @XmlAttribute 096 private Boolean strategyMethodAllowNull; 097 @XmlAttribute 098 private Integer completionSize; 099 @XmlAttribute 100 private Long completionInterval; 101 @XmlAttribute 102 private Long completionTimeout; 103 @XmlAttribute 104 private Boolean completionFromBatchConsumer; 105 @XmlAttribute 106 @Deprecated 107 private Boolean groupExchanges; 108 @XmlAttribute 109 private Boolean eagerCheckCompletion; 110 @XmlAttribute 111 private Boolean ignoreInvalidCorrelationKeys; 112 @XmlAttribute 113 private Integer closeCorrelationKeyOnCompletion; 114 @XmlAttribute 115 private Boolean discardOnCompletionTimeout; 116 @XmlAttribute 117 private Boolean forceCompletionOnStop; 118 @XmlAttribute 119 private Boolean completeAllOnStop; 120 @XmlTransient 121 private AggregateController aggregateController; 122 @XmlAttribute 123 private String aggregateControllerRef; 124 @XmlElementRef 125 private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); 126 127 public AggregateDefinition() { 128 } 129 130 public AggregateDefinition(Predicate predicate) { 131 this(ExpressionNodeHelper.toExpressionDefinition(predicate)); 132 } 133 134 public AggregateDefinition(Expression expression) { 135 this(ExpressionNodeHelper.toExpressionDefinition(expression)); 136 } 137 138 public AggregateDefinition(ExpressionDefinition correlationExpression) { 139 setExpression(correlationExpression); 140 141 ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition(); 142 cor.setExpressionType(correlationExpression); 143 setCorrelationExpression(cor); 144 } 145 146 public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 147 this(correlationExpression); 148 this.aggregationStrategy = aggregationStrategy; 149 } 150 151 @Override 152 public String toString() { 153 return "Aggregate[" + description() + " -> " + getOutputs() + "]"; 154 } 155 156 protected String description() { 157 return getExpression() != null ? getExpression().getLabel() : ""; 158 } 159 160 @Override 161 public String getLabel() { 162 return "aggregate[" + description() + "]"; 163 } 164 165 @Override 166 public Processor createProcessor(RouteContext routeContext) throws Exception { 167 return createAggregator(routeContext); 168 } 169 170 protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception { 171 Processor childProcessor = this.createChildProcessor(routeContext, true); 172 173 // wrap the aggregate route in a unit of work processor 174 CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor); 175 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); 176 177 Expression correlation = getExpression().createExpression(routeContext); 178 AggregationStrategy strategy = createAggregationStrategy(routeContext); 179 180 boolean parallel = getParallelProcessing() != null && getParallelProcessing(); 181 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, parallel); 182 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, parallel); 183 if (threadPool == null && !parallel) { 184 // executor service is mandatory for the Aggregator 185 // we do not run in parallel mode, but use a synchronous executor, so we run in current thread 186 threadPool = new SynchronousExecutorService(); 187 shutdownThreadPool = true; 188 } 189 190 AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), internal, 191 correlation, strategy, threadPool, shutdownThreadPool); 192 193 AggregationRepository repository = createAggregationRepository(routeContext); 194 if (repository != null) { 195 answer.setAggregationRepository(repository); 196 } 197 198 if (getAggregateController() == null && getAggregateControllerRef() != null) { 199 setAggregateController(routeContext.mandatoryLookup(getAggregateControllerRef(), AggregateController.class)); 200 } 201 202 // this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool 203 boolean shutdownTimeoutThreadPool = false; 204 ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService; 205 if (timeoutThreadPool == null && timeoutCheckerExecutorServiceRef != null) { 206 // lookup existing thread pool 207 timeoutThreadPool = routeContext.getCamelContext().getRegistry().lookupByNameAndType(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class); 208 if (timeoutThreadPool == null) { 209 // then create a thread pool assuming the ref is a thread pool profile id 210 timeoutThreadPool = routeContext.getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, 211 AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER, timeoutCheckerExecutorServiceRef); 212 if (timeoutThreadPool == null) { 213 throw new IllegalArgumentException("ExecutorServiceRef " + timeoutCheckerExecutorServiceRef + " not found in registry or as a thread pool profile."); 214 } 215 shutdownTimeoutThreadPool = true; 216 } 217 } 218 answer.setTimeoutCheckerExecutorService(timeoutThreadPool); 219 answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool); 220 221 // set other options 222 answer.setParallelProcessing(parallel); 223 if (getOptimisticLocking() != null) { 224 answer.setOptimisticLocking(getOptimisticLocking()); 225 } 226 if (getCompletionPredicate() != null) { 227 Predicate predicate = getCompletionPredicate().createPredicate(routeContext); 228 answer.setCompletionPredicate(predicate); 229 } else if (strategy instanceof Predicate) { 230 // if aggregation strategy implements predicate and was not configured then use as fallback 231 log.debug("Using AggregationStrategy as completion predicate: {}", strategy); 232 answer.setCompletionPredicate((Predicate) strategy); 233 } 234 if (getCompletionTimeoutExpression() != null) { 235 Expression expression = getCompletionTimeoutExpression().createExpression(routeContext); 236 answer.setCompletionTimeoutExpression(expression); 237 } 238 if (getCompletionTimeout() != null) { 239 answer.setCompletionTimeout(getCompletionTimeout()); 240 } 241 if (getCompletionInterval() != null) { 242 answer.setCompletionInterval(getCompletionInterval()); 243 } 244 if (getCompletionSizeExpression() != null) { 245 Expression expression = getCompletionSizeExpression().createExpression(routeContext); 246 answer.setCompletionSizeExpression(expression); 247 } 248 if (getCompletionSize() != null) { 249 answer.setCompletionSize(getCompletionSize()); 250 } 251 if (getCompletionFromBatchConsumer() != null) { 252 answer.setCompletionFromBatchConsumer(getCompletionFromBatchConsumer()); 253 } 254 if (getEagerCheckCompletion() != null) { 255 answer.setEagerCheckCompletion(getEagerCheckCompletion()); 256 } 257 if (getIgnoreInvalidCorrelationKeys() != null) { 258 answer.setIgnoreInvalidCorrelationKeys(getIgnoreInvalidCorrelationKeys()); 259 } 260 if (getCloseCorrelationKeyOnCompletion() != null) { 261 answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion()); 262 } 263 if (getDiscardOnCompletionTimeout() != null) { 264 answer.setDiscardOnCompletionTimeout(getDiscardOnCompletionTimeout()); 265 } 266 if (getForceCompletionOnStop() != null) { 267 answer.setForceCompletionOnStop(getForceCompletionOnStop()); 268 } 269 if (getCompleteAllOnStop() != null) { 270 answer.setCompleteAllOnStop(getCompleteAllOnStop()); 271 } 272 if (optimisticLockRetryPolicy == null) { 273 if (getOptimisticLockRetryPolicyDefinition() != null) { 274 answer.setOptimisticLockRetryPolicy(getOptimisticLockRetryPolicyDefinition().createOptimisticLockRetryPolicy()); 275 } 276 } else { 277 answer.setOptimisticLockRetryPolicy(optimisticLockRetryPolicy); 278 } 279 if (getAggregateController() != null) { 280 answer.setAggregateController(getAggregateController()); 281 } 282 return answer; 283 } 284 285 @Override 286 public void configureChild(ProcessorDefinition<?> output) { 287 if (expression != null && expression instanceof ExpressionClause) { 288 ExpressionClause<?> clause = (ExpressionClause<?>) expression; 289 if (clause.getExpressionType() != null) { 290 // if using the Java DSL then the expression may have been set using the 291 // ExpressionClause which is a fancy builder to define expressions and predicates 292 // using fluent builders in the DSL. However we need afterwards a callback to 293 // reset the expression to the expression type the ExpressionClause did build for us 294 expression = clause.getExpressionType(); 295 // set the correlation expression from the expression type, as the model definition 296 // would then be accurate 297 correlationExpression = new ExpressionSubElementDefinition(); 298 correlationExpression.setExpressionType(clause.getExpressionType()); 299 } 300 } 301 } 302 303 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 304 AggregationStrategy strategy = getAggregationStrategy(); 305 if (strategy == null && strategyRef != null) { 306 Object aggStrategy = routeContext.lookup(strategyRef, Object.class); 307 if (aggStrategy instanceof AggregationStrategy) { 308 strategy = (AggregationStrategy) aggStrategy; 309 } else if (aggStrategy != null) { 310 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName()); 311 if (getStrategyMethodAllowNull() != null) { 312 adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); 313 adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); 314 } 315 strategy = adapter; 316 } else { 317 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); 318 } 319 } 320 321 if (groupExchanges != null && groupExchanges) { 322 if (strategy != null || strategyRef != null) { 323 throw new IllegalArgumentException("Options groupExchanges and AggregationStrategy cannot be enabled at the same time"); 324 } 325 if (eagerCheckCompletion != null && !eagerCheckCompletion) { 326 throw new IllegalArgumentException("Option eagerCheckCompletion cannot be false when groupExchanges has been enabled"); 327 } 328 // set eager check to enabled by default when using grouped exchanges 329 setEagerCheckCompletion(true); 330 // if grouped exchange is enabled then use special strategy for that 331 strategy = new GroupedExchangeAggregationStrategy(); 332 } 333 334 if (strategy == null) { 335 throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this); 336 } 337 338 if (strategy instanceof CamelContextAware) { 339 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 340 } 341 342 return strategy; 343 } 344 345 private AggregationRepository createAggregationRepository(RouteContext routeContext) { 346 AggregationRepository repository = getAggregationRepository(); 347 if (repository == null && aggregationRepositoryRef != null) { 348 repository = routeContext.mandatoryLookup(aggregationRepositoryRef, AggregationRepository.class); 349 } 350 return repository; 351 } 352 353 public AggregationStrategy getAggregationStrategy() { 354 return aggregationStrategy; 355 } 356 357 /** 358 * The AggregationStrategy to use. 359 * <p/> 360 * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges. 361 * At first call the oldExchange parameter is null. 362 * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange. 363 */ 364 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 365 this.aggregationStrategy = aggregationStrategy; 366 } 367 368 public String getAggregationStrategyRef() { 369 return strategyRef; 370 } 371 372 /** 373 * A reference to lookup the AggregationStrategy in the Registry. 374 * <p/> 375 * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges. 376 * At first call the oldExchange parameter is null. 377 * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange. 378 */ 379 public void setAggregationStrategyRef(String aggregationStrategyRef) { 380 this.strategyRef = aggregationStrategyRef; 381 } 382 383 public String getStrategyRef() { 384 return strategyRef; 385 } 386 387 /** 388 * A reference to lookup the AggregationStrategy in the Registry. 389 * <p/> 390 * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges. 391 * At first call the oldExchange parameter is null. 392 * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange. 393 */ 394 public void setStrategyRef(String strategyRef) { 395 this.strategyRef = strategyRef; 396 } 397 398 public String getAggregationStrategyMethodName() { 399 return strategyMethodName; 400 } 401 402 /** 403 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 404 */ 405 public void setAggregationStrategyMethodName(String strategyMethodName) { 406 this.strategyMethodName = strategyMethodName; 407 } 408 409 public Boolean getStrategyMethodAllowNull() { 410 return strategyMethodAllowNull; 411 } 412 413 public String getStrategyMethodName() { 414 return strategyMethodName; 415 } 416 417 /** 418 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 419 */ 420 public void setStrategyMethodName(String strategyMethodName) { 421 this.strategyMethodName = strategyMethodName; 422 } 423 424 /** 425 * If this option is false then the aggregate method is not used for the very first aggregation. 426 * If this option is true then null values is used as the oldExchange (at the very first aggregation), 427 * when using POJOs as the AggregationStrategy. 428 */ 429 public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { 430 this.strategyMethodAllowNull = strategyMethodAllowNull; 431 } 432 433 /** 434 * The expression used to calculate the correlation key to use for aggregation. 435 * The Exchange which has the same correlation key is aggregated together. 436 * If the correlation key could not be evaluated an Exception is thrown. 437 * You can disable this by using the ignoreBadCorrelationKeys option. 438 */ 439 public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) { 440 this.correlationExpression = correlationExpression; 441 } 442 443 public ExpressionSubElementDefinition getCorrelationExpression() { 444 return correlationExpression; 445 } 446 447 public Integer getCompletionSize() { 448 return completionSize; 449 } 450 451 public void setCompletionSize(Integer completionSize) { 452 this.completionSize = completionSize; 453 } 454 455 public OptimisticLockRetryPolicyDefinition getOptimisticLockRetryPolicyDefinition() { 456 return optimisticLockRetryPolicyDefinition; 457 } 458 459 public void setOptimisticLockRetryPolicyDefinition(OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition) { 460 this.optimisticLockRetryPolicyDefinition = optimisticLockRetryPolicyDefinition; 461 } 462 463 public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() { 464 return optimisticLockRetryPolicy; 465 } 466 467 public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) { 468 this.optimisticLockRetryPolicy = optimisticLockRetryPolicy; 469 } 470 471 public Long getCompletionInterval() { 472 return completionInterval; 473 } 474 475 public void setCompletionInterval(Long completionInterval) { 476 this.completionInterval = completionInterval; 477 } 478 479 public Long getCompletionTimeout() { 480 return completionTimeout; 481 } 482 483 public void setCompletionTimeout(Long completionTimeout) { 484 this.completionTimeout = completionTimeout; 485 } 486 487 public ExpressionSubElementDefinition getCompletionPredicate() { 488 return completionPredicate; 489 } 490 491 public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) { 492 this.completionPredicate = completionPredicate; 493 } 494 495 public ExpressionSubElementDefinition getCompletionTimeoutExpression() { 496 return completionTimeoutExpression; 497 } 498 499 public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) { 500 this.completionTimeoutExpression = completionTimeoutExpression; 501 } 502 503 public ExpressionSubElementDefinition getCompletionSizeExpression() { 504 return completionSizeExpression; 505 } 506 507 public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) { 508 this.completionSizeExpression = completionSizeExpression; 509 } 510 511 public Boolean getGroupExchanges() { 512 return groupExchanges; 513 } 514 515 public void setGroupExchanges(Boolean groupExchanges) { 516 this.groupExchanges = groupExchanges; 517 } 518 519 public Boolean getCompletionFromBatchConsumer() { 520 return completionFromBatchConsumer; 521 } 522 523 public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) { 524 this.completionFromBatchConsumer = completionFromBatchConsumer; 525 } 526 527 public ExecutorService getExecutorService() { 528 return executorService; 529 } 530 531 public void setExecutorService(ExecutorService executorService) { 532 this.executorService = executorService; 533 } 534 535 public Boolean getOptimisticLocking() { 536 return optimisticLocking; 537 } 538 539 public void setOptimisticLocking(boolean optimisticLocking) { 540 this.optimisticLocking = optimisticLocking; 541 } 542 543 public Boolean getParallelProcessing() { 544 return parallelProcessing; 545 } 546 547 public void setParallelProcessing(boolean parallelProcessing) { 548 this.parallelProcessing = parallelProcessing; 549 } 550 551 public String getExecutorServiceRef() { 552 return executorServiceRef; 553 } 554 555 public void setExecutorServiceRef(String executorServiceRef) { 556 this.executorServiceRef = executorServiceRef; 557 } 558 559 public Boolean getEagerCheckCompletion() { 560 return eagerCheckCompletion; 561 } 562 563 public void setEagerCheckCompletion(Boolean eagerCheckCompletion) { 564 this.eagerCheckCompletion = eagerCheckCompletion; 565 } 566 567 public Boolean getIgnoreInvalidCorrelationKeys() { 568 return ignoreInvalidCorrelationKeys; 569 } 570 571 public void setIgnoreInvalidCorrelationKeys(Boolean ignoreInvalidCorrelationKeys) { 572 this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys; 573 } 574 575 public Integer getCloseCorrelationKeyOnCompletion() { 576 return closeCorrelationKeyOnCompletion; 577 } 578 579 public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) { 580 this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion; 581 } 582 583 public AggregationRepository getAggregationRepository() { 584 return aggregationRepository; 585 } 586 587 public void setAggregationRepository(AggregationRepository aggregationRepository) { 588 this.aggregationRepository = aggregationRepository; 589 } 590 591 public String getAggregationRepositoryRef() { 592 return aggregationRepositoryRef; 593 } 594 595 public void setAggregationRepositoryRef(String aggregationRepositoryRef) { 596 this.aggregationRepositoryRef = aggregationRepositoryRef; 597 } 598 599 public Boolean getDiscardOnCompletionTimeout() { 600 return discardOnCompletionTimeout; 601 } 602 603 public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) { 604 this.discardOnCompletionTimeout = discardOnCompletionTimeout; 605 } 606 607 public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { 608 this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; 609 } 610 611 public ScheduledExecutorService getTimeoutCheckerExecutorService() { 612 return timeoutCheckerExecutorService; 613 } 614 615 public void setTimeoutCheckerExecutorServiceRef(String timeoutCheckerExecutorServiceRef) { 616 this.timeoutCheckerExecutorServiceRef = timeoutCheckerExecutorServiceRef; 617 } 618 619 public String getTimeoutCheckerExecutorServiceRef() { 620 return timeoutCheckerExecutorServiceRef; 621 } 622 623 public Boolean getForceCompletionOnStop() { 624 return forceCompletionOnStop; 625 } 626 627 public void setForceCompletionOnStop(Boolean forceCompletionOnStop) { 628 this.forceCompletionOnStop = forceCompletionOnStop; 629 } 630 631 public Boolean getCompleteAllOnStop() { 632 return completeAllOnStop; 633 } 634 635 public void setCompleteAllOnStop(Boolean completeAllOnStop) { 636 this.completeAllOnStop = completeAllOnStop; 637 } 638 639 public AggregateController getAggregateController() { 640 return aggregateController; 641 } 642 643 public void setAggregateController(AggregateController aggregateController) { 644 this.aggregateController = aggregateController; 645 } 646 647 public String getAggregateControllerRef() { 648 return aggregateControllerRef; 649 } 650 651 /** 652 * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control 653 * this aggregator. 654 */ 655 public void setAggregateControllerRef(String aggregateControllerRef) { 656 this.aggregateControllerRef = aggregateControllerRef; 657 } 658 659 // Fluent API 660 //------------------------------------------------------------------------- 661 662 /** 663 * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange. 664 * At opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange. 665 * 666 * @return builder 667 */ 668 public AggregateDefinition eagerCheckCompletion() { 669 setEagerCheckCompletion(true); 670 return this; 671 } 672 673 /** 674 * If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just 675 * ignore the incoming Exchange. 676 * 677 * @return builder 678 */ 679 public AggregateDefinition ignoreInvalidCorrelationKeys() { 680 setIgnoreInvalidCorrelationKeys(true); 681 return this; 682 } 683 684 /** 685 * Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key 686 * that has been closed, it will be defined and a {@link ClosedCorrelationKeyException} 687 * is thrown. 688 * 689 * @param capacity the maximum capacity of the closed correlation key cache. 690 * Use <tt>0</tt> or negative value for unbounded capacity. 691 * @return builder 692 */ 693 public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) { 694 setCloseCorrelationKeyOnCompletion(capacity); 695 return this; 696 } 697 698 /** 699 * Discards the aggregated message on completion timeout. 700 * <p/> 701 * This means on timeout the aggregated message is dropped and not sent out of the aggregator. 702 * 703 * @return builder 704 */ 705 public AggregateDefinition discardOnCompletionTimeout() { 706 setDiscardOnCompletionTimeout(true); 707 return this; 708 } 709 710 /** 711 * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer} 712 * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported 713 * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete. 714 * 715 * @return builder 716 */ 717 public AggregateDefinition completionFromBatchConsumer() { 718 setCompletionFromBatchConsumer(true); 719 return this; 720 } 721 722 /** 723 * Sets the completion size, which is the number of aggregated exchanges which would 724 * cause the aggregate to consider the group as complete and send out the aggregated exchange. 725 * 726 * @param completionSize the completion size 727 * @return builder 728 */ 729 public AggregateDefinition completionSize(int completionSize) { 730 setCompletionSize(completionSize); 731 return this; 732 } 733 734 /** 735 * Sets the completion size, which is the number of aggregated exchanges which would 736 * cause the aggregate to consider the group as complete and send out the aggregated exchange. 737 * 738 * @param completionSize the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type 739 * @return builder 740 */ 741 public AggregateDefinition completionSize(Expression completionSize) { 742 setCompletionSizeExpression(new ExpressionSubElementDefinition(completionSize)); 743 return this; 744 } 745 746 /** 747 * Sets the completion interval, which would cause the aggregate to consider the group as complete 748 * and send out the aggregated exchange. 749 * 750 * @param completionInterval the interval in millis 751 * @return the builder 752 */ 753 public AggregateDefinition completionInterval(long completionInterval) { 754 setCompletionInterval(completionInterval); 755 return this; 756 } 757 758 /** 759 * Sets the completion timeout, which would cause the aggregate to consider the group as complete 760 * and send out the aggregated exchange. 761 * 762 * @param completionTimeout the timeout in millis 763 * @return the builder 764 */ 765 public AggregateDefinition completionTimeout(long completionTimeout) { 766 setCompletionTimeout(completionTimeout); 767 return this; 768 } 769 770 /** 771 * Sets the completion timeout, which would cause the aggregate to consider the group as complete 772 * and send out the aggregated exchange. 773 * 774 * @param completionTimeout the timeout as an {@link Expression} which is evaluated as a {@link Long} type 775 * @return the builder 776 */ 777 public AggregateDefinition completionTimeout(Expression completionTimeout) { 778 setCompletionTimeoutExpression(new ExpressionSubElementDefinition(completionTimeout)); 779 return this; 780 } 781 782 /** 783 * Sets the aggregate strategy to use 784 * 785 * @param aggregationStrategy the aggregate strategy to use 786 * @return the builder 787 */ 788 public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 789 setAggregationStrategy(aggregationStrategy); 790 return this; 791 } 792 793 /** 794 * Sets the aggregate strategy to use 795 * 796 * @param aggregationStrategyRef reference to the strategy to lookup in the registry 797 * @return the builder 798 */ 799 public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) { 800 setAggregationStrategyRef(aggregationStrategyRef); 801 return this; 802 } 803 804 /** 805 * Sets the method name to use when using a POJO as {@link AggregationStrategy}. 806 * 807 * @param methodName the method name to call 808 * @return the builder 809 */ 810 public AggregateDefinition aggregationStrategyMethodName(String methodName) { 811 setAggregationStrategyMethodName(methodName); 812 return this; 813 } 814 815 /** 816 * Sets allowing null when using a POJO as {@link AggregationStrategy}. 817 * 818 * @return the builder 819 */ 820 public AggregateDefinition aggregationStrategyMethodAllowNull() { 821 setStrategyMethodAllowNull(true); 822 return this; 823 } 824 825 /** 826 * Sets the custom aggregate repository to use. 827 * <p/> 828 * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository} 829 * 830 * @param aggregationRepository the aggregate repository to use 831 * @return the builder 832 */ 833 public AggregateDefinition aggregationRepository(AggregationRepository aggregationRepository) { 834 setAggregationRepository(aggregationRepository); 835 return this; 836 } 837 838 /** 839 * Sets the custom aggregate repository to use 840 * <p/> 841 * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository} 842 * 843 * @param aggregationRepositoryRef reference to the repository to lookup in the registry 844 * @return the builder 845 */ 846 public AggregateDefinition aggregationRepositoryRef(String aggregationRepositoryRef) { 847 setAggregationRepositoryRef(aggregationRepositoryRef); 848 return this; 849 } 850 851 /** 852 * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single 853 * combined Exchange holding all the aggregated exchanges in a {@link java.util.List}. 854 * 855 * @deprecated use {@link GroupedExchangeAggregationStrategy} as aggregation strategy instead. 856 */ 857 @Deprecated 858 public AggregateDefinition groupExchanges() { 859 setGroupExchanges(true); 860 // must use eager check when using grouped exchanges 861 setEagerCheckCompletion(true); 862 return this; 863 } 864 865 /** 866 * Sets the predicate used to determine if the aggregation is completed 867 */ 868 public AggregateDefinition completionPredicate(Predicate predicate) { 869 checkNoCompletedPredicate(); 870 setCompletionPredicate(new ExpressionSubElementDefinition(predicate)); 871 return this; 872 } 873 874 /** 875 * Indicates to complete all current aggregated exchanges when the context is stopped 876 */ 877 public AggregateDefinition forceCompletionOnStop() { 878 setForceCompletionOnStop(true); 879 return this; 880 } 881 882 /** 883 * Indicates to wait to complete all current and partial (pending) aggregated exchanges when the context is stopped. 884 * <p/> 885 * This also means that we will wait for all pending exchanges which are stored in the aggregation repository 886 * to complete so the repository is empty before we can stop. 887 * <p/> 888 * You may want to enable this when using the memory based aggregation repository that is memory based only, 889 * and do not store data on disk. When this option is enabled, then the aggregator is waiting to complete 890 * all those exchanges before its stopped, when stopping CamelContext or the route using it. 891 */ 892 public AggregateDefinition completeAllOnStop() { 893 setCompleteAllOnStop(true); 894 return this; 895 } 896 897 /** 898 * When aggregated are completed they are being send out of the aggregator. 899 * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. 900 * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads. 901 */ 902 public AggregateDefinition parallelProcessing() { 903 setParallelProcessing(true); 904 return this; 905 } 906 907 /** 908 * When aggregated are completed they are being send out of the aggregator. 909 * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency. 910 * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads. 911 */ 912 public AggregateDefinition parallelProcessing(boolean parallelProcessing) { 913 setParallelProcessing(parallelProcessing); 914 return this; 915 } 916 917 /** 918 * Turns on using optimistic locking, which requires the aggregationRepository being used, 919 * is supporting this by implementing {@link org.apache.camel.spi.OptimisticLockingAggregationRepository}. 920 */ 921 public AggregateDefinition optimisticLocking() { 922 setOptimisticLocking(true); 923 return this; 924 } 925 926 /** 927 * Allows to configure retry settings when using optimistic locking. 928 */ 929 public AggregateDefinition optimisticLockRetryPolicy(OptimisticLockRetryPolicy policy) { 930 setOptimisticLockRetryPolicy(policy); 931 return this; 932 } 933 934 /** 935 * If using parallelProcessing you can specify a custom thread pool to be used. 936 * In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well. 937 */ 938 public AggregateDefinition executorService(ExecutorService executorService) { 939 setExecutorService(executorService); 940 return this; 941 } 942 943 /** 944 * If using parallelProcessing you can specify a custom thread pool to be used. 945 * In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well. 946 */ 947 public AggregateDefinition executorServiceRef(String executorServiceRef) { 948 setExecutorServiceRef(executorServiceRef); 949 return this; 950 } 951 952 /** 953 * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a 954 * background thread is created to check for the completion for every aggregator. 955 * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator. 956 */ 957 public AggregateDefinition timeoutCheckerExecutorService(ScheduledExecutorService executorService) { 958 setTimeoutCheckerExecutorService(executorService); 959 return this; 960 } 961 962 /** 963 * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a 964 * background thread is created to check for the completion for every aggregator. 965 * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator. 966 */ 967 public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) { 968 setTimeoutCheckerExecutorServiceRef(executorServiceRef); 969 return this; 970 } 971 972 /** 973 * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control 974 * this aggregator. 975 */ 976 public AggregateDefinition aggregateController(AggregateController aggregateController) { 977 setAggregateController(aggregateController); 978 return this; 979 } 980 981 // Section - Methods from ExpressionNode 982 // Needed to copy methods from ExpressionNode here so that I could specify the 983 // correlation expression as optional in JAXB 984 985 public ExpressionDefinition getExpression() { 986 if (expression == null && correlationExpression != null) { 987 expression = correlationExpression.getExpressionType(); 988 } 989 return expression; 990 } 991 992 public void setExpression(ExpressionDefinition expression) { 993 this.expression = expression; 994 } 995 996 protected void checkNoCompletedPredicate() { 997 if (getCompletionPredicate() != null) { 998 throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this); 999 } 1000 } 1001 1002 @Override 1003 public List<ProcessorDefinition<?>> getOutputs() { 1004 return outputs; 1005 } 1006 1007 public boolean isOutputSupported() { 1008 return true; 1009 } 1010 1011 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 1012 this.outputs = outputs; 1013 } 1014 1015}