001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor.aggregate;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.LinkedHashSet;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentSkipListSet;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.ScheduledExecutorService;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.concurrent.atomic.AtomicLong;
032import java.util.concurrent.locks.Lock;
033import java.util.concurrent.locks.ReentrantLock;
034
035import org.apache.camel.AsyncCallback;
036import org.apache.camel.AsyncProcessor;
037import org.apache.camel.CamelContext;
038import org.apache.camel.CamelExchangeException;
039import org.apache.camel.Endpoint;
040import org.apache.camel.Exchange;
041import org.apache.camel.Expression;
042import org.apache.camel.Navigate;
043import org.apache.camel.NoSuchEndpointException;
044import org.apache.camel.Predicate;
045import org.apache.camel.Processor;
046import org.apache.camel.ProducerTemplate;
047import org.apache.camel.ShutdownRunningTask;
048import org.apache.camel.TimeoutMap;
049import org.apache.camel.Traceable;
050import org.apache.camel.spi.AggregationRepository;
051import org.apache.camel.spi.ExceptionHandler;
052import org.apache.camel.spi.IdAware;
053import org.apache.camel.spi.OptimisticLockingAggregationRepository;
054import org.apache.camel.spi.RecoverableAggregationRepository;
055import org.apache.camel.spi.ShutdownAware;
056import org.apache.camel.spi.ShutdownPrepared;
057import org.apache.camel.spi.Synchronization;
058import org.apache.camel.support.DefaultTimeoutMap;
059import org.apache.camel.support.LoggingExceptionHandler;
060import org.apache.camel.support.ServiceSupport;
061import org.apache.camel.util.AsyncProcessorHelper;
062import org.apache.camel.util.ExchangeHelper;
063import org.apache.camel.util.LRUCache;
064import org.apache.camel.util.ObjectHelper;
065import org.apache.camel.util.ServiceHelper;
066import org.apache.camel.util.StopWatch;
067import org.apache.camel.util.TimeUtils;
068import org.slf4j.Logger;
069import org.slf4j.LoggerFactory;
070
071/**
072 * An implementation of the <a
073 * href="http://camel.apache.org/aggregator2.html">Aggregator</a>
074 * pattern where a batch of messages are processed (up to a maximum amount or
075 * until some timeout is reached) and messages for the same correlation key are
076 * combined together using some kind of {@link AggregationStrategy}
077 * (by default the latest message is used) to compress many message exchanges
078 * into a smaller number of exchanges.
079 * <p/>
080 * A good example of this is stock market data; you may be receiving 30,000
081 * messages/second and you may want to throttle it right down so that multiple
082 * messages for the same stock are combined (or just the latest message is used
083 * and older prices are discarded). Another idea is to combine line item messages
084 * together into a single invoice message.
085 */
086public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, ShutdownAware, IdAware {
087
088    public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
089
090    private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class);
091
092    private final Lock lock = new ReentrantLock();
093    private final CamelContext camelContext;
094    private final Processor processor;
095    private String id;
096    private AggregationStrategy aggregationStrategy;
097    private boolean preCompletion;
098    private Expression correlationExpression;
099    private AggregateController aggregateController;
100    private final ExecutorService executorService;
101    private final boolean shutdownExecutorService;
102    private OptimisticLockRetryPolicy optimisticLockRetryPolicy = new OptimisticLockRetryPolicy();
103    private ScheduledExecutorService timeoutCheckerExecutorService;
104    private boolean shutdownTimeoutCheckerExecutorService;
105    private ScheduledExecutorService recoverService;
106    // store correlation key -> exchange id in timeout map
107    private TimeoutMap<String, String> timeoutMap;
108    private ExceptionHandler exceptionHandler;
109    private AggregationRepository aggregationRepository;
110    private Map<String, String> closedCorrelationKeys;
111    private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<String>();
112    private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
113    private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>();
114
115    private final AggregateProcessorStatistics statistics = new Statistics();
116    private final AtomicLong totalIn = new AtomicLong();
117    private final AtomicLong totalCompleted = new AtomicLong();
118    private final AtomicLong completedBySize = new AtomicLong();
119    private final AtomicLong completedByStrategy = new AtomicLong();
120    private final AtomicLong completedByInterval = new AtomicLong();
121    private final AtomicLong completedByTimeout = new AtomicLong();
122    private final AtomicLong completedByPredicate = new AtomicLong();
123    private final AtomicLong completedByBatchConsumer = new AtomicLong();
124    private final AtomicLong completedByForce = new AtomicLong();
125
126    // keep booking about redelivery
127    private class RedeliveryData {
128        int redeliveryCounter;
129    }
130
131    private class Statistics implements AggregateProcessorStatistics {
132
133        private boolean statisticsEnabled = true;
134
135        public long getTotalIn() {
136            return totalIn.get();
137        }
138
139        public long getTotalCompleted() {
140            return totalCompleted.get();
141        }
142
143        public long getCompletedBySize() {
144            return completedBySize.get();
145        }
146
147        public long getCompletedByStrategy() {
148            return completedByStrategy.get();
149        }
150
151        public long getCompletedByInterval() {
152            return completedByInterval.get();
153        }
154
155        public long getCompletedByTimeout() {
156            return completedByTimeout.get();
157        }
158
159        public long getCompletedByPredicate() {
160            return completedByPredicate.get();
161        }
162
163        public long getCompletedByBatchConsumer() {
164            return completedByBatchConsumer.get();
165        }
166
167        public long getCompletedByForce() {
168            return completedByForce.get();
169        }
170
171        public void reset() {
172            totalIn.set(0);
173            totalCompleted.set(0);
174            completedBySize.set(0);
175            completedByStrategy.set(0);
176            completedByTimeout.set(0);
177            completedByPredicate.set(0);
178            completedByBatchConsumer.set(0);
179            completedByForce.set(0);
180        }
181
182        public boolean isStatisticsEnabled() {
183            return statisticsEnabled;
184        }
185
186        public void setStatisticsEnabled(boolean statisticsEnabled) {
187            this.statisticsEnabled = statisticsEnabled;
188        }
189    }
190
191    // options
192    private boolean ignoreInvalidCorrelationKeys;
193    private Integer closeCorrelationKeyOnCompletion;
194    private boolean parallelProcessing;
195    private boolean optimisticLocking;
196
197    // different ways to have completion triggered
198    private boolean eagerCheckCompletion;
199    private Predicate completionPredicate;
200    private long completionTimeout;
201    private Expression completionTimeoutExpression;
202    private long completionInterval;
203    private int completionSize;
204    private Expression completionSizeExpression;
205    private boolean completionFromBatchConsumer;
206    private AtomicInteger batchConsumerCounter = new AtomicInteger();
207    private boolean discardOnCompletionTimeout;
208    private boolean forceCompletionOnStop;
209    private boolean completeAllOnStop;
210
211    private ProducerTemplate deadLetterProducerTemplate;
212
213    public AggregateProcessor(CamelContext camelContext, Processor processor,
214                              Expression correlationExpression, AggregationStrategy aggregationStrategy,
215                              ExecutorService executorService, boolean shutdownExecutorService) {
216        ObjectHelper.notNull(camelContext, "camelContext");
217        ObjectHelper.notNull(processor, "processor");
218        ObjectHelper.notNull(correlationExpression, "correlationExpression");
219        ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
220        ObjectHelper.notNull(executorService, "executorService");
221        this.camelContext = camelContext;
222        this.processor = processor;
223        this.correlationExpression = correlationExpression;
224        this.aggregationStrategy = aggregationStrategy;
225        this.executorService = executorService;
226        this.shutdownExecutorService = shutdownExecutorService;
227        this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
228    }
229
230    @Override
231    public String toString() {
232        return "AggregateProcessor[to: " + processor + "]";
233    }
234
235    public String getTraceLabel() {
236        return "aggregate[" + correlationExpression + "]";
237    }
238
239    public List<Processor> next() {
240        if (!hasNext()) {
241            return null;
242        }
243        List<Processor> answer = new ArrayList<Processor>(1);
244        answer.add(processor);
245        return answer;
246    }
247
248    public boolean hasNext() {
249        return processor != null;
250    }
251
252    public String getId() {
253        return id;
254    }
255
256    public void setId(String id) {
257        this.id = id;
258    }
259
260    public void process(Exchange exchange) throws Exception {
261        AsyncProcessorHelper.process(this, exchange);
262    }
263
264    public boolean process(Exchange exchange, AsyncCallback callback) {
265        try {
266            doProcess(exchange);
267        } catch (Throwable e) {
268            exchange.setException(e);
269        }
270        callback.done(true);
271        return true;
272    }
273
274    protected void doProcess(Exchange exchange) throws Exception {
275
276        if (getStatistics().isStatisticsEnabled()) {
277            totalIn.incrementAndGet();
278        }
279
280        //check for the special header to force completion of all groups (and ignore the exchange otherwise)
281        boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
282        if (completeAllGroups) {
283            forceCompletionOfAllGroups();
284            return;
285        }
286
287        // compute correlation expression
288        String key = correlationExpression.evaluate(exchange, String.class);
289        if (ObjectHelper.isEmpty(key)) {
290            // we have a bad correlation key
291            if (isIgnoreInvalidCorrelationKeys()) {
292                LOG.debug("Invalid correlation key. This Exchange will be ignored: {}", exchange);
293                return;
294            } else {
295                throw new CamelExchangeException("Invalid correlation key", exchange);
296            }
297        }
298
299        // is the correlation key closed?
300        if (closedCorrelationKeys != null && closedCorrelationKeys.containsKey(key)) {
301            throw new ClosedCorrelationKeyException(key, exchange);
302        }
303
304        // when optimist locking is enabled we keep trying until we succeed
305        if (optimisticLocking) {
306            List<Exchange> aggregated = null;
307            boolean exhaustedRetries = true;
308            int attempt = 0;
309            do {
310                attempt++;
311                // copy exchange, and do not share the unit of work
312                // the aggregated output runs in another unit of work
313                Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
314                try {
315                    aggregated = doAggregation(key, copy);
316                    exhaustedRetries = false;
317                    break;
318                } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
319                    LOG.trace("On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to add() key: {} and exchange: {}",
320                              new Object[]{attempt, aggregationRepository, key, copy, e});
321                    optimisticLockRetryPolicy.doDelay(attempt);
322                }
323            } while (optimisticLockRetryPolicy.shouldRetry(attempt));
324
325            if (exhaustedRetries) {
326                throw new CamelExchangeException("Exhausted optimistic locking retry attempts, tried " + attempt + " times", exchange,
327                        new OptimisticLockingAggregationRepository.OptimisticLockingException());
328            } else if (aggregated != null) {
329                // we are completed so submit to completion
330                for (Exchange agg : aggregated) {
331                    onSubmitCompletion(key, agg);
332                }
333            }
334        } else {
335            // copy exchange, and do not share the unit of work
336            // the aggregated output runs in another unit of work
337            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
338
339            // when memory based then its fast using synchronized, but if the aggregation repository is IO
340            // bound such as JPA etc then concurrent aggregation per correlation key could
341            // improve performance as we can run aggregation repository get/add in parallel
342            List<Exchange> aggregated = null;
343            lock.lock();
344            try {
345                aggregated = doAggregation(key, copy);
346            } finally {
347                lock.unlock();
348            }
349
350            // we are completed so do that work outside the lock
351            if (aggregated != null) {
352                for (Exchange agg : aggregated) {
353                    onSubmitCompletion(key, agg);
354                }
355            }
356        }
357
358        // check for the special header to force completion of all groups (inclusive of the message)
359        boolean completeAllGroupsInclusive = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class);
360        if (completeAllGroupsInclusive) {
361            forceCompletionOfAllGroups();
362        }
363    }
364
365    /**
366     * Aggregates the exchange with the given correlation key
367     * <p/>
368     * This method <b>must</b> be run synchronized as we cannot aggregate the same correlation key
369     * in parallel.
370     * <p/>
371     * The returned {@link Exchange} should be send downstream using the {@link #onSubmitCompletion(String, org.apache.camel.Exchange)}
372     * method which sends out the aggregated and completed {@link Exchange}.
373     *
374     * @param key      the correlation key
375     * @param newExchange the exchange
376     * @return the aggregated exchange(s) which is complete, or <tt>null</tt> if not yet complete
377     * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating
378     */
379    private List<Exchange> doAggregation(String key, Exchange newExchange) throws CamelExchangeException {
380        LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
381
382        List<Exchange> list = new ArrayList<Exchange>();
383        String complete = null;
384
385        Exchange answer;
386        Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key);
387        Exchange oldExchange = originalExchange;
388
389        Integer size = 1;
390        if (oldExchange != null) {
391            // hack to support legacy AggregationStrategy's that modify and return the oldExchange, these will not
392            // working when using an identify based approach for optimistic locking like the MemoryAggregationRepository.
393            if (optimisticLocking && aggregationRepository instanceof MemoryAggregationRepository) {
394                oldExchange = originalExchange.copy();
395            }
396            size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class);
397            size++;
398        }
399
400        // prepare the exchanges for aggregation
401        ExchangeHelper.prepareAggregation(oldExchange, newExchange);
402
403        // check if we are pre complete
404        if (preCompletion) {
405            try {
406                // put the current aggregated size on the exchange so its avail during completion check
407                newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
408                complete = isPreCompleted(key, oldExchange, newExchange);
409                // make sure to track timeouts if not complete
410                if (complete == null) {
411                    trackTimeout(key, newExchange);
412                }
413                // remove it afterwards
414                newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
415            } catch (Throwable e) {
416                // must catch any exception from aggregation
417                throw new CamelExchangeException("Error occurred during preComplete", newExchange, e);
418            }
419        } else if (isEagerCheckCompletion()) {
420            // put the current aggregated size on the exchange so its avail during completion check
421            newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
422            complete = isCompleted(key, newExchange);
423            // make sure to track timeouts if not complete
424            if (complete == null) {
425                trackTimeout(key, newExchange);
426            }
427            // remove it afterwards
428            newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
429        }
430
431        if (preCompletion && complete != null) {
432            // need to pre complete the current group before we aggregate
433            doAggregationComplete(complete, list, key, originalExchange, oldExchange);
434            // as we complete the current group eager, we should indicate the new group is not complete
435            complete = null;
436            // and clear old/original exchange as we start on a new group
437            oldExchange = null;
438            originalExchange = null;
439            // and reset the size to 1
440            size = 1;
441            // make sure to track timeout as we just restart the correlation group when we are in pre completion mode
442            trackTimeout(key, newExchange);
443        }
444
445        // aggregate the exchanges
446        try {
447            answer = onAggregation(oldExchange, newExchange);
448        } catch (Throwable e) {
449            // must catch any exception from aggregation
450            throw new CamelExchangeException("Error occurred during aggregation", newExchange, e);
451        }
452        if (answer == null) {
453            throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", newExchange);
454        }
455
456        // update the aggregated size
457        answer.setProperty(Exchange.AGGREGATED_SIZE, size);
458
459        // maybe we should check completion after the aggregation
460        if (!preCompletion && !isEagerCheckCompletion()) {
461            complete = isCompleted(key, answer);
462            // make sure to track timeouts if not complete
463            if (complete == null) {
464                trackTimeout(key, newExchange);
465            }
466        }
467
468        if (complete == null) {
469            // only need to update aggregation repository if we are not complete
470            doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
471        } else {
472            // if we are complete then add the answer to the list
473            doAggregationComplete(complete, list, key, originalExchange, answer);
474        }
475
476        LOG.trace("onAggregation +++  end  +++ with correlation key: {}", key);
477        return list;
478    }
479
480    protected void doAggregationComplete(String complete, List<Exchange> list, String key, Exchange originalExchange, Exchange answer) {
481        if ("consumer".equals(complete)) {
482            for (String batchKey : batchConsumerCorrelationKeys) {
483                Exchange batchAnswer;
484                if (batchKey.equals(key)) {
485                    // skip the current aggregated key as we have already aggregated it and have the answer
486                    batchAnswer = answer;
487                } else {
488                    batchAnswer = aggregationRepository.get(camelContext, batchKey);
489                }
490
491                if (batchAnswer != null) {
492                    batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
493                    onCompletion(batchKey, originalExchange, batchAnswer, false);
494                    list.add(batchAnswer);
495                }
496            }
497            batchConsumerCorrelationKeys.clear();
498            // we have already submitted to completion, so answer should be null
499            answer = null;
500        } else if (answer != null) {
501            // we are complete for this exchange
502            answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
503            answer = onCompletion(key, originalExchange, answer, false);
504        }
505
506        if (answer != null) {
507            list.add(answer);
508        }
509    }
510
511    protected void doAggregationRepositoryAdd(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) {
512        LOG.trace("In progress aggregated oldExchange: {}, newExchange: {} with correlation key: {}", new Object[]{oldExchange, newExchange, key});
513        if (optimisticLocking) {
514            try {
515                ((OptimisticLockingAggregationRepository)aggregationRepository).add(camelContext, key, oldExchange, newExchange);
516            } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
517                onOptimisticLockingFailure(oldExchange, newExchange);
518                throw e;
519            }
520        } else {
521            aggregationRepository.add(camelContext, key, newExchange);
522        }
523    }
524
525    protected void onOptimisticLockingFailure(Exchange oldExchange, Exchange newExchange) {
526        AggregationStrategy strategy = aggregationStrategy;
527        if (strategy instanceof DelegateAggregationStrategy) {
528            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
529        }
530        if (strategy instanceof OptimisticLockingAwareAggregationStrategy) {
531            LOG.trace("onOptimisticLockFailure with AggregationStrategy: {}, oldExchange: {}, newExchange: {}",
532                      new Object[]{strategy, oldExchange, newExchange});
533            ((OptimisticLockingAwareAggregationStrategy)strategy).onOptimisticLockFailure(oldExchange, newExchange);
534        }
535    }
536
537    /**
538     * Tests whether the given exchanges is pre-complete or not
539     *
540     * @param key      the correlation key
541     * @param oldExchange   the existing exchange
542     * @param newExchange the incoming exchange
543     * @return <tt>null</tt> if not pre-completed, otherwise a String with the type that triggered the pre-completion
544     */
545    protected String isPreCompleted(String key, Exchange oldExchange, Exchange newExchange) {
546        boolean complete = false;
547        AggregationStrategy strategy = aggregationStrategy;
548        if (strategy instanceof DelegateAggregationStrategy) {
549            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
550        }
551        if (strategy instanceof PreCompletionAwareAggregationStrategy) {
552            complete = ((PreCompletionAwareAggregationStrategy) strategy).preComplete(oldExchange, newExchange);
553        }
554        return complete ? "strategy" : null;
555    }
556
557    /**
558     * Tests whether the given exchange is complete or not
559     *
560     * @param key      the correlation key
561     * @param exchange the incoming exchange
562     * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion
563     */
564    protected String isCompleted(String key, Exchange exchange) {
565        // batch consumer completion must always run first
566        if (isCompletionFromBatchConsumer()) {
567            batchConsumerCorrelationKeys.add(key);
568            batchConsumerCounter.incrementAndGet();
569            int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
570            if (size > 0 && batchConsumerCounter.intValue() >= size) {
571                // batch consumer is complete then reset the counter
572                batchConsumerCounter.set(0);
573                return "consumer";
574            }
575        }
576
577        if (exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, boolean.class)) {
578            return "strategy";
579        }
580
581        if (getCompletionPredicate() != null) {
582            boolean answer = getCompletionPredicate().matches(exchange);
583            if (answer) {
584                return "predicate";
585            }
586        }
587
588        boolean sizeChecked = false;
589        if (getCompletionSizeExpression() != null) {
590            Integer value = getCompletionSizeExpression().evaluate(exchange, Integer.class);
591            if (value != null && value > 0) {
592                // mark as already checked size as expression takes precedence over static configured
593                sizeChecked = true;
594                int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
595                if (size >= value) {
596                    return "size";
597                }
598            }
599        }
600        if (!sizeChecked && getCompletionSize() > 0) {
601            int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
602            if (size >= getCompletionSize()) {
603                return "size";
604            }
605        }
606
607        // not complete
608        return null;
609    }
610
611    protected void trackTimeout(String key, Exchange exchange) {
612        // timeout can be either evaluated based on an expression or from a fixed value
613        // expression takes precedence
614        boolean timeoutSet = false;
615        if (getCompletionTimeoutExpression() != null) {
616            Long value = getCompletionTimeoutExpression().evaluate(exchange, Long.class);
617            if (value != null && value > 0) {
618                if (LOG.isTraceEnabled()) {
619                    LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}",
620                            new Object[]{key, value, exchange});
621                }
622                addExchangeToTimeoutMap(key, exchange, value);
623                timeoutSet = true;
624            }
625        }
626        if (!timeoutSet && getCompletionTimeout() > 0) {
627            // timeout is used so use the timeout map to keep an eye on this
628            if (LOG.isTraceEnabled()) {
629                LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}",
630                        new Object[]{key, getCompletionTimeout(), exchange});
631            }
632            addExchangeToTimeoutMap(key, exchange, getCompletionTimeout());
633        }
634    }
635
636    protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
637        return aggregationStrategy.aggregate(oldExchange, newExchange);
638    }
639
640    protected boolean onPreCompletionAggregation(Exchange oldExchange, Exchange newExchange) {
641        AggregationStrategy strategy = aggregationStrategy;
642        if (strategy instanceof DelegateAggregationStrategy) {
643            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
644        }
645        if (strategy instanceof PreCompletionAwareAggregationStrategy) {
646            return ((PreCompletionAwareAggregationStrategy) strategy).preComplete(oldExchange, newExchange);
647        }
648        return false;
649    }
650
651    protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) {
652        // store the correlation key as property before we remove so the repository has that information
653        if (original != null) {
654            original.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
655        }
656        aggregated.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
657
658        // only remove if we have previous added (as we could potentially complete with only 1 exchange)
659        // (if we have previous added then we have that as the original exchange)
660        if (original != null) {
661            // remove from repository as its completed, we do this first as to trigger any OptimisticLockingException's
662            aggregationRepository.remove(aggregated.getContext(), key, original);
663        }
664
665        if (!fromTimeout && timeoutMap != null) {
666            // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
667            LOG.trace("Removing correlation key {} from timeout", key);
668            timeoutMap.remove(key);
669        }
670
671        // this key has been closed so add it to the closed map
672        if (closedCorrelationKeys != null) {
673            closedCorrelationKeys.put(key, key);
674        }
675
676        if (fromTimeout) {
677            // invoke timeout if its timeout aware aggregation strategy,
678            // to allow any custom processing before discarding the exchange
679            AggregationStrategy strategy = aggregationStrategy;
680            if (strategy instanceof DelegateAggregationStrategy) {
681                strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
682            }
683            if (strategy instanceof TimeoutAwareAggregationStrategy) {
684                long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1;
685                ((TimeoutAwareAggregationStrategy) strategy).timeout(aggregated, -1, -1, timeout);
686            }
687        }
688
689        Exchange answer;
690        if (fromTimeout && isDiscardOnCompletionTimeout()) {
691            // discard due timeout
692            LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
693            // must confirm the discarded exchange
694            aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId());
695            // and remove redelivery state as well
696            redeliveryState.remove(aggregated.getExchangeId());
697            // the completion was from timeout and we should just discard it
698            answer = null;
699        } else {
700            // the aggregated exchange should be published (sent out)
701            answer = aggregated;
702        }
703
704        return answer;
705    }
706
707    private void onSubmitCompletion(final String key, final Exchange exchange) {
708        LOG.debug("Aggregation complete for correlation key {} sending aggregated exchange: {}", key, exchange);
709
710        // add this as in progress before we submit the task
711        inProgressCompleteExchanges.add(exchange.getExchangeId());
712
713        // invoke the on completion callback
714        AggregationStrategy target = aggregationStrategy;
715        if (target instanceof DelegateAggregationStrategy) {
716            target = ((DelegateAggregationStrategy) target).getDelegate();
717        }
718        if (target instanceof CompletionAwareAggregationStrategy) {
719            ((CompletionAwareAggregationStrategy) target).onCompletion(exchange);
720        }
721
722        if (getStatistics().isStatisticsEnabled()) {
723            totalCompleted.incrementAndGet();
724
725            String completedBy = exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class);
726            if ("interval".equals(completedBy)) {
727                completedByInterval.incrementAndGet();
728            } else if ("timeout".equals(completedBy)) {
729                completedByTimeout.incrementAndGet();
730            } else if ("force".equals(completedBy)) {
731                completedByForce.incrementAndGet();
732            } else if ("consumer".equals(completedBy)) {
733                completedByBatchConsumer.incrementAndGet();
734            } else if ("predicate".equals(completedBy)) {
735                completedByPredicate.incrementAndGet();
736            } else if ("size".equals(completedBy)) {
737                completedBySize.incrementAndGet();
738            } else if ("strategy".equals(completedBy)) {
739                completedByStrategy.incrementAndGet();
740            }
741        }
742
743        // send this exchange
744        executorService.submit(new Runnable() {
745            public void run() {
746                LOG.debug("Processing aggregated exchange: {}", exchange);
747
748                // add on completion task so we remember to update the inProgressCompleteExchanges
749                exchange.addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId()));
750
751                try {
752                    processor.process(exchange);
753                } catch (Throwable e) {
754                    exchange.setException(e);
755                }
756
757                // log exception if there was a problem
758                if (exchange.getException() != null) {
759                    // if there was an exception then let the exception handler handle it
760                    getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException());
761                } else {
762                    LOG.trace("Processing aggregated exchange: {} complete.", exchange);
763                }
764            }
765        });
766    }
767
768    /**
769     * Restores the timeout map with timeout values from the aggregation repository.
770     * <p/>
771     * This is needed in case the aggregator has been stopped and started again (for example a server restart).
772     * Then the existing exchanges from the {@link AggregationRepository} must have their timeout conditions restored.
773     */
774    protected void restoreTimeoutMapFromAggregationRepository() throws Exception {
775        // grab the timeout value for each partly aggregated exchange
776        Set<String> keys = aggregationRepository.getKeys();
777        if (keys == null || keys.isEmpty()) {
778            return;
779        }
780
781        StopWatch watch = new StopWatch();
782        LOG.trace("Starting restoring CompletionTimeout for {} existing exchanges from the aggregation repository...", keys.size());
783
784        for (String key : keys) {
785            Exchange exchange = aggregationRepository.get(camelContext, key);
786            // grab the timeout value
787            long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0;
788            if (timeout > 0) {
789                LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout);
790                addExchangeToTimeoutMap(key, exchange, timeout);
791            }
792        }
793
794        // log duration of this task so end user can see how long it takes to pre-check this upon starting
795        LOG.info("Restored {} CompletionTimeout conditions in the AggregationTimeoutChecker in {}",
796                timeoutMap.size(), TimeUtils.printDuration(watch.stop()));
797    }
798
799    /**
800     * Adds the given exchange to the timeout map, which is used by the timeout checker task to trigger timeouts.
801     *
802     * @param key      the correlation key
803     * @param exchange the exchange
804     * @param timeout  the timeout value in millis
805     */
806    private void addExchangeToTimeoutMap(String key, Exchange exchange, long timeout) {
807        // store the timeout value on the exchange as well, in case we need it later
808        exchange.setProperty(Exchange.AGGREGATED_TIMEOUT, timeout);
809        timeoutMap.put(key, exchange.getExchangeId(), timeout);
810    }
811
812    /**
813     * Current number of closed correlation keys in the memory cache
814     */
815    public int getClosedCorrelationKeysCacheSize() {
816        if (closedCorrelationKeys != null) {
817            return closedCorrelationKeys.size();
818        } else {
819            return 0;
820        }
821    }
822
823    /**
824     * Clear all the closed correlation keys stored in the cache
825     */
826    public void clearClosedCorrelationKeysCache() {
827        if (closedCorrelationKeys != null) {
828            closedCorrelationKeys.clear();
829        }
830    }
831
832    public AggregateProcessorStatistics getStatistics() {
833        return statistics;
834    }
835
836    public int getInProgressCompleteExchanges() {
837        return inProgressCompleteExchanges.size();
838    }
839
840    public Predicate getCompletionPredicate() {
841        return completionPredicate;
842    }
843
844    public void setCompletionPredicate(Predicate completionPredicate) {
845        this.completionPredicate = completionPredicate;
846    }
847
848    public boolean isEagerCheckCompletion() {
849        return eagerCheckCompletion;
850    }
851
852    public void setEagerCheckCompletion(boolean eagerCheckCompletion) {
853        this.eagerCheckCompletion = eagerCheckCompletion;
854    }
855
856    public long getCompletionTimeout() {
857        return completionTimeout;
858    }
859
860    public void setCompletionTimeout(long completionTimeout) {
861        this.completionTimeout = completionTimeout;
862    }
863
864    public Expression getCompletionTimeoutExpression() {
865        return completionTimeoutExpression;
866    }
867
868    public void setCompletionTimeoutExpression(Expression completionTimeoutExpression) {
869        this.completionTimeoutExpression = completionTimeoutExpression;
870    }
871
872    public long getCompletionInterval() {
873        return completionInterval;
874    }
875
876    public void setCompletionInterval(long completionInterval) {
877        this.completionInterval = completionInterval;
878    }
879
880    public int getCompletionSize() {
881        return completionSize;
882    }
883
884    public void setCompletionSize(int completionSize) {
885        this.completionSize = completionSize;
886    }
887
888    public Expression getCompletionSizeExpression() {
889        return completionSizeExpression;
890    }
891
892    public void setCompletionSizeExpression(Expression completionSizeExpression) {
893        this.completionSizeExpression = completionSizeExpression;
894    }
895
896    public boolean isIgnoreInvalidCorrelationKeys() {
897        return ignoreInvalidCorrelationKeys;
898    }
899
900    public void setIgnoreInvalidCorrelationKeys(boolean ignoreInvalidCorrelationKeys) {
901        this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys;
902    }
903
904    public Integer getCloseCorrelationKeyOnCompletion() {
905        return closeCorrelationKeyOnCompletion;
906    }
907
908    public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
909        this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
910    }
911
912    public boolean isCompletionFromBatchConsumer() {
913        return completionFromBatchConsumer;
914    }
915
916    public void setCompletionFromBatchConsumer(boolean completionFromBatchConsumer) {
917        this.completionFromBatchConsumer = completionFromBatchConsumer;
918    }
919
920    public boolean isCompleteAllOnStop() {
921        return completeAllOnStop;
922    }
923
924    public ExceptionHandler getExceptionHandler() {
925        return exceptionHandler;
926    }
927
928    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
929        this.exceptionHandler = exceptionHandler;
930    }
931
932    public boolean isParallelProcessing() {
933        return parallelProcessing;
934    }
935
936    public void setParallelProcessing(boolean parallelProcessing) {
937        this.parallelProcessing = parallelProcessing;
938    }
939
940    public boolean isOptimisticLocking() {
941        return optimisticLocking;
942    }
943
944    public void setOptimisticLocking(boolean optimisticLocking) {
945        this.optimisticLocking = optimisticLocking;
946    }
947
948    public AggregationRepository getAggregationRepository() {
949        return aggregationRepository;
950    }
951
952    public void setAggregationRepository(AggregationRepository aggregationRepository) {
953        this.aggregationRepository = aggregationRepository;
954    }
955
956    public boolean isDiscardOnCompletionTimeout() {
957        return discardOnCompletionTimeout;
958    }
959
960    public void setDiscardOnCompletionTimeout(boolean discardOnCompletionTimeout) {
961        this.discardOnCompletionTimeout = discardOnCompletionTimeout;
962    }
963
964    public void setForceCompletionOnStop(boolean forceCompletionOnStop) {
965        this.forceCompletionOnStop = forceCompletionOnStop;
966    }
967
968    public void setCompleteAllOnStop(boolean completeAllOnStop) {
969        this.completeAllOnStop = completeAllOnStop;
970    }
971
972    public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
973        this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
974    }
975
976    public ScheduledExecutorService getTimeoutCheckerExecutorService() {
977        return timeoutCheckerExecutorService;
978    }
979
980    public boolean isShutdownTimeoutCheckerExecutorService() {
981        return shutdownTimeoutCheckerExecutorService;
982    }
983
984    public void setShutdownTimeoutCheckerExecutorService(boolean shutdownTimeoutCheckerExecutorService) {
985        this.shutdownTimeoutCheckerExecutorService = shutdownTimeoutCheckerExecutorService;
986    }
987
988    public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) {
989        this.optimisticLockRetryPolicy = optimisticLockRetryPolicy;
990    }
991
992    public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() {
993        return optimisticLockRetryPolicy;
994    }
995
996    public AggregationStrategy getAggregationStrategy() {
997        return aggregationStrategy;
998    }
999
1000    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
1001        this.aggregationStrategy = aggregationStrategy;
1002    }
1003
1004    public Expression getCorrelationExpression() {
1005        return correlationExpression;
1006    }
1007
1008    public void setCorrelationExpression(Expression correlationExpression) {
1009        this.correlationExpression = correlationExpression;
1010    }
1011
1012    public AggregateController getAggregateController() {
1013        return aggregateController;
1014    }
1015
1016    public void setAggregateController(AggregateController aggregateController) {
1017        this.aggregateController = aggregateController;
1018    }
1019
1020    /**
1021     * On completion task which keeps the booking of the in progress up to date
1022     */
1023    private final class AggregateOnCompletion implements Synchronization {
1024        private final String exchangeId;
1025
1026        private AggregateOnCompletion(String exchangeId) {
1027            // must use the original exchange id as it could potentially change if send over SEDA etc.
1028            this.exchangeId = exchangeId;
1029        }
1030
1031        public void onFailure(Exchange exchange) {
1032            LOG.trace("Aggregated exchange onFailure: {}", exchange);
1033
1034            // must remember to remove in progress when we failed
1035            inProgressCompleteExchanges.remove(exchangeId);
1036            // do not remove redelivery state as we need it when we redeliver again later
1037        }
1038
1039        public void onComplete(Exchange exchange) {
1040            LOG.trace("Aggregated exchange onComplete: {}", exchange);
1041
1042            // only confirm if we processed without a problem
1043            try {
1044                aggregationRepository.confirm(exchange.getContext(), exchangeId);
1045                // and remove redelivery state as well
1046                redeliveryState.remove(exchangeId);
1047            } finally {
1048                // must remember to remove in progress when we are complete
1049                inProgressCompleteExchanges.remove(exchangeId);
1050            }
1051        }
1052
1053        @Override
1054        public String toString() {
1055            return "AggregateOnCompletion";
1056        }
1057    }
1058
1059    /**
1060     * Background task that looks for aggregated exchanges which is triggered by completion timeouts.
1061     */
1062    private final class AggregationTimeoutMap extends DefaultTimeoutMap<String, String> {
1063
1064        private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
1065            // do NOT use locking on the timeout map as this aggregator has its own shared lock we will use instead
1066            super(executor, requestMapPollTimeMillis, optimisticLocking);
1067        }
1068
1069        @Override
1070        public void purge() {
1071            // must acquire the shared aggregation lock to be able to purge
1072            if (!optimisticLocking) {
1073                lock.lock();
1074            }
1075            try {
1076                super.purge();
1077            } finally {
1078                if (!optimisticLocking) {
1079                    lock.unlock();
1080                }
1081            }
1082        }
1083
1084        @Override
1085        public boolean onEviction(String key, String exchangeId) {
1086            log.debug("Completion timeout triggered for correlation key: {}", key);
1087
1088            boolean inProgress = inProgressCompleteExchanges.contains(exchangeId);
1089            if (inProgress) {
1090                LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
1091                return true;
1092            }
1093
1094            // get the aggregated exchange
1095            boolean evictionStolen = false;
1096            Exchange answer = aggregationRepository.get(camelContext, key);
1097            if (answer == null) {
1098                evictionStolen = true;
1099            } else {
1100                // indicate it was completed by timeout
1101                answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
1102                try {
1103                    answer = onCompletion(key, answer, answer, true);
1104                    if (answer != null) {
1105                        onSubmitCompletion(key, answer);
1106                    }
1107                } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
1108                    evictionStolen = true;
1109                }
1110            }
1111
1112            if (optimisticLocking && evictionStolen) {
1113                LOG.debug("Another Camel instance has already successfully correlated or processed this timeout eviction "
1114                          + "for exchange with id: {} and correlation id: {}", exchangeId, key);
1115            }
1116            return true;
1117        }
1118    }
1119
1120    /**
1121     * Background task that triggers completion based on interval.
1122     */
1123    private final class AggregationIntervalTask implements Runnable {
1124
1125        public void run() {
1126            // only run if CamelContext has been fully started
1127            if (!camelContext.getStatus().isStarted()) {
1128                LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", camelContext.getName());
1129                return;
1130            }
1131
1132            LOG.trace("Starting completion interval task");
1133
1134            // trigger completion for all in the repository
1135            Set<String> keys = aggregationRepository.getKeys();
1136
1137            if (keys != null && !keys.isEmpty()) {
1138                // must acquire the shared aggregation lock to be able to trigger interval completion
1139                if (!optimisticLocking) {
1140                    lock.lock();
1141                }
1142                try {
1143                    for (String key : keys) {
1144                        boolean stolenInterval = false;
1145                        Exchange exchange = aggregationRepository.get(camelContext, key);
1146                        if (exchange == null) {
1147                            stolenInterval = true;
1148                        } else {
1149                            LOG.trace("Completion interval triggered for correlation key: {}", key);
1150                            // indicate it was completed by interval
1151                            exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
1152                            try {
1153                                Exchange answer = onCompletion(key, exchange, exchange, false);
1154                                if (answer != null) {
1155                                    onSubmitCompletion(key, answer);
1156                                }
1157                            } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
1158                                stolenInterval = true;
1159                            }
1160                        }
1161                        if (optimisticLocking && stolenInterval) {
1162                            LOG.debug("Another Camel instance has already processed this interval aggregation for exchange with correlation id: {}", key);
1163                        }
1164                    }
1165                } finally {
1166                    if (!optimisticLocking) {
1167                        lock.unlock();
1168                    }
1169                }
1170            }
1171
1172            LOG.trace("Completion interval task complete");
1173        }
1174    }
1175
1176    /**
1177     * Background task that looks for aggregated exchanges to recover.
1178     */
1179    private final class RecoverTask implements Runnable {
1180        private final RecoverableAggregationRepository recoverable;
1181
1182        private RecoverTask(RecoverableAggregationRepository recoverable) {
1183            this.recoverable = recoverable;
1184        }
1185
1186        public void run() {
1187            // only run if CamelContext has been fully started
1188            if (!camelContext.getStatus().isStarted()) {
1189                LOG.trace("Recover check cannot start due CamelContext({}) has not been started yet", camelContext.getName());
1190                return;
1191            }
1192
1193            LOG.trace("Starting recover check");
1194
1195            // copy the current in progress before doing scan
1196            final Set<String> copyOfInProgress = new LinkedHashSet<String>(inProgressCompleteExchanges);
1197
1198            Set<String> exchangeIds = recoverable.scan(camelContext);
1199            for (String exchangeId : exchangeIds) {
1200
1201                // we may shutdown while doing recovery
1202                if (!isRunAllowed()) {
1203                    LOG.info("We are shutting down so stop recovering");
1204                    return;
1205                }
1206
1207                // consider in progress if it was in progress before we did the scan, or currently after we did the scan
1208                // its safer to consider it in progress than risk duplicates due both in progress + recovered
1209                boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId);
1210                if (inProgress) {
1211                    LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
1212                } else {
1213                    LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId);
1214                    Exchange exchange = recoverable.recover(camelContext, exchangeId);
1215                    if (exchange != null) {
1216                        // get the correlation key
1217                        String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
1218                        // and mark it as redelivered
1219                        exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
1220
1221                        // get the current redelivery data
1222                        RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
1223
1224                        // if we are exhausted, then move to dead letter channel
1225                        if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
1226                            LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries()
1227                                    + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri());
1228
1229                            // send to DLC
1230                            try {
1231                                // set redelivery counter
1232                                exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
1233                                exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
1234                                deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
1235                            } catch (Throwable e) {
1236                                exchange.setException(e);
1237                            }
1238
1239                            // handle if failed
1240                            if (exchange.getException() != null) {
1241                                getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException());
1242                            } else {
1243                                // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
1244                                recoverable.confirm(camelContext, exchangeId);
1245                            }
1246                        } else {
1247                            // update current redelivery state
1248                            if (data == null) {
1249                                // create new data
1250                                data = new RedeliveryData();
1251                                redeliveryState.put(exchange.getExchangeId(), data);
1252                            }
1253                            data.redeliveryCounter++;
1254
1255                            // set redelivery counter
1256                            exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
1257                            if (recoverable.getMaximumRedeliveries() > 0) {
1258                                exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries());
1259                            }
1260
1261                            LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId);
1262
1263                            // not exhaust so resubmit the recovered exchange
1264                            onSubmitCompletion(key, exchange);
1265                        }
1266                    }
1267                }
1268            }
1269
1270            LOG.trace("Recover check complete");
1271        }
1272    }
1273
1274    @Override
1275    protected void doStart() throws Exception {
1276        AggregationStrategy strategy = aggregationStrategy;
1277        if (strategy instanceof DelegateAggregationStrategy) {
1278            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
1279        }
1280        if (strategy instanceof PreCompletionAwareAggregationStrategy) {
1281            preCompletion = true;
1282            LOG.info("PreCompletionAwareAggregationStrategy detected. Aggregator {} is in pre-completion mode.", getId());
1283        }
1284
1285        if (!preCompletion) {
1286            // if not in pre completion mode then check we configured the completion required
1287            if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
1288                    && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null
1289                    && getCompletionSizeExpression() == null) {
1290                throw new IllegalStateException("At least one of the completions options"
1291                        + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
1292            }
1293        }
1294
1295        if (getCloseCorrelationKeyOnCompletion() != null) {
1296            if (getCloseCorrelationKeyOnCompletion() > 0) {
1297                LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of " + getCloseCorrelationKeyOnCompletion());
1298                closedCorrelationKeys = new LRUCache<String, String>(getCloseCorrelationKeyOnCompletion());
1299            } else {
1300                LOG.info("Using ClosedCorrelationKeys with unbounded capacity");
1301                closedCorrelationKeys = new ConcurrentHashMap<String, String>();
1302            }
1303        }
1304
1305        if (aggregationRepository == null) {
1306            aggregationRepository = new MemoryAggregationRepository(optimisticLocking);
1307            LOG.info("Defaulting to MemoryAggregationRepository");
1308        }
1309
1310        if (optimisticLocking) {
1311            if (!(aggregationRepository instanceof OptimisticLockingAggregationRepository)) {
1312                throw new IllegalArgumentException("Optimistic locking cannot be enabled without using an AggregationRepository that implements OptimisticLockingAggregationRepository");
1313            }
1314            LOG.info("Optimistic locking is enabled");
1315        }
1316
1317        ServiceHelper.startServices(aggregationStrategy, processor, aggregationRepository);
1318
1319        // should we use recover checker
1320        if (aggregationRepository instanceof RecoverableAggregationRepository) {
1321            RecoverableAggregationRepository recoverable = (RecoverableAggregationRepository) aggregationRepository;
1322            if (recoverable.isUseRecovery()) {
1323                long interval = recoverable.getRecoveryIntervalInMillis();
1324                if (interval <= 0) {
1325                    throw new IllegalArgumentException("AggregationRepository has recovery enabled and the RecoveryInterval option must be a positive number, was: " + interval);
1326                }
1327
1328                // create a background recover thread to check every interval
1329                recoverService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateRecoverChecker", 1);
1330                Runnable recoverTask = new RecoverTask(recoverable);
1331                LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every " + interval + " millis.");
1332                // use fixed delay so there is X interval between each run
1333                recoverService.scheduleWithFixedDelay(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS);
1334
1335                if (recoverable.getDeadLetterUri() != null) {
1336                    int max = recoverable.getMaximumRedeliveries();
1337                    if (max <= 0) {
1338                        throw new IllegalArgumentException("Option maximumRedeliveries must be a positive number, was: " + max);
1339                    }
1340                    LOG.info("After " + max + " failed redelivery attempts Exchanges will be moved to deadLetterUri: " + recoverable.getDeadLetterUri());
1341
1342                    // dead letter uri must be a valid endpoint
1343                    Endpoint endpoint = camelContext.getEndpoint(recoverable.getDeadLetterUri());
1344                    if (endpoint == null) {
1345                        throw new NoSuchEndpointException(recoverable.getDeadLetterUri());
1346                    }
1347                    deadLetterProducerTemplate = camelContext.createProducerTemplate();
1348                }
1349            }
1350        }
1351
1352        if (getCompletionInterval() > 0 && getCompletionTimeout() > 0) {
1353            throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both.");
1354        }
1355        if (getCompletionInterval() > 0) {
1356            LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis.");
1357            if (getTimeoutCheckerExecutorService() == null) {
1358                setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
1359                shutdownTimeoutCheckerExecutorService = true;
1360            }
1361            // trigger completion based on interval
1362            getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS);
1363        }
1364
1365        // start timeout service if its in use
1366        if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
1367            LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout() + " millis of inactivity.");
1368            if (getTimeoutCheckerExecutorService() == null) {
1369                setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
1370                shutdownTimeoutCheckerExecutorService = true;
1371            }
1372            // check for timed out aggregated messages once every second
1373            timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L);
1374            // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we
1375            // need to re-establish the timeout map so timeout can trigger
1376            restoreTimeoutMapFromAggregationRepository();
1377            ServiceHelper.startService(timeoutMap);
1378        }
1379
1380        if (aggregateController == null) {
1381            aggregateController = new DefaultAggregateController();
1382        }
1383        aggregateController.onStart(this);
1384    }
1385
1386    @Override
1387    protected void doStop() throws Exception {
1388        // note: we cannot do doForceCompletionOnStop from this doStop method
1389        // as this is handled in the prepareShutdown method which is also invoked when stopping a route
1390        // and is better suited for preparing to shutdown than this doStop method is
1391
1392        if (aggregateController != null) {
1393            aggregateController.onStop(this);
1394        }
1395
1396        if (recoverService != null) {
1397            camelContext.getExecutorServiceManager().shutdown(recoverService);
1398        }
1399        ServiceHelper.stopServices(timeoutMap, processor, deadLetterProducerTemplate);
1400
1401        if (closedCorrelationKeys != null) {
1402            // it may be a service so stop it as well
1403            ServiceHelper.stopService(closedCorrelationKeys);
1404            closedCorrelationKeys.clear();
1405        }
1406        batchConsumerCorrelationKeys.clear();
1407        redeliveryState.clear();
1408    }
1409
1410    @Override
1411    public void prepareShutdown(boolean suspendOnly, boolean forced) {
1412        // we are shutting down, so force completion if this option was enabled
1413        // but only do this when forced=false, as that is when we have chance to
1414        // send out new messages to be routed by Camel. When forced=true, then
1415        // we have to shutdown in a hurry
1416        if (!forced && forceCompletionOnStop) {
1417            doForceCompletionOnStop();
1418        }
1419    }
1420
1421    @Override
1422    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
1423        // not in use
1424        return true;
1425    }
1426
1427    @Override
1428    public int getPendingExchangesSize() {
1429        if (completeAllOnStop) {
1430            // we want to regard all pending exchanges in the repo as inflight
1431            Set<String> keys = getAggregationRepository().getKeys();
1432            return keys != null ? keys.size() : 0;
1433        } else {
1434            return 0;
1435        }
1436    }
1437
1438    private void doForceCompletionOnStop() {
1439        int expected = forceCompletionOfAllGroups();
1440
1441        StopWatch watch = new StopWatch();
1442        while (inProgressCompleteExchanges.size() > 0) {
1443            LOG.trace("Waiting for {} inflight exchanges to complete", getInProgressCompleteExchanges());
1444            try {
1445                Thread.sleep(100);
1446            } catch (InterruptedException e) {
1447                // break out as we got interrupted such as the JVM terminating
1448                LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", getInProgressCompleteExchanges());
1449                break;
1450            }
1451        }
1452
1453        if (expected > 0) {
1454            LOG.info("Forcing completion of all groups with {} exchanges completed in {}", expected, TimeUtils.printDuration(watch.stop()));
1455        }
1456    }
1457
1458    @Override
1459    protected void doShutdown() throws Exception {
1460        // shutdown aggregation repository and the strategy
1461        ServiceHelper.stopAndShutdownServices(aggregationRepository, aggregationStrategy);
1462
1463        // cleanup when shutting down
1464        inProgressCompleteExchanges.clear();
1465
1466        if (shutdownExecutorService) {
1467            camelContext.getExecutorServiceManager().shutdownNow(executorService);
1468        }
1469        if (shutdownTimeoutCheckerExecutorService) {
1470            camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService);
1471            timeoutCheckerExecutorService = null;
1472        }
1473
1474        super.doShutdown();
1475    }
1476
1477    public int forceCompletionOfGroup(String key) {
1478        // must acquire the shared aggregation lock to be able to trigger force completion
1479        int total = 0;
1480
1481        if (!optimisticLocking) {
1482            lock.lock();
1483        }
1484        try {
1485            Exchange exchange = aggregationRepository.get(camelContext, key);
1486            if (exchange != null) {
1487                total = 1;
1488                LOG.trace("Force completion triggered for correlation key: {}", key);
1489                // indicate it was completed by a force completion request
1490                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force");
1491                Exchange answer = onCompletion(key, exchange, exchange, false);
1492                if (answer != null) {
1493                    onSubmitCompletion(key, answer);
1494                }
1495            }
1496        } finally {
1497            if (!optimisticLocking) {
1498                lock.unlock(); 
1499            }
1500        }
1501        LOG.trace("Completed force completion of group {}", key);
1502
1503        if (total > 0) {
1504            LOG.debug("Forcing completion of group {} with {} exchanges", key, total);
1505        }
1506        return total;
1507    }
1508
1509    public int forceCompletionOfAllGroups() {
1510
1511        // only run if CamelContext has been fully started or is stopping
1512        boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping();
1513        if (!allow) {
1514            LOG.warn("Cannot start force completion of all groups because CamelContext({}) has not been started", camelContext.getName());
1515            return 0;
1516        }
1517
1518        LOG.trace("Starting force completion of all groups task");
1519
1520        // trigger completion for all in the repository
1521        Set<String> keys = aggregationRepository.getKeys();
1522
1523        int total = 0;
1524        if (keys != null && !keys.isEmpty()) {
1525            // must acquire the shared aggregation lock to be able to trigger force completion
1526            if (!optimisticLocking) {
1527                lock.lock(); 
1528            }
1529            total = keys.size();
1530            try {
1531                for (String key : keys) {
1532                    Exchange exchange = aggregationRepository.get(camelContext, key);
1533                    if (exchange != null) {
1534                        LOG.trace("Force completion triggered for correlation key: {}", key);
1535                        // indicate it was completed by a force completion request
1536                        exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force");
1537                        Exchange answer = onCompletion(key, exchange, exchange, false);
1538                        if (answer != null) {
1539                            onSubmitCompletion(key, answer);
1540                        }
1541                    }
1542                }
1543            } finally {
1544                if (!optimisticLocking) {
1545                    lock.unlock();
1546                }
1547            }
1548        }
1549        LOG.trace("Completed force completion of all groups task");
1550
1551        if (total > 0) {
1552            LOG.debug("Forcing completion of all groups with {} exchanges", total);
1553        }
1554        return total;
1555    }
1556
1557}