001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor.aggregate;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.LinkedHashSet;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025import java.util.concurrent.ConcurrentHashMap;
026import java.util.concurrent.ConcurrentSkipListSet;
027import java.util.concurrent.ExecutorService;
028import java.util.concurrent.ScheduledExecutorService;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.atomic.AtomicBoolean;
031import java.util.concurrent.atomic.AtomicInteger;
032import java.util.concurrent.atomic.AtomicLong;
033import java.util.concurrent.locks.Lock;
034import java.util.concurrent.locks.ReentrantLock;
035
036import org.apache.camel.AsyncCallback;
037import org.apache.camel.AsyncProcessor;
038import org.apache.camel.CamelContext;
039import org.apache.camel.CamelContextAware;
040import org.apache.camel.CamelExchangeException;
041import org.apache.camel.Endpoint;
042import org.apache.camel.Exchange;
043import org.apache.camel.Expression;
044import org.apache.camel.Navigate;
045import org.apache.camel.NoSuchEndpointException;
046import org.apache.camel.Predicate;
047import org.apache.camel.Processor;
048import org.apache.camel.ProducerTemplate;
049import org.apache.camel.ShutdownRunningTask;
050import org.apache.camel.TimeoutMap;
051import org.apache.camel.Traceable;
052import org.apache.camel.spi.AggregationRepository;
053import org.apache.camel.spi.ExceptionHandler;
054import org.apache.camel.spi.IdAware;
055import org.apache.camel.spi.OptimisticLockingAggregationRepository;
056import org.apache.camel.spi.RecoverableAggregationRepository;
057import org.apache.camel.spi.ShutdownAware;
058import org.apache.camel.spi.ShutdownPrepared;
059import org.apache.camel.spi.Synchronization;
060import org.apache.camel.support.DefaultTimeoutMap;
061import org.apache.camel.support.LoggingExceptionHandler;
062import org.apache.camel.support.ServiceSupport;
063import org.apache.camel.util.AsyncProcessorHelper;
064import org.apache.camel.util.ExchangeHelper;
065import org.apache.camel.util.LRUCacheFactory;
066import org.apache.camel.util.ObjectHelper;
067import org.apache.camel.util.ServiceHelper;
068import org.apache.camel.util.StopWatch;
069import org.apache.camel.util.TimeUtils;
070import org.slf4j.Logger;
071import org.slf4j.LoggerFactory;
072
073/**
074 * An implementation of the <a
075 * href="http://camel.apache.org/aggregator2.html">Aggregator</a>
076 * pattern where a batch of messages are processed (up to a maximum amount or
077 * until some timeout is reached) and messages for the same correlation key are
078 * combined together using some kind of {@link AggregationStrategy}
079 * (by default the latest message is used) to compress many message exchanges
080 * into a smaller number of exchanges.
081 * <p/>
082 * A good example of this is stock market data; you may be receiving 30,000
083 * messages/second and you may want to throttle it right down so that multiple
084 * messages for the same stock are combined (or just the latest message is used
085 * and older prices are discarded). Another idea is to combine line item messages
086 * together into a single invoice message.
087 */
088public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, ShutdownPrepared, ShutdownAware, IdAware {
089
090    public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
091
092    private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class);
093
094    private final Lock lock = new ReentrantLock();
095    private final AtomicBoolean aggregateRepositoryWarned = new AtomicBoolean();
096    private final CamelContext camelContext;
097    private final Processor processor;
098    private String id;
099    private AggregationStrategy aggregationStrategy;
100    private boolean preCompletion;
101    private Expression correlationExpression;
102    private AggregateController aggregateController;
103    private final ExecutorService executorService;
104    private final boolean shutdownExecutorService;
105    private OptimisticLockRetryPolicy optimisticLockRetryPolicy = new OptimisticLockRetryPolicy();
106    private ScheduledExecutorService timeoutCheckerExecutorService;
107    private boolean shutdownTimeoutCheckerExecutorService;
108    private ScheduledExecutorService recoverService;
109    // store correlation key -> exchange id in timeout map
110    private TimeoutMap<String, String> timeoutMap;
111    private ExceptionHandler exceptionHandler;
112    private AggregationRepository aggregationRepository;
113    private Map<String, String> closedCorrelationKeys;
114    private final Set<String> batchConsumerCorrelationKeys = new ConcurrentSkipListSet<String>();
115    private final Set<String> inProgressCompleteExchanges = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
116    private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>();
117
118    private final AggregateProcessorStatistics statistics = new Statistics();
119    private final AtomicLong totalIn = new AtomicLong();
120    private final AtomicLong totalCompleted = new AtomicLong();
121    private final AtomicLong completedBySize = new AtomicLong();
122    private final AtomicLong completedByStrategy = new AtomicLong();
123    private final AtomicLong completedByInterval = new AtomicLong();
124    private final AtomicLong completedByTimeout = new AtomicLong();
125    private final AtomicLong completedByPredicate = new AtomicLong();
126    private final AtomicLong completedByBatchConsumer = new AtomicLong();
127    private final AtomicLong completedByForce = new AtomicLong();
128
129    // keep booking about redelivery
130    private class RedeliveryData {
131        int redeliveryCounter;
132    }
133
134    private class Statistics implements AggregateProcessorStatistics {
135
136        private boolean statisticsEnabled = true;
137
138        public long getTotalIn() {
139            return totalIn.get();
140        }
141
142        public long getTotalCompleted() {
143            return totalCompleted.get();
144        }
145
146        public long getCompletedBySize() {
147            return completedBySize.get();
148        }
149
150        public long getCompletedByStrategy() {
151            return completedByStrategy.get();
152        }
153
154        public long getCompletedByInterval() {
155            return completedByInterval.get();
156        }
157
158        public long getCompletedByTimeout() {
159            return completedByTimeout.get();
160        }
161
162        public long getCompletedByPredicate() {
163            return completedByPredicate.get();
164        }
165
166        public long getCompletedByBatchConsumer() {
167            return completedByBatchConsumer.get();
168        }
169
170        public long getCompletedByForce() {
171            return completedByForce.get();
172        }
173
174        public void reset() {
175            totalIn.set(0);
176            totalCompleted.set(0);
177            completedBySize.set(0);
178            completedByStrategy.set(0);
179            completedByTimeout.set(0);
180            completedByPredicate.set(0);
181            completedByBatchConsumer.set(0);
182            completedByForce.set(0);
183        }
184
185        public boolean isStatisticsEnabled() {
186            return statisticsEnabled;
187        }
188
189        public void setStatisticsEnabled(boolean statisticsEnabled) {
190            this.statisticsEnabled = statisticsEnabled;
191        }
192    }
193
194    // options
195    private boolean ignoreInvalidCorrelationKeys;
196    private Integer closeCorrelationKeyOnCompletion;
197    private boolean parallelProcessing;
198    private boolean optimisticLocking;
199
200    // different ways to have completion triggered
201    private boolean eagerCheckCompletion;
202    private Predicate completionPredicate;
203    private long completionTimeout;
204    private Expression completionTimeoutExpression;
205    private long completionInterval;
206    private int completionSize;
207    private Expression completionSizeExpression;
208    private boolean completionFromBatchConsumer;
209    private boolean completionOnNewCorrelationGroup;
210    private AtomicInteger batchConsumerCounter = new AtomicInteger();
211    private boolean discardOnCompletionTimeout;
212    private boolean forceCompletionOnStop;
213    private boolean completeAllOnStop;
214    private long completionTimeoutCheckerInterval = 1000;
215
216    private ProducerTemplate deadLetterProducerTemplate;
217
218    public AggregateProcessor(CamelContext camelContext, Processor processor,
219                              Expression correlationExpression, AggregationStrategy aggregationStrategy,
220                              ExecutorService executorService, boolean shutdownExecutorService) {
221        ObjectHelper.notNull(camelContext, "camelContext");
222        ObjectHelper.notNull(processor, "processor");
223        ObjectHelper.notNull(correlationExpression, "correlationExpression");
224        ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
225        ObjectHelper.notNull(executorService, "executorService");
226        this.camelContext = camelContext;
227        this.processor = processor;
228        this.correlationExpression = correlationExpression;
229        this.aggregationStrategy = aggregationStrategy;
230        this.executorService = executorService;
231        this.shutdownExecutorService = shutdownExecutorService;
232        this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
233    }
234
235    @Override
236    public String toString() {
237        return "AggregateProcessor[to: " + processor + "]";
238    }
239
240    public String getTraceLabel() {
241        return "aggregate[" + correlationExpression + "]";
242    }
243
244    public List<Processor> next() {
245        if (!hasNext()) {
246            return null;
247        }
248        List<Processor> answer = new ArrayList<Processor>(1);
249        answer.add(processor);
250        return answer;
251    }
252
253    public boolean hasNext() {
254        return processor != null;
255    }
256
257    public String getId() {
258        return id;
259    }
260
261    public void setId(String id) {
262        this.id = id;
263    }
264
265    public void process(Exchange exchange) throws Exception {
266        AsyncProcessorHelper.process(this, exchange);
267    }
268
269    public boolean process(Exchange exchange, AsyncCallback callback) {
270        try {
271            doProcess(exchange);
272        } catch (Throwable e) {
273            exchange.setException(e);
274        }
275        callback.done(true);
276        return true;
277    }
278
279    protected void doProcess(Exchange exchange) throws Exception {
280
281        if (getStatistics().isStatisticsEnabled()) {
282            totalIn.incrementAndGet();
283        }
284
285        //check for the special header to force completion of all groups (and ignore the exchange otherwise)
286        boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
287        if (completeAllGroups) {
288            forceCompletionOfAllGroups();
289            return;
290        }
291
292        // compute correlation expression
293        String key = correlationExpression.evaluate(exchange, String.class);
294        if (ObjectHelper.isEmpty(key)) {
295            // we have a bad correlation key
296            if (isIgnoreInvalidCorrelationKeys()) {
297                LOG.debug("Invalid correlation key. This Exchange will be ignored: {}", exchange);
298                return;
299            } else {
300                throw new CamelExchangeException("Invalid correlation key", exchange);
301            }
302        }
303
304        // is the correlation key closed?
305        if (closedCorrelationKeys != null && closedCorrelationKeys.containsKey(key)) {
306            throw new ClosedCorrelationKeyException(key, exchange);
307        }
308
309        // when optimist locking is enabled we keep trying until we succeed
310        if (optimisticLocking) {
311            List<Exchange> aggregated = null;
312            boolean exhaustedRetries = true;
313            int attempt = 0;
314            do {
315                attempt++;
316                // copy exchange, and do not share the unit of work
317                // the aggregated output runs in another unit of work
318                Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
319                try {
320                    aggregated = doAggregation(key, copy);
321                    exhaustedRetries = false;
322                    break;
323                } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
324                    LOG.trace("On attempt {} OptimisticLockingAggregationRepository: {} threw OptimisticLockingException while trying to add() key: {} and exchange: {}",
325                              new Object[]{attempt, aggregationRepository, key, copy, e});
326                    optimisticLockRetryPolicy.doDelay(attempt);
327                }
328            } while (optimisticLockRetryPolicy.shouldRetry(attempt));
329
330            if (exhaustedRetries) {
331                throw new CamelExchangeException("Exhausted optimistic locking retry attempts, tried " + attempt + " times", exchange,
332                        new OptimisticLockingAggregationRepository.OptimisticLockingException());
333            } else if (aggregated != null) {
334                // we are completed so submit to completion
335                for (Exchange agg : aggregated) {
336                    onSubmitCompletion(key, agg);
337                }
338            }
339        } else {
340            // copy exchange, and do not share the unit of work
341            // the aggregated output runs in another unit of work
342            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
343
344            // when memory based then its fast using synchronized, but if the aggregation repository is IO
345            // bound such as JPA etc then concurrent aggregation per correlation key could
346            // improve performance as we can run aggregation repository get/add in parallel
347            List<Exchange> aggregated = null;
348            lock.lock();
349            try {
350                aggregated = doAggregation(key, copy);
351            } finally {
352                lock.unlock();
353            }
354            // we are completed so do that work outside the lock
355            if (aggregated != null) {
356                for (Exchange agg : aggregated) {
357                    onSubmitCompletion(key, agg);
358                }
359            }
360        }
361
362        // check for the special header to force completion of all groups (inclusive of the message)
363        boolean completeAllGroupsInclusive = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, false, boolean.class);
364        if (completeAllGroupsInclusive) {
365            forceCompletionOfAllGroups();
366        }
367    }
368
369    /**
370     * Aggregates the exchange with the given correlation key
371     * <p/>
372     * This method <b>must</b> be run synchronized as we cannot aggregate the same correlation key
373     * in parallel.
374     * <p/>
375     * The returned {@link Exchange} should be send downstream using the {@link #onSubmitCompletion(String, org.apache.camel.Exchange)}
376     * method which sends out the aggregated and completed {@link Exchange}.
377     *
378     * @param key      the correlation key
379     * @param newExchange the exchange
380     * @return the aggregated exchange(s) which is complete, or <tt>null</tt> if not yet complete
381     * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating
382     */
383    private List<Exchange> doAggregation(String key, Exchange newExchange) throws CamelExchangeException {
384        LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
385
386        List<Exchange> list = new ArrayList<Exchange>();
387        String complete = null;
388
389        Exchange answer;
390        Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key);
391        Exchange oldExchange = originalExchange;
392
393        Integer size = 1;
394        if (oldExchange != null) {
395            // hack to support legacy AggregationStrategy's that modify and return the oldExchange, these will not
396            // working when using an identify based approach for optimistic locking like the MemoryAggregationRepository.
397            if (optimisticLocking && aggregationRepository instanceof MemoryAggregationRepository) {
398                oldExchange = originalExchange.copy();
399            }
400            size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class);
401            size++;
402        }
403
404        // prepare the exchanges for aggregation
405        ExchangeHelper.prepareAggregation(oldExchange, newExchange);
406
407        // check if we are pre complete
408        if (preCompletion) {
409            try {
410                // put the current aggregated size on the exchange so its avail during completion check
411                newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
412                complete = isPreCompleted(key, oldExchange, newExchange);
413                // make sure to track timeouts if not complete
414                if (complete == null) {
415                    trackTimeout(key, newExchange);
416                }
417                // remove it afterwards
418                newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
419            } catch (Throwable e) {
420                // must catch any exception from aggregation
421                throw new CamelExchangeException("Error occurred during preComplete", newExchange, e);
422            }
423        } else if (isEagerCheckCompletion()) {
424            // put the current aggregated size on the exchange so its avail during completion check
425            newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
426            complete = isCompleted(key, newExchange);
427            // make sure to track timeouts if not complete
428            if (complete == null) {
429                trackTimeout(key, newExchange);
430            }
431            // remove it afterwards
432            newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
433        }
434
435        if (preCompletion && complete != null) {
436            // need to pre complete the current group before we aggregate
437            doAggregationComplete(complete, list, key, originalExchange, oldExchange);
438            // as we complete the current group eager, we should indicate the new group is not complete
439            complete = null;
440            // and clear old/original exchange as we start on a new group
441            oldExchange = null;
442            originalExchange = null;
443            // and reset the size to 1
444            size = 1;
445            // make sure to track timeout as we just restart the correlation group when we are in pre completion mode
446            trackTimeout(key, newExchange);
447        }
448
449        // aggregate the exchanges
450        try {
451            answer = onAggregation(oldExchange, newExchange);
452        } catch (Throwable e) {
453            // must catch any exception from aggregation
454            throw new CamelExchangeException("Error occurred during aggregation", newExchange, e);
455        }
456        if (answer == null) {
457            throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", newExchange);
458        }
459
460        // check for the special exchange property to force completion of all groups
461        boolean completeAllGroups = answer.getProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
462        if (completeAllGroups) {
463            // remove the exchange property so we do not complete again
464            answer.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
465            forceCompletionOfAllGroups();
466        } else if (isCompletionOnNewCorrelationGroup() && originalExchange == null) {
467            // its a new group so force complete of all existing groups
468            forceCompletionOfAllGroups();
469        }
470
471        // special for some repository implementations
472        if (aggregationRepository instanceof RecoverableAggregationRepository) {
473            boolean valid = oldExchange == null || answer.getExchangeId().equals(oldExchange.getExchangeId());
474            if (!valid && aggregateRepositoryWarned.compareAndSet(false, true)) {
475                LOG.warn("AggregationStrategy should return the oldExchange instance instead of the newExchange whenever possible"
476                    + " as otherwise this can lead to unexpected behavior with some RecoverableAggregationRepository implementations");
477            }
478        }
479
480        // update the aggregated size
481        answer.setProperty(Exchange.AGGREGATED_SIZE, size);
482
483        // maybe we should check completion after the aggregation
484        if (!preCompletion && !isEagerCheckCompletion()) {
485            complete = isCompleted(key, answer);
486            // make sure to track timeouts if not complete
487            if (complete == null) {
488                trackTimeout(key, newExchange);
489            }
490        }
491
492        if (complete == null) {
493            // only need to update aggregation repository if we are not complete
494            doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
495        } else {
496            // if we are complete then add the answer to the list
497            doAggregationComplete(complete, list, key, originalExchange, answer);
498        }
499
500        LOG.trace("onAggregation +++  end  +++ with correlation key: {}", key);
501        return list;
502    }
503
504    protected void doAggregationComplete(String complete, List<Exchange> list, String key, Exchange originalExchange, Exchange answer) {
505        if ("consumer".equals(complete)) {
506            for (String batchKey : batchConsumerCorrelationKeys) {
507                Exchange batchAnswer;
508                if (batchKey.equals(key)) {
509                    // skip the current aggregated key as we have already aggregated it and have the answer
510                    batchAnswer = answer;
511                } else {
512                    batchAnswer = aggregationRepository.get(camelContext, batchKey);
513                }
514
515                if (batchAnswer != null) {
516                    batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
517                    onCompletion(batchKey, originalExchange, batchAnswer, false);
518                    list.add(batchAnswer);
519                }
520            }
521            batchConsumerCorrelationKeys.clear();
522            // we have already submitted to completion, so answer should be null
523            answer = null;
524        } else if (answer != null) {
525            // we are complete for this exchange
526            answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
527            answer = onCompletion(key, originalExchange, answer, false);
528        }
529
530        if (answer != null) {
531            list.add(answer);
532        }
533    }
534
535    protected void doAggregationRepositoryAdd(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) {
536        LOG.trace("In progress aggregated oldExchange: {}, newExchange: {} with correlation key: {}", new Object[]{oldExchange, newExchange, key});
537        if (optimisticLocking) {
538            try {
539                ((OptimisticLockingAggregationRepository)aggregationRepository).add(camelContext, key, oldExchange, newExchange);
540            } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
541                onOptimisticLockingFailure(oldExchange, newExchange);
542                throw e;
543            }
544        } else {
545            aggregationRepository.add(camelContext, key, newExchange);
546        }
547    }
548
549    protected void onOptimisticLockingFailure(Exchange oldExchange, Exchange newExchange) {
550        AggregationStrategy strategy = aggregationStrategy;
551        if (strategy instanceof DelegateAggregationStrategy) {
552            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
553        }
554        if (strategy instanceof OptimisticLockingAwareAggregationStrategy) {
555            LOG.trace("onOptimisticLockFailure with AggregationStrategy: {}, oldExchange: {}, newExchange: {}",
556                      new Object[]{strategy, oldExchange, newExchange});
557            ((OptimisticLockingAwareAggregationStrategy)strategy).onOptimisticLockFailure(oldExchange, newExchange);
558        }
559    }
560
561    /**
562     * Tests whether the given exchanges is pre-complete or not
563     *
564     * @param key      the correlation key
565     * @param oldExchange   the existing exchange
566     * @param newExchange the incoming exchange
567     * @return <tt>null</tt> if not pre-completed, otherwise a String with the type that triggered the pre-completion
568     */
569    protected String isPreCompleted(String key, Exchange oldExchange, Exchange newExchange) {
570        boolean complete = false;
571        AggregationStrategy strategy = aggregationStrategy;
572        if (strategy instanceof DelegateAggregationStrategy) {
573            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
574        }
575        if (strategy instanceof PreCompletionAwareAggregationStrategy) {
576            complete = ((PreCompletionAwareAggregationStrategy) strategy).preComplete(oldExchange, newExchange);
577        }
578        return complete ? "strategy" : null;
579    }
580
581    /**
582     * Tests whether the given exchange is complete or not
583     *
584     * @param key      the correlation key
585     * @param exchange the incoming exchange
586     * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion
587     */
588    protected String isCompleted(String key, Exchange exchange) {
589        // batch consumer completion must always run first
590        if (isCompletionFromBatchConsumer()) {
591            batchConsumerCorrelationKeys.add(key);
592            batchConsumerCounter.incrementAndGet();
593            int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
594            if (size > 0 && batchConsumerCounter.intValue() >= size) {
595                // batch consumer is complete then reset the counter
596                batchConsumerCounter.set(0);
597                return "consumer";
598            }
599        }
600
601        if (exchange.getProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, false, boolean.class)) {
602            return "strategy";
603        }
604
605        if (getCompletionPredicate() != null) {
606            boolean answer = getCompletionPredicate().matches(exchange);
607            if (answer) {
608                return "predicate";
609            }
610        }
611
612        boolean sizeChecked = false;
613        if (getCompletionSizeExpression() != null) {
614            Integer value = getCompletionSizeExpression().evaluate(exchange, Integer.class);
615            if (value != null && value > 0) {
616                // mark as already checked size as expression takes precedence over static configured
617                sizeChecked = true;
618                int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
619                if (size >= value) {
620                    return "size";
621                }
622            }
623        }
624        if (!sizeChecked && getCompletionSize() > 0) {
625            int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
626            if (size >= getCompletionSize()) {
627                return "size";
628            }
629        }
630
631        // not complete
632        return null;
633    }
634
635    protected void trackTimeout(String key, Exchange exchange) {
636        // timeout can be either evaluated based on an expression or from a fixed value
637        // expression takes precedence
638        boolean timeoutSet = false;
639        if (getCompletionTimeoutExpression() != null) {
640            Long value = getCompletionTimeoutExpression().evaluate(exchange, Long.class);
641            if (value != null && value > 0) {
642                if (LOG.isTraceEnabled()) {
643                    LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}",
644                            new Object[]{key, value, exchange});
645                }
646                addExchangeToTimeoutMap(key, exchange, value);
647                timeoutSet = true;
648            }
649        }
650        if (!timeoutSet && getCompletionTimeout() > 0) {
651            // timeout is used so use the timeout map to keep an eye on this
652            if (LOG.isTraceEnabled()) {
653                LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}",
654                        new Object[]{key, getCompletionTimeout(), exchange});
655            }
656            addExchangeToTimeoutMap(key, exchange, getCompletionTimeout());
657        }
658    }
659
660    protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
661        return aggregationStrategy.aggregate(oldExchange, newExchange);
662    }
663
664    protected boolean onPreCompletionAggregation(Exchange oldExchange, Exchange newExchange) {
665        AggregationStrategy strategy = aggregationStrategy;
666        if (strategy instanceof DelegateAggregationStrategy) {
667            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
668        }
669        if (strategy instanceof PreCompletionAwareAggregationStrategy) {
670            return ((PreCompletionAwareAggregationStrategy) strategy).preComplete(oldExchange, newExchange);
671        }
672        return false;
673    }
674
675    protected Exchange onCompletion(final String key, final Exchange original, final Exchange aggregated, boolean fromTimeout) {
676        // store the correlation key as property before we remove so the repository has that information
677        if (original != null) {
678            original.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
679        }
680        aggregated.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
681
682        // only remove if we have previous added (as we could potentially complete with only 1 exchange)
683        // (if we have previous added then we have that as the original exchange)
684        if (original != null) {
685            // remove from repository as its completed, we do this first as to trigger any OptimisticLockingException's
686            aggregationRepository.remove(aggregated.getContext(), key, original);
687        }
688
689        if (!fromTimeout && timeoutMap != null) {
690            // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
691            LOG.trace("Removing correlation key {} from timeout", key);
692            timeoutMap.remove(key);
693        }
694
695        // this key has been closed so add it to the closed map
696        if (closedCorrelationKeys != null) {
697            closedCorrelationKeys.put(key, key);
698        }
699
700        if (fromTimeout) {
701            // invoke timeout if its timeout aware aggregation strategy,
702            // to allow any custom processing before discarding the exchange
703            AggregationStrategy strategy = aggregationStrategy;
704            if (strategy instanceof DelegateAggregationStrategy) {
705                strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
706            }
707            if (strategy instanceof TimeoutAwareAggregationStrategy) {
708                long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1;
709                ((TimeoutAwareAggregationStrategy) strategy).timeout(aggregated, -1, -1, timeout);
710            }
711        }
712
713        Exchange answer;
714        if (fromTimeout && isDiscardOnCompletionTimeout()) {
715            // discard due timeout
716            LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: {}", key, aggregated);
717            // must confirm the discarded exchange
718            aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId());
719            // and remove redelivery state as well
720            redeliveryState.remove(aggregated.getExchangeId());
721            // the completion was from timeout and we should just discard it
722            answer = null;
723        } else {
724            // the aggregated exchange should be published (sent out)
725            answer = aggregated;
726        }
727
728        return answer;
729    }
730
731    private void onSubmitCompletion(final String key, final Exchange exchange) {
732        LOG.debug("Aggregation complete for correlation key {} sending aggregated exchange: {}", key, exchange);
733
734        // add this as in progress before we submit the task
735        inProgressCompleteExchanges.add(exchange.getExchangeId());
736
737        // invoke the on completion callback
738        AggregationStrategy target = aggregationStrategy;
739        if (target instanceof DelegateAggregationStrategy) {
740            target = ((DelegateAggregationStrategy) target).getDelegate();
741        }
742        if (target instanceof CompletionAwareAggregationStrategy) {
743            ((CompletionAwareAggregationStrategy) target).onCompletion(exchange);
744        }
745
746        if (getStatistics().isStatisticsEnabled()) {
747            totalCompleted.incrementAndGet();
748
749            String completedBy = exchange.getProperty(Exchange.AGGREGATED_COMPLETED_BY, String.class);
750            if ("interval".equals(completedBy)) {
751                completedByInterval.incrementAndGet();
752            } else if ("timeout".equals(completedBy)) {
753                completedByTimeout.incrementAndGet();
754            } else if ("force".equals(completedBy)) {
755                completedByForce.incrementAndGet();
756            } else if ("consumer".equals(completedBy)) {
757                completedByBatchConsumer.incrementAndGet();
758            } else if ("predicate".equals(completedBy)) {
759                completedByPredicate.incrementAndGet();
760            } else if ("size".equals(completedBy)) {
761                completedBySize.incrementAndGet();
762            } else if ("strategy".equals(completedBy)) {
763                completedByStrategy.incrementAndGet();
764            }
765        }
766
767        // send this exchange
768        executorService.submit(new Runnable() {
769            public void run() {
770                LOG.debug("Processing aggregated exchange: {}", exchange);
771
772                // add on completion task so we remember to update the inProgressCompleteExchanges
773                exchange.addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId()));
774
775                try {
776                    processor.process(exchange);
777                } catch (Throwable e) {
778                    exchange.setException(e);
779                }
780
781                // log exception if there was a problem
782                if (exchange.getException() != null) {
783                    // if there was an exception then let the exception handler handle it
784                    getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException());
785                } else {
786                    LOG.trace("Processing aggregated exchange: {} complete.", exchange);
787                }
788            }
789        });
790    }
791
792    /**
793     * Restores the timeout map with timeout values from the aggregation repository.
794     * <p/>
795     * This is needed in case the aggregator has been stopped and started again (for example a server restart).
796     * Then the existing exchanges from the {@link AggregationRepository} must have their timeout conditions restored.
797     */
798    protected void restoreTimeoutMapFromAggregationRepository() throws Exception {
799        // grab the timeout value for each partly aggregated exchange
800        Set<String> keys = aggregationRepository.getKeys();
801        if (keys == null || keys.isEmpty()) {
802            return;
803        }
804
805        StopWatch watch = new StopWatch();
806        LOG.trace("Starting restoring CompletionTimeout for {} existing exchanges from the aggregation repository...", keys.size());
807
808        for (String key : keys) {
809            Exchange exchange = aggregationRepository.get(camelContext, key);
810            // grab the timeout value
811            long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0;
812            if (timeout > 0) {
813                LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout);
814                addExchangeToTimeoutMap(key, exchange, timeout);
815            }
816        }
817
818        // log duration of this task so end user can see how long it takes to pre-check this upon starting
819        LOG.info("Restored {} CompletionTimeout conditions in the AggregationTimeoutChecker in {}",
820                timeoutMap.size(), TimeUtils.printDuration(watch.taken()));
821    }
822
823    /**
824     * Adds the given exchange to the timeout map, which is used by the timeout checker task to trigger timeouts.
825     *
826     * @param key      the correlation key
827     * @param exchange the exchange
828     * @param timeout  the timeout value in millis
829     */
830    private void addExchangeToTimeoutMap(String key, Exchange exchange, long timeout) {
831        // store the timeout value on the exchange as well, in case we need it later
832        exchange.setProperty(Exchange.AGGREGATED_TIMEOUT, timeout);
833        timeoutMap.put(key, exchange.getExchangeId(), timeout);
834    }
835
836    /**
837     * Current number of closed correlation keys in the memory cache
838     */
839    public int getClosedCorrelationKeysCacheSize() {
840        if (closedCorrelationKeys != null) {
841            return closedCorrelationKeys.size();
842        } else {
843            return 0;
844        }
845    }
846
847    /**
848     * Clear all the closed correlation keys stored in the cache
849     */
850    public void clearClosedCorrelationKeysCache() {
851        if (closedCorrelationKeys != null) {
852            closedCorrelationKeys.clear();
853        }
854    }
855
856    public AggregateProcessorStatistics getStatistics() {
857        return statistics;
858    }
859
860    public int getInProgressCompleteExchanges() {
861        return inProgressCompleteExchanges.size();
862    }
863
864    public Predicate getCompletionPredicate() {
865        return completionPredicate;
866    }
867
868    public void setCompletionPredicate(Predicate completionPredicate) {
869        this.completionPredicate = completionPredicate;
870    }
871
872    public boolean isEagerCheckCompletion() {
873        return eagerCheckCompletion;
874    }
875
876    public void setEagerCheckCompletion(boolean eagerCheckCompletion) {
877        this.eagerCheckCompletion = eagerCheckCompletion;
878    }
879
880    public long getCompletionTimeout() {
881        return completionTimeout;
882    }
883
884    public void setCompletionTimeout(long completionTimeout) {
885        this.completionTimeout = completionTimeout;
886    }
887
888    public Expression getCompletionTimeoutExpression() {
889        return completionTimeoutExpression;
890    }
891
892    public void setCompletionTimeoutExpression(Expression completionTimeoutExpression) {
893        this.completionTimeoutExpression = completionTimeoutExpression;
894    }
895
896    public long getCompletionInterval() {
897        return completionInterval;
898    }
899
900    public void setCompletionInterval(long completionInterval) {
901        this.completionInterval = completionInterval;
902    }
903
904    public int getCompletionSize() {
905        return completionSize;
906    }
907
908    public void setCompletionSize(int completionSize) {
909        this.completionSize = completionSize;
910    }
911
912    public Expression getCompletionSizeExpression() {
913        return completionSizeExpression;
914    }
915
916    public void setCompletionSizeExpression(Expression completionSizeExpression) {
917        this.completionSizeExpression = completionSizeExpression;
918    }
919
920    public boolean isIgnoreInvalidCorrelationKeys() {
921        return ignoreInvalidCorrelationKeys;
922    }
923
924    public void setIgnoreInvalidCorrelationKeys(boolean ignoreInvalidCorrelationKeys) {
925        this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys;
926    }
927
928    public Integer getCloseCorrelationKeyOnCompletion() {
929        return closeCorrelationKeyOnCompletion;
930    }
931
932    public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
933        this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
934    }
935
936    public boolean isCompletionFromBatchConsumer() {
937        return completionFromBatchConsumer;
938    }
939
940    public void setCompletionFromBatchConsumer(boolean completionFromBatchConsumer) {
941        this.completionFromBatchConsumer = completionFromBatchConsumer;
942    }
943
944    public boolean isCompletionOnNewCorrelationGroup() {
945        return completionOnNewCorrelationGroup;
946    }
947
948    public void setCompletionOnNewCorrelationGroup(boolean completionOnNewCorrelationGroup) {
949        this.completionOnNewCorrelationGroup = completionOnNewCorrelationGroup;
950    }
951
952    public boolean isCompleteAllOnStop() {
953        return completeAllOnStop;
954    }
955
956    public long getCompletionTimeoutCheckerInterval() {
957        return completionTimeoutCheckerInterval;
958    }
959
960    public void setCompletionTimeoutCheckerInterval(long completionTimeoutCheckerInterval) {
961        this.completionTimeoutCheckerInterval = completionTimeoutCheckerInterval;
962    }
963
964    public ExceptionHandler getExceptionHandler() {
965        return exceptionHandler;
966    }
967
968    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
969        this.exceptionHandler = exceptionHandler;
970    }
971
972    public boolean isParallelProcessing() {
973        return parallelProcessing;
974    }
975
976    public void setParallelProcessing(boolean parallelProcessing) {
977        this.parallelProcessing = parallelProcessing;
978    }
979
980    public boolean isOptimisticLocking() {
981        return optimisticLocking;
982    }
983
984    public void setOptimisticLocking(boolean optimisticLocking) {
985        this.optimisticLocking = optimisticLocking;
986    }
987
988    public AggregationRepository getAggregationRepository() {
989        return aggregationRepository;
990    }
991
992    public void setAggregationRepository(AggregationRepository aggregationRepository) {
993        this.aggregationRepository = aggregationRepository;
994    }
995
996    public boolean isDiscardOnCompletionTimeout() {
997        return discardOnCompletionTimeout;
998    }
999
1000    public void setDiscardOnCompletionTimeout(boolean discardOnCompletionTimeout) {
1001        this.discardOnCompletionTimeout = discardOnCompletionTimeout;
1002    }
1003
1004    public void setForceCompletionOnStop(boolean forceCompletionOnStop) {
1005        this.forceCompletionOnStop = forceCompletionOnStop;
1006    }
1007
1008    public void setCompleteAllOnStop(boolean completeAllOnStop) {
1009        this.completeAllOnStop = completeAllOnStop;
1010    }
1011
1012    public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
1013        this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
1014    }
1015
1016    public ScheduledExecutorService getTimeoutCheckerExecutorService() {
1017        return timeoutCheckerExecutorService;
1018    }
1019
1020    public boolean isShutdownTimeoutCheckerExecutorService() {
1021        return shutdownTimeoutCheckerExecutorService;
1022    }
1023
1024    public void setShutdownTimeoutCheckerExecutorService(boolean shutdownTimeoutCheckerExecutorService) {
1025        this.shutdownTimeoutCheckerExecutorService = shutdownTimeoutCheckerExecutorService;
1026    }
1027
1028    public void setOptimisticLockRetryPolicy(OptimisticLockRetryPolicy optimisticLockRetryPolicy) {
1029        this.optimisticLockRetryPolicy = optimisticLockRetryPolicy;
1030    }
1031
1032    public OptimisticLockRetryPolicy getOptimisticLockRetryPolicy() {
1033        return optimisticLockRetryPolicy;
1034    }
1035
1036    public AggregationStrategy getAggregationStrategy() {
1037        return aggregationStrategy;
1038    }
1039
1040    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
1041        this.aggregationStrategy = aggregationStrategy;
1042    }
1043
1044    public Expression getCorrelationExpression() {
1045        return correlationExpression;
1046    }
1047
1048    public void setCorrelationExpression(Expression correlationExpression) {
1049        this.correlationExpression = correlationExpression;
1050    }
1051
1052    public AggregateController getAggregateController() {
1053        return aggregateController;
1054    }
1055
1056    public void setAggregateController(AggregateController aggregateController) {
1057        this.aggregateController = aggregateController;
1058    }
1059
1060    /**
1061     * On completion task which keeps the booking of the in progress up to date
1062     */
1063    private final class AggregateOnCompletion implements Synchronization {
1064        private final String exchangeId;
1065
1066        private AggregateOnCompletion(String exchangeId) {
1067            // must use the original exchange id as it could potentially change if send over SEDA etc.
1068            this.exchangeId = exchangeId;
1069        }
1070
1071        public void onFailure(Exchange exchange) {
1072            LOG.trace("Aggregated exchange onFailure: {}", exchange);
1073
1074            // must remember to remove in progress when we failed
1075            inProgressCompleteExchanges.remove(exchangeId);
1076            // do not remove redelivery state as we need it when we redeliver again later
1077        }
1078
1079        public void onComplete(Exchange exchange) {
1080            LOG.trace("Aggregated exchange onComplete: {}", exchange);
1081
1082            // only confirm if we processed without a problem
1083            try {
1084                aggregationRepository.confirm(exchange.getContext(), exchangeId);
1085                // and remove redelivery state as well
1086                redeliveryState.remove(exchangeId);
1087            } finally {
1088                // must remember to remove in progress when we are complete
1089                inProgressCompleteExchanges.remove(exchangeId);
1090            }
1091        }
1092
1093        @Override
1094        public String toString() {
1095            return "AggregateOnCompletion";
1096        }
1097    }
1098
1099    /**
1100     * Background task that looks for aggregated exchanges which is triggered by completion timeouts.
1101     */
1102    private final class AggregationTimeoutMap extends DefaultTimeoutMap<String, String> {
1103
1104        private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
1105            // do NOT use locking on the timeout map as this aggregator has its own shared lock we will use instead
1106            super(executor, requestMapPollTimeMillis, optimisticLocking);
1107        }
1108
1109        @Override
1110        public void purge() {
1111            // must acquire the shared aggregation lock to be able to purge
1112            if (!optimisticLocking) {
1113                lock.lock();
1114            }
1115            try {
1116                super.purge();
1117            } finally {
1118                if (!optimisticLocking) {
1119                    lock.unlock();
1120                }
1121            }
1122        }
1123
1124        @Override
1125        public boolean onEviction(String key, String exchangeId) {
1126            log.debug("Completion timeout triggered for correlation key: {}", key);
1127
1128            boolean inProgress = inProgressCompleteExchanges.contains(exchangeId);
1129            if (inProgress) {
1130                LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
1131                return true;
1132            }
1133
1134            // get the aggregated exchange
1135            boolean evictionStolen = false;
1136            Exchange answer = aggregationRepository.get(camelContext, key);
1137            if (answer == null) {
1138                evictionStolen = true;
1139            } else {
1140                // indicate it was completed by timeout
1141                answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
1142                try {
1143                    answer = onCompletion(key, answer, answer, true);
1144                    if (answer != null) {
1145                        onSubmitCompletion(key, answer);
1146                    }
1147                } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
1148                    evictionStolen = true;
1149                }
1150            }
1151
1152            if (optimisticLocking && evictionStolen) {
1153                LOG.debug("Another Camel instance has already successfully correlated or processed this timeout eviction "
1154                          + "for exchange with id: {} and correlation id: {}", exchangeId, key);
1155            }
1156            return true;
1157        }
1158    }
1159
1160    /**
1161     * Background task that triggers completion based on interval.
1162     */
1163    private final class AggregationIntervalTask implements Runnable {
1164
1165        public void run() {
1166            // only run if CamelContext has been fully started
1167            if (!camelContext.getStatus().isStarted()) {
1168                LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", camelContext.getName());
1169                return;
1170            }
1171
1172            LOG.trace("Starting completion interval task");
1173
1174            // trigger completion for all in the repository
1175            Set<String> keys = aggregationRepository.getKeys();
1176
1177            if (keys != null && !keys.isEmpty()) {
1178                // must acquire the shared aggregation lock to be able to trigger interval completion
1179                if (!optimisticLocking) {
1180                    lock.lock();
1181                }
1182                try {
1183                    for (String key : keys) {
1184                        boolean stolenInterval = false;
1185                        Exchange exchange = aggregationRepository.get(camelContext, key);
1186                        if (exchange == null) {
1187                            stolenInterval = true;
1188                        } else {
1189                            LOG.trace("Completion interval triggered for correlation key: {}", key);
1190                            // indicate it was completed by interval
1191                            exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
1192                            try {
1193                                Exchange answer = onCompletion(key, exchange, exchange, false);
1194                                if (answer != null) {
1195                                    onSubmitCompletion(key, answer);
1196                                }
1197                            } catch (OptimisticLockingAggregationRepository.OptimisticLockingException e) {
1198                                stolenInterval = true;
1199                            }
1200                        }
1201                        if (optimisticLocking && stolenInterval) {
1202                            LOG.debug("Another Camel instance has already processed this interval aggregation for exchange with correlation id: {}", key);
1203                        }
1204                    }
1205                } finally {
1206                    if (!optimisticLocking) {
1207                        lock.unlock();
1208                    }
1209                }
1210            }
1211
1212            LOG.trace("Completion interval task complete");
1213        }
1214    }
1215
1216    /**
1217     * Background task that looks for aggregated exchanges to recover.
1218     */
1219    private final class RecoverTask implements Runnable {
1220        private final RecoverableAggregationRepository recoverable;
1221
1222        private RecoverTask(RecoverableAggregationRepository recoverable) {
1223            this.recoverable = recoverable;
1224        }
1225
1226        public void run() {
1227            // only run if CamelContext has been fully started
1228            if (!camelContext.getStatus().isStarted()) {
1229                LOG.trace("Recover check cannot start due CamelContext({}) has not been started yet", camelContext.getName());
1230                return;
1231            }
1232
1233            LOG.trace("Starting recover check");
1234
1235            // copy the current in progress before doing scan
1236            final Set<String> copyOfInProgress = new LinkedHashSet<String>(inProgressCompleteExchanges);
1237
1238            Set<String> exchangeIds = recoverable.scan(camelContext);
1239            for (String exchangeId : exchangeIds) {
1240
1241                // we may shutdown while doing recovery
1242                if (!isRunAllowed()) {
1243                    LOG.info("We are shutting down so stop recovering");
1244                    return;
1245                }
1246                if (!optimisticLocking) {
1247                    lock.lock();
1248                }
1249                try {
1250                    // consider in progress if it was in progress before we did the scan, or currently after we did the scan
1251                    // its safer to consider it in progress than risk duplicates due both in progress + recovered
1252                    boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId);
1253                    if (inProgress) {
1254                        LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
1255                    } else {
1256                        LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId);
1257                        Exchange exchange = recoverable.recover(camelContext, exchangeId);
1258                        if (exchange != null) {
1259                            // get the correlation key
1260                            String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
1261                            // and mark it as redelivered
1262                            exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
1263
1264                            // get the current redelivery data
1265                            RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
1266
1267                            // if we are exhausted, then move to dead letter channel
1268                            if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
1269                                LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries()
1270                                        + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri());
1271
1272                                // send to DLC
1273                                try {
1274                                    // set redelivery counter
1275                                    exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
1276                                    exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
1277                                    deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
1278                                } catch (Throwable e) {
1279                                    exchange.setException(e);
1280                                }
1281
1282                                // handle if failed
1283                                if (exchange.getException() != null) {
1284                                    getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException());
1285                                } else {
1286                                    // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
1287                                    recoverable.confirm(camelContext, exchangeId);
1288                                }
1289                            } else {
1290                                // update current redelivery state
1291                                if (data == null) {
1292                                    // create new data
1293                                    data = new RedeliveryData();
1294                                    redeliveryState.put(exchange.getExchangeId(), data);
1295                                }
1296                                data.redeliveryCounter++;
1297
1298                                // set redelivery counter
1299                                exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
1300                                if (recoverable.getMaximumRedeliveries() > 0) {
1301                                    exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries());
1302                                }
1303
1304                                LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId);
1305
1306                                // not exhaust so resubmit the recovered exchange
1307                                onSubmitCompletion(key, exchange);
1308                            }
1309                        }
1310                    }
1311                } finally {
1312                    if (!optimisticLocking) {
1313                        lock.unlock();
1314                    }
1315                }
1316            }
1317
1318            LOG.trace("Recover check complete");
1319        }
1320    }
1321
1322    @Override
1323    @SuppressWarnings("unchecked")
1324    protected void doStart() throws Exception {
1325        AggregationStrategy strategy = aggregationStrategy;
1326        if (strategy instanceof DelegateAggregationStrategy) {
1327            strategy = ((DelegateAggregationStrategy) strategy).getDelegate();
1328        }
1329        if (strategy instanceof CamelContextAware) {
1330            ((CamelContextAware) strategy).setCamelContext(camelContext);
1331        }
1332        if (strategy instanceof PreCompletionAwareAggregationStrategy) {
1333            preCompletion = true;
1334            LOG.info("PreCompletionAwareAggregationStrategy detected. Aggregator {} is in pre-completion mode.", getId());
1335        }
1336
1337        if (!preCompletion) {
1338            // if not in pre completion mode then check we configured the completion required
1339            if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
1340                    && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null
1341                    && getCompletionSizeExpression() == null) {
1342                throw new IllegalStateException("At least one of the completions options"
1343                        + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
1344            }
1345        }
1346
1347        if (getCloseCorrelationKeyOnCompletion() != null) {
1348            if (getCloseCorrelationKeyOnCompletion() > 0) {
1349                LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of {}", getCloseCorrelationKeyOnCompletion());
1350                closedCorrelationKeys = LRUCacheFactory.newLRUCache(getCloseCorrelationKeyOnCompletion());
1351            } else {
1352                LOG.info("Using ClosedCorrelationKeys with unbounded capacity");
1353                closedCorrelationKeys = new ConcurrentHashMap<String, String>();
1354            }
1355        }
1356
1357        if (aggregationRepository == null) {
1358            aggregationRepository = new MemoryAggregationRepository(optimisticLocking);
1359            LOG.info("Defaulting to MemoryAggregationRepository");
1360        }
1361
1362        if (optimisticLocking) {
1363            if (!(aggregationRepository instanceof OptimisticLockingAggregationRepository)) {
1364                throw new IllegalArgumentException("Optimistic locking cannot be enabled without using an AggregationRepository that implements OptimisticLockingAggregationRepository");
1365            }
1366            LOG.info("Optimistic locking is enabled");
1367        }
1368
1369        ServiceHelper.startServices(aggregationStrategy, processor, aggregationRepository);
1370
1371        // should we use recover checker
1372        if (aggregationRepository instanceof RecoverableAggregationRepository) {
1373            RecoverableAggregationRepository recoverable = (RecoverableAggregationRepository) aggregationRepository;
1374            if (recoverable.isUseRecovery()) {
1375                long interval = recoverable.getRecoveryIntervalInMillis();
1376                if (interval <= 0) {
1377                    throw new IllegalArgumentException("AggregationRepository has recovery enabled and the RecoveryInterval option must be a positive number, was: " + interval);
1378                }
1379
1380                // create a background recover thread to check every interval
1381                recoverService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateRecoverChecker", 1);
1382                Runnable recoverTask = new RecoverTask(recoverable);
1383                LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every {} millis.", interval);
1384                // use fixed delay so there is X interval between each run
1385                recoverService.scheduleWithFixedDelay(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS);
1386
1387                if (recoverable.getDeadLetterUri() != null) {
1388                    int max = recoverable.getMaximumRedeliveries();
1389                    if (max <= 0) {
1390                        throw new IllegalArgumentException("Option maximumRedeliveries must be a positive number, was: " + max);
1391                    }
1392                    LOG.info("After {} failed redelivery attempts Exchanges will be moved to deadLetterUri: {}", max, recoverable.getDeadLetterUri());
1393
1394                    // dead letter uri must be a valid endpoint
1395                    Endpoint endpoint = camelContext.getEndpoint(recoverable.getDeadLetterUri());
1396                    if (endpoint == null) {
1397                        throw new NoSuchEndpointException(recoverable.getDeadLetterUri());
1398                    }
1399                    deadLetterProducerTemplate = camelContext.createProducerTemplate();
1400                }
1401            }
1402        }
1403
1404        if (getCompletionInterval() > 0 && getCompletionTimeout() > 0) {
1405            throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both.");
1406        }
1407        if (getCompletionInterval() > 0) {
1408            LOG.info("Using CompletionInterval to run every {} millis.", getCompletionInterval());
1409            if (getTimeoutCheckerExecutorService() == null) {
1410                setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
1411                shutdownTimeoutCheckerExecutorService = true;
1412            }
1413            // trigger completion based on interval
1414            getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS);
1415        }
1416
1417        // start timeout service if its in use
1418        if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
1419            LOG.info("Using CompletionTimeout to trigger after {} millis of inactivity.", getCompletionTimeout());
1420            if (getTimeoutCheckerExecutorService() == null) {
1421                setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
1422                shutdownTimeoutCheckerExecutorService = true;
1423            }
1424            // check for timed out aggregated messages once every second
1425            timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), getCompletionTimeoutCheckerInterval());
1426            // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we
1427            // need to re-establish the timeout map so timeout can trigger
1428            restoreTimeoutMapFromAggregationRepository();
1429            ServiceHelper.startService(timeoutMap);
1430        }
1431
1432        if (aggregateController == null) {
1433            aggregateController = new DefaultAggregateController();
1434        }
1435        aggregateController.onStart(this);
1436    }
1437
1438    @Override
1439    protected void doStop() throws Exception {
1440        // note: we cannot do doForceCompletionOnStop from this doStop method
1441        // as this is handled in the prepareShutdown method which is also invoked when stopping a route
1442        // and is better suited for preparing to shutdown than this doStop method is
1443
1444        if (aggregateController != null) {
1445            aggregateController.onStop(this);
1446        }
1447
1448        if (recoverService != null) {
1449            camelContext.getExecutorServiceManager().shutdown(recoverService);
1450        }
1451        ServiceHelper.stopServices(timeoutMap, processor, deadLetterProducerTemplate);
1452
1453        if (closedCorrelationKeys != null) {
1454            // it may be a service so stop it as well
1455            ServiceHelper.stopService(closedCorrelationKeys);
1456            closedCorrelationKeys.clear();
1457        }
1458        batchConsumerCorrelationKeys.clear();
1459        redeliveryState.clear();
1460    }
1461
1462    @Override
1463    public void prepareShutdown(boolean suspendOnly, boolean forced) {
1464        // we are shutting down, so force completion if this option was enabled
1465        // but only do this when forced=false, as that is when we have chance to
1466        // send out new messages to be routed by Camel. When forced=true, then
1467        // we have to shutdown in a hurry
1468        if (!forced && forceCompletionOnStop) {
1469            doForceCompletionOnStop();
1470        }
1471    }
1472
1473    @Override
1474    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
1475        // not in use
1476        return true;
1477    }
1478
1479    @Override
1480    public int getPendingExchangesSize() {
1481        if (completeAllOnStop) {
1482            // we want to regard all pending exchanges in the repo as inflight
1483            Set<String> keys = getAggregationRepository().getKeys();
1484            return keys != null ? keys.size() : 0;
1485        } else {
1486            return 0;
1487        }
1488    }
1489
1490    private void doForceCompletionOnStop() {
1491        int expected = forceCompletionOfAllGroups();
1492
1493        StopWatch watch = new StopWatch();
1494        while (inProgressCompleteExchanges.size() > 0) {
1495            LOG.trace("Waiting for {} inflight exchanges to complete", getInProgressCompleteExchanges());
1496            try {
1497                Thread.sleep(100);
1498            } catch (InterruptedException e) {
1499                // break out as we got interrupted such as the JVM terminating
1500                LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", getInProgressCompleteExchanges());
1501                break;
1502            }
1503        }
1504
1505        if (expected > 0) {
1506            LOG.info("Forcing completion of all groups with {} exchanges completed in {}", expected, TimeUtils.printDuration(watch.taken()));
1507        }
1508    }
1509
1510    @Override
1511    protected void doShutdown() throws Exception {
1512        // shutdown aggregation repository and the strategy
1513        ServiceHelper.stopAndShutdownServices(aggregationRepository, aggregationStrategy);
1514
1515        // cleanup when shutting down
1516        inProgressCompleteExchanges.clear();
1517
1518        if (shutdownExecutorService) {
1519            camelContext.getExecutorServiceManager().shutdownNow(executorService);
1520        }
1521        if (shutdownTimeoutCheckerExecutorService) {
1522            camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService);
1523            timeoutCheckerExecutorService = null;
1524        }
1525
1526        super.doShutdown();
1527    }
1528
1529    public int forceCompletionOfGroup(String key) {
1530        // must acquire the shared aggregation lock to be able to trigger force completion
1531        int total = 0;
1532
1533        if (!optimisticLocking) {
1534            lock.lock();
1535        }
1536        try {
1537            Exchange exchange = aggregationRepository.get(camelContext, key);
1538            if (exchange != null) {
1539                total = 1;
1540                LOG.trace("Force completion triggered for correlation key: {}", key);
1541                // indicate it was completed by a force completion request
1542                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force");
1543                Exchange answer = onCompletion(key, exchange, exchange, false);
1544                if (answer != null) {
1545                    onSubmitCompletion(key, answer);
1546                }
1547            }
1548        } finally {
1549            if (!optimisticLocking) {
1550                lock.unlock(); 
1551            }
1552        }
1553        LOG.trace("Completed force completion of group {}", key);
1554
1555        if (total > 0) {
1556            LOG.debug("Forcing completion of group {} with {} exchanges", key, total);
1557        }
1558        return total;
1559    }
1560
1561    public int forceCompletionOfAllGroups() {
1562
1563        // only run if CamelContext has been fully started or is stopping
1564        boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping();
1565        if (!allow) {
1566            LOG.warn("Cannot start force completion of all groups because CamelContext({}) has not been started", camelContext.getName());
1567            return 0;
1568        }
1569
1570        LOG.trace("Starting force completion of all groups task");
1571
1572        // trigger completion for all in the repository
1573        Set<String> keys = aggregationRepository.getKeys();
1574
1575        int total = 0;
1576        if (keys != null && !keys.isEmpty()) {
1577            // must acquire the shared aggregation lock to be able to trigger force completion
1578            if (!optimisticLocking) {
1579                lock.lock(); 
1580            }
1581            total = keys.size();
1582            try {
1583                for (String key : keys) {
1584                    Exchange exchange = aggregationRepository.get(camelContext, key);
1585                    if (exchange != null) {
1586                        LOG.trace("Force completion triggered for correlation key: {}", key);
1587                        // indicate it was completed by a force completion request
1588                        exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "force");
1589                        Exchange answer = onCompletion(key, exchange, exchange, false);
1590                        if (answer != null) {
1591                            onSubmitCompletion(key, answer);
1592                        }
1593                    }
1594                }
1595            } finally {
1596                if (!optimisticLocking) {
1597                    lock.unlock();
1598                }
1599            }
1600        }
1601        LOG.trace("Completed force completion of all groups task");
1602
1603        if (total > 0) {
1604            LOG.debug("Forcing completion of all groups with {} exchanges", total);
1605        }
1606        return total;
1607    }
1608
1609}