001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.ScheduledExecutorService;
023import javax.xml.bind.annotation.XmlAccessType;
024import javax.xml.bind.annotation.XmlAccessorType;
025import javax.xml.bind.annotation.XmlAttribute;
026import javax.xml.bind.annotation.XmlElement;
027import javax.xml.bind.annotation.XmlElementRef;
028import javax.xml.bind.annotation.XmlRootElement;
029import javax.xml.bind.annotation.XmlTransient;
030
031import org.apache.camel.CamelContextAware;
032import org.apache.camel.Expression;
033import org.apache.camel.Predicate;
034import org.apache.camel.Processor;
035import org.apache.camel.builder.ExpressionClause;
036import org.apache.camel.model.language.ExpressionDefinition;
037import org.apache.camel.processor.CamelInternalProcessor;
038import org.apache.camel.processor.aggregate.AggregateController;
039import org.apache.camel.processor.aggregate.AggregateProcessor;
040import org.apache.camel.processor.aggregate.AggregationStrategy;
041import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
042import org.apache.camel.processor.aggregate.ClosedCorrelationKeyException;
043import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
044import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
045import org.apache.camel.spi.AggregationRepository;
046import org.apache.camel.spi.Metadata;
047import org.apache.camel.spi.RouteContext;
048import org.apache.camel.util.concurrent.SynchronousExecutorService;
049
050/**
051 * Aggregates many messages into a single message
052 *
053 * @version 
054 */
055@Metadata(label = "eip,routing")
056@XmlRootElement(name = "aggregate")
057@XmlAccessorType(XmlAccessType.FIELD)
058public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> {
059    @XmlElement(name = "correlationExpression", required = true)
060    private ExpressionSubElementDefinition correlationExpression;
061    @XmlElement(name = "completionPredicate")
062    private ExpressionSubElementDefinition completionPredicate;
063    @XmlElement(name = "completionTimeout")
064    private ExpressionSubElementDefinition completionTimeoutExpression;
065    @XmlElement(name = "completionSize")
066    private ExpressionSubElementDefinition completionSizeExpression;
067    @XmlElement(name = "optimisticLockRetryPolicy")
068    private OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition;
069    @XmlTransient
070    private ExpressionDefinition expression;
071    @XmlTransient
072    private AggregationStrategy aggregationStrategy;
073    @XmlTransient
074    private ExecutorService executorService;
075    @XmlTransient
076    private ScheduledExecutorService timeoutCheckerExecutorService;
077    @XmlTransient
078    private AggregationRepository aggregationRepository;
079    @XmlTransient
080    private OptimisticLockRetryPolicy optimisticLockRetryPolicy;
081    @XmlAttribute
082    private Boolean parallelProcessing;
083    @XmlAttribute
084    private Boolean optimisticLocking;
085    @XmlAttribute
086    private String executorServiceRef;
087    @XmlAttribute
088    private String timeoutCheckerExecutorServiceRef;
089    @XmlAttribute
090    private String aggregationRepositoryRef;
091    @XmlAttribute
092    private String strategyRef;
093    @XmlAttribute
094    private String strategyMethodName;
095    @XmlAttribute
096    private Boolean strategyMethodAllowNull;
097    @XmlAttribute
098    private Integer completionSize;
099    @XmlAttribute
100    private Long completionInterval;
101    @XmlAttribute
102    private Long completionTimeout;
103    @XmlAttribute
104    private Boolean completionFromBatchConsumer;
105    @XmlAttribute
106    @Deprecated
107    private Boolean groupExchanges;
108    @XmlAttribute
109    private Boolean eagerCheckCompletion;
110    @XmlAttribute
111    private Boolean ignoreInvalidCorrelationKeys;
112    @XmlAttribute
113    private Integer closeCorrelationKeyOnCompletion;
114    @XmlAttribute
115    private Boolean discardOnCompletionTimeout;
116    @XmlAttribute
117    private Boolean forceCompletionOnStop;
118    @XmlAttribute
119    private Boolean completeAllOnStop;
120    @XmlTransient
121    private AggregateController aggregateController;
122    @XmlAttribute
123    private String aggregateControllerRef;
124    @XmlElementRef
125    private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
126
127    public AggregateDefinition() {
128    }
129
130    public AggregateDefinition(Predicate predicate) {
131        this(ExpressionNodeHelper.toExpressionDefinition(predicate));
132    }
133    
134    public AggregateDefinition(Expression expression) {
135        this(ExpressionNodeHelper.toExpressionDefinition(expression));
136    }
137
138    public AggregateDefinition(ExpressionDefinition correlationExpression) {
139        setExpression(correlationExpression);
140
141        ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition();
142        cor.setExpressionType(correlationExpression);
143        setCorrelationExpression(cor);
144    }
145
146    public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
147        this(correlationExpression);
148        this.aggregationStrategy = aggregationStrategy;
149    }
150
151    @Override
152    public String toString() {
153        return "Aggregate[" + description() + " -> " + getOutputs() + "]";
154    }
155    
156    protected String description() {
157        return getExpression() != null ? getExpression().getLabel() : "";
158    }
159
160    @Override
161    public String getLabel() {
162        return "aggregate[" + description() + "]";
163    }
164
165    @Override
166    public Processor createProcessor(RouteContext routeContext) throws Exception {
167        return createAggregator(routeContext);
168    }
169
170    protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception {
171        Processor childProcessor = this.createChildProcessor(routeContext, true);
172
173        // wrap the aggregate route in a unit of work processor
174        CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor);
175        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
176
177        Expression correlation = getExpression().createExpression(routeContext);
178        AggregationStrategy strategy = createAggregationStrategy(routeContext);
179
180        boolean parallel = getParallelProcessing() != null && getParallelProcessing();
181        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, parallel);
182        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, parallel);
183        if (threadPool == null && !parallel) {
184            // executor service is mandatory for the Aggregator
185            // we do not run in parallel mode, but use a synchronous executor, so we run in current thread
186            threadPool = new SynchronousExecutorService();
187            shutdownThreadPool = true;
188        }
189
190        AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), internal,
191                correlation, strategy, threadPool, shutdownThreadPool);
192
193        AggregationRepository repository = createAggregationRepository(routeContext);
194        if (repository != null) {
195            answer.setAggregationRepository(repository);
196        }
197
198        if (getAggregateController() == null && getAggregateControllerRef() != null) {
199            setAggregateController(routeContext.mandatoryLookup(getAggregateControllerRef(), AggregateController.class));
200        }
201
202        // this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool
203        boolean shutdownTimeoutThreadPool = false;
204        ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService;
205        if (timeoutThreadPool == null && timeoutCheckerExecutorServiceRef != null) {
206            // lookup existing thread pool
207            timeoutThreadPool = routeContext.getCamelContext().getRegistry().lookupByNameAndType(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class);
208            if (timeoutThreadPool == null) {
209                // then create a thread pool assuming the ref is a thread pool profile id
210                timeoutThreadPool = routeContext.getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
211                        AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER, timeoutCheckerExecutorServiceRef);
212                if (timeoutThreadPool == null) {
213                    throw new IllegalArgumentException("ExecutorServiceRef " + timeoutCheckerExecutorServiceRef + " not found in registry or as a thread pool profile.");
214                }
215                shutdownTimeoutThreadPool = true;
216            }
217        }
218        answer.setTimeoutCheckerExecutorService(timeoutThreadPool);
219        answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool);
220
221        // set other options
222        answer.setParallelProcessing(parallel);
223        if (getOptimisticLocking() != null) {
224            answer.setOptimisticLocking(getOptimisticLocking());
225        }
226        if (getCompletionPredicate() != null) {
227            Predicate predicate = getCompletionPredicate().createPredicate(routeContext);
228            answer.setCompletionPredicate(predicate);
229        } else if (strategy instanceof Predicate) {
230            // if aggregation strategy implements predicate and was not configured then use as fallback
231            log.debug("Using AggregationStrategy as completion predicate: {}", strategy);
232            answer.setCompletionPredicate((Predicate) strategy);
233        }
234        if (getCompletionTimeoutExpression() != null) {
235            Expression expression = getCompletionTimeoutExpression().createExpression(routeContext);
236            answer.setCompletionTimeoutExpression(expression);
237        }
238        if (getCompletionTimeout() != null) {
239            answer.setCompletionTimeout(getCompletionTimeout());
240        }
241        if (getCompletionInterval() != null) {
242            answer.setCompletionInterval(getCompletionInterval());
243        }
244        if (getCompletionSizeExpression() != null) {
245            Expression expression = getCompletionSizeExpression().createExpression(routeContext);
246            answer.setCompletionSizeExpression(expression);
247        }
248        if (getCompletionSize() != null) {
249            answer.setCompletionSize(getCompletionSize());
250        }
251        if (getCompletionFromBatchConsumer() != null) {
252            answer.setCompletionFromBatchConsumer(getCompletionFromBatchConsumer());
253        }
254        if (getEagerCheckCompletion() != null) {
255            answer.setEagerCheckCompletion(getEagerCheckCompletion());
256        }
257        if (getIgnoreInvalidCorrelationKeys() != null) {
258            answer.setIgnoreInvalidCorrelationKeys(getIgnoreInvalidCorrelationKeys());
259        }
260        if (getCloseCorrelationKeyOnCompletion() != null) {
261            answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion());
262        }
263        if (getDiscardOnCompletionTimeout() != null) {
264            answer.setDiscardOnCompletionTimeout(getDiscardOnCompletionTimeout());
265        }
266        if (getForceCompletionOnStop() != null) {
267            answer.setForceCompletionOnStop(getForceCompletionOnStop());
268        }
269        if (getCompleteAllOnStop() != null) {
270            answer.setCompleteAllOnStop(getCompleteAllOnStop());
271        }
272        if (optimisticLockRetryPolicy == null) {
273            if (getOptimisticLockRetryPolicyDefinition() != null) {
274                answer.setOptimisticLockRetryPolicy(getOptimisticLockRetryPolicyDefinition().createOptimisticLockRetryPolicy());
275            }
276        } else {
277            answer.setOptimisticLockRetryPolicy(optimisticLockRetryPolicy);
278        }
279        if (getAggregateController() != null) {
280            answer.setAggregateController(getAggregateController());
281        }
282        return answer;
283    }
284
285    @Override
286    public void configureChild(ProcessorDefinition<?> output) {
287        if (expression != null && expression instanceof ExpressionClause) {
288            ExpressionClause<?> clause = (ExpressionClause<?>) expression;
289            if (clause.getExpressionType() != null) {
290                // if using the Java DSL then the expression may have been set using the
291                // ExpressionClause which is a fancy builder to define expressions and predicates
292                // using fluent builders in the DSL. However we need afterwards a callback to
293                // reset the expression to the expression type the ExpressionClause did build for us
294                expression = clause.getExpressionType();
295                // set the correlation expression from the expression type, as the model definition
296                // would then be accurate
297                correlationExpression = new ExpressionSubElementDefinition();
298                correlationExpression.setExpressionType(clause.getExpressionType());
299            }
300        }
301    }
302
303    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
304        AggregationStrategy strategy = getAggregationStrategy();
305        if (strategy == null && strategyRef != null) {
306            Object aggStrategy = routeContext.lookup(strategyRef, Object.class);
307            if (aggStrategy instanceof AggregationStrategy) {
308                strategy = (AggregationStrategy) aggStrategy;
309            } else if (aggStrategy != null) {
310                AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName());
311                if (getStrategyMethodAllowNull() != null) {
312                    adapter.setAllowNullNewExchange(getStrategyMethodAllowNull());
313                    adapter.setAllowNullOldExchange(getStrategyMethodAllowNull());
314                }
315                strategy = adapter;
316            } else {
317                throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
318            }
319        }
320
321        if (groupExchanges != null && groupExchanges) {
322            if (strategy != null || strategyRef != null) {
323                throw new IllegalArgumentException("Options groupExchanges and AggregationStrategy cannot be enabled at the same time");
324            }
325            if (eagerCheckCompletion != null && !eagerCheckCompletion) {
326                throw new IllegalArgumentException("Option eagerCheckCompletion cannot be false when groupExchanges has been enabled");
327            }
328            // set eager check to enabled by default when using grouped exchanges
329            setEagerCheckCompletion(true);
330            // if grouped exchange is enabled then use special strategy for that
331            strategy = new GroupedExchangeAggregationStrategy();
332        }
333
334        if (strategy == null) {
335            throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this);
336        }
337
338        if (strategy instanceof CamelContextAware) {
339            ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
340        }
341
342        return strategy;
343    }
344
345    private AggregationRepository createAggregationRepository(RouteContext routeContext) {
346        AggregationRepository repository = getAggregationRepository();
347        if (repository == null && aggregationRepositoryRef != null) {
348            repository = routeContext.mandatoryLookup(aggregationRepositoryRef, AggregationRepository.class);
349        }
350        return repository;
351    }
352
353    public AggregationStrategy getAggregationStrategy() {
354        return aggregationStrategy;
355    }
356
357    /**
358     * The AggregationStrategy to use.
359     * <p/>
360     * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges.
361     * At first call the oldExchange parameter is null.
362     * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange.
363     */
364    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
365        this.aggregationStrategy = aggregationStrategy;
366    }
367
368    public String getAggregationStrategyRef() {
369        return strategyRef;
370    }
371
372    /**
373     * A reference to lookup the AggregationStrategy in the Registry.
374     * <p/>
375     * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges.
376     * At first call the oldExchange parameter is null.
377     * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange.
378     */
379    public void setAggregationStrategyRef(String aggregationStrategyRef) {
380        this.strategyRef = aggregationStrategyRef;
381    }
382
383    public String getStrategyRef() {
384        return strategyRef;
385    }
386
387    /**
388     * A reference to lookup the AggregationStrategy in the Registry.
389     * <p/>
390     * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges.
391     * At first call the oldExchange parameter is null.
392     * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange.
393     */
394    public void setStrategyRef(String strategyRef) {
395        this.strategyRef = strategyRef;
396    }
397
398    public String getAggregationStrategyMethodName() {
399        return strategyMethodName;
400    }
401
402    /**
403     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
404     */
405    public void setAggregationStrategyMethodName(String strategyMethodName) {
406        this.strategyMethodName = strategyMethodName;
407    }
408
409    public Boolean getStrategyMethodAllowNull() {
410        return strategyMethodAllowNull;
411    }
412
413    public String getStrategyMethodName() {
414        return strategyMethodName;
415    }
416
417    /**
418     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
419     */
420    public void setStrategyMethodName(String strategyMethodName) {
421        this.strategyMethodName = strategyMethodName;
422    }
423
424    /**
425     * If this option is false then the aggregate method is not used for the very first aggregation.
426     * If this option is true then null values is used as the oldExchange (at the very first aggregation),
427     * when using POJOs as the AggregationStrategy.
428     */
429    public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) {
430        this.strategyMethodAllowNull = strategyMethodAllowNull;
431    }
432
433    /**
434     * The expression used to calculate the correlation key to use for aggregation.
435     * The Exchange which has the same correlation key is aggregated together.
436     * If the correlation key could not be evaluated an Exception is thrown.
437     * You can disable this by using the ignoreBadCorrelationKeys option.
438     */
439    public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
440        this.correlationExpression = correlationExpression;
441    }
442
443    public ExpressionSubElementDefinition getCorrelationExpression() {
444        return correlationExpression;
445    }
446
447    public Integer getCompletionSize() {
448        return completionSize;
449    }
450
451    public void setCompletionSize(Integer completionSize) {
452        this.completionSize = completionSize;
453    }
454
455    public OptimisticLockRetryPolicyDefinition getOptimisticLockRetryPolicyDefinition() {
456        return optimisticLockRetryPolicyDefinition;
457    }
458
459    public void setOptimisticLockRetryPolicyDefinition(OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition) {
460        this.optimisticLockRetryPolicyDefinition = optimisticLockRetryPolicyDefinition;
461    }
462
463    public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() {
464        return optimisticLockRetryPolicy;
465    }
466
467    public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) {
468        this.optimisticLockRetryPolicy = optimisticLockRetryPolicy;
469    }
470
471    public Long getCompletionInterval() {
472        return completionInterval;
473    }
474
475    public void setCompletionInterval(Long completionInterval) {
476        this.completionInterval = completionInterval;
477    }
478
479    public Long getCompletionTimeout() {
480        return completionTimeout;
481    }
482
483    public void setCompletionTimeout(Long completionTimeout) {
484        this.completionTimeout = completionTimeout;
485    }
486
487    public ExpressionSubElementDefinition getCompletionPredicate() {
488        return completionPredicate;
489    }
490
491    public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) {
492        this.completionPredicate = completionPredicate;
493    }
494
495    public ExpressionSubElementDefinition getCompletionTimeoutExpression() {
496        return completionTimeoutExpression;
497    }
498
499    public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) {
500        this.completionTimeoutExpression = completionTimeoutExpression;
501    }
502
503    public ExpressionSubElementDefinition getCompletionSizeExpression() {
504        return completionSizeExpression;
505    }
506
507    public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) {
508        this.completionSizeExpression = completionSizeExpression;
509    }
510
511    public Boolean getGroupExchanges() {
512        return groupExchanges;
513    }
514
515    public void setGroupExchanges(Boolean groupExchanges) {
516        this.groupExchanges = groupExchanges;
517    }
518
519    public Boolean getCompletionFromBatchConsumer() {
520        return completionFromBatchConsumer;
521    }
522
523    public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) {
524        this.completionFromBatchConsumer = completionFromBatchConsumer;
525    }
526
527    public ExecutorService getExecutorService() {
528        return executorService;
529    }
530
531    public void setExecutorService(ExecutorService executorService) {
532        this.executorService = executorService;
533    }
534
535    public Boolean getOptimisticLocking() {
536        return optimisticLocking;
537    }
538
539    public void setOptimisticLocking(boolean optimisticLocking) {
540        this.optimisticLocking = optimisticLocking;
541    }
542
543    public Boolean getParallelProcessing() {
544        return parallelProcessing;
545    }
546
547    public void setParallelProcessing(boolean parallelProcessing) {
548        this.parallelProcessing = parallelProcessing;
549    }
550
551    public String getExecutorServiceRef() {
552        return executorServiceRef;
553    }
554
555    public void setExecutorServiceRef(String executorServiceRef) {
556        this.executorServiceRef = executorServiceRef;
557    }
558
559    public Boolean getEagerCheckCompletion() {
560        return eagerCheckCompletion;
561    }
562
563    public void setEagerCheckCompletion(Boolean eagerCheckCompletion) {
564        this.eagerCheckCompletion = eagerCheckCompletion;
565    }
566
567    public Boolean getIgnoreInvalidCorrelationKeys() {
568        return ignoreInvalidCorrelationKeys;
569    }
570
571    public void setIgnoreInvalidCorrelationKeys(Boolean ignoreInvalidCorrelationKeys) {
572        this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys;
573    }
574
575    public Integer getCloseCorrelationKeyOnCompletion() {
576        return closeCorrelationKeyOnCompletion;
577    }
578
579    public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
580        this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
581    }
582
583    public AggregationRepository getAggregationRepository() {
584        return aggregationRepository;
585    }
586
587    public void setAggregationRepository(AggregationRepository aggregationRepository) {
588        this.aggregationRepository = aggregationRepository;
589    }
590
591    public String getAggregationRepositoryRef() {
592        return aggregationRepositoryRef;
593    }
594
595    public void setAggregationRepositoryRef(String aggregationRepositoryRef) {
596        this.aggregationRepositoryRef = aggregationRepositoryRef;
597    }
598
599    public Boolean getDiscardOnCompletionTimeout() {
600        return discardOnCompletionTimeout;
601    }
602
603    public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) {
604        this.discardOnCompletionTimeout = discardOnCompletionTimeout;
605    }
606    
607    public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
608        this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
609    }
610
611    public ScheduledExecutorService getTimeoutCheckerExecutorService() {
612        return timeoutCheckerExecutorService;
613    }
614
615    public void setTimeoutCheckerExecutorServiceRef(String timeoutCheckerExecutorServiceRef) {
616        this.timeoutCheckerExecutorServiceRef = timeoutCheckerExecutorServiceRef;
617    }
618
619    public String getTimeoutCheckerExecutorServiceRef() {
620        return timeoutCheckerExecutorServiceRef;
621    }
622
623    public Boolean getForceCompletionOnStop() {
624        return forceCompletionOnStop;
625    }
626
627    public void setForceCompletionOnStop(Boolean forceCompletionOnStop) {
628        this.forceCompletionOnStop = forceCompletionOnStop;
629    }
630
631    public Boolean getCompleteAllOnStop() {
632        return completeAllOnStop;
633    }
634
635    public void setCompleteAllOnStop(Boolean completeAllOnStop) {
636        this.completeAllOnStop = completeAllOnStop;
637    }
638
639    public AggregateController getAggregateController() {
640        return aggregateController;
641    }
642
643    public void setAggregateController(AggregateController aggregateController) {
644        this.aggregateController = aggregateController;
645    }
646
647    public String getAggregateControllerRef() {
648        return aggregateControllerRef;
649    }
650
651    /**
652     * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control
653     * this aggregator.
654     */
655    public void setAggregateControllerRef(String aggregateControllerRef) {
656        this.aggregateControllerRef = aggregateControllerRef;
657    }
658
659    // Fluent API
660    //-------------------------------------------------------------------------
661
662    /**
663     * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange.
664     * As opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange.
665     *
666     * @return builder
667     */
668    public AggregateDefinition eagerCheckCompletion() {
669        setEagerCheckCompletion(true);
670        return this;
671    }
672
673    /**
674     * If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just
675     * ignore the incoming Exchange.
676     *
677     * @return builder
678     */
679    public AggregateDefinition ignoreInvalidCorrelationKeys() {
680        setIgnoreInvalidCorrelationKeys(true);
681        return this;
682    }
683
684    /**
685     * Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key
686     * that has been closed, it will be defined and a {@link ClosedCorrelationKeyException}
687     * is thrown.
688     *
689     * @param capacity the maximum capacity of the closed correlation key cache.
690     *                 Use <tt>0</tt> or negative value for unbounded capacity.
691     * @return builder
692     */
693    public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) {
694        setCloseCorrelationKeyOnCompletion(capacity);
695        return this;
696    }
697
698    /**
699     * Discards the aggregated message on completion timeout.
700     * <p/>
701     * This means on timeout the aggregated message is dropped and not sent out of the aggregator.
702     *
703     * @return builder
704     */
705    public AggregateDefinition discardOnCompletionTimeout() {
706        setDiscardOnCompletionTimeout(true);
707        return this;
708    }
709
710    /**
711     * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
712     * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
713     * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
714     *
715     * @return builder
716     */
717    public AggregateDefinition completionFromBatchConsumer() {
718        setCompletionFromBatchConsumer(true);
719        return this;
720    }
721
722    /**
723     * Sets the completion size, which is the number of aggregated exchanges which would
724     * cause the aggregate to consider the group as complete and send out the aggregated exchange.
725     *
726     * @param completionSize  the completion size
727     * @return builder
728     */
729    public AggregateDefinition completionSize(int completionSize) {
730        setCompletionSize(completionSize);
731        return this;
732    }
733
734    /**
735     * Sets the completion size, which is the number of aggregated exchanges which would
736     * cause the aggregate to consider the group as complete and send out the aggregated exchange.
737     *
738     * @param completionSize  the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type
739     * @return builder
740     */
741    public AggregateDefinition completionSize(Expression completionSize) {
742        setCompletionSizeExpression(new ExpressionSubElementDefinition(completionSize));
743        return this;
744    }
745
746    /**
747     * Sets the completion interval, which would cause the aggregate to consider the group as complete
748     * and send out the aggregated exchange.
749     *
750     * @param completionInterval  the interval in millis
751     * @return the builder
752     */
753    public AggregateDefinition completionInterval(long completionInterval) {
754        setCompletionInterval(completionInterval);
755        return this;
756    }
757
758    /**
759     * Sets the completion timeout, which would cause the aggregate to consider the group as complete
760     * and send out the aggregated exchange.
761     *
762     * @param completionTimeout  the timeout in millis
763     * @return the builder
764     */
765    public AggregateDefinition completionTimeout(long completionTimeout) {
766        setCompletionTimeout(completionTimeout);
767        return this;
768    }
769
770    /**
771     * Sets the completion timeout, which would cause the aggregate to consider the group as complete
772     * and send out the aggregated exchange.
773     *
774     * @param completionTimeout  the timeout as an {@link Expression} which is evaluated as a {@link Long} type
775     * @return the builder
776     */
777    public AggregateDefinition completionTimeout(Expression completionTimeout) {
778        setCompletionTimeoutExpression(new ExpressionSubElementDefinition(completionTimeout));
779        return this;
780    }
781
782    /**
783     * Sets the aggregate strategy to use
784     *
785     * @param aggregationStrategy  the aggregate strategy to use
786     * @return the builder
787     */
788    public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
789        setAggregationStrategy(aggregationStrategy);
790        return this;
791    }
792
793    /**
794     * Sets the aggregate strategy to use
795     *
796     * @param aggregationStrategyRef  reference to the strategy to lookup in the registry
797     * @return the builder
798     */
799    public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) {
800        setAggregationStrategyRef(aggregationStrategyRef);
801        return this;
802    }
803
804    /**
805     * Sets the method name to use when using a POJO as {@link AggregationStrategy}.
806     *
807     * @param  methodName the method name to call
808     * @return the builder
809     */
810    public AggregateDefinition aggregationStrategyMethodName(String methodName) {
811        setAggregationStrategyMethodName(methodName);
812        return this;
813    }
814
815    /**
816     * Sets allowing null when using a POJO as {@link AggregationStrategy}.
817     *
818     * @return the builder
819     */
820    public AggregateDefinition aggregationStrategyMethodAllowNull() {
821        setStrategyMethodAllowNull(true);
822        return this;
823    }
824
825    /**
826     * Sets the custom aggregate repository to use.
827     * <p/>
828     * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
829     *
830     * @param aggregationRepository  the aggregate repository to use
831     * @return the builder
832     */
833    public AggregateDefinition aggregationRepository(AggregationRepository aggregationRepository) {
834        setAggregationRepository(aggregationRepository);
835        return this;
836    }
837
838    /**
839     * Sets the custom aggregate repository to use
840     * <p/>
841     * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
842     *
843     * @param aggregationRepositoryRef  reference to the repository to lookup in the registry
844     * @return the builder
845     */
846    public AggregateDefinition aggregationRepositoryRef(String aggregationRepositoryRef) {
847        setAggregationRepositoryRef(aggregationRepositoryRef);
848        return this;
849    }
850
851    /**
852     * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single
853     * combined Exchange holding all the aggregated exchanges in a {@link java.util.List}.
854     *
855     * @deprecated use {@link GroupedExchangeAggregationStrategy} as aggregation strategy instead.
856     */
857    @Deprecated
858    public AggregateDefinition groupExchanges() {
859        setGroupExchanges(true);
860        // must use eager check when using grouped exchanges
861        setEagerCheckCompletion(true);
862        return this;
863    }
864
865    /**
866     * Sets the predicate used to determine if the aggregation is completed
867     */
868    public AggregateDefinition completionPredicate(Predicate predicate) {
869        checkNoCompletedPredicate();
870        setCompletionPredicate(new ExpressionSubElementDefinition(predicate));
871        return this;
872    }
873
874    /**
875     * Indicates to complete all current aggregated exchanges when the context is stopped
876     */
877    public AggregateDefinition forceCompletionOnStop() {
878        setForceCompletionOnStop(true);
879        return this;
880    }
881
882    /**
883     * Indicates to wait to complete all current and partial (pending) aggregated exchanges when the context is stopped.
884     * <p/>
885     * This also means that we will wait for all pending exchanges which are stored in the aggregation repository
886     * to complete so the repository is empty before we can stop.
887     * <p/>
888     * You may want to enable this when using the memory based aggregation repository that is memory based only,
889     * and do not store data on disk. When this option is enabled, then the aggregator is waiting to complete
890     * all those exchanges before its stopped, when stopping CamelContext or the route using it.
891     */
892    public AggregateDefinition completeAllOnStop() {
893        setCompleteAllOnStop(true);
894        return this;
895    }
896
897    /**
898     * When aggregated are completed they are being send out of the aggregator.
899     * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency.
900     * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads.
901     */
902    public AggregateDefinition parallelProcessing() {
903        setParallelProcessing(true);
904        return this;
905    }
906
907    /**
908     * When aggregated are completed they are being send out of the aggregator.
909     * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency.
910     * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads.
911     */
912    public AggregateDefinition parallelProcessing(boolean parallelProcessing) {
913        setParallelProcessing(parallelProcessing);
914        return this;
915    }
916
917    /**
918     * Turns on using optimistic locking, which requires the aggregationRepository being used,
919     * is supporting this by implementing {@link org.apache.camel.spi.OptimisticLockingAggregationRepository}.
920     */
921    public AggregateDefinition optimisticLocking() {
922        setOptimisticLocking(true);
923        return this;
924    }
925
926    /**
927     * Allows to configure retry settings when using optimistic locking.
928     */
929    public AggregateDefinition optimisticLockRetryPolicy(OptimisticLockRetryPolicy policy) {
930        setOptimisticLockRetryPolicy(policy);
931        return this;
932    }
933
934    /**
935     * If using parallelProcessing you can specify a custom thread pool to be used.
936     * In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well.
937     */
938    public AggregateDefinition executorService(ExecutorService executorService) {
939        setExecutorService(executorService);
940        return this;
941    }
942
943    /**
944     * If using parallelProcessing you can specify a custom thread pool to be used.
945     * In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well.
946     */
947    public AggregateDefinition executorServiceRef(String executorServiceRef) {
948        setExecutorServiceRef(executorServiceRef);
949        return this;
950    }
951
952    /**
953     * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a
954     * background thread is created to check for the completion for every aggregator.
955     * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator.
956     */
957    public AggregateDefinition timeoutCheckerExecutorService(ScheduledExecutorService executorService) {
958        setTimeoutCheckerExecutorService(executorService);
959        return this;
960    }
961
962    /**
963     * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a
964     * background thread is created to check for the completion for every aggregator.
965     * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator.
966     */
967    public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) {
968        setTimeoutCheckerExecutorServiceRef(executorServiceRef);
969        return this;
970    }
971
972    /**
973     * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control
974     * this aggregator.
975     */
976    public AggregateDefinition aggregateController(AggregateController aggregateController) {
977        setAggregateController(aggregateController);
978        return this;
979    }
980
981    // Section - Methods from ExpressionNode
982    // Needed to copy methods from ExpressionNode here so that I could specify the
983    // correlation expression as optional in JAXB
984
985    public ExpressionDefinition getExpression() {
986        if (expression == null && correlationExpression != null) {
987            expression = correlationExpression.getExpressionType();            
988        }
989        return expression;
990    }
991
992    public void setExpression(ExpressionDefinition expression) {
993        this.expression = expression;
994    }
995
996    protected void checkNoCompletedPredicate() {
997        if (getCompletionPredicate() != null) {
998            throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this);
999        }
1000    }
1001
1002    @Override
1003    public List<ProcessorDefinition<?>> getOutputs() {
1004        return outputs;
1005    }
1006
1007    public boolean isOutputSupported() {
1008        return true;
1009    }
1010
1011    public void setOutputs(List<ProcessorDefinition<?>> outputs) {
1012        this.outputs = outputs;
1013    }
1014
1015}