001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import java.util.concurrent.ExecutorService; 022 import java.util.concurrent.ScheduledExecutorService; 023 024 import javax.xml.bind.annotation.XmlAccessType; 025 import javax.xml.bind.annotation.XmlAccessorType; 026 import javax.xml.bind.annotation.XmlAttribute; 027 import javax.xml.bind.annotation.XmlElement; 028 import javax.xml.bind.annotation.XmlElementRef; 029 import javax.xml.bind.annotation.XmlRootElement; 030 import javax.xml.bind.annotation.XmlTransient; 031 032 import org.apache.camel.Expression; 033 import org.apache.camel.Predicate; 034 import org.apache.camel.Processor; 035 import org.apache.camel.builder.ExpressionClause; 036 import org.apache.camel.model.language.ExpressionDefinition; 037 import org.apache.camel.processor.UnitOfWorkProcessor; 038 import org.apache.camel.processor.aggregate.AggregateProcessor; 039 import org.apache.camel.processor.aggregate.AggregationStrategy; 040 import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy; 041 import org.apache.camel.spi.AggregationRepository; 042 import org.apache.camel.spi.RouteContext; 043 import org.apache.camel.util.concurrent.SynchronousExecutorService; 044 045 /** 046 * Represents an XML <aggregate/> element 047 * 048 * @version 049 */ 050 @XmlRootElement(name = "aggregate") 051 @XmlAccessorType(XmlAccessType.FIELD) 052 public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> { 053 @XmlElement(name = "correlationExpression", required = true) 054 private ExpressionSubElementDefinition correlationExpression; 055 @XmlElement(name = "completionPredicate") 056 private ExpressionSubElementDefinition completionPredicate; 057 @XmlElement(name = "completionTimeout") 058 private ExpressionSubElementDefinition completionTimeoutExpression; 059 @XmlElement(name = "completionSize") 060 private ExpressionSubElementDefinition completionSizeExpression; 061 @XmlTransient 062 private ExpressionDefinition expression; 063 @XmlElementRef 064 private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); 065 @XmlTransient 066 private AggregationStrategy aggregationStrategy; 067 @XmlTransient 068 private ExecutorService executorService; 069 @XmlTransient 070 private ScheduledExecutorService timeoutCheckerExecutorService; 071 @XmlTransient 072 private AggregationRepository aggregationRepository; 073 @XmlAttribute 074 private Boolean parallelProcessing; 075 @XmlAttribute 076 private String executorServiceRef; 077 @XmlAttribute 078 private String timeoutCheckerExecutorServiceRef; 079 @XmlAttribute 080 private String aggregationRepositoryRef; 081 @XmlAttribute 082 private String strategyRef; 083 @XmlAttribute 084 private Integer completionSize; 085 @XmlAttribute 086 private Long completionInterval; 087 @XmlAttribute 088 private Long completionTimeout; 089 @XmlAttribute 090 private Boolean completionFromBatchConsumer; 091 @XmlAttribute 092 private Boolean groupExchanges; 093 @XmlAttribute 094 private Boolean eagerCheckCompletion; 095 @XmlAttribute 096 private Boolean ignoreInvalidCorrelationKeys; 097 @XmlAttribute 098 private Integer closeCorrelationKeyOnCompletion; 099 @XmlAttribute 100 private Boolean discardOnCompletionTimeout; 101 @XmlAttribute 102 private Boolean forceCompletionOnStop; 103 104 public AggregateDefinition() { 105 } 106 107 public AggregateDefinition(Predicate predicate) { 108 if (predicate != null) { 109 setExpression(new ExpressionDefinition(predicate)); 110 } 111 } 112 113 public AggregateDefinition(Expression correlationExpression) { 114 if (correlationExpression != null) { 115 setExpression(new ExpressionDefinition(correlationExpression)); 116 } 117 } 118 119 public AggregateDefinition(ExpressionDefinition correlationExpression) { 120 this.expression = correlationExpression; 121 } 122 123 public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 124 this(correlationExpression); 125 this.aggregationStrategy = aggregationStrategy; 126 } 127 128 @Override 129 public String toString() { 130 return "Aggregate[" + description() + " -> " + getOutputs() + "]"; 131 } 132 133 protected String description() { 134 return getExpression() != null ? getExpression().getLabel() : ""; 135 } 136 137 @Override 138 public String getShortName() { 139 return "aggregate"; 140 } 141 142 @Override 143 public String getLabel() { 144 return "aggregate[" + description() + "]"; 145 } 146 147 @Override 148 public Processor createProcessor(RouteContext routeContext) throws Exception { 149 return createAggregator(routeContext); 150 } 151 152 protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception { 153 Processor processor = this.createChildProcessor(routeContext, true); 154 // wrap the aggregated route in a unit of work processor 155 processor = new UnitOfWorkProcessor(routeContext, processor); 156 157 Expression correlation = getExpression().createExpression(routeContext); 158 AggregationStrategy strategy = createAggregationStrategy(routeContext); 159 160 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing()); 161 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, isParallelProcessing()); 162 if (threadPool == null && !isParallelProcessing()) { 163 // executor service is mandatory for the Aggregator 164 // we do not run in parallel mode, but use a synchronous executor, so we run in current thread 165 threadPool = new SynchronousExecutorService(); 166 shutdownThreadPool = true; 167 } 168 169 AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor, 170 correlation, strategy, threadPool, shutdownThreadPool); 171 172 AggregationRepository repository = createAggregationRepository(routeContext); 173 if (repository != null) { 174 answer.setAggregationRepository(repository); 175 } 176 177 // this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool 178 boolean shutdownTimeoutThreadPool = false; 179 ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService; 180 if (timeoutThreadPool == null && timeoutCheckerExecutorServiceRef != null) { 181 // lookup existing thread pool 182 timeoutThreadPool = routeContext.getCamelContext().getRegistry().lookup(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class); 183 if (timeoutThreadPool == null) { 184 // then create a thread pool assuming the ref is a thread pool profile id 185 timeoutThreadPool = routeContext.getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, 186 AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER, timeoutCheckerExecutorServiceRef); 187 if (timeoutThreadPool == null) { 188 throw new IllegalArgumentException("ExecutorServiceRef " + timeoutCheckerExecutorServiceRef + " not found in registry or as a thread pool profile."); 189 } 190 shutdownTimeoutThreadPool = true; 191 } 192 } 193 answer.setTimeoutCheckerExecutorService(timeoutThreadPool); 194 answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool); 195 196 // set other options 197 answer.setParallelProcessing(isParallelProcessing()); 198 if (getCompletionPredicate() != null) { 199 Predicate predicate = getCompletionPredicate().createPredicate(routeContext); 200 answer.setCompletionPredicate(predicate); 201 } 202 if (getCompletionTimeoutExpression() != null) { 203 Expression expression = getCompletionTimeoutExpression().createExpression(routeContext); 204 answer.setCompletionTimeoutExpression(expression); 205 } 206 if (getCompletionTimeout() != null) { 207 answer.setCompletionTimeout(getCompletionTimeout()); 208 } 209 if (getCompletionInterval() != null) { 210 answer.setCompletionInterval(getCompletionInterval()); 211 } 212 if (getCompletionSizeExpression() != null) { 213 Expression expression = getCompletionSizeExpression().createExpression(routeContext); 214 answer.setCompletionSizeExpression(expression); 215 } 216 if (getCompletionSize() != null) { 217 answer.setCompletionSize(getCompletionSize()); 218 } 219 if (getCompletionFromBatchConsumer() != null) { 220 answer.setCompletionFromBatchConsumer(isCompletionFromBatchConsumer()); 221 } 222 if (getEagerCheckCompletion() != null) { 223 answer.setEagerCheckCompletion(isEagerCheckCompletion()); 224 } 225 if (getIgnoreInvalidCorrelationKeys() != null) { 226 answer.setIgnoreInvalidCorrelationKeys(isIgnoreInvalidCorrelationKeys()); 227 } 228 if (getCloseCorrelationKeyOnCompletion() != null) { 229 answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion()); 230 } 231 if (getDiscardOnCompletionTimeout() != null) { 232 answer.setDiscardOnCompletionTimeout(isDiscardOnCompletionTimeout()); 233 } 234 if (getForceCompletionOnStop() != null) { 235 answer.setForceCompletionOnStop(getForceCompletionOnStop()); 236 } 237 238 return answer; 239 } 240 241 @Override 242 protected void configureChild(ProcessorDefinition<?> output) { 243 if (expression != null && expression instanceof ExpressionClause) { 244 ExpressionClause<?> clause = (ExpressionClause<?>) expression; 245 if (clause.getExpressionType() != null) { 246 // if using the Java DSL then the expression may have been set using the 247 // ExpressionClause which is a fancy builder to define expressions and predicates 248 // using fluent builders in the DSL. However we need afterwards a callback to 249 // reset the expression to the expression type the ExpressionClause did build for us 250 expression = clause.getExpressionType(); 251 // set the correlation expression from the expression type, as the model definition 252 // would then be accurate 253 correlationExpression = new ExpressionSubElementDefinition(); 254 correlationExpression.setExpressionType(clause.getExpressionType()); 255 } 256 } 257 } 258 259 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 260 AggregationStrategy strategy = getAggregationStrategy(); 261 if (strategy == null && strategyRef != null) { 262 strategy = routeContext.mandatoryLookup(strategyRef, AggregationStrategy.class); 263 } 264 265 if (groupExchanges != null && groupExchanges) { 266 if (strategy != null || strategyRef != null) { 267 throw new IllegalArgumentException("Options groupExchanges and AggregationStrategy cannot be enabled at the same time"); 268 } 269 if (eagerCheckCompletion != null && !eagerCheckCompletion) { 270 throw new IllegalArgumentException("Option eagerCheckCompletion cannot be false when groupExchanges has been enabled"); 271 } 272 // set eager check to enabled by default when using grouped exchanges 273 setEagerCheckCompletion(true); 274 // if grouped exchange is enabled then use special strategy for that 275 strategy = new GroupedExchangeAggregationStrategy(); 276 } 277 278 if (strategy == null) { 279 throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this); 280 } 281 return strategy; 282 } 283 284 private AggregationRepository createAggregationRepository(RouteContext routeContext) { 285 AggregationRepository repository = getAggregationRepository(); 286 if (repository == null && aggregationRepositoryRef != null) { 287 repository = routeContext.mandatoryLookup(aggregationRepositoryRef, AggregationRepository.class); 288 } 289 return repository; 290 } 291 292 public AggregationStrategy getAggregationStrategy() { 293 return aggregationStrategy; 294 } 295 296 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 297 this.aggregationStrategy = aggregationStrategy; 298 } 299 300 public String getAggregationStrategyRef() { 301 return strategyRef; 302 } 303 304 public void setAggregationStrategyRef(String aggregationStrategyRef) { 305 this.strategyRef = aggregationStrategyRef; 306 } 307 308 public Integer getCompletionSize() { 309 return completionSize; 310 } 311 312 public void setCompletionSize(Integer completionSize) { 313 this.completionSize = completionSize; 314 } 315 316 public Long getCompletionInterval() { 317 return completionInterval; 318 } 319 320 public void setCompletionInterval(Long completionInterval) { 321 this.completionInterval = completionInterval; 322 } 323 324 public Long getCompletionTimeout() { 325 return completionTimeout; 326 } 327 328 public void setCompletionTimeout(Long completionTimeout) { 329 this.completionTimeout = completionTimeout; 330 } 331 332 public ExpressionSubElementDefinition getCompletionPredicate() { 333 return completionPredicate; 334 } 335 336 public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) { 337 this.completionPredicate = completionPredicate; 338 } 339 340 public ExpressionSubElementDefinition getCompletionTimeoutExpression() { 341 return completionTimeoutExpression; 342 } 343 344 public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) { 345 this.completionTimeoutExpression = completionTimeoutExpression; 346 } 347 348 public ExpressionSubElementDefinition getCompletionSizeExpression() { 349 return completionSizeExpression; 350 } 351 352 public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) { 353 this.completionSizeExpression = completionSizeExpression; 354 } 355 356 public Boolean getGroupExchanges() { 357 return groupExchanges; 358 } 359 360 public boolean isGroupExchanges() { 361 return groupExchanges != null && groupExchanges; 362 } 363 364 public void setGroupExchanges(Boolean groupExchanges) { 365 this.groupExchanges = groupExchanges; 366 } 367 368 public Boolean getCompletionFromBatchConsumer() { 369 return completionFromBatchConsumer; 370 } 371 372 public boolean isCompletionFromBatchConsumer() { 373 return completionFromBatchConsumer != null && completionFromBatchConsumer; 374 } 375 376 public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) { 377 this.completionFromBatchConsumer = completionFromBatchConsumer; 378 } 379 380 public ExecutorService getExecutorService() { 381 return executorService; 382 } 383 384 public void setExecutorService(ExecutorService executorService) { 385 this.executorService = executorService; 386 } 387 388 public Boolean getParallelProcessing() { 389 return parallelProcessing; 390 } 391 392 public boolean isParallelProcessing() { 393 return parallelProcessing != null && parallelProcessing; 394 } 395 396 public void setParallelProcessing(boolean parallelProcessing) { 397 this.parallelProcessing = parallelProcessing; 398 } 399 400 public String getExecutorServiceRef() { 401 return executorServiceRef; 402 } 403 404 public void setExecutorServiceRef(String executorServiceRef) { 405 this.executorServiceRef = executorServiceRef; 406 } 407 408 public String getStrategyRef() { 409 return strategyRef; 410 } 411 412 public void setStrategyRef(String strategyRef) { 413 this.strategyRef = strategyRef; 414 } 415 416 public Boolean getEagerCheckCompletion() { 417 return eagerCheckCompletion; 418 } 419 420 public boolean isEagerCheckCompletion() { 421 return eagerCheckCompletion != null && eagerCheckCompletion; 422 } 423 424 public void setEagerCheckCompletion(Boolean eagerCheckCompletion) { 425 this.eagerCheckCompletion = eagerCheckCompletion; 426 } 427 428 public Boolean getIgnoreInvalidCorrelationKeys() { 429 return ignoreInvalidCorrelationKeys; 430 } 431 432 public boolean isIgnoreInvalidCorrelationKeys() { 433 return ignoreInvalidCorrelationKeys != null && ignoreInvalidCorrelationKeys; 434 } 435 436 public void setIgnoreInvalidCorrelationKeys(Boolean ignoreInvalidCorrelationKeys) { 437 this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys; 438 } 439 440 public Integer getCloseCorrelationKeyOnCompletion() { 441 return closeCorrelationKeyOnCompletion; 442 } 443 444 public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) { 445 this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion; 446 } 447 448 public AggregationRepository getAggregationRepository() { 449 return aggregationRepository; 450 } 451 452 public void setAggregationRepository(AggregationRepository aggregationRepository) { 453 this.aggregationRepository = aggregationRepository; 454 } 455 456 public String getAggregationRepositoryRef() { 457 return aggregationRepositoryRef; 458 } 459 460 public void setAggregationRepositoryRef(String aggregationRepositoryRef) { 461 this.aggregationRepositoryRef = aggregationRepositoryRef; 462 } 463 464 public Boolean getDiscardOnCompletionTimeout() { 465 return discardOnCompletionTimeout; 466 } 467 468 public boolean isDiscardOnCompletionTimeout() { 469 return discardOnCompletionTimeout != null && discardOnCompletionTimeout; 470 } 471 472 public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) { 473 this.discardOnCompletionTimeout = discardOnCompletionTimeout; 474 } 475 476 public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) { 477 this.timeoutCheckerExecutorService = timeoutCheckerExecutorService; 478 } 479 480 public ScheduledExecutorService getTimeoutCheckerExecutorService() { 481 return timeoutCheckerExecutorService; 482 } 483 484 public void setTimeoutCheckerExecutorServiceRef(String timeoutCheckerExecutorServiceRef) { 485 this.timeoutCheckerExecutorServiceRef = timeoutCheckerExecutorServiceRef; 486 } 487 488 public String getTimeoutCheckerExecutorServiceRef() { 489 return timeoutCheckerExecutorServiceRef; 490 } 491 492 // Fluent API 493 //------------------------------------------------------------------------- 494 495 /** 496 * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange. 497 * At opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange. 498 * 499 * @return builder 500 */ 501 public AggregateDefinition eagerCheckCompletion() { 502 setEagerCheckCompletion(true); 503 return this; 504 } 505 506 /** 507 * If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just 508 * ignore the incoming Exchange. 509 * 510 * @return builder 511 */ 512 public AggregateDefinition ignoreInvalidCorrelationKeys() { 513 setIgnoreInvalidCorrelationKeys(true); 514 return this; 515 } 516 517 /** 518 * Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key 519 * that has been closed, it will be defined and a {@link org.apache.camel.processor.aggregate.ClosedCorrelationKeyException} 520 * is thrown. 521 * 522 * @param capacity the maximum capacity of the closed correlation key cache. 523 * Use <tt>0</tt> or negative value for unbounded capacity. 524 * @return builder 525 */ 526 public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) { 527 setCloseCorrelationKeyOnCompletion(capacity); 528 return this; 529 } 530 531 /** 532 * Discards the aggregated message on completion timeout. 533 * <p/> 534 * This means on timeout the aggregated message is dropped and not sent out of the aggregator. 535 * 536 * @return builder 537 */ 538 public AggregateDefinition discardOnCompletionTimeout() { 539 setDiscardOnCompletionTimeout(true); 540 return this; 541 } 542 543 /** 544 * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer} 545 * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported 546 * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete. 547 * 548 * @return builder 549 */ 550 public AggregateDefinition completionFromBatchConsumer() { 551 setCompletionFromBatchConsumer(true); 552 return this; 553 } 554 555 /** 556 * Sets the completion size, which is the number of aggregated exchanges which would 557 * cause the aggregate to consider the group as complete and send out the aggregated exchange. 558 * 559 * @param completionSize the completion size 560 * @return builder 561 */ 562 public AggregateDefinition completionSize(int completionSize) { 563 setCompletionSize(completionSize); 564 return this; 565 } 566 567 /** 568 * Sets the completion size, which is the number of aggregated exchanges which would 569 * cause the aggregate to consider the group as complete and send out the aggregated exchange. 570 * 571 * @param completionSize the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type 572 * @return builder 573 */ 574 public AggregateDefinition completionSize(Expression completionSize) { 575 setCompletionSizeExpression(new ExpressionSubElementDefinition(completionSize)); 576 return this; 577 } 578 579 /** 580 * Sets the completion interval, which would cause the aggregate to consider the group as complete 581 * and send out the aggregated exchange. 582 * 583 * @param completionInterval the interval in millis 584 * @return the builder 585 */ 586 public AggregateDefinition completionInterval(long completionInterval) { 587 setCompletionInterval(completionInterval); 588 return this; 589 } 590 591 /** 592 * Sets the completion timeout, which would cause the aggregate to consider the group as complete 593 * and send out the aggregated exchange. 594 * 595 * @param completionTimeout the timeout in millis 596 * @return the builder 597 */ 598 public AggregateDefinition completionTimeout(long completionTimeout) { 599 setCompletionTimeout(completionTimeout); 600 return this; 601 } 602 603 /** 604 * Sets the completion timeout, which would cause the aggregate to consider the group as complete 605 * and send out the aggregated exchange. 606 * 607 * @param completionTimeout the timeout as an {@link Expression} which is evaluated as a {@link Long} type 608 * @return the builder 609 */ 610 public AggregateDefinition completionTimeout(Expression completionTimeout) { 611 setCompletionTimeoutExpression(new ExpressionSubElementDefinition(completionTimeout)); 612 return this; 613 } 614 615 /** 616 * Sets the aggregate strategy to use 617 * 618 * @param aggregationStrategy the aggregate strategy to use 619 * @return the builder 620 */ 621 public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 622 setAggregationStrategy(aggregationStrategy); 623 return this; 624 } 625 626 /** 627 * Sets the aggregate strategy to use 628 * 629 * @param aggregationStrategyRef reference to the strategy to lookup in the registry 630 * @return the builder 631 */ 632 public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) { 633 setAggregationStrategyRef(aggregationStrategyRef); 634 return this; 635 } 636 637 /** 638 * Sets the custom aggregate repository to use. 639 * <p/> 640 * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository} 641 * 642 * @param aggregationRepository the aggregate repository to use 643 * @return the builder 644 */ 645 public AggregateDefinition aggregationRepository(AggregationRepository aggregationRepository) { 646 setAggregationRepository(aggregationRepository); 647 return this; 648 } 649 650 /** 651 * Sets the custom aggregate repository to use 652 * <p/> 653 * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository} 654 * 655 * @param aggregationRepositoryRef reference to the repository to lookup in the registry 656 * @return the builder 657 */ 658 public AggregateDefinition aggregationRepositoryRef(String aggregationRepositoryRef) { 659 setAggregationRepositoryRef(aggregationRepositoryRef); 660 return this; 661 } 662 663 /** 664 * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single 665 * combined Exchange holding all the aggregated exchanges in a {@link java.util.List} as a exchange 666 * property with the key {@link org.apache.camel.Exchange#GROUPED_EXCHANGE}. 667 * 668 * @return the builder 669 */ 670 public AggregateDefinition groupExchanges() { 671 setGroupExchanges(true); 672 // must use eager check when using grouped exchanges 673 setEagerCheckCompletion(true); 674 return this; 675 } 676 677 /** 678 * Sets the predicate used to determine if the aggregation is completed 679 * 680 * @param predicate the predicate 681 * @return the builder 682 */ 683 public AggregateDefinition completionPredicate(Predicate predicate) { 684 checkNoCompletedPredicate(); 685 setCompletionPredicate(new ExpressionSubElementDefinition(predicate)); 686 return this; 687 } 688 689 /** 690 * Sets the force completion on stop flag, which considers the current group as complete 691 * and sends out the aggregated exchange when the stop event is executed 692 * 693 * @return builder 694 */ 695 public AggregateDefinition forceCompletionOnStop() { 696 setForceCompletionOnStop(true); 697 return this; 698 } 699 700 public Boolean getForceCompletionOnStop() { 701 return forceCompletionOnStop; 702 } 703 704 public boolean isForceCompletionOnStop() { 705 return forceCompletionOnStop != null && forceCompletionOnStop; 706 } 707 708 public void setForceCompletionOnStop(Boolean forceCompletionOnStop) { 709 this.forceCompletionOnStop = forceCompletionOnStop; 710 } 711 712 /** 713 * Sending the aggregated output in parallel 714 * 715 * @return the builder 716 */ 717 public AggregateDefinition parallelProcessing() { 718 setParallelProcessing(true); 719 return this; 720 } 721 722 public AggregateDefinition executorService(ExecutorService executorService) { 723 setExecutorService(executorService); 724 return this; 725 } 726 727 public AggregateDefinition executorServiceRef(String executorServiceRef) { 728 setExecutorServiceRef(executorServiceRef); 729 return this; 730 } 731 732 public AggregateDefinition timeoutCheckerExecutorService(ScheduledExecutorService executorService) { 733 setTimeoutCheckerExecutorService(executorService); 734 return this; 735 } 736 737 public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) { 738 setTimeoutCheckerExecutorServiceRef(executorServiceRef); 739 return this; 740 } 741 742 protected void checkNoCompletedPredicate() { 743 if (getCompletionPredicate() != null) { 744 throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this); 745 } 746 } 747 748 public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) { 749 this.correlationExpression = correlationExpression; 750 } 751 752 public ExpressionSubElementDefinition getCorrelationExpression() { 753 return correlationExpression; 754 } 755 756 // Section - Methods from ExpressionNode 757 // Needed to copy methods from ExpressionNode here so that I could specify the 758 // correlation expression as optional in JAXB 759 760 public ExpressionDefinition getExpression() { 761 if (expression == null && correlationExpression != null) { 762 expression = correlationExpression.getExpressionType(); 763 } 764 return expression; 765 } 766 767 public void setExpression(ExpressionDefinition expression) { 768 this.expression = expression; 769 } 770 771 @Override 772 public List<ProcessorDefinition<?>> getOutputs() { 773 return outputs; 774 } 775 776 public boolean isOutputSupported() { 777 return true; 778 } 779 780 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 781 this.outputs = outputs; 782 } 783 784 }