001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import java.util.concurrent.ExecutorService;
022    import java.util.concurrent.ScheduledExecutorService;
023    
024    import javax.xml.bind.annotation.XmlAccessType;
025    import javax.xml.bind.annotation.XmlAccessorType;
026    import javax.xml.bind.annotation.XmlAttribute;
027    import javax.xml.bind.annotation.XmlElement;
028    import javax.xml.bind.annotation.XmlElementRef;
029    import javax.xml.bind.annotation.XmlRootElement;
030    import javax.xml.bind.annotation.XmlTransient;
031    
032    import org.apache.camel.Expression;
033    import org.apache.camel.Predicate;
034    import org.apache.camel.Processor;
035    import org.apache.camel.builder.ExpressionClause;
036    import org.apache.camel.model.language.ExpressionDefinition;
037    import org.apache.camel.processor.UnitOfWorkProcessor;
038    import org.apache.camel.processor.aggregate.AggregateProcessor;
039    import org.apache.camel.processor.aggregate.AggregationStrategy;
040    import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
041    import org.apache.camel.spi.AggregationRepository;
042    import org.apache.camel.spi.RouteContext;
043    import org.apache.camel.util.concurrent.SynchronousExecutorService;
044    
045    /**
046     * Represents an XML <aggregate/> element
047     *
048     * @version 
049     */
050    @XmlRootElement(name = "aggregate")
051    @XmlAccessorType(XmlAccessType.FIELD)
052    public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> {
053        @XmlElement(name = "correlationExpression", required = true)
054        private ExpressionSubElementDefinition correlationExpression;
055        @XmlElement(name = "completionPredicate")
056        private ExpressionSubElementDefinition completionPredicate;
057        @XmlElement(name = "completionTimeout")
058        private ExpressionSubElementDefinition completionTimeoutExpression;
059        @XmlElement(name = "completionSize")
060        private ExpressionSubElementDefinition completionSizeExpression;
061        @XmlTransient
062        private ExpressionDefinition expression;
063        @XmlElementRef
064        private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
065        @XmlTransient
066        private AggregationStrategy aggregationStrategy;
067        @XmlTransient
068        private ExecutorService executorService;
069        @XmlTransient
070        private ScheduledExecutorService timeoutCheckerExecutorService;
071        @XmlTransient
072        private AggregationRepository aggregationRepository;
073        @XmlAttribute
074        private Boolean parallelProcessing;
075        @XmlAttribute
076        private String executorServiceRef;
077        @XmlAttribute
078        private String timeoutCheckerExecutorServiceRef;
079        @XmlAttribute
080        private String aggregationRepositoryRef;
081        @XmlAttribute
082        private String strategyRef;
083        @XmlAttribute
084        private Integer completionSize;
085        @XmlAttribute
086        private Long completionInterval;
087        @XmlAttribute
088        private Long completionTimeout;
089        @XmlAttribute
090        private Boolean completionFromBatchConsumer;
091        @XmlAttribute
092        private Boolean groupExchanges;
093        @XmlAttribute
094        private Boolean eagerCheckCompletion;
095        @XmlAttribute
096        private Boolean ignoreInvalidCorrelationKeys;
097        @XmlAttribute
098        private Integer closeCorrelationKeyOnCompletion;
099        @XmlAttribute
100        private Boolean discardOnCompletionTimeout;
101        @XmlAttribute
102        private Boolean forceCompletionOnStop;
103    
104        public AggregateDefinition() {
105        }
106    
107        public AggregateDefinition(Predicate predicate) {
108            if (predicate != null) {
109                setExpression(new ExpressionDefinition(predicate));
110            }
111        }    
112        
113        public AggregateDefinition(Expression correlationExpression) {
114            if (correlationExpression != null) {
115                setExpression(new ExpressionDefinition(correlationExpression));
116            }
117        }
118    
119        public AggregateDefinition(ExpressionDefinition correlationExpression) {
120            this.expression = correlationExpression;
121        }
122    
123        public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
124            this(correlationExpression);
125            this.aggregationStrategy = aggregationStrategy;
126        }
127    
128        @Override
129        public String toString() {
130            return "Aggregate[" + description() + " -> " + getOutputs() + "]";
131        }
132        
133        protected String description() {
134            return getExpression() != null ? getExpression().getLabel() : "";
135        }
136    
137        @Override
138        public String getShortName() {
139            return "aggregate";
140        }
141    
142        @Override
143        public String getLabel() {
144            return "aggregate[" + description() + "]";
145        }
146    
147        @Override
148        public Processor createProcessor(RouteContext routeContext) throws Exception {
149            return createAggregator(routeContext);
150        }
151    
152        protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception {
153            Processor processor = this.createChildProcessor(routeContext, true);
154            // wrap the aggregated route in a unit of work processor
155            processor = new UnitOfWorkProcessor(routeContext, processor);
156    
157            Expression correlation = getExpression().createExpression(routeContext);
158            AggregationStrategy strategy = createAggregationStrategy(routeContext);
159    
160            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
161            ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Aggregator", this, isParallelProcessing());
162            if (threadPool == null && !isParallelProcessing()) {
163                // executor service is mandatory for the Aggregator
164                // we do not run in parallel mode, but use a synchronous executor, so we run in current thread
165                threadPool = new SynchronousExecutorService();
166                shutdownThreadPool = true;
167            }
168    
169            AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor,
170                    correlation, strategy, threadPool, shutdownThreadPool);
171    
172            AggregationRepository repository = createAggregationRepository(routeContext);
173            if (repository != null) {
174                answer.setAggregationRepository(repository);
175            }
176    
177            // this EIP supports using a shared timeout checker thread pool or fallback to create a new thread pool
178            boolean shutdownTimeoutThreadPool = false;
179            ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService;
180            if (timeoutThreadPool == null && timeoutCheckerExecutorServiceRef != null) {
181                // lookup existing thread pool
182                timeoutThreadPool = routeContext.getCamelContext().getRegistry().lookup(timeoutCheckerExecutorServiceRef, ScheduledExecutorService.class);
183                if (timeoutThreadPool == null) {
184                    // then create a thread pool assuming the ref is a thread pool profile id
185                    timeoutThreadPool = routeContext.getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
186                            AggregateProcessor.AGGREGATE_TIMEOUT_CHECKER, timeoutCheckerExecutorServiceRef);
187                    if (timeoutThreadPool == null) {
188                        throw new IllegalArgumentException("ExecutorServiceRef " + timeoutCheckerExecutorServiceRef + " not found in registry or as a thread pool profile.");
189                    }
190                    shutdownTimeoutThreadPool = true;
191                }
192            }
193            answer.setTimeoutCheckerExecutorService(timeoutThreadPool);
194            answer.setShutdownTimeoutCheckerExecutorService(shutdownTimeoutThreadPool);
195    
196            // set other options
197            answer.setParallelProcessing(isParallelProcessing());
198            if (getCompletionPredicate() != null) {
199                Predicate predicate = getCompletionPredicate().createPredicate(routeContext);
200                answer.setCompletionPredicate(predicate);
201            }
202            if (getCompletionTimeoutExpression() != null) {
203                Expression expression = getCompletionTimeoutExpression().createExpression(routeContext);
204                answer.setCompletionTimeoutExpression(expression);
205            }
206            if (getCompletionTimeout() != null) {
207                answer.setCompletionTimeout(getCompletionTimeout());
208            }
209            if (getCompletionInterval() != null) {
210                answer.setCompletionInterval(getCompletionInterval());
211            }
212            if (getCompletionSizeExpression() != null) {
213                Expression expression = getCompletionSizeExpression().createExpression(routeContext);
214                answer.setCompletionSizeExpression(expression);
215            }
216            if (getCompletionSize() != null) {
217                answer.setCompletionSize(getCompletionSize());
218            }
219            if (getCompletionFromBatchConsumer() != null) {
220                answer.setCompletionFromBatchConsumer(isCompletionFromBatchConsumer());
221            }
222            if (getEagerCheckCompletion() != null) {
223                answer.setEagerCheckCompletion(isEagerCheckCompletion());
224            }
225            if (getIgnoreInvalidCorrelationKeys() != null) {
226                answer.setIgnoreInvalidCorrelationKeys(isIgnoreInvalidCorrelationKeys());
227            }
228            if (getCloseCorrelationKeyOnCompletion() != null) {
229                answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion());
230            }
231            if (getDiscardOnCompletionTimeout() != null) {
232                answer.setDiscardOnCompletionTimeout(isDiscardOnCompletionTimeout());
233            }
234            if (getForceCompletionOnStop() != null) {
235                answer.setForceCompletionOnStop(getForceCompletionOnStop());
236            }
237    
238            return answer;
239        }
240    
241        @Override
242        protected void configureChild(ProcessorDefinition<?> output) {
243            if (expression != null && expression instanceof ExpressionClause) {
244                ExpressionClause<?> clause = (ExpressionClause<?>) expression;
245                if (clause.getExpressionType() != null) {
246                    // if using the Java DSL then the expression may have been set using the
247                    // ExpressionClause which is a fancy builder to define expressions and predicates
248                    // using fluent builders in the DSL. However we need afterwards a callback to
249                    // reset the expression to the expression type the ExpressionClause did build for us
250                    expression = clause.getExpressionType();
251                    // set the correlation expression from the expression type, as the model definition
252                    // would then be accurate
253                    correlationExpression = new ExpressionSubElementDefinition();
254                    correlationExpression.setExpressionType(clause.getExpressionType());
255                }
256            }
257        }
258    
259        private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
260            AggregationStrategy strategy = getAggregationStrategy();
261            if (strategy == null && strategyRef != null) {
262                strategy = routeContext.mandatoryLookup(strategyRef, AggregationStrategy.class);
263            }
264    
265            if (groupExchanges != null && groupExchanges) {
266                if (strategy != null || strategyRef != null) {
267                    throw new IllegalArgumentException("Options groupExchanges and AggregationStrategy cannot be enabled at the same time");
268                }
269                if (eagerCheckCompletion != null && !eagerCheckCompletion) {
270                    throw new IllegalArgumentException("Option eagerCheckCompletion cannot be false when groupExchanges has been enabled");
271                }
272                // set eager check to enabled by default when using grouped exchanges
273                setEagerCheckCompletion(true);
274                // if grouped exchange is enabled then use special strategy for that
275                strategy = new GroupedExchangeAggregationStrategy();
276            }
277    
278            if (strategy == null) {
279                throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this);
280            }
281            return strategy;
282        }
283    
284        private AggregationRepository createAggregationRepository(RouteContext routeContext) {
285            AggregationRepository repository = getAggregationRepository();
286            if (repository == null && aggregationRepositoryRef != null) {
287                repository = routeContext.mandatoryLookup(aggregationRepositoryRef, AggregationRepository.class);
288            }
289            return repository;
290        }
291    
292        public AggregationStrategy getAggregationStrategy() {
293            return aggregationStrategy;
294        }
295    
296        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
297            this.aggregationStrategy = aggregationStrategy;
298        }
299    
300        public String getAggregationStrategyRef() {
301            return strategyRef;
302        }
303    
304        public void setAggregationStrategyRef(String aggregationStrategyRef) {
305            this.strategyRef = aggregationStrategyRef;
306        }
307    
308        public Integer getCompletionSize() {
309            return completionSize;
310        }
311    
312        public void setCompletionSize(Integer completionSize) {
313            this.completionSize = completionSize;
314        }
315    
316        public Long getCompletionInterval() {
317            return completionInterval;
318        }
319    
320        public void setCompletionInterval(Long completionInterval) {
321            this.completionInterval = completionInterval;
322        }
323    
324        public Long getCompletionTimeout() {
325            return completionTimeout;
326        }
327    
328        public void setCompletionTimeout(Long completionTimeout) {
329            this.completionTimeout = completionTimeout;
330        }
331    
332        public ExpressionSubElementDefinition getCompletionPredicate() {
333            return completionPredicate;
334        }
335    
336        public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) {
337            this.completionPredicate = completionPredicate;
338        }
339    
340        public ExpressionSubElementDefinition getCompletionTimeoutExpression() {
341            return completionTimeoutExpression;
342        }
343    
344        public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) {
345            this.completionTimeoutExpression = completionTimeoutExpression;
346        }
347    
348        public ExpressionSubElementDefinition getCompletionSizeExpression() {
349            return completionSizeExpression;
350        }
351    
352        public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) {
353            this.completionSizeExpression = completionSizeExpression;
354        }
355    
356        public Boolean getGroupExchanges() {
357            return groupExchanges;
358        }
359    
360        public boolean isGroupExchanges() {
361            return groupExchanges != null && groupExchanges;
362        }
363    
364        public void setGroupExchanges(Boolean groupExchanges) {
365            this.groupExchanges = groupExchanges;
366        }
367    
368        public Boolean getCompletionFromBatchConsumer() {
369            return completionFromBatchConsumer;
370        }
371    
372        public boolean isCompletionFromBatchConsumer() {
373            return completionFromBatchConsumer != null && completionFromBatchConsumer;
374        }
375    
376        public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) {
377            this.completionFromBatchConsumer = completionFromBatchConsumer;
378        }
379    
380        public ExecutorService getExecutorService() {
381            return executorService;
382        }
383    
384        public void setExecutorService(ExecutorService executorService) {
385            this.executorService = executorService;
386        }
387    
388        public Boolean getParallelProcessing() {
389            return parallelProcessing;
390        }
391    
392        public boolean isParallelProcessing() {
393            return parallelProcessing != null && parallelProcessing;
394        }
395    
396        public void setParallelProcessing(boolean parallelProcessing) {
397            this.parallelProcessing = parallelProcessing;
398        }
399    
400        public String getExecutorServiceRef() {
401            return executorServiceRef;
402        }
403    
404        public void setExecutorServiceRef(String executorServiceRef) {
405            this.executorServiceRef = executorServiceRef;
406        }
407    
408        public String getStrategyRef() {
409            return strategyRef;
410        }
411    
412        public void setStrategyRef(String strategyRef) {
413            this.strategyRef = strategyRef;
414        }
415    
416        public Boolean getEagerCheckCompletion() {
417            return eagerCheckCompletion;
418        }
419    
420        public boolean isEagerCheckCompletion() {
421            return eagerCheckCompletion != null && eagerCheckCompletion;
422        }
423    
424        public void setEagerCheckCompletion(Boolean eagerCheckCompletion) {
425            this.eagerCheckCompletion = eagerCheckCompletion;
426        }
427    
428        public Boolean getIgnoreInvalidCorrelationKeys() {
429            return ignoreInvalidCorrelationKeys;
430        }
431    
432        public boolean isIgnoreInvalidCorrelationKeys() {
433            return ignoreInvalidCorrelationKeys != null && ignoreInvalidCorrelationKeys;
434        }
435    
436        public void setIgnoreInvalidCorrelationKeys(Boolean ignoreInvalidCorrelationKeys) {
437            this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys;
438        }
439    
440        public Integer getCloseCorrelationKeyOnCompletion() {
441            return closeCorrelationKeyOnCompletion;
442        }
443    
444        public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
445            this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
446        }
447    
448        public AggregationRepository getAggregationRepository() {
449            return aggregationRepository;
450        }
451    
452        public void setAggregationRepository(AggregationRepository aggregationRepository) {
453            this.aggregationRepository = aggregationRepository;
454        }
455    
456        public String getAggregationRepositoryRef() {
457            return aggregationRepositoryRef;
458        }
459    
460        public void setAggregationRepositoryRef(String aggregationRepositoryRef) {
461            this.aggregationRepositoryRef = aggregationRepositoryRef;
462        }
463    
464        public Boolean getDiscardOnCompletionTimeout() {
465            return discardOnCompletionTimeout;
466        }
467    
468        public boolean isDiscardOnCompletionTimeout() {
469            return discardOnCompletionTimeout != null && discardOnCompletionTimeout;
470        }
471    
472        public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) {
473            this.discardOnCompletionTimeout = discardOnCompletionTimeout;
474        }
475        
476        public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
477            this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
478        }
479    
480        public ScheduledExecutorService getTimeoutCheckerExecutorService() {
481            return timeoutCheckerExecutorService;
482        }
483    
484        public void setTimeoutCheckerExecutorServiceRef(String timeoutCheckerExecutorServiceRef) {
485            this.timeoutCheckerExecutorServiceRef = timeoutCheckerExecutorServiceRef;
486        }
487    
488        public String getTimeoutCheckerExecutorServiceRef() {
489            return timeoutCheckerExecutorServiceRef;
490        }
491    
492        // Fluent API
493        //-------------------------------------------------------------------------
494    
495        /**
496         * Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange.
497         * At opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange.
498         *
499         * @return builder
500         */
501        public AggregateDefinition eagerCheckCompletion() {
502            setEagerCheckCompletion(true);
503            return this;
504        }
505    
506        /**
507         * If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just
508         * ignore the incoming Exchange.
509         *
510         * @return builder
511         */
512        public AggregateDefinition ignoreInvalidCorrelationKeys() {
513            setIgnoreInvalidCorrelationKeys(true);
514            return this;
515        }
516    
517        /**
518         * Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key
519         * that has been closed, it will be defined and a {@link org.apache.camel.processor.aggregate.ClosedCorrelationKeyException}
520         * is thrown.
521         *
522         * @param capacity the maximum capacity of the closed correlation key cache.
523         *                 Use <tt>0</tt> or negative value for unbounded capacity.
524         * @return builder
525         */
526        public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) {
527            setCloseCorrelationKeyOnCompletion(capacity);
528            return this;
529        }
530    
531        /**
532         * Discards the aggregated message on completion timeout.
533         * <p/>
534         * This means on timeout the aggregated message is dropped and not sent out of the aggregator.
535         *
536         * @return builder
537         */
538        public AggregateDefinition discardOnCompletionTimeout() {
539            setDiscardOnCompletionTimeout(true);
540            return this;
541        }
542    
543        /**
544         * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
545         * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
546         * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
547         *
548         * @return builder
549         */
550        public AggregateDefinition completionFromBatchConsumer() {
551            setCompletionFromBatchConsumer(true);
552            return this;
553        }
554    
555        /**
556         * Sets the completion size, which is the number of aggregated exchanges which would
557         * cause the aggregate to consider the group as complete and send out the aggregated exchange.
558         *
559         * @param completionSize  the completion size
560         * @return builder
561         */
562        public AggregateDefinition completionSize(int completionSize) {
563            setCompletionSize(completionSize);
564            return this;
565        }
566    
567        /**
568         * Sets the completion size, which is the number of aggregated exchanges which would
569         * cause the aggregate to consider the group as complete and send out the aggregated exchange.
570         *
571         * @param completionSize  the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type
572         * @return builder
573         */
574        public AggregateDefinition completionSize(Expression completionSize) {
575            setCompletionSizeExpression(new ExpressionSubElementDefinition(completionSize));
576            return this;
577        }
578    
579        /**
580         * Sets the completion interval, which would cause the aggregate to consider the group as complete
581         * and send out the aggregated exchange.
582         *
583         * @param completionInterval  the interval in millis
584         * @return the builder
585         */
586        public AggregateDefinition completionInterval(long completionInterval) {
587            setCompletionInterval(completionInterval);
588            return this;
589        }
590    
591        /**
592         * Sets the completion timeout, which would cause the aggregate to consider the group as complete
593         * and send out the aggregated exchange.
594         *
595         * @param completionTimeout  the timeout in millis
596         * @return the builder
597         */
598        public AggregateDefinition completionTimeout(long completionTimeout) {
599            setCompletionTimeout(completionTimeout);
600            return this;
601        }
602    
603        /**
604         * Sets the completion timeout, which would cause the aggregate to consider the group as complete
605         * and send out the aggregated exchange.
606         *
607         * @param completionTimeout  the timeout as an {@link Expression} which is evaluated as a {@link Long} type
608         * @return the builder
609         */
610        public AggregateDefinition completionTimeout(Expression completionTimeout) {
611            setCompletionTimeoutExpression(new ExpressionSubElementDefinition(completionTimeout));
612            return this;
613        }
614    
615        /**
616         * Sets the aggregate strategy to use
617         *
618         * @param aggregationStrategy  the aggregate strategy to use
619         * @return the builder
620         */
621        public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
622            setAggregationStrategy(aggregationStrategy);
623            return this;
624        }
625    
626        /**
627         * Sets the aggregate strategy to use
628         *
629         * @param aggregationStrategyRef  reference to the strategy to lookup in the registry
630         * @return the builder
631         */
632        public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) {
633            setAggregationStrategyRef(aggregationStrategyRef);
634            return this;
635        }
636    
637        /**
638         * Sets the custom aggregate repository to use.
639         * <p/>
640         * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
641         *
642         * @param aggregationRepository  the aggregate repository to use
643         * @return the builder
644         */
645        public AggregateDefinition aggregationRepository(AggregationRepository aggregationRepository) {
646            setAggregationRepository(aggregationRepository);
647            return this;
648        }
649    
650        /**
651         * Sets the custom aggregate repository to use
652         * <p/>
653         * Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
654         *
655         * @param aggregationRepositoryRef  reference to the repository to lookup in the registry
656         * @return the builder
657         */
658        public AggregateDefinition aggregationRepositoryRef(String aggregationRepositoryRef) {
659            setAggregationRepositoryRef(aggregationRepositoryRef);
660            return this;
661        }
662    
663        /**
664         * Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single
665         * combined Exchange holding all the aggregated exchanges in a {@link java.util.List} as a exchange
666         * property with the key {@link org.apache.camel.Exchange#GROUPED_EXCHANGE}.
667         *
668         * @return the builder
669         */
670        public AggregateDefinition groupExchanges() {
671            setGroupExchanges(true);
672            // must use eager check when using grouped exchanges
673            setEagerCheckCompletion(true);
674            return this;
675        }
676    
677        /**
678         * Sets the predicate used to determine if the aggregation is completed
679         *
680         * @param predicate  the predicate
681         * @return the builder
682         */
683        public AggregateDefinition completionPredicate(Predicate predicate) {
684            checkNoCompletedPredicate();
685            setCompletionPredicate(new ExpressionSubElementDefinition(predicate));
686            return this;
687        }
688    
689         /**
690         * Sets the force completion on stop flag, which considers the current group as complete
691         * and sends out the aggregated exchange when the stop event is executed
692         *
693         * @return builder
694         */
695        public AggregateDefinition forceCompletionOnStop() {
696            setForceCompletionOnStop(true);
697            return this;
698        }
699    
700        public Boolean getForceCompletionOnStop() {
701            return forceCompletionOnStop;
702        }
703    
704        public boolean isForceCompletionOnStop() {
705            return forceCompletionOnStop != null && forceCompletionOnStop;
706        }
707    
708        public void setForceCompletionOnStop(Boolean forceCompletionOnStop) {
709            this.forceCompletionOnStop = forceCompletionOnStop;
710        }
711    
712        /**
713         * Sending the aggregated output in parallel
714         *
715         * @return the builder
716         */
717        public AggregateDefinition parallelProcessing() {
718            setParallelProcessing(true);
719            return this;
720        }
721        
722        public AggregateDefinition executorService(ExecutorService executorService) {
723            setExecutorService(executorService);
724            return this;
725        }
726    
727        public AggregateDefinition executorServiceRef(String executorServiceRef) {
728            setExecutorServiceRef(executorServiceRef);
729            return this;
730        }
731    
732        public AggregateDefinition timeoutCheckerExecutorService(ScheduledExecutorService executorService) {
733            setTimeoutCheckerExecutorService(executorService);
734            return this;
735        }
736    
737        public AggregateDefinition timeoutCheckerExecutorServiceRef(String executorServiceRef) {
738            setTimeoutCheckerExecutorServiceRef(executorServiceRef);
739            return this;
740        }
741        
742        protected void checkNoCompletedPredicate() {
743            if (getCompletionPredicate() != null) {
744                throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this);
745            }
746        }
747    
748        public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
749            this.correlationExpression = correlationExpression;
750        }
751    
752        public ExpressionSubElementDefinition getCorrelationExpression() {
753            return correlationExpression;
754        }
755    
756        // Section - Methods from ExpressionNode
757        // Needed to copy methods from ExpressionNode here so that I could specify the
758        // correlation expression as optional in JAXB
759    
760        public ExpressionDefinition getExpression() {
761            if (expression == null && correlationExpression != null) {
762                expression = correlationExpression.getExpressionType();            
763            }
764            return expression;
765        }
766    
767        public void setExpression(ExpressionDefinition expression) {
768            this.expression = expression;
769        }
770    
771        @Override
772        public List<ProcessorDefinition<?>> getOutputs() {
773            return outputs;
774        }
775    
776        public boolean isOutputSupported() {
777            return true;
778        }
779    
780        public void setOutputs(List<ProcessorDefinition<?>> outputs) {
781            this.outputs = outputs;
782        }
783    
784    }