001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.ScheduledExecutorService;
023
024import javax.xml.bind.annotation.XmlAccessType;
025import javax.xml.bind.annotation.XmlAccessorType;
026import javax.xml.bind.annotation.XmlAttribute;
027import javax.xml.bind.annotation.XmlElement;
028import javax.xml.bind.annotation.XmlElementRef;
029import javax.xml.bind.annotation.XmlRootElement;
030import javax.xml.bind.annotation.XmlTransient;
031
032import org.apache.camel.CamelContextAware;
033import org.apache.camel.Expression;
034import org.apache.camel.Predicate;
035import org.apache.camel.Processor;
036import org.apache.camel.builder.AggregationStrategyClause;
037import org.apache.camel.builder.ExpressionClause;
038import org.apache.camel.builder.PredicateClause;
039import org.apache.camel.model.language.ExpressionDefinition;
040import org.apache.camel.processor.CamelInternalProcessor;
041import org.apache.camel.processor.aggregate.AggregateController;
042import org.apache.camel.processor.aggregate.AggregateProcessor;
043import org.apache.camel.processor.aggregate.AggregationStrategy;
044import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
045import org.apache.camel.processor.aggregate.ClosedCorrelationKeyException;
046import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
047import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
048import org.apache.camel.spi.AggregationRepository;
049import org.apache.camel.spi.AsPredicate;
050import org.apache.camel.spi.Metadata;
051import org.apache.camel.spi.RouteContext;
052import org.apache.camel.util.concurrent.SynchronousExecutorService;
053
054/**
055 * Aggregates many messages into a single message
056 *
057 * @version 
058 */
059@Metadata(label = "eip,routing")
060@XmlRootElement(name = "aggregate")
061@XmlAccessorType(XmlAccessType.FIELD)
062public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> {
063    @XmlElement(name = "correlationExpression", required = true)
064    private ExpressionSubElementDefinition correlationExpression;
065    @XmlElement(name = "completionPredicate") @AsPredicate
066    private ExpressionSubElementDefinition completionPredicate;
067    @XmlElement(name = "completionTimeout")
068    private ExpressionSubElementDefinition completionTimeoutExpression;
069    @XmlElement(name = "completionSize")
070    private ExpressionSubElementDefinition completionSizeExpression;
071    @XmlElement(name = "optimisticLockRetryPolicy")
072    private OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition;
073    @XmlTransient
074    private ExpressionDefinition expression;
075    @XmlTransient
076    private AggregationStrategy aggregationStrategy;
077    @XmlTransient
078    private ExecutorService executorService;
079    @XmlTransient
080    private ScheduledExecutorService timeoutCheckerExecutorService;
081    @XmlTransient
082    private AggregationRepository aggregationRepository;
083    @XmlTransient
084    private OptimisticLockRetryPolicy optimisticLockRetryPolicy;
085    @XmlAttribute
086    private Boolean parallelProcessing;
087    @XmlAttribute
088    private Boolean optimisticLocking;
089    @XmlAttribute
090    private String executorServiceRef;
091    @XmlAttribute
092    private String timeoutCheckerExecutorServiceRef;
093    @XmlAttribute
094    private String aggregationRepositoryRef;
095    @XmlAttribute
096    private String strategyRef;
097    @XmlAttribute
098    private String strategyMethodName;
099    @XmlAttribute
100    private Boolean strategyMethodAllowNull;
101    @XmlAttribute
102    private Integer completionSize;
103    @XmlAttribute
104    private Long completionInterval;
105    @XmlAttribute
106    private Long completionTimeout;
107    @XmlAttribute @Metadata(defaultValue = "1000")
108    private Long completionTimeoutCheckerInterval = 1000L;
109    @XmlAttribute
110    private Boolean completionFromBatchConsumer;
111    @XmlAttribute
112    private Boolean completionOnNewCorrelationGroup;
113    @XmlAttribute
114    @Deprecated
115    private Boolean groupExchanges;
116    @XmlAttribute
117    private Boolean eagerCheckCompletion;
118    @XmlAttribute
119    private Boolean ignoreInvalidCorrelationKeys;
120    @XmlAttribute
121    private Integer closeCorrelationKeyOnCompletion;
122    @XmlAttribute
123    private Boolean discardOnCompletionTimeout;
124    @XmlAttribute
125    private Boolean forceCompletionOnStop;
126    @XmlAttribute
127    private Boolean completeAllOnStop;
128    @XmlTransient
129    private AggregateController aggregateController;
130    @XmlAttribute
131    private String aggregateControllerRef;
132    @XmlElementRef
133    private List<ProcessorDefinition<?>> outputs = new ArrayList<>();
134
135    public AggregateDefinition() {
136    }
137
138    public AggregateDefinition(@AsPredicate Predicate predicate) {
139        this(ExpressionNodeHelper.toExpressionDefinition(predicate));
140    }
141    
142    public AggregateDefinition(Expression expression) {
143        this(ExpressionNodeHelper.toExpressionDefinition(expression));
144    }
145
146    public AggregateDefinition(ExpressionDefinition correlationExpression) {
147        setExpression(correlationExpression);
148
149        ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition();
150        cor.setExpressionType(correlationExpression);
151        setCorrelationExpression(cor);
152    }
153
154    public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
155        this(correlationExpression);
156        this.aggregationStrategy = aggregationStrategy;
157    }
158
159    @Override
160    public String toString() {
161        return "Aggregate[" + description() + " -> " + getOutputs() + "]";
162    }
163    
164    protected String description() {
165        return getExpression() != null ? getExpression().getLabel() : "";
166    }
167
168    @Override
169    public String getShortName() {
170        return "aggregate";
171    }
172
173    @Override
174    public String getLabel() {
175        return "aggregate[" + description() + "]";
176    }
177
178    @Override
179    public Processor createProcessor(RouteContext routeContext) throws Exception {
180        return createAggregator(routeContext);
181    }
182
183    protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception {
184        Processor childProcessor = this.createChildProcessor(routeContext, true);
185
186        // wrap the aggregate route in a unit of work processor
187        CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor);
188        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
189
190        Expression correlation = getExpression().createExpression(routeContext);
191        AggregationStrategy strategy = createAggregationStrategy(routeContext);
192
193        boolean parallel = getParallelProcessing() != null && getParallelProcessing();
194        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, parallel);
195        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, parallel);
196        if (threadPool == null && !parallel) {
197            // executor service is mandatory for the Aggregator
198            // we do not run in parallel mode, but use a synchronous executor, so we run in current thread
199            threadPool = new SynchronousExecutorService();
200            shutdownThreadPool = true;
201        }
202
203        AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), internal,
204                correlation, strategy, threadPool, shutdownThreadPool);
205
206        AggregationRepository repository = createAggregationRepository(routeContext);
207        if (repository != null) {
208            answer.setAggregationRepository(repository);
209        }
210
211        if (getAggregateController() == null && getAggregateControllerRef() != null) {
212            setAggregateController(routeContext.mandatoryLookup(getAggregateControllerRef(), AggregateController.class));
213        }
214
215        // this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool
216        boolean shutdownTimeoutThreadPool = false;
217        ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService;
218        if (timeoutThreadPool == null && timeoutCheckerExecutorServiceRef != null) {
219            // lookup existing thread pool
220            timeoutThreadPool = routeContext.getCamelContext().getRegistry().lookupByNameAndType(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class);
221            if (timeoutThreadPool == null) {
222                // then create a thread pool assuming the ref is a thread pool profile id
223                timeoutThreadPool = routeContext.getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
224                        AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER, timeoutCheckerExecutorServiceRef);
225                if (timeoutThreadPool == null) {
226                    throw new IllegalArgumentException("ExecutorServiceRef " + timeoutCheckerExecutorServiceRef 
227                            + " not found in registry (as an ScheduledExecutorService instance) or as a thread pool profile.");
228                }
229                shutdownTimeoutThreadPool = true;
230            }
231        }
232        answer.setTimeoutCheckerExecutorService(timeoutThreadPool);
233        answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool);
234
235        // set other options
236        answer.setParallelProcessing(parallel);
237        if (getOptimisticLocking() != null) {
238            answer.setOptimisticLocking(getOptimisticLocking());
239        }
240        if (getCompletionPredicate() != null) {
241            Predicate predicate = getCompletionPredicate().createPredicate(routeContext);
242            answer.setCompletionPredicate(predicate);
243        } else if (strategy instanceof Predicate) {
244            // if aggregation strategy implements predicate and was not configured then use as fallback
245            log.debug("Using AggregationStrategy as completion predicate: {}", strategy);
246            answer.setCompletionPredicate((Predicate) strategy);
247        }
248        if (getCompletionTimeoutExpression() != null) {
249            Expression expression = getCompletionTimeoutExpression().createExpression(routeContext);
250            answer.setCompletionTimeoutExpression(expression);
251        }
252        if (getCompletionTimeout() != null) {
253            answer.setCompletionTimeout(getCompletionTimeout());
254        }
255        if (getCompletionInterval() != null) {
256            answer.setCompletionInterval(getCompletionInterval());
257        }
258        if (getCompletionSizeExpression() != null) {
259            Expression expression = getCompletionSizeExpression().createExpression(routeContext);
260            answer.setCompletionSizeExpression(expression);
261        }
262        if (getCompletionSize() != null) {
263            answer.setCompletionSize(getCompletionSize());
264        }
265        if (getCompletionFromBatchConsumer() != null) {
266            answer.setCompletionFromBatchConsumer(getCompletionFromBatchConsumer());
267        }
268        if (getCompletionOnNewCorrelationGroup() != null) {
269            answer.setCompletionOnNewCorrelationGroup(getCompletionOnNewCorrelationGroup());
270        }
271        if (getEagerCheckCompletion() != null) {
272            answer.setEagerCheckCompletion(getEagerCheckCompletion());
273        }
274        if (getIgnoreInvalidCorrelationKeys() != null) {
275            answer.setIgnoreInvalidCorrelationKeys(getIgnoreInvalidCorrelationKeys());
276        }
277        if (getCloseCorrelationKeyOnCompletion() != null) {
278            answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion());
279        }
280        if (getDiscardOnCompletionTimeout() != null) {
281            answer.setDiscardOnCompletionTimeout(getDiscardOnCompletionTimeout());
282        }
283        if (getForceCompletionOnStop() != null) {
284            answer.setForceCompletionOnStop(getForceCompletionOnStop());
285        }
286        if (getCompleteAllOnStop() != null) {
287            answer.setCompleteAllOnStop(getCompleteAllOnStop());
288        }
289        if (optimisticLockRetryPolicy == null) {
290            if (getOptimisticLockRetryPolicyDefinition() != null) {
291                answer.setOptimisticLockRetryPolicy(getOptimisticLockRetryPolicyDefinition().createOptimisticLockRetryPolicy());
292            }
293        } else {
294            answer.setOptimisticLockRetryPolicy(optimisticLockRetryPolicy);
295        }
296        if (getAggregateController() != null) {
297            answer.setAggregateController(getAggregateController());
298        }
299        if (getCompletionTimeoutCheckerInterval() != null) {
300            answer.setCompletionTimeoutCheckerInterval(getCompletionTimeoutCheckerInterval());
301        }
302        return answer;
303    }
304
305    @Override
306    public void configureChild(ProcessorDefinition<?> output) {
307        if (expression instanceof ExpressionClause) {
308            ExpressionClause<?> clause = (ExpressionClause<?>) expression;
309            if (clause.getExpressionType() != null) {
310                // if using the Java DSL then the expression may have been set using the
311                // ExpressionClause which is a fancy builder to define expressions and predicates
312                // using fluent builders in the DSL. However we need afterwards a callback to
313                // reset the expression to the expression type the ExpressionClause did build for us
314                expression = clause.getExpressionType();
315                // set the correlation expression from the expression type, as the model definition
316                // would then be accurate
317                correlationExpression = new ExpressionSubElementDefinition();
318                correlationExpression.setExpressionType(clause.getExpressionType());
319            }
320        }
321    }
322
323    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
324        AggregationStrategy strategy = getAggregationStrategy();
325        if (strategy == null && strategyRef != null) {
326            Object aggStrategy = routeContext.lookup(strategyRef, Object.class);
327            if (aggStrategy instanceof AggregationStrategy) {
328                strategy = (AggregationStrategy) aggStrategy;
329            } else if (aggStrategy != null) {
330                AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName());
331                if (getStrategyMethodAllowNull() != null) {
332                    adapter.setAllowNullNewExchange(getStrategyMethodAllowNull());
333                    adapter.setAllowNullOldExchange(getStrategyMethodAllowNull());
334                }
335                strategy = adapter;
336            } else {
337                throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
338            }
339        }
340
341        if (groupExchanges != null && groupExchanges) {
342            if (strategy != null || strategyRef != null) {
343                throw new IllegalArgumentException("Options groupExchanges and AggregationStrategy cannot be enabled at the same time");
344            }
345            if (eagerCheckCompletion != null && !eagerCheckCompletion) {
346                throw new IllegalArgumentException("Option eagerCheckCompletion cannot be false when groupExchanges has been enabled");
347            }
348            // set eager check to enabled by default when using grouped exchanges
349            setEagerCheckCompletion(true);
350            // if grouped exchange is enabled then use special strategy for that
351            strategy = new GroupedExchangeAggregationStrategy();
352        }
353
354        if (strategy == null) {
355            throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this);
356        }
357
358        if (strategy instanceof CamelContextAware) {
359            ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
360        }
361
362        return strategy;
363    }
364
365    private AggregationRepository createAggregationRepository(RouteContext routeContext) {
366        AggregationRepository repository = getAggregationRepository();
367        if (repository == null && aggregationRepositoryRef != null) {
368            repository = routeContext.mandatoryLookup(aggregationRepositoryRef, AggregationRepository.class);
369        }
370        return repository;
371    }
372
373    public AggregationStrategy getAggregationStrategy() {
374        return aggregationStrategy;
375    }
376
377    /**
378     * The AggregationStrategy to use.
379     * <p/>
380     * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges.
381     * At first call the oldExchange parameter is null.
382     * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange.
383     */
384    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
385        this.aggregationStrategy = aggregationStrategy;
386    }
387
388    public String getAggregationStrategyRef() {
389        return strategyRef;
390    }
391
392    /**
393     * A reference to lookup the AggregationStrategy in the Registry.
394     * <p/>
395     * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges.
396     * At first call the oldExchange parameter is null.
397     * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange.
398     */
399    public void setAggregationStrategyRef(String aggregationStrategyRef) {
400        this.strategyRef = aggregationStrategyRef;
401    }
402
403    public String getStrategyRef() {
404        return strategyRef;
405    }
406
407    /**
408     * A reference to lookup the AggregationStrategy in the Registry.
409     * <p/>
410     * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges.
411     * At first call the oldExchange parameter is null.
412     * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange.
413     */
414    public void setStrategyRef(String strategyRef) {
415        this.strategyRef = strategyRef;
416    }
417
418    public String getAggregationStrategyMethodName() {
419        return strategyMethodName;
420    }
421
422    /**
423     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
424     */
425    public void setAggregationStrategyMethodName(String strategyMethodName) {
426        this.strategyMethodName = strategyMethodName;
427    }
428
429    public Boolean getStrategyMethodAllowNull() {
430        return strategyMethodAllowNull;
431    }
432
433    public String getStrategyMethodName() {
434        return strategyMethodName;
435    }
436
437    /**
438     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
439     */
440    public void setStrategyMethodName(String strategyMethodName) {
441        this.strategyMethodName = strategyMethodName;
442    }
443
444    /**
445     * If this option is false then the aggregate method is not used for the very first aggregation.
446     * If this option is true then null values is used as the oldExchange (at the very first aggregation),
447     * when using POJOs as the AggregationStrategy.
448     */
449    public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) {
450        this.strategyMethodAllowNull = strategyMethodAllowNull;
451    }
452
453    /**
454     * The expression used to calculate the correlation key to use for aggregation.
455     * The Exchange which has the same correlation key is aggregated together.
456     * If the correlation key could not be evaluated an Exception is thrown.
457     * You can disable this by using the ignoreBadCorrelationKeys option.
458     */
459    public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
460        this.correlationExpression = correlationExpression;
461    }
462
463    public ExpressionSubElementDefinition getCorrelationExpression() {
464        return correlationExpression;
465    }
466
467    public Integer getCompletionSize() {
468        return completionSize;
469    }
470
471    public void setCompletionSize(Integer completionSize) {
472        this.completionSize = completionSize;
473    }
474
475    public OptimisticLockRetryPolicyDefinition getOptimisticLockRetryPolicyDefinition() {
476        return optimisticLockRetryPolicyDefinition;
477    }
478
479    public void setOptimisticLockRetryPolicyDefinition(OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition) {
480        this.optimisticLockRetryPolicyDefinition = optimisticLockRetryPolicyDefinition;
481    }
482
483    public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() {
484        return optimisticLockRetryPolicy;
485    }
486
487    public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) {
488        this.optimisticLockRetryPolicy = optimisticLockRetryPolicy;
489    }
490
491    public Long getCompletionInterval() {
492        return completionInterval;
493    }
494
495    public void setCompletionInterval(Long completionInterval) {
496        this.completionInterval = completionInterval;
497    }
498
499    public Long getCompletionTimeout() {
500        return completionTimeout;
501    }
502
503    public void setCompletionTimeout(Long completionTimeout) {
504        this.completionTimeout = completionTimeout;
505    }
506
507    public Long getCompletionTimeoutCheckerInterval() {
508        return completionTimeoutCheckerInterval;
509    }
510
511    public void setCompletionTimeoutCheckerInterval(Long completionTimeoutCheckerInterval) {
512        this.completionTimeoutCheckerInterval = completionTimeoutCheckerInterval;
513    }
514
515    public ExpressionSubElementDefinition getCompletionPredicate() {
516        return completionPredicate;
517    }
518
519    public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) {
520        this.completionPredicate = completionPredicate;
521    }
522
523    public ExpressionSubElementDefinition getCompletionTimeoutExpression() {
524        return completionTimeoutExpression;
525    }
526
527    public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) {
528        this.completionTimeoutExpression = completionTimeoutExpression;
529    }
530
531    public ExpressionSubElementDefinition getCompletionSizeExpression() {
532        return completionSizeExpression;
533    }
534
535    public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) {
536        this.completionSizeExpression = completionSizeExpression;
537    }
538
539    public Boolean getGroupExchanges() {
540        return groupExchanges;
541    }
542
543    public void setGroupExchanges(Boolean groupExchanges) {
544        this.groupExchanges = groupExchanges;
545    }
546
547    public Boolean getCompletionFromBatchConsumer() {
548        return completionFromBatchConsumer;
549    }
550
551    public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) {
552        this.completionFromBatchConsumer = completionFromBatchConsumer;
553    }
554
555    public Boolean getCompletionOnNewCorrelationGroup() {
556        return completionOnNewCorrelationGroup;
557    }
558
559    public void setCompletionOnNewCorrelationGroup(Boolean completionOnNewCorrelationGroup) {
560        this.completionOnNewCorrelationGroup = completionOnNewCorrelationGroup;
561    }
562
563    public ExecutorService getExecutorService() {
564        return executorService;
565    }
566
567    public void setExecutorService(ExecutorService executorService) {
568        this.executorService = executorService;
569    }
570
571    public Boolean getOptimisticLocking() {
572        return optimisticLocking;
573    }
574
575    public void setOptimisticLocking(boolean optimisticLocking) {
576        this.optimisticLocking = optimisticLocking;
577    }
578
579    public Boolean getParallelProcessing() {
580        return parallelProcessing;
581    }
582
583    public void setParallelProcessing(boolean parallelProcessing) {
584        this.parallelProcessing = parallelProcessing;
585    }
586
587    public String getExecutorServiceRef() {
588        return executorServiceRef;
589    }
590
591    public void setExecutorServiceRef(String executorServiceRef) {
592        this.executorServiceRef = executorServiceRef;
593    }
594
595    public Boolean getEagerCheckCompletion() {
596        return eagerCheckCompletion;
597    }
598
599    public void setEagerCheckCompletion(Boolean eagerCheckCompletion) {
600        this.eagerCheckCompletion = eagerCheckCompletion;
601    }
602
603    public Boolean getIgnoreInvalidCorrelationKeys() {
604        return ignoreInvalidCorrelationKeys;
605    }
606
607    public void setIgnoreInvalidCorrelationKeys(Boolean ignoreInvalidCorrelationKeys) {
608        this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys;
609    }
610
611    public Integer getCloseCorrelationKeyOnCompletion() {
612        return closeCorrelationKeyOnCompletion;
613    }
614
615    public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
616        this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
617    }
618
619    public AggregationRepository getAggregationRepository() {
620        return aggregationRepository;
621    }
622
623    public void setAggregationRepository(AggregationRepository aggregationRepository) {
624        this.aggregationRepository = aggregationRepository;
625    }
626
627    public String getAggregationRepositoryRef() {
628        return aggregationRepositoryRef;
629    }
630
631    public void setAggregationRepositoryRef(String aggregationRepositoryRef) {
632        this.aggregationRepositoryRef = aggregationRepositoryRef;
633    }
634
635    public Boolean getDiscardOnCompletionTimeout() {
636        return discardOnCompletionTimeout;
637    }
638
639    public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) {
640        this.discardOnCompletionTimeout = discardOnCompletionTimeout;
641    }
642    
643    public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
644        this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
645    }
646
647    public ScheduledExecutorService getTimeoutCheckerExecutorService() {
648        return timeoutCheckerExecutorService;
649    }
650
651    public void setTimeoutCheckerExecutorServiceRef(String timeoutCheckerExecutorServiceRef) {
652        this.timeoutCheckerExecutorServiceRef = timeoutCheckerExecutorServiceRef;
653    }
654
655    public String getTimeoutCheckerExecutorServiceRef() {
656        return timeoutCheckerExecutorServiceRef;
657    }
658
659    public Boolean getForceCompletionOnStop() {
660        return forceCompletionOnStop;
661    }
662
663    public void setForceCompletionOnStop(Boolean forceCompletionOnStop) {
664        this.forceCompletionOnStop = forceCompletionOnStop;
665    }
666
667    public Boolean getCompleteAllOnStop() {
668        return completeAllOnStop;
669    }
670
671    public void setCompleteAllOnStop(Boolean completeAllOnStop) {
672        this.completeAllOnStop = completeAllOnStop;
673    }
674
675    public AggregateController getAggregateController() {
676        return aggregateController;
677    }
678
679    public void setAggregateController(AggregateController aggregateController) {
680        this.aggregateController = aggregateController;
681    }
682
683    public String getAggregateControllerRef() {
684        return aggregateControllerRef;
685    }
686
687    /**
688     * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control
689     * this aggregator.
690     */
691    public void setAggregateControllerRef(String aggregateControllerRef) {
692        this.aggregateControllerRef = aggregateControllerRef;
693    }
694
695    // Fluent API
696    //-------------------------------------------------------------------------
697
698    /**
699     * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange.
700     * As opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange.
701     *
702     * @return builder
703     */
704    public AggregateDefinition eagerCheckCompletion() {
705        setEagerCheckCompletion(true);
706        return this;
707    }
708
709    /**
710     * If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just
711     * ignore the incoming Exchange.
712     *
713     * @return builder
714     */
715    public AggregateDefinition ignoreInvalidCorrelationKeys() {
716        setIgnoreInvalidCorrelationKeys(true);
717        return this;
718    }
719
720    /**
721     * Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key
722     * that has been closed, it will be defined and a {@link ClosedCorrelationKeyException}
723     * is thrown.
724     *
725     * @param capacity the maximum capacity of the closed correlation key cache.
726     *                 Use <tt>0</tt> or negative value for unbounded capacity.
727     * @return builder
728     */
729    public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) {
730        setCloseCorrelationKeyOnCompletion(capacity);
731        return this;
732    }
733
734    /**
735     * Discards the aggregated message on completion timeout.
736     * <p/>
737     * This means on timeout the aggregated message is dropped and not sent out of the aggregator.
738     *
739     * @return builder
740     */
741    public AggregateDefinition discardOnCompletionTimeout() {
742        setDiscardOnCompletionTimeout(true);
743        return this;
744    }
745
746    /**
747     * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
748     * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
749     * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
750     *
751     * @return builder
752     */
753    public AggregateDefinition completionFromBatchConsumer() {
754        setCompletionFromBatchConsumer(true);
755        return this;
756    }
757
758    /**
759     * Enables completion on all previous groups when a new incoming correlation group. This can for example be used
760     * to complete groups with same correlation keys when they are in consecutive order.
761     * Notice when this is enabled then only 1 correlation group can be in progress as when a new correlation group
762     * starts, then the previous groups is forced completed.
763     *
764     * @return builder
765     */
766    public AggregateDefinition completionOnNewCorrelationGroup() {
767        setCompletionOnNewCorrelationGroup(true);
768        return this;
769    }
770
771    /**
772     * Number of messages aggregated before the aggregation is complete. This option can be set as either
773     * a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result.
774     * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
775     *
776     * @param completionSize  the completion size, must be a positive number
777     * @return builder
778     */
779    public AggregateDefinition completionSize(int completionSize) {
780        setCompletionSize(completionSize);
781        return this;
782    }
783
784    /**
785     * Number of messages aggregated before the aggregation is complete. This option can be set as either
786     * a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result.
787     * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
788     *
789     * @param completionSize  the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type
790     * @return builder
791     */
792    public AggregateDefinition completionSize(Expression completionSize) {
793        setCompletionSizeExpression(new ExpressionSubElementDefinition(completionSize));
794        return this;
795    }
796
797    /**
798     * A repeating period in millis by which the aggregator will complete all current aggregated exchanges.
799     * Camel has a background task which is triggered every period. You cannot use this option together
800     * with completionTimeout, only one of them can be used.
801     *
802     * @param completionInterval  the interval in millis, must be a positive value
803     * @return the builder
804     */
805    public AggregateDefinition completionInterval(long completionInterval) {
806        setCompletionInterval(completionInterval);
807        return this;
808    }
809
810    /**
811     * Time in millis that an aggregated exchange should be inactive before its complete (timeout).
812     * This option can be set as either a fixed value or using an Expression which allows you to evaluate
813     * a timeout dynamically - will use Long as result.
814     * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
815     * You cannot use this option together with completionInterval, only one of the two can be used.
816     * <p/>
817     * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option
818     * to configure how frequently to run the checker.
819     * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value.
820     * It is not recommended to use very low timeout values or checker intervals.
821     *
822     * @param completionTimeout  the timeout in millis, must be a positive value
823     * @return the builder
824     */
825    public AggregateDefinition completionTimeout(long completionTimeout) {
826        setCompletionTimeout(completionTimeout);
827        return this;
828    }
829
830    /**
831     * Time in millis that an aggregated exchange should be inactive before its complete (timeout).
832     * This option can be set as either a fixed value or using an Expression which allows you to evaluate
833     * a timeout dynamically - will use Long as result.
834     * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
835     * You cannot use this option together with completionInterval, only one of the two can be used.
836     * <p/>
837     * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option
838     * to configure how frequently to run the checker.
839     * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value.
840     * It is not recommended to use very low timeout values or checker intervals.
841     *
842     * @param completionTimeout  the timeout as an {@link Expression} which is evaluated as a {@link Long} type
843     * @return the builder
844     */
845    public AggregateDefinition completionTimeout(Expression completionTimeout) {
846        setCompletionTimeoutExpression(new ExpressionSubElementDefinition(completionTimeout));
847        return this;
848    }
849
850    /**
851     * Interval in millis that is used by the background task that checks for timeouts ({@link org.apache.camel.TimeoutMap}).
852     * <p/>
853     * By default the timeout checker runs every second.
854     * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value.
855     * It is not recommended to use very low timeout values or checker intervals.
856     *
857     * @param completionTimeoutCheckerInterval  the interval in millis, must be a positive value
858     * @return the builder
859     */
860    public AggregateDefinition completionTimeoutCheckerInterval(long completionTimeoutCheckerInterval) {
861        setCompletionTimeoutCheckerInterval(completionTimeoutCheckerInterval);
862        return this;
863    }
864
865    /**
866     * Sets the AggregationStrategy to use with a fluent builder.
867     */
868    public AggregationStrategyClause<AggregateDefinition> aggregationStrategy() {
869        AggregationStrategyClause<AggregateDefinition> clause = new AggregationStrategyClause<>(this);
870        setAggregationStrategy(clause);
871        return clause;
872    }
873
874    /**
875     * Sets the AggregationStrategy to use with a fluent builder.
876     */
877    public AggregationStrategyClause<AggregateDefinition> strategy() {
878        return aggregationStrategy();
879    }
880
881    /**
882     * Sets the aggregate strategy to use
883     *
884     * @param aggregationStrategy  the aggregate strategy to use
885     * @return the builder
886     */
887    public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) {
888        return aggregationStrategy(aggregationStrategy);
889    }
890
891    /**
892     * Sets the aggregate strategy to use
893     *
894     * @param aggregationStrategy  the aggregate strategy to use
895     * @return the builder
896     */
897    public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
898        setAggregationStrategy(aggregationStrategy);
899        return this;
900    }
901
902    /**
903     * Sets the aggregate strategy to use
904     *
905     * @param aggregationStrategyRef  reference to the strategy to lookup in the registry
906     * @return the builder
907     */
908    public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) {
909        setAggregationStrategyRef(aggregationStrategyRef);
910        return this;
911    }
912
913    /**
914     * Sets the method name to use when using a POJO as {@link AggregationStrategy}.
915     *
916     * @param  methodName the method name to call
917     * @return the builder
918     */
919    public AggregateDefinition aggregationStrategyMethodName(String methodName) {
920        setAggregationStrategyMethodName(methodName);
921        return this;
922    }
923
924    /**
925     * Sets allowing null when using a POJO as {@link AggregationStrategy}.
926     *
927     * @return the builder
928     */
929    public AggregateDefinition aggregationStrategyMethodAllowNull() {
930        setStrategyMethodAllowNull(true);
931        return this;
932    }
933
934    /**
935     * Sets the custom aggregate repository to use.
936     * <p/>
937     * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
938     *
939     * @param aggregationRepository  the aggregate repository to use
940     * @return the builder
941     */
942    public AggregateDefinition aggregationRepository(AggregationRepository aggregationRepository) {
943        setAggregationRepository(aggregationRepository);
944        return this;
945    }
946
947    /**
948     * Sets the custom aggregate repository to use
949     * <p/>
950     * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
951     *
952     * @param aggregationRepositoryRef  reference to the repository to lookup in the registry
953     * @return the builder
954     */
955    public AggregateDefinition aggregationRepositoryRef(String aggregationRepositoryRef) {
956        setAggregationRepositoryRef(aggregationRepositoryRef);
957        return this;
958    }
959
960    /**
961     * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single
962     * combined Exchange holding all the aggregated exchanges in a {@link java.util.List}.
963     *
964     * @deprecated use {@link GroupedExchangeAggregationStrategy} as aggregation strategy instead.
965     */
966    @Deprecated
967    public AggregateDefinition groupExchanges() {
968        setGroupExchanges(true);
969        // must use eager check when using grouped exchanges
970        setEagerCheckCompletion(true);
971        return this;
972    }
973
974    /**
975     * A Predicate to indicate when an aggregated exchange is complete.
976     * If this is not specified and the AggregationStrategy object implements Predicate,
977     * the aggregationStrategy object will be used as the completionPredicate.
978     */
979    public AggregateDefinition completionPredicate(@AsPredicate Predicate predicate) {
980        checkNoCompletedPredicate();
981        setCompletionPredicate(new ExpressionSubElementDefinition(predicate));
982        return this;
983    }
984
985    /**
986     * A Predicate to indicate when an aggregated exchange is complete.
987     * If this is not specified and the AggregationStrategy object implements Predicate,
988     * the aggregationStrategy object will be used as the completionPredicate.
989     */
990    @AsPredicate
991    public PredicateClause<AggregateDefinition> completionPredicate() {
992        PredicateClause<AggregateDefinition> clause = new PredicateClause<>(this);
993        completionPredicate(clause);
994        return clause;
995    }
996
997    /**
998     * A Predicate to indicate when an aggregated exchange is complete.
999     * If this is not specified and the AggregationStrategy object implements Predicate,
1000     * the aggregationStrategy object will be used as the completionPredicate.
1001     */
1002    @AsPredicate
1003    public PredicateClause<AggregateDefinition> completion() {
1004        return completionPredicate();
1005    }
1006
1007    /**
1008     * A Predicate to indicate when an aggregated exchange is complete.
1009     * If this is not specified and the AggregationStrategy object implements Predicate,
1010     * the aggregationStrategy object will be used as the completionPredicate.
1011     */
1012    public AggregateDefinition completion(@AsPredicate Predicate predicate) {
1013        return completionPredicate(predicate);
1014    }
1015
1016    /**
1017     * Indicates to complete all current aggregated exchanges when the context is stopped
1018     */
1019    public AggregateDefinition forceCompletionOnStop() {
1020        setForceCompletionOnStop(true);
1021        return this;
1022    }
1023
1024    /**
1025     * Indicates to wait to complete all current and partial (pending) aggregated exchanges when the context is stopped.
1026     * <p/>
1027     * This also means that we will wait for all pending exchanges which are stored in the aggregation repository
1028     * to complete so the repository is empty before we can stop.
1029     * <p/>
1030     * You may want to enable this when using the memory based aggregation repository that is memory based only,
1031     * and do not store data on disk. When this option is enabled, then the aggregator is waiting to complete
1032     * all those exchanges before its stopped, when stopping CamelContext or the route using it.
1033     */
1034    public AggregateDefinition completeAllOnStop() {
1035        setCompleteAllOnStop(true);
1036        return this;
1037    }
1038
1039    /**
1040     * When aggregated are completed they are being send out of the aggregator.
1041     * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency.
1042     * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads.
1043     */
1044    public AggregateDefinition parallelProcessing() {
1045        setParallelProcessing(true);
1046        return this;
1047    }
1048
1049    /**
1050     * When aggregated are completed they are being send out of the aggregator.
1051     * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency.
1052     * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads.
1053     */
1054    public AggregateDefinition parallelProcessing(boolean parallelProcessing) {
1055        setParallelProcessing(parallelProcessing);
1056        return this;
1057    }
1058
1059    /**
1060     * Turns on using optimistic locking, which requires the aggregationRepository being used,
1061     * is supporting this by implementing {@link org.apache.camel.spi.OptimisticLockingAggregationRepository}.
1062     */
1063    public AggregateDefinition optimisticLocking() {
1064        setOptimisticLocking(true);
1065        return this;
1066    }
1067
1068    /**
1069     * Allows to configure retry settings when using optimistic locking.
1070     */
1071    public AggregateDefinition optimisticLockRetryPolicy(OptimisticLockRetryPolicy policy) {
1072        setOptimisticLockRetryPolicy(policy);
1073        return this;
1074    }
1075
1076    /**
1077     * If using parallelProcessing you can specify a custom thread pool to be used.
1078     * In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well.
1079     */
1080    public AggregateDefinition executorService(ExecutorService executorService) {
1081        setExecutorService(executorService);
1082        return this;
1083    }
1084
1085    /**
1086     * If using parallelProcessing you can specify a custom thread pool to be used.
1087     * In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well.
1088     */
1089    public AggregateDefinition executorServiceRef(String executorServiceRef) {
1090        setExecutorServiceRef(executorServiceRef);
1091        return this;
1092    }
1093
1094    /**
1095     * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a
1096     * background thread is created to check for the completion for every aggregator.
1097     * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator.
1098     */
1099    public AggregateDefinition timeoutCheckerExecutorService(ScheduledExecutorService executorService) {
1100        setTimeoutCheckerExecutorService(executorService);
1101        return this;
1102    }
1103
1104    /**
1105     * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a
1106     * background thread is created to check for the completion for every aggregator.
1107     * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator.
1108     */
1109    public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) {
1110        setTimeoutCheckerExecutorServiceRef(executorServiceRef);
1111        return this;
1112    }
1113
1114    /**
1115     * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control
1116     * this aggregator.
1117     */
1118    public AggregateDefinition aggregateController(AggregateController aggregateController) {
1119        setAggregateController(aggregateController);
1120        return this;
1121    }
1122
1123    // Section - Methods from ExpressionNode
1124    // Needed to copy methods from ExpressionNode here so that I could specify the
1125    // correlation expression as optional in JAXB
1126
1127    public ExpressionDefinition getExpression() {
1128        if (expression == null && correlationExpression != null) {
1129            expression = correlationExpression.getExpressionType();            
1130        }
1131        return expression;
1132    }
1133
1134    public void setExpression(ExpressionDefinition expression) {
1135        this.expression = expression;
1136    }
1137
1138    protected void checkNoCompletedPredicate() {
1139        if (getCompletionPredicate() != null) {
1140            throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this);
1141        }
1142    }
1143
1144    @Override
1145    public List<ProcessorDefinition<?>> getOutputs() {
1146        return outputs;
1147    }
1148
1149    public boolean isOutputSupported() {
1150        return true;
1151    }
1152
1153    public void setOutputs(List<ProcessorDefinition<?>> outputs) {
1154        this.outputs = outputs;
1155    }
1156
1157}