001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.ExecutorService;
022import java.util.concurrent.ScheduledExecutorService;
023import javax.xml.bind.annotation.XmlAccessType;
024import javax.xml.bind.annotation.XmlAccessorType;
025import javax.xml.bind.annotation.XmlAttribute;
026import javax.xml.bind.annotation.XmlElement;
027import javax.xml.bind.annotation.XmlElementRef;
028import javax.xml.bind.annotation.XmlRootElement;
029import javax.xml.bind.annotation.XmlTransient;
030
031import org.apache.camel.CamelContextAware;
032import org.apache.camel.Expression;
033import org.apache.camel.Predicate;
034import org.apache.camel.Processor;
035import org.apache.camel.builder.AggregationStrategyClause;
036import org.apache.camel.builder.ExpressionClause;
037import org.apache.camel.builder.PredicateClause;
038import org.apache.camel.model.language.ExpressionDefinition;
039import org.apache.camel.processor.CamelInternalProcessor;
040import org.apache.camel.processor.aggregate.AggregateController;
041import org.apache.camel.processor.aggregate.AggregateProcessor;
042import org.apache.camel.processor.aggregate.AggregationStrategy;
043import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
044import org.apache.camel.processor.aggregate.ClosedCorrelationKeyException;
045import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
046import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
047import org.apache.camel.spi.AggregationRepository;
048import org.apache.camel.spi.AsPredicate;
049import org.apache.camel.spi.Metadata;
050import org.apache.camel.spi.RouteContext;
051import org.apache.camel.util.concurrent.SynchronousExecutorService;
052
053/**
054 * Aggregates many messages into a single message
055 *
056 * @version 
057 */
058@Metadata(label = "eip,routing")
059@XmlRootElement(name = "aggregate")
060@XmlAccessorType(XmlAccessType.FIELD)
061public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> {
062    @XmlElement(name = "correlationExpression", required = true)
063    private ExpressionSubElementDefinition correlationExpression;
064    @XmlElement(name = "completionPredicate") @AsPredicate
065    private ExpressionSubElementDefinition completionPredicate;
066    @XmlElement(name = "completionTimeout")
067    private ExpressionSubElementDefinition completionTimeoutExpression;
068    @XmlElement(name = "completionSize")
069    private ExpressionSubElementDefinition completionSizeExpression;
070    @XmlElement(name = "optimisticLockRetryPolicy")
071    private OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition;
072    @XmlTransient
073    private ExpressionDefinition expression;
074    @XmlTransient
075    private AggregationStrategy aggregationStrategy;
076    @XmlTransient
077    private ExecutorService executorService;
078    @XmlTransient
079    private ScheduledExecutorService timeoutCheckerExecutorService;
080    @XmlTransient
081    private AggregationRepository aggregationRepository;
082    @XmlTransient
083    private OptimisticLockRetryPolicy optimisticLockRetryPolicy;
084    @XmlAttribute
085    private Boolean parallelProcessing;
086    @XmlAttribute
087    private Boolean optimisticLocking;
088    @XmlAttribute
089    private String executorServiceRef;
090    @XmlAttribute
091    private String timeoutCheckerExecutorServiceRef;
092    @XmlAttribute
093    private String aggregationRepositoryRef;
094    @XmlAttribute
095    private String strategyRef;
096    @XmlAttribute
097    private String strategyMethodName;
098    @XmlAttribute
099    private Boolean strategyMethodAllowNull;
100    @XmlAttribute
101    private Integer completionSize;
102    @XmlAttribute
103    private Long completionInterval;
104    @XmlAttribute
105    private Long completionTimeout;
106    @XmlAttribute @Metadata(defaultValue = "1000")
107    private Long completionTimeoutCheckerInterval = 1000L;
108    @XmlAttribute
109    private Boolean completionFromBatchConsumer;
110    @XmlAttribute
111    private Boolean completionOnNewCorrelationGroup;
112    @XmlAttribute
113    @Deprecated
114    private Boolean groupExchanges;
115    @XmlAttribute
116    private Boolean eagerCheckCompletion;
117    @XmlAttribute
118    private Boolean ignoreInvalidCorrelationKeys;
119    @XmlAttribute
120    private Integer closeCorrelationKeyOnCompletion;
121    @XmlAttribute
122    private Boolean discardOnCompletionTimeout;
123    @XmlAttribute
124    private Boolean forceCompletionOnStop;
125    @XmlAttribute
126    private Boolean completeAllOnStop;
127    @XmlTransient
128    private AggregateController aggregateController;
129    @XmlAttribute
130    private String aggregateControllerRef;
131    @XmlElementRef
132    private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
133
134    public AggregateDefinition() {
135    }
136
137    public AggregateDefinition(@AsPredicate Predicate predicate) {
138        this(ExpressionNodeHelper.toExpressionDefinition(predicate));
139    }
140    
141    public AggregateDefinition(Expression expression) {
142        this(ExpressionNodeHelper.toExpressionDefinition(expression));
143    }
144
145    public AggregateDefinition(ExpressionDefinition correlationExpression) {
146        setExpression(correlationExpression);
147
148        ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition();
149        cor.setExpressionType(correlationExpression);
150        setCorrelationExpression(cor);
151    }
152
153    public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
154        this(correlationExpression);
155        this.aggregationStrategy = aggregationStrategy;
156    }
157
158    @Override
159    public String toString() {
160        return "Aggregate[" + description() + " -> " + getOutputs() + "]";
161    }
162    
163    protected String description() {
164        return getExpression() != null ? getExpression().getLabel() : "";
165    }
166
167    @Override
168    public String getLabel() {
169        return "aggregate[" + description() + "]";
170    }
171
172    @Override
173    public Processor createProcessor(RouteContext routeContext) throws Exception {
174        return createAggregator(routeContext);
175    }
176
177    protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception {
178        Processor childProcessor = this.createChildProcessor(routeContext, true);
179
180        // wrap the aggregate route in a unit of work processor
181        CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor);
182        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
183
184        Expression correlation = getExpression().createExpression(routeContext);
185        AggregationStrategy strategy = createAggregationStrategy(routeContext);
186
187        boolean parallel = getParallelProcessing() != null && getParallelProcessing();
188        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, parallel);
189        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, parallel);
190        if (threadPool == null && !parallel) {
191            // executor service is mandatory for the Aggregator
192            // we do not run in parallel mode, but use a synchronous executor, so we run in current thread
193            threadPool = new SynchronousExecutorService();
194            shutdownThreadPool = true;
195        }
196
197        AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), internal,
198                correlation, strategy, threadPool, shutdownThreadPool);
199
200        AggregationRepository repository = createAggregationRepository(routeContext);
201        if (repository != null) {
202            answer.setAggregationRepository(repository);
203        }
204
205        if (getAggregateController() == null && getAggregateControllerRef() != null) {
206            setAggregateController(routeContext.mandatoryLookup(getAggregateControllerRef(), AggregateController.class));
207        }
208
209        // this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool
210        boolean shutdownTimeoutThreadPool = false;
211        ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService;
212        if (timeoutThreadPool == null && timeoutCheckerExecutorServiceRef != null) {
213            // lookup existing thread pool
214            timeoutThreadPool = routeContext.getCamelContext().getRegistry().lookupByNameAndType(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class);
215            if (timeoutThreadPool == null) {
216                // then create a thread pool assuming the ref is a thread pool profile id
217                timeoutThreadPool = routeContext.getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
218                        AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER, timeoutCheckerExecutorServiceRef);
219                if (timeoutThreadPool == null) {
220                    throw new IllegalArgumentException("ExecutorServiceRef " + timeoutCheckerExecutorServiceRef 
221                            + " not found in registry (as an ScheduledExecutorService instance) or as a thread pool profile.");
222                }
223                shutdownTimeoutThreadPool = true;
224            }
225        }
226        answer.setTimeoutCheckerExecutorService(timeoutThreadPool);
227        answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool);
228
229        // set other options
230        answer.setParallelProcessing(parallel);
231        if (getOptimisticLocking() != null) {
232            answer.setOptimisticLocking(getOptimisticLocking());
233        }
234        if (getCompletionPredicate() != null) {
235            Predicate predicate = getCompletionPredicate().createPredicate(routeContext);
236            answer.setCompletionPredicate(predicate);
237        } else if (strategy instanceof Predicate) {
238            // if aggregation strategy implements predicate and was not configured then use as fallback
239            log.debug("Using AggregationStrategy as completion predicate: {}", strategy);
240            answer.setCompletionPredicate((Predicate) strategy);
241        }
242        if (getCompletionTimeoutExpression() != null) {
243            Expression expression = getCompletionTimeoutExpression().createExpression(routeContext);
244            answer.setCompletionTimeoutExpression(expression);
245        }
246        if (getCompletionTimeout() != null) {
247            answer.setCompletionTimeout(getCompletionTimeout());
248        }
249        if (getCompletionInterval() != null) {
250            answer.setCompletionInterval(getCompletionInterval());
251        }
252        if (getCompletionSizeExpression() != null) {
253            Expression expression = getCompletionSizeExpression().createExpression(routeContext);
254            answer.setCompletionSizeExpression(expression);
255        }
256        if (getCompletionSize() != null) {
257            answer.setCompletionSize(getCompletionSize());
258        }
259        if (getCompletionFromBatchConsumer() != null) {
260            answer.setCompletionFromBatchConsumer(getCompletionFromBatchConsumer());
261        }
262        if (getCompletionOnNewCorrelationGroup() != null) {
263            answer.setCompletionOnNewCorrelationGroup(getCompletionOnNewCorrelationGroup());
264        }
265        if (getEagerCheckCompletion() != null) {
266            answer.setEagerCheckCompletion(getEagerCheckCompletion());
267        }
268        if (getIgnoreInvalidCorrelationKeys() != null) {
269            answer.setIgnoreInvalidCorrelationKeys(getIgnoreInvalidCorrelationKeys());
270        }
271        if (getCloseCorrelationKeyOnCompletion() != null) {
272            answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion());
273        }
274        if (getDiscardOnCompletionTimeout() != null) {
275            answer.setDiscardOnCompletionTimeout(getDiscardOnCompletionTimeout());
276        }
277        if (getForceCompletionOnStop() != null) {
278            answer.setForceCompletionOnStop(getForceCompletionOnStop());
279        }
280        if (getCompleteAllOnStop() != null) {
281            answer.setCompleteAllOnStop(getCompleteAllOnStop());
282        }
283        if (optimisticLockRetryPolicy == null) {
284            if (getOptimisticLockRetryPolicyDefinition() != null) {
285                answer.setOptimisticLockRetryPolicy(getOptimisticLockRetryPolicyDefinition().createOptimisticLockRetryPolicy());
286            }
287        } else {
288            answer.setOptimisticLockRetryPolicy(optimisticLockRetryPolicy);
289        }
290        if (getAggregateController() != null) {
291            answer.setAggregateController(getAggregateController());
292        }
293        if (getCompletionTimeoutCheckerInterval() != null) {
294            answer.setCompletionTimeoutCheckerInterval(getCompletionTimeoutCheckerInterval());
295        }
296        return answer;
297    }
298
299    @Override
300    public void configureChild(ProcessorDefinition<?> output) {
301        if (expression instanceof ExpressionClause) {
302            ExpressionClause<?> clause = (ExpressionClause<?>) expression;
303            if (clause.getExpressionType() != null) {
304                // if using the Java DSL then the expression may have been set using the
305                // ExpressionClause which is a fancy builder to define expressions and predicates
306                // using fluent builders in the DSL. However we need afterwards a callback to
307                // reset the expression to the expression type the ExpressionClause did build for us
308                expression = clause.getExpressionType();
309                // set the correlation expression from the expression type, as the model definition
310                // would then be accurate
311                correlationExpression = new ExpressionSubElementDefinition();
312                correlationExpression.setExpressionType(clause.getExpressionType());
313            }
314        }
315    }
316
317    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
318        AggregationStrategy strategy = getAggregationStrategy();
319        if (strategy == null && strategyRef != null) {
320            Object aggStrategy = routeContext.lookup(strategyRef, Object.class);
321            if (aggStrategy instanceof AggregationStrategy) {
322                strategy = (AggregationStrategy) aggStrategy;
323            } else if (aggStrategy != null) {
324                AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName());
325                if (getStrategyMethodAllowNull() != null) {
326                    adapter.setAllowNullNewExchange(getStrategyMethodAllowNull());
327                    adapter.setAllowNullOldExchange(getStrategyMethodAllowNull());
328                }
329                strategy = adapter;
330            } else {
331                throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
332            }
333        }
334
335        if (groupExchanges != null && groupExchanges) {
336            if (strategy != null || strategyRef != null) {
337                throw new IllegalArgumentException("Options groupExchanges and AggregationStrategy cannot be enabled at the same time");
338            }
339            if (eagerCheckCompletion != null && !eagerCheckCompletion) {
340                throw new IllegalArgumentException("Option eagerCheckCompletion cannot be false when groupExchanges has been enabled");
341            }
342            // set eager check to enabled by default when using grouped exchanges
343            setEagerCheckCompletion(true);
344            // if grouped exchange is enabled then use special strategy for that
345            strategy = new GroupedExchangeAggregationStrategy();
346        }
347
348        if (strategy == null) {
349            throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this);
350        }
351
352        if (strategy instanceof CamelContextAware) {
353            ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
354        }
355
356        return strategy;
357    }
358
359    private AggregationRepository createAggregationRepository(RouteContext routeContext) {
360        AggregationRepository repository = getAggregationRepository();
361        if (repository == null && aggregationRepositoryRef != null) {
362            repository = routeContext.mandatoryLookup(aggregationRepositoryRef, AggregationRepository.class);
363        }
364        return repository;
365    }
366
367    public AggregationStrategy getAggregationStrategy() {
368        return aggregationStrategy;
369    }
370
371    /**
372     * The AggregationStrategy to use.
373     * <p/>
374     * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges.
375     * At first call the oldExchange parameter is null.
376     * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange.
377     */
378    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
379        this.aggregationStrategy = aggregationStrategy;
380    }
381
382    public String getAggregationStrategyRef() {
383        return strategyRef;
384    }
385
386    /**
387     * A reference to lookup the AggregationStrategy in the Registry.
388     * <p/>
389     * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges.
390     * At first call the oldExchange parameter is null.
391     * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange.
392     */
393    public void setAggregationStrategyRef(String aggregationStrategyRef) {
394        this.strategyRef = aggregationStrategyRef;
395    }
396
397    public String getStrategyRef() {
398        return strategyRef;
399    }
400
401    /**
402     * A reference to lookup the AggregationStrategy in the Registry.
403     * <p/>
404     * Configuring an AggregationStrategy is required, and is used to merge the incoming Exchange with the existing already merged exchanges.
405     * At first call the oldExchange parameter is null.
406     * On subsequent invocations the oldExchange contains the merged exchanges and newExchange is of course the new incoming Exchange.
407     */
408    public void setStrategyRef(String strategyRef) {
409        this.strategyRef = strategyRef;
410    }
411
412    public String getAggregationStrategyMethodName() {
413        return strategyMethodName;
414    }
415
416    /**
417     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
418     */
419    public void setAggregationStrategyMethodName(String strategyMethodName) {
420        this.strategyMethodName = strategyMethodName;
421    }
422
423    public Boolean getStrategyMethodAllowNull() {
424        return strategyMethodAllowNull;
425    }
426
427    public String getStrategyMethodName() {
428        return strategyMethodName;
429    }
430
431    /**
432     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
433     */
434    public void setStrategyMethodName(String strategyMethodName) {
435        this.strategyMethodName = strategyMethodName;
436    }
437
438    /**
439     * If this option is false then the aggregate method is not used for the very first aggregation.
440     * If this option is true then null values is used as the oldExchange (at the very first aggregation),
441     * when using POJOs as the AggregationStrategy.
442     */
443    public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) {
444        this.strategyMethodAllowNull = strategyMethodAllowNull;
445    }
446
447    /**
448     * The expression used to calculate the correlation key to use for aggregation.
449     * The Exchange which has the same correlation key is aggregated together.
450     * If the correlation key could not be evaluated an Exception is thrown.
451     * You can disable this by using the ignoreBadCorrelationKeys option.
452     */
453    public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
454        this.correlationExpression = correlationExpression;
455    }
456
457    public ExpressionSubElementDefinition getCorrelationExpression() {
458        return correlationExpression;
459    }
460
461    public Integer getCompletionSize() {
462        return completionSize;
463    }
464
465    public void setCompletionSize(Integer completionSize) {
466        this.completionSize = completionSize;
467    }
468
469    public OptimisticLockRetryPolicyDefinition getOptimisticLockRetryPolicyDefinition() {
470        return optimisticLockRetryPolicyDefinition;
471    }
472
473    public void setOptimisticLockRetryPolicyDefinition(OptimisticLockRetryPolicyDefinition optimisticLockRetryPolicyDefinition) {
474        this.optimisticLockRetryPolicyDefinition = optimisticLockRetryPolicyDefinition;
475    }
476
477    public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() {
478        return optimisticLockRetryPolicy;
479    }
480
481    public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) {
482        this.optimisticLockRetryPolicy = optimisticLockRetryPolicy;
483    }
484
485    public Long getCompletionInterval() {
486        return completionInterval;
487    }
488
489    public void setCompletionInterval(Long completionInterval) {
490        this.completionInterval = completionInterval;
491    }
492
493    public Long getCompletionTimeout() {
494        return completionTimeout;
495    }
496
497    public void setCompletionTimeout(Long completionTimeout) {
498        this.completionTimeout = completionTimeout;
499    }
500
501    public Long getCompletionTimeoutCheckerInterval() {
502        return completionTimeoutCheckerInterval;
503    }
504
505    public void setCompletionTimeoutCheckerInterval(Long completionTimeoutCheckerInterval) {
506        this.completionTimeoutCheckerInterval = completionTimeoutCheckerInterval;
507    }
508
509    public ExpressionSubElementDefinition getCompletionPredicate() {
510        return completionPredicate;
511    }
512
513    public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) {
514        this.completionPredicate = completionPredicate;
515    }
516
517    public ExpressionSubElementDefinition getCompletionTimeoutExpression() {
518        return completionTimeoutExpression;
519    }
520
521    public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) {
522        this.completionTimeoutExpression = completionTimeoutExpression;
523    }
524
525    public ExpressionSubElementDefinition getCompletionSizeExpression() {
526        return completionSizeExpression;
527    }
528
529    public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) {
530        this.completionSizeExpression = completionSizeExpression;
531    }
532
533    public Boolean getGroupExchanges() {
534        return groupExchanges;
535    }
536
537    public void setGroupExchanges(Boolean groupExchanges) {
538        this.groupExchanges = groupExchanges;
539    }
540
541    public Boolean getCompletionFromBatchConsumer() {
542        return completionFromBatchConsumer;
543    }
544
545    public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) {
546        this.completionFromBatchConsumer = completionFromBatchConsumer;
547    }
548
549    public Boolean getCompletionOnNewCorrelationGroup() {
550        return completionOnNewCorrelationGroup;
551    }
552
553    public void setCompletionOnNewCorrelationGroup(Boolean completionOnNewCorrelationGroup) {
554        this.completionOnNewCorrelationGroup = completionOnNewCorrelationGroup;
555    }
556
557    public ExecutorService getExecutorService() {
558        return executorService;
559    }
560
561    public void setExecutorService(ExecutorService executorService) {
562        this.executorService = executorService;
563    }
564
565    public Boolean getOptimisticLocking() {
566        return optimisticLocking;
567    }
568
569    public void setOptimisticLocking(boolean optimisticLocking) {
570        this.optimisticLocking = optimisticLocking;
571    }
572
573    public Boolean getParallelProcessing() {
574        return parallelProcessing;
575    }
576
577    public void setParallelProcessing(boolean parallelProcessing) {
578        this.parallelProcessing = parallelProcessing;
579    }
580
581    public String getExecutorServiceRef() {
582        return executorServiceRef;
583    }
584
585    public void setExecutorServiceRef(String executorServiceRef) {
586        this.executorServiceRef = executorServiceRef;
587    }
588
589    public Boolean getEagerCheckCompletion() {
590        return eagerCheckCompletion;
591    }
592
593    public void setEagerCheckCompletion(Boolean eagerCheckCompletion) {
594        this.eagerCheckCompletion = eagerCheckCompletion;
595    }
596
597    public Boolean getIgnoreInvalidCorrelationKeys() {
598        return ignoreInvalidCorrelationKeys;
599    }
600
601    public void setIgnoreInvalidCorrelationKeys(Boolean ignoreInvalidCorrelationKeys) {
602        this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys;
603    }
604
605    public Integer getCloseCorrelationKeyOnCompletion() {
606        return closeCorrelationKeyOnCompletion;
607    }
608
609    public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
610        this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
611    }
612
613    public AggregationRepository getAggregationRepository() {
614        return aggregationRepository;
615    }
616
617    public void setAggregationRepository(AggregationRepository aggregationRepository) {
618        this.aggregationRepository = aggregationRepository;
619    }
620
621    public String getAggregationRepositoryRef() {
622        return aggregationRepositoryRef;
623    }
624
625    public void setAggregationRepositoryRef(String aggregationRepositoryRef) {
626        this.aggregationRepositoryRef = aggregationRepositoryRef;
627    }
628
629    public Boolean getDiscardOnCompletionTimeout() {
630        return discardOnCompletionTimeout;
631    }
632
633    public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) {
634        this.discardOnCompletionTimeout = discardOnCompletionTimeout;
635    }
636    
637    public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
638        this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
639    }
640
641    public ScheduledExecutorService getTimeoutCheckerExecutorService() {
642        return timeoutCheckerExecutorService;
643    }
644
645    public void setTimeoutCheckerExecutorServiceRef(String timeoutCheckerExecutorServiceRef) {
646        this.timeoutCheckerExecutorServiceRef = timeoutCheckerExecutorServiceRef;
647    }
648
649    public String getTimeoutCheckerExecutorServiceRef() {
650        return timeoutCheckerExecutorServiceRef;
651    }
652
653    public Boolean getForceCompletionOnStop() {
654        return forceCompletionOnStop;
655    }
656
657    public void setForceCompletionOnStop(Boolean forceCompletionOnStop) {
658        this.forceCompletionOnStop = forceCompletionOnStop;
659    }
660
661    public Boolean getCompleteAllOnStop() {
662        return completeAllOnStop;
663    }
664
665    public void setCompleteAllOnStop(Boolean completeAllOnStop) {
666        this.completeAllOnStop = completeAllOnStop;
667    }
668
669    public AggregateController getAggregateController() {
670        return aggregateController;
671    }
672
673    public void setAggregateController(AggregateController aggregateController) {
674        this.aggregateController = aggregateController;
675    }
676
677    public String getAggregateControllerRef() {
678        return aggregateControllerRef;
679    }
680
681    /**
682     * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control
683     * this aggregator.
684     */
685    public void setAggregateControllerRef(String aggregateControllerRef) {
686        this.aggregateControllerRef = aggregateControllerRef;
687    }
688
689    // Fluent API
690    //-------------------------------------------------------------------------
691
692    /**
693     * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange.
694     * As opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange.
695     *
696     * @return builder
697     */
698    public AggregateDefinition eagerCheckCompletion() {
699        setEagerCheckCompletion(true);
700        return this;
701    }
702
703    /**
704     * If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just
705     * ignore the incoming Exchange.
706     *
707     * @return builder
708     */
709    public AggregateDefinition ignoreInvalidCorrelationKeys() {
710        setIgnoreInvalidCorrelationKeys(true);
711        return this;
712    }
713
714    /**
715     * Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key
716     * that has been closed, it will be defined and a {@link ClosedCorrelationKeyException}
717     * is thrown.
718     *
719     * @param capacity the maximum capacity of the closed correlation key cache.
720     *                 Use <tt>0</tt> or negative value for unbounded capacity.
721     * @return builder
722     */
723    public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) {
724        setCloseCorrelationKeyOnCompletion(capacity);
725        return this;
726    }
727
728    /**
729     * Discards the aggregated message on completion timeout.
730     * <p/>
731     * This means on timeout the aggregated message is dropped and not sent out of the aggregator.
732     *
733     * @return builder
734     */
735    public AggregateDefinition discardOnCompletionTimeout() {
736        setDiscardOnCompletionTimeout(true);
737        return this;
738    }
739
740    /**
741     * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
742     * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
743     * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
744     *
745     * @return builder
746     */
747    public AggregateDefinition completionFromBatchConsumer() {
748        setCompletionFromBatchConsumer(true);
749        return this;
750    }
751
752    /**
753     * Enables completion on all previous groups when a new incoming correlation group. This can for example be used
754     * to complete groups with same correlation keys when they are in consecutive order.
755     * Notice when this is enabled then only 1 correlation group can be in progress as when a new correlation group
756     * starts, then the previous groups is forced completed.
757     *
758     * @return builder
759     */
760    public AggregateDefinition completionOnNewCorrelationGroup() {
761        setCompletionOnNewCorrelationGroup(true);
762        return this;
763    }
764
765    /**
766     * Number of messages aggregated before the aggregation is complete. This option can be set as either
767     * a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result.
768     * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
769     *
770     * @param completionSize  the completion size, must be a positive number
771     * @return builder
772     */
773    public AggregateDefinition completionSize(int completionSize) {
774        setCompletionSize(completionSize);
775        return this;
776    }
777
778    /**
779     * Number of messages aggregated before the aggregation is complete. This option can be set as either
780     * a fixed value or using an Expression which allows you to evaluate a size dynamically - will use Integer as result.
781     * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
782     *
783     * @param completionSize  the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type
784     * @return builder
785     */
786    public AggregateDefinition completionSize(Expression completionSize) {
787        setCompletionSizeExpression(new ExpressionSubElementDefinition(completionSize));
788        return this;
789    }
790
791    /**
792     * A repeating period in millis by which the aggregator will complete all current aggregated exchanges.
793     * Camel has a background task which is triggered every period. You cannot use this option together
794     * with completionTimeout, only one of them can be used.
795     *
796     * @param completionInterval  the interval in millis, must be a positive value
797     * @return the builder
798     */
799    public AggregateDefinition completionInterval(long completionInterval) {
800        setCompletionInterval(completionInterval);
801        return this;
802    }
803
804    /**
805     * Time in millis that an aggregated exchange should be inactive before its complete (timeout).
806     * This option can be set as either a fixed value or using an Expression which allows you to evaluate
807     * a timeout dynamically - will use Long as result.
808     * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
809     * You cannot use this option together with completionInterval, only one of the two can be used.
810     * <p/>
811     * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option
812     * to configure how frequently to run the checker.
813     * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value.
814     * It is not recommended to use very low timeout values or checker intervals.
815     *
816     * @param completionTimeout  the timeout in millis, must be a positive value
817     * @return the builder
818     */
819    public AggregateDefinition completionTimeout(long completionTimeout) {
820        setCompletionTimeout(completionTimeout);
821        return this;
822    }
823
824    /**
825     * Time in millis that an aggregated exchange should be inactive before its complete (timeout).
826     * This option can be set as either a fixed value or using an Expression which allows you to evaluate
827     * a timeout dynamically - will use Long as result.
828     * If both are set Camel will fallback to use the fixed value if the Expression result was null or 0.
829     * You cannot use this option together with completionInterval, only one of the two can be used.
830     * <p/>
831     * By default the timeout checker runs every second, you can use the completionTimeoutCheckerInterval option
832     * to configure how frequently to run the checker.
833     * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value.
834     * It is not recommended to use very low timeout values or checker intervals.
835     *
836     * @param completionTimeout  the timeout as an {@link Expression} which is evaluated as a {@link Long} type
837     * @return the builder
838     */
839    public AggregateDefinition completionTimeout(Expression completionTimeout) {
840        setCompletionTimeoutExpression(new ExpressionSubElementDefinition(completionTimeout));
841        return this;
842    }
843
844    /**
845     * Interval in millis that is used by the background task that checks for timeouts ({@link org.apache.camel.TimeoutMap}).
846     * <p/>
847     * By default the timeout checker runs every second.
848     * The timeout is an approximation and there is no guarantee that the a timeout is triggered exactly after the timeout value.
849     * It is not recommended to use very low timeout values or checker intervals.
850     *
851     * @param completionTimeoutCheckerInterval  the interval in millis, must be a positive value
852     * @return the builder
853     */
854    public AggregateDefinition completionTimeoutCheckerInterval(long completionTimeoutCheckerInterval) {
855        setCompletionTimeoutCheckerInterval(completionTimeoutCheckerInterval);
856        return this;
857    }
858
859    /**
860     * Sets the AggregationStrategy to use with a fluent builder.
861     */
862    public AggregationStrategyClause<AggregateDefinition> aggregationStrategy() {
863        AggregationStrategyClause<AggregateDefinition> clause = new AggregationStrategyClause<>(this);
864        setAggregationStrategy(clause);
865        return clause;
866    }
867
868    /**
869     * Sets the AggregationStrategy to use with a fluent builder.
870     */
871    public AggregationStrategyClause<AggregateDefinition> strategy() {
872        return aggregationStrategy();
873    }
874
875    /**
876     * Sets the aggregate strategy to use
877     *
878     * @param aggregationStrategy  the aggregate strategy to use
879     * @return the builder
880     */
881    public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) {
882        return aggregationStrategy(aggregationStrategy);
883    }
884
885    /**
886     * Sets the aggregate strategy to use
887     *
888     * @param aggregationStrategy  the aggregate strategy to use
889     * @return the builder
890     */
891    public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
892        setAggregationStrategy(aggregationStrategy);
893        return this;
894    }
895
896    /**
897     * Sets the aggregate strategy to use
898     *
899     * @param aggregationStrategyRef  reference to the strategy to lookup in the registry
900     * @return the builder
901     */
902    public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) {
903        setAggregationStrategyRef(aggregationStrategyRef);
904        return this;
905    }
906
907    /**
908     * Sets the method name to use when using a POJO as {@link AggregationStrategy}.
909     *
910     * @param  methodName the method name to call
911     * @return the builder
912     */
913    public AggregateDefinition aggregationStrategyMethodName(String methodName) {
914        setAggregationStrategyMethodName(methodName);
915        return this;
916    }
917
918    /**
919     * Sets allowing null when using a POJO as {@link AggregationStrategy}.
920     *
921     * @return the builder
922     */
923    public AggregateDefinition aggregationStrategyMethodAllowNull() {
924        setStrategyMethodAllowNull(true);
925        return this;
926    }
927
928    /**
929     * Sets the custom aggregate repository to use.
930     * <p/>
931     * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
932     *
933     * @param aggregationRepository  the aggregate repository to use
934     * @return the builder
935     */
936    public AggregateDefinition aggregationRepository(AggregationRepository aggregationRepository) {
937        setAggregationRepository(aggregationRepository);
938        return this;
939    }
940
941    /**
942     * Sets the custom aggregate repository to use
943     * <p/>
944     * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
945     *
946     * @param aggregationRepositoryRef  reference to the repository to lookup in the registry
947     * @return the builder
948     */
949    public AggregateDefinition aggregationRepositoryRef(String aggregationRepositoryRef) {
950        setAggregationRepositoryRef(aggregationRepositoryRef);
951        return this;
952    }
953
954    /**
955     * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single
956     * combined Exchange holding all the aggregated exchanges in a {@link java.util.List}.
957     *
958     * @deprecated use {@link GroupedExchangeAggregationStrategy} as aggregation strategy instead.
959     */
960    @Deprecated
961    public AggregateDefinition groupExchanges() {
962        setGroupExchanges(true);
963        // must use eager check when using grouped exchanges
964        setEagerCheckCompletion(true);
965        return this;
966    }
967
968    /**
969     * A Predicate to indicate when an aggregated exchange is complete.
970     * If this is not specified and the AggregationStrategy object implements Predicate,
971     * the aggregationStrategy object will be used as the completionPredicate.
972     */
973    public AggregateDefinition completionPredicate(@AsPredicate Predicate predicate) {
974        checkNoCompletedPredicate();
975        setCompletionPredicate(new ExpressionSubElementDefinition(predicate));
976        return this;
977    }
978
979    /**
980     * A Predicate to indicate when an aggregated exchange is complete.
981     * If this is not specified and the AggregationStrategy object implements Predicate,
982     * the aggregationStrategy object will be used as the completionPredicate.
983     */
984    @AsPredicate
985    public PredicateClause<AggregateDefinition> completionPredicate() {
986        PredicateClause<AggregateDefinition> clause = new PredicateClause<>(this);
987        completionPredicate(clause);
988        return clause;
989    }
990
991    /**
992     * A Predicate to indicate when an aggregated exchange is complete.
993     * If this is not specified and the AggregationStrategy object implements Predicate,
994     * the aggregationStrategy object will be used as the completionPredicate.
995     */
996    @AsPredicate
997    public PredicateClause<AggregateDefinition> completion() {
998        return completionPredicate();
999    }
1000
1001    /**
1002     * A Predicate to indicate when an aggregated exchange is complete.
1003     * If this is not specified and the AggregationStrategy object implements Predicate,
1004     * the aggregationStrategy object will be used as the completionPredicate.
1005     */
1006    public AggregateDefinition completion(@AsPredicate Predicate predicate) {
1007        return completionPredicate(predicate);
1008    }
1009
1010    /**
1011     * Indicates to complete all current aggregated exchanges when the context is stopped
1012     */
1013    public AggregateDefinition forceCompletionOnStop() {
1014        setForceCompletionOnStop(true);
1015        return this;
1016    }
1017
1018    /**
1019     * Indicates to wait to complete all current and partial (pending) aggregated exchanges when the context is stopped.
1020     * <p/>
1021     * This also means that we will wait for all pending exchanges which are stored in the aggregation repository
1022     * to complete so the repository is empty before we can stop.
1023     * <p/>
1024     * You may want to enable this when using the memory based aggregation repository that is memory based only,
1025     * and do not store data on disk. When this option is enabled, then the aggregator is waiting to complete
1026     * all those exchanges before its stopped, when stopping CamelContext or the route using it.
1027     */
1028    public AggregateDefinition completeAllOnStop() {
1029        setCompleteAllOnStop(true);
1030        return this;
1031    }
1032
1033    /**
1034     * When aggregated are completed they are being send out of the aggregator.
1035     * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency.
1036     * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads.
1037     */
1038    public AggregateDefinition parallelProcessing() {
1039        setParallelProcessing(true);
1040        return this;
1041    }
1042
1043    /**
1044     * When aggregated are completed they are being send out of the aggregator.
1045     * This option indicates whether or not Camel should use a thread pool with multiple threads for concurrency.
1046     * If no custom thread pool has been specified then Camel creates a default pool with 10 concurrent threads.
1047     */
1048    public AggregateDefinition parallelProcessing(boolean parallelProcessing) {
1049        setParallelProcessing(parallelProcessing);
1050        return this;
1051    }
1052
1053    /**
1054     * Turns on using optimistic locking, which requires the aggregationRepository being used,
1055     * is supporting this by implementing {@link org.apache.camel.spi.OptimisticLockingAggregationRepository}.
1056     */
1057    public AggregateDefinition optimisticLocking() {
1058        setOptimisticLocking(true);
1059        return this;
1060    }
1061
1062    /**
1063     * Allows to configure retry settings when using optimistic locking.
1064     */
1065    public AggregateDefinition optimisticLockRetryPolicy(OptimisticLockRetryPolicy policy) {
1066        setOptimisticLockRetryPolicy(policy);
1067        return this;
1068    }
1069
1070    /**
1071     * If using parallelProcessing you can specify a custom thread pool to be used.
1072     * In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well.
1073     */
1074    public AggregateDefinition executorService(ExecutorService executorService) {
1075        setExecutorService(executorService);
1076        return this;
1077    }
1078
1079    /**
1080     * If using parallelProcessing you can specify a custom thread pool to be used.
1081     * In fact also if you are not using parallelProcessing this custom thread pool is used to send out aggregated exchanges as well.
1082     */
1083    public AggregateDefinition executorServiceRef(String executorServiceRef) {
1084        setExecutorServiceRef(executorServiceRef);
1085        return this;
1086    }
1087
1088    /**
1089     * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a
1090     * background thread is created to check for the completion for every aggregator.
1091     * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator.
1092     */
1093    public AggregateDefinition timeoutCheckerExecutorService(ScheduledExecutorService executorService) {
1094        setTimeoutCheckerExecutorService(executorService);
1095        return this;
1096    }
1097
1098    /**
1099     * If using either of the completionTimeout, completionTimeoutExpression, or completionInterval options a
1100     * background thread is created to check for the completion for every aggregator.
1101     * Set this option to provide a custom thread pool to be used rather than creating a new thread for every aggregator.
1102     */
1103    public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) {
1104        setTimeoutCheckerExecutorServiceRef(executorServiceRef);
1105        return this;
1106    }
1107
1108    /**
1109     * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow external sources to control
1110     * this aggregator.
1111     */
1112    public AggregateDefinition aggregateController(AggregateController aggregateController) {
1113        setAggregateController(aggregateController);
1114        return this;
1115    }
1116
1117    // Section - Methods from ExpressionNode
1118    // Needed to copy methods from ExpressionNode here so that I could specify the
1119    // correlation expression as optional in JAXB
1120
1121    public ExpressionDefinition getExpression() {
1122        if (expression == null && correlationExpression != null) {
1123            expression = correlationExpression.getExpressionType();            
1124        }
1125        return expression;
1126    }
1127
1128    public void setExpression(ExpressionDefinition expression) {
1129        this.expression = expression;
1130    }
1131
1132    protected void checkNoCompletedPredicate() {
1133        if (getCompletionPredicate() != null) {
1134            throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this);
1135        }
1136    }
1137
1138    @Override
1139    public List<ProcessorDefinition<?>> getOutputs() {
1140        return outputs;
1141    }
1142
1143    public boolean isOutputSupported() {
1144        return true;
1145    }
1146
1147    public void setOutputs(List<ProcessorDefinition<?>> outputs) {
1148        this.outputs = outputs;
1149    }
1150
1151}