001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.aggregate;
018    
019    import java.util.ArrayList;
020    import java.util.HashMap;
021    import java.util.HashSet;
022    import java.util.LinkedHashSet;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.Set;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.ExecutorService;
028    import java.util.concurrent.ScheduledExecutorService;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.atomic.AtomicInteger;
031    import java.util.concurrent.locks.Lock;
032    import java.util.concurrent.locks.ReentrantLock;
033    
034    import org.apache.camel.CamelContext;
035    import org.apache.camel.CamelExchangeException;
036    import org.apache.camel.Endpoint;
037    import org.apache.camel.Exchange;
038    import org.apache.camel.Expression;
039    import org.apache.camel.Navigate;
040    import org.apache.camel.NoSuchEndpointException;
041    import org.apache.camel.Predicate;
042    import org.apache.camel.Processor;
043    import org.apache.camel.ProducerTemplate;
044    import org.apache.camel.TimeoutMap;
045    import org.apache.camel.Traceable;
046    import org.apache.camel.impl.LoggingExceptionHandler;
047    import org.apache.camel.spi.AggregationRepository;
048    import org.apache.camel.spi.ExceptionHandler;
049    import org.apache.camel.spi.RecoverableAggregationRepository;
050    import org.apache.camel.spi.ShutdownPrepared;
051    import org.apache.camel.spi.Synchronization;
052    import org.apache.camel.support.DefaultTimeoutMap;
053    import org.apache.camel.support.ServiceSupport;
054    import org.apache.camel.util.ExchangeHelper;
055    import org.apache.camel.util.LRUCache;
056    import org.apache.camel.util.ObjectHelper;
057    import org.apache.camel.util.ServiceHelper;
058    import org.apache.camel.util.StopWatch;
059    import org.apache.camel.util.TimeUtils;
060    import org.slf4j.Logger;
061    import org.slf4j.LoggerFactory;
062    
063    /**
064     * An implementation of the <a
065     * href="http://camel.apache.org/aggregator2.html">Aggregator</a>
066     * pattern where a batch of messages are processed (up to a maximum amount or
067     * until some timeout is reached) and messages for the same correlation key are
068     * combined together using some kind of {@link AggregationStrategy}
069     * (by default the latest message is used) to compress many message exchanges
070     * into a smaller number of exchanges.
071     * <p/>
072     * A good example of this is stock market data; you may be receiving 30,000
073     * messages/second and you may want to throttle it right down so that multiple
074     * messages for the same stock are combined (or just the latest message is used
075     * and older prices are discarded). Another idea is to combine line item messages
076     * together into a single invoice message.
077     */
078    public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable, ShutdownPrepared {
079    
080        public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
081    
082        private static final Logger LOG = LoggerFactory.getLogger(AggregateProcessor.class);
083    
084        private final Lock lock = new ReentrantLock();
085        private final CamelContext camelContext;
086        private final Processor processor;
087        private final AggregationStrategy aggregationStrategy;
088        private final Expression correlationExpression;
089        private final ExecutorService executorService;
090        private final boolean shutdownExecutorService;
091        private ScheduledExecutorService timeoutCheckerExecutorService;
092        private boolean shutdownTimeoutCheckerExecutorService;
093        private ScheduledExecutorService recoverService;
094        // store correlation key -> exchange id in timeout map
095        private TimeoutMap<String, String> timeoutMap;
096        private ExceptionHandler exceptionHandler = new LoggingExceptionHandler(getClass());
097        private AggregationRepository aggregationRepository = new MemoryAggregationRepository();
098        private Map<Object, Object> closedCorrelationKeys;
099        private Set<String> batchConsumerCorrelationKeys = new LinkedHashSet<String>();
100        private final Set<String> inProgressCompleteExchanges = new HashSet<String>();
101        private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>();
102    
103        // keep booking about redelivery
104        private class RedeliveryData {
105            int redeliveryCounter;
106        }
107    
108        // options
109        private boolean ignoreInvalidCorrelationKeys;
110        private Integer closeCorrelationKeyOnCompletion;
111        private boolean parallelProcessing;
112    
113        // different ways to have completion triggered
114        private boolean eagerCheckCompletion;
115        private Predicate completionPredicate;
116        private long completionTimeout;
117        private Expression completionTimeoutExpression;
118        private long completionInterval;
119        private int completionSize;
120        private Expression completionSizeExpression;
121        private boolean completionFromBatchConsumer;
122        private AtomicInteger batchConsumerCounter = new AtomicInteger();
123        private boolean discardOnCompletionTimeout;
124        private boolean forceCompletionOnStop;
125    
126        private ProducerTemplate deadLetterProducerTemplate;
127    
128        public AggregateProcessor(CamelContext camelContext, Processor processor,
129                                  Expression correlationExpression, AggregationStrategy aggregationStrategy,
130                                  ExecutorService executorService, boolean shutdownExecutorService) {
131            ObjectHelper.notNull(camelContext, "camelContext");
132            ObjectHelper.notNull(processor, "processor");
133            ObjectHelper.notNull(correlationExpression, "correlationExpression");
134            ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
135            ObjectHelper.notNull(executorService, "executorService");
136            this.camelContext = camelContext;
137            this.processor = processor;
138            this.correlationExpression = correlationExpression;
139            this.aggregationStrategy = aggregationStrategy;
140            this.executorService = executorService;
141            this.shutdownExecutorService = shutdownExecutorService;
142        }
143    
144        @Override
145        public String toString() {
146            return "AggregateProcessor[to: " + processor + "]";
147        }
148    
149        public String getTraceLabel() {
150            return "aggregate[" + correlationExpression + "]";
151        }
152    
153        public List<Processor> next() {
154            if (!hasNext()) {
155                return null;
156            }
157            List<Processor> answer = new ArrayList<Processor>(1);
158            answer.add(processor);
159            return answer;
160        }
161    
162        public boolean hasNext() {
163            return processor != null;
164        }
165    
166        public void process(Exchange exchange) throws Exception {
167    
168            //check for the special header to force completion of all groups (and ignore the exchange otherwise)
169            boolean completeAllGroups = exchange.getIn().getHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, false, boolean.class);
170            if (completeAllGroups) {
171                forceCompletionOfAllGroups();
172                return;
173            }
174    
175            // compute correlation expression
176            String key = correlationExpression.evaluate(exchange, String.class);
177            if (ObjectHelper.isEmpty(key)) {
178                // we have a bad correlation key
179                if (isIgnoreInvalidCorrelationKeys()) {
180                    LOG.debug("Invalid correlation key. This Exchange will be ignored: {}", exchange);
181                    return;
182                } else {
183                    throw new CamelExchangeException("Invalid correlation key", exchange);
184                }
185            }
186    
187            // is the correlation key closed?
188            if (closedCorrelationKeys != null && closedCorrelationKeys.containsKey(key)) {
189                throw new ClosedCorrelationKeyException(key, exchange);
190            }
191    
192            // copy exchange, and do not share the unit of work
193            // the aggregated output runs in another unit of work
194            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
195    
196            // when memory based then its fast using synchronized, but if the aggregation repository is IO
197            // bound such as JPA etc then concurrent aggregation per correlation key could
198            // improve performance as we can run aggregation repository get/add in parallel
199            lock.lock();
200            try {
201                doAggregation(key, copy);
202            } finally {
203                lock.unlock();
204            }
205        }
206    
207        /**
208         * Aggregates the exchange with the given correlation key
209         * <p/>
210         * This method <b>must</b> be run synchronized as we cannot aggregate the same correlation key
211         * in parallel.
212         *
213         * @param key      the correlation key
214         * @param exchange the exchange
215         * @return the aggregated exchange
216         * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating
217         */
218        private Exchange doAggregation(String key, Exchange exchange) throws CamelExchangeException {
219            LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
220    
221            Exchange answer;
222            Exchange oldExchange = aggregationRepository.get(exchange.getContext(), key);
223            Exchange newExchange = exchange;
224    
225            Integer size = 1;
226            if (oldExchange != null) {
227                size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class);
228                size++;
229            }
230    
231            // check if we are complete
232            String complete = null;
233            if (isEagerCheckCompletion()) {
234                // put the current aggregated size on the exchange so its avail during completion check
235                newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
236                complete = isCompleted(key, newExchange);
237                // remove it afterwards
238                newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
239            }
240    
241            // prepare the exchanges for aggregation and aggregate it
242            ExchangeHelper.prepareAggregation(oldExchange, newExchange);
243            // must catch any exception from aggregation
244            try {
245                answer = onAggregation(oldExchange, exchange);
246            } catch (Throwable e) {
247                throw new CamelExchangeException("Error occurred during aggregation", exchange, e);
248            }
249            if (answer == null) {
250                throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", exchange);
251            }
252    
253            // update the aggregated size
254            answer.setProperty(Exchange.AGGREGATED_SIZE, size);
255    
256            // maybe we should check completion after the aggregation
257            if (!isEagerCheckCompletion()) {
258                complete = isCompleted(key, answer);
259            }
260    
261            // only need to update aggregation repository if we are not complete
262            if (complete == null) {
263                LOG.trace("In progress aggregated exchange: {} with correlation key: {}", answer, key);
264                aggregationRepository.add(exchange.getContext(), key, answer);
265            } else {
266                // if batch consumer completion is enabled then we need to complete the group
267                if ("consumer".equals(complete)) {
268                    for (String batchKey : batchConsumerCorrelationKeys) {
269                        Exchange batchAnswer;
270                        if (batchKey.equals(key)) {
271                            // skip the current aggregated key as we have already aggregated it and have the answer
272                            batchAnswer = answer;
273                        } else {
274                            batchAnswer = aggregationRepository.get(camelContext, batchKey);
275                        }
276    
277                        if (batchAnswer != null) {
278                            batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
279                            onCompletion(batchKey, batchAnswer, false);
280                        }
281                    }
282                    batchConsumerCorrelationKeys.clear();
283                } else {
284                    // we are complete for this exchange
285                    answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
286                    onCompletion(key, answer, false);
287                }
288            }
289    
290            LOG.trace("onAggregation +++  end  +++ with correlation key: {}", key);
291    
292            return answer;
293        }
294    
295        /**
296         * Tests whether the given exchange is complete or not
297         *
298         * @param key      the correlation key
299         * @param exchange the incoming exchange
300         * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion
301         */
302        protected String isCompleted(String key, Exchange exchange) {
303            // batch consumer completion must always run first
304            if (isCompletionFromBatchConsumer()) {
305                batchConsumerCorrelationKeys.add(key);
306                batchConsumerCounter.incrementAndGet();
307                int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
308                if (size > 0 && batchConsumerCounter.intValue() >= size) {
309                    // batch consumer is complete then reset the counter
310                    batchConsumerCounter.set(0);
311                    return "consumer";
312                }
313            }
314    
315            if (getCompletionPredicate() != null) {
316                boolean answer = getCompletionPredicate().matches(exchange);
317                if (answer) {
318                    return "predicate";
319                }
320            }
321    
322            boolean sizeChecked = false;
323            if (getCompletionSizeExpression() != null) {
324                Integer value = getCompletionSizeExpression().evaluate(exchange, Integer.class);
325                if (value != null && value > 0) {
326                    // mark as already checked size as expression takes precedence over static configured
327                    sizeChecked = true;
328                    int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
329                    if (size >= value) {
330                        return "size";
331                    }
332                }
333            }
334            if (!sizeChecked && getCompletionSize() > 0) {
335                int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
336                if (size >= getCompletionSize()) {
337                    return "size";
338                }
339            }
340    
341            // timeout can be either evaluated based on an expression or from a fixed value
342            // expression takes precedence
343            boolean timeoutSet = false;
344            if (getCompletionTimeoutExpression() != null) {
345                Long value = getCompletionTimeoutExpression().evaluate(exchange, Long.class);
346                if (value != null && value > 0) {
347                    if (LOG.isTraceEnabled()) {
348                        LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}",
349                                new Object[]{key, value, exchange});
350                    }
351                    addExchangeToTimeoutMap(key, exchange, value);
352                    timeoutSet = true;
353                }
354            }
355            if (!timeoutSet && getCompletionTimeout() > 0) {
356                // timeout is used so use the timeout map to keep an eye on this
357                if (LOG.isTraceEnabled()) {
358                    LOG.trace("Updating correlation key {} to timeout after {} ms. as exchange received: {}",
359                            new Object[]{key, getCompletionTimeout(), exchange});
360                }
361                addExchangeToTimeoutMap(key, exchange, getCompletionTimeout());
362            }
363    
364            // not complete
365            return null;
366        }
367    
368        protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
369            return aggregationStrategy.aggregate(oldExchange, newExchange);
370        }
371    
372        protected void onCompletion(final String key, final Exchange exchange, boolean fromTimeout) {
373            // store the correlation key as property
374            exchange.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
375            // remove from repository as its completed
376            aggregationRepository.remove(exchange.getContext(), key, exchange);
377            if (!fromTimeout && timeoutMap != null) {
378                // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
379                timeoutMap.remove(key);
380            }
381    
382            // this key has been closed so add it to the closed map
383            if (closedCorrelationKeys != null) {
384                closedCorrelationKeys.put(key, key);
385            }
386    
387            if (fromTimeout) {
388                // invoke timeout if its timeout aware aggregation strategy,
389                // to allow any custom processing before discarding the exchange
390                if (aggregationStrategy instanceof TimeoutAwareAggregationStrategy) {
391                    long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1;
392                    ((TimeoutAwareAggregationStrategy) aggregationStrategy).timeout(exchange, -1, -1, timeout);
393                }
394            }
395    
396            if (fromTimeout && isDiscardOnCompletionTimeout()) {
397                // discard due timeout
398                LOG.debug("Aggregation for correlation key {} discarding aggregated exchange: ()", key, exchange);
399                // must confirm the discarded exchange
400                aggregationRepository.confirm(exchange.getContext(), exchange.getExchangeId());
401                // and remove redelivery state as well
402                redeliveryState.remove(exchange.getExchangeId());
403            } else {
404                // the aggregated exchange should be published (sent out)
405                onSubmitCompletion(key, exchange);
406            }
407        }
408    
409        private void onSubmitCompletion(final Object key, final Exchange exchange) {
410            LOG.debug("Aggregation complete for correlation key {} sending aggregated exchange: {}", key, exchange);
411    
412            // add this as in progress before we submit the task
413            inProgressCompleteExchanges.add(exchange.getExchangeId());
414    
415            // invoke the on completion callback
416            if (aggregationStrategy instanceof CompletionAwareAggregationStrategy) {
417                ((CompletionAwareAggregationStrategy) aggregationStrategy).onCompletion(exchange);
418            }
419    
420                // send this exchange
421            executorService.submit(new Runnable() {
422                public void run() {
423                    LOG.debug("Processing aggregated exchange: {}", exchange);
424    
425                    // add on completion task so we remember to update the inProgressCompleteExchanges
426                    exchange.addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId()));
427    
428                    try {
429                        processor.process(exchange);
430                    } catch (Throwable e) {
431                        exchange.setException(e);
432                    }
433    
434                    // log exception if there was a problem
435                    if (exchange.getException() != null) {
436                        // if there was an exception then let the exception handler handle it
437                        getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException());
438                    } else {
439                        LOG.trace("Processing aggregated exchange: {} complete.", exchange);
440                    }
441                }
442            });
443        }
444    
445        /**
446         * Restores the timeout map with timeout values from the aggregation repository.
447         * <p/>
448         * This is needed in case the aggregator has been stopped and started again (for example a server restart).
449         * Then the existing exchanges from the {@link AggregationRepository} must have its timeout conditions restored.
450         */
451        protected void restoreTimeoutMapFromAggregationRepository() throws Exception {
452            // grab the timeout value for each partly aggregated exchange
453            Set<String> keys = aggregationRepository.getKeys();
454            if (keys == null || keys.isEmpty()) {
455                return;
456            }
457    
458            StopWatch watch = new StopWatch();
459            LOG.trace("Starting restoring CompletionTimeout for {} existing exchanges from the aggregation repository...", keys.size());
460    
461            for (String key : keys) {
462                Exchange exchange = aggregationRepository.get(camelContext, key);
463                // grab the timeout value
464                long timeout = exchange.hasProperties() ? exchange.getProperty(Exchange.AGGREGATED_TIMEOUT, 0, long.class) : 0;
465                if (timeout > 0) {
466                    LOG.trace("Restoring CompletionTimeout for exchangeId: {} with timeout: {} millis.", exchange.getExchangeId(), timeout);
467                    addExchangeToTimeoutMap(key, exchange, timeout);
468                }
469            }
470    
471            // log duration of this task so end user can see how long it takes to pre-check this upon starting
472            LOG.info("Restored {} CompletionTimeout conditions in the AggregationTimeoutChecker in {}",
473                    timeoutMap.size(), TimeUtils.printDuration(watch.stop()));
474        }
475    
476        /**
477         * Adds the given exchange to the timeout map, which is used by the timeout checker task to trigger timeouts.
478         *
479         * @param key      the correlation key
480         * @param exchange the exchange
481         * @param timeout  the timeout value in millis
482         */
483        private void addExchangeToTimeoutMap(String key, Exchange exchange, long timeout) {
484            // store the timeout value on the exchange as well, in case we need it later
485            exchange.setProperty(Exchange.AGGREGATED_TIMEOUT, timeout);
486            timeoutMap.put(key, exchange.getExchangeId(), timeout);
487        }
488    
489        public Predicate getCompletionPredicate() {
490            return completionPredicate;
491        }
492    
493        public void setCompletionPredicate(Predicate completionPredicate) {
494            this.completionPredicate = completionPredicate;
495        }
496    
497        public boolean isEagerCheckCompletion() {
498            return eagerCheckCompletion;
499        }
500    
501        public void setEagerCheckCompletion(boolean eagerCheckCompletion) {
502            this.eagerCheckCompletion = eagerCheckCompletion;
503        }
504    
505        public long getCompletionTimeout() {
506            return completionTimeout;
507        }
508    
509        public void setCompletionTimeout(long completionTimeout) {
510            this.completionTimeout = completionTimeout;
511        }
512    
513        public Expression getCompletionTimeoutExpression() {
514            return completionTimeoutExpression;
515        }
516    
517        public void setCompletionTimeoutExpression(Expression completionTimeoutExpression) {
518            this.completionTimeoutExpression = completionTimeoutExpression;
519        }
520    
521        public long getCompletionInterval() {
522            return completionInterval;
523        }
524    
525        public void setCompletionInterval(long completionInterval) {
526            this.completionInterval = completionInterval;
527        }
528    
529        public int getCompletionSize() {
530            return completionSize;
531        }
532    
533        public void setCompletionSize(int completionSize) {
534            this.completionSize = completionSize;
535        }
536    
537        public Expression getCompletionSizeExpression() {
538            return completionSizeExpression;
539        }
540    
541        public void setCompletionSizeExpression(Expression completionSizeExpression) {
542            this.completionSizeExpression = completionSizeExpression;
543        }
544    
545        public boolean isIgnoreInvalidCorrelationKeys() {
546            return ignoreInvalidCorrelationKeys;
547        }
548    
549        public void setIgnoreInvalidCorrelationKeys(boolean ignoreInvalidCorrelationKeys) {
550            this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys;
551        }
552    
553        public Integer getCloseCorrelationKeyOnCompletion() {
554            return closeCorrelationKeyOnCompletion;
555        }
556    
557        public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
558            this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
559        }
560    
561        public boolean isCompletionFromBatchConsumer() {
562            return completionFromBatchConsumer;
563        }
564    
565        public void setCompletionFromBatchConsumer(boolean completionFromBatchConsumer) {
566            this.completionFromBatchConsumer = completionFromBatchConsumer;
567        }
568    
569        public ExceptionHandler getExceptionHandler() {
570            return exceptionHandler;
571        }
572    
573        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
574            this.exceptionHandler = exceptionHandler;
575        }
576    
577        public boolean isParallelProcessing() {
578            return parallelProcessing;
579        }
580    
581        public void setParallelProcessing(boolean parallelProcessing) {
582            this.parallelProcessing = parallelProcessing;
583        }
584    
585        public AggregationRepository getAggregationRepository() {
586            return aggregationRepository;
587        }
588    
589        public void setAggregationRepository(AggregationRepository aggregationRepository) {
590            this.aggregationRepository = aggregationRepository;
591        }
592    
593        public boolean isDiscardOnCompletionTimeout() {
594            return discardOnCompletionTimeout;
595        }
596    
597        public void setDiscardOnCompletionTimeout(boolean discardOnCompletionTimeout) {
598            this.discardOnCompletionTimeout = discardOnCompletionTimeout;
599        }
600    
601        public void setForceCompletionOnStop(boolean forceCompletionOnStop) {
602            this.forceCompletionOnStop = forceCompletionOnStop;
603        }
604    
605        public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
606            this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
607        }
608    
609        public ScheduledExecutorService getTimeoutCheckerExecutorService() {
610            return timeoutCheckerExecutorService;
611        }
612    
613        public boolean isShutdownTimeoutCheckerExecutorService() {
614            return shutdownTimeoutCheckerExecutorService;
615        }
616    
617        public void setShutdownTimeoutCheckerExecutorService(boolean shutdownTimeoutCheckerExecutorService) {
618            this.shutdownTimeoutCheckerExecutorService = shutdownTimeoutCheckerExecutorService;
619        }
620    
621        /**
622         * On completion task which keeps the booking of the in progress up to date
623         */
624        private final class AggregateOnCompletion implements Synchronization {
625            private final String exchangeId;
626    
627            private AggregateOnCompletion(String exchangeId) {
628                // must use the original exchange id as it could potentially change if send over SEDA etc.
629                this.exchangeId = exchangeId;
630            }
631    
632            public void onFailure(Exchange exchange) {
633                LOG.trace("Aggregated exchange onFailure: {}", exchange);
634    
635                // must remember to remove in progress when we failed
636                inProgressCompleteExchanges.remove(exchangeId);
637                // do not remove redelivery state as we need it when we redeliver again later
638            }
639    
640            public void onComplete(Exchange exchange) {
641                LOG.trace("Aggregated exchange onComplete: {}", exchange);
642    
643                // only confirm if we processed without a problem
644                try {
645                    aggregationRepository.confirm(exchange.getContext(), exchangeId);
646                    // and remove redelivery state as well
647                    redeliveryState.remove(exchangeId);
648                } finally {
649                    // must remember to remove in progress when we are complete
650                    inProgressCompleteExchanges.remove(exchangeId);
651                }
652            }
653    
654            @Override
655            public String toString() {
656                return "AggregateOnCompletion";
657            }
658        }
659    
660        /**
661         * Background task that looks for aggregated exchanges which is triggered by completion timeouts.
662         */
663        private final class AggregationTimeoutMap extends DefaultTimeoutMap<String, String> {
664    
665            private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
666                // do NOT use locking on the timeout map as this aggregator has its own shared lock we will use instead
667                super(executor, requestMapPollTimeMillis, false);
668            }
669    
670            @Override
671            public void purge() {
672                // must acquire the shared aggregation lock to be able to purge
673                lock.lock();
674                try {
675                    super.purge();
676                } finally {
677                    lock.unlock();
678                }
679            }
680    
681            @Override
682            public boolean onEviction(String key, String exchangeId) {
683                log.debug("Completion timeout triggered for correlation key: {}", key);
684    
685                boolean inProgress = inProgressCompleteExchanges.contains(exchangeId);
686                if (inProgress) {
687                    LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
688                    return true;
689                }
690    
691                // get the aggregated exchange
692                Exchange answer = aggregationRepository.get(camelContext, key);
693                if (answer != null) {
694                    // indicate it was completed by timeout
695                    answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
696                    onCompletion(key, answer, true);
697                }
698                return true;
699            }
700        }
701    
702        /**
703         * Background task that triggers completion based on interval.
704         */
705        private final class AggregationIntervalTask implements Runnable {
706    
707            public void run() {
708                // only run if CamelContext has been fully started
709                if (!camelContext.getStatus().isStarted()) {
710                    LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", camelContext.getName());
711                    return;
712                }
713    
714                LOG.trace("Starting completion interval task");
715    
716                // trigger completion for all in the repository
717                Set<String> keys = aggregationRepository.getKeys();
718    
719                if (keys != null && !keys.isEmpty()) {
720                    // must acquire the shared aggregation lock to be able to trigger interval completion
721                    lock.lock();
722                    try {
723                        for (String key : keys) {
724                            Exchange exchange = aggregationRepository.get(camelContext, key);
725                            if (exchange != null) {
726                                LOG.trace("Completion interval triggered for correlation key: {}", key);
727                                // indicate it was completed by interval
728                                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
729                                onCompletion(key, exchange, false);
730                            }
731                        }
732                    } finally {
733                        lock.unlock();
734                    }
735                }
736    
737                LOG.trace("Completion interval task complete");
738            }
739        }
740    
741        /**
742         * Background task that looks for aggregated exchanges to recover.
743         */
744        private final class RecoverTask implements Runnable {
745            private final RecoverableAggregationRepository recoverable;
746    
747            private RecoverTask(RecoverableAggregationRepository recoverable) {
748                this.recoverable = recoverable;
749            }
750    
751            public void run() {
752                // only run if CamelContext has been fully started
753                if (!camelContext.getStatus().isStarted()) {
754                    LOG.trace("Recover check cannot start due CamelContext({}) has not been started yet", camelContext.getName());
755                    return;
756                }
757    
758                LOG.trace("Starting recover check");
759    
760                // copy the current in progress before doing scan
761                final Set<String> copyOfInProgress = new LinkedHashSet<String>(inProgressCompleteExchanges);
762    
763                Set<String> exchangeIds = recoverable.scan(camelContext);
764                for (String exchangeId : exchangeIds) {
765    
766                    // we may shutdown while doing recovery
767                    if (!isRunAllowed()) {
768                        LOG.info("We are shutting down so stop recovering");
769                        return;
770                    }
771    
772                    // consider in progress if it was in progress before we did the scan, or currently after we did the scan
773                    // its safer to consider it in progress than risk duplicates due both in progress + recovered
774                    boolean inProgress = copyOfInProgress.contains(exchangeId) || inProgressCompleteExchanges.contains(exchangeId);
775                    if (inProgress) {
776                        LOG.trace("Aggregated exchange with id: {} is already in progress.", exchangeId);
777                    } else {
778                        LOG.debug("Loading aggregated exchange with id: {} to be recovered.", exchangeId);
779                        Exchange exchange = recoverable.recover(camelContext, exchangeId);
780                        if (exchange != null) {
781                            // get the correlation key
782                            String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class);
783                            // and mark it as redelivered
784                            exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE);
785    
786                            // get the current redelivery data
787                            RedeliveryData data = redeliveryState.get(exchange.getExchangeId());
788    
789                            // if we are exhausted, then move to dead letter channel
790                            if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) {
791                                LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries()
792                                        + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri());
793    
794                                // send to DLC
795                                try {
796                                    // set redelivery counter
797                                    exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
798                                    exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
799                                    deadLetterProducerTemplate.send(recoverable.getDeadLetterUri(), exchange);
800                                } catch (Throwable e) {
801                                    exchange.setException(e);
802                                }
803    
804                                // handle if failed
805                                if (exchange.getException() != null) {
806                                    getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException());
807                                } else {
808                                    // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again
809                                    recoverable.confirm(camelContext, exchangeId);
810                                }
811                            } else {
812                                // update current redelivery state
813                                if (data == null) {
814                                    // create new data
815                                    data = new RedeliveryData();
816                                    redeliveryState.put(exchange.getExchangeId(), data);
817                                }
818                                data.redeliveryCounter++;
819    
820                                // set redelivery counter
821                                exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter);
822                                if (recoverable.getMaximumRedeliveries() > 0) {
823                                    exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, recoverable.getMaximumRedeliveries());
824                                }
825    
826                                LOG.debug("Delivery attempt: {} to recover aggregated exchange with id: {}", data.redeliveryCounter, exchangeId);
827    
828                                // not exhaust so resubmit the recovered exchange
829                                onSubmitCompletion(key, exchange);
830                            }
831                        }
832                    }
833                }
834    
835                LOG.trace("Recover check complete");
836            }
837        }
838    
839        @Override
840        protected void doStart() throws Exception {
841            if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
842                    && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null
843                    && getCompletionSizeExpression() == null) {
844                throw new IllegalStateException("At least one of the completions options"
845                        + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
846            }
847    
848            if (getCloseCorrelationKeyOnCompletion() != null) {
849                if (getCloseCorrelationKeyOnCompletion() > 0) {
850                    LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of " + getCloseCorrelationKeyOnCompletion());
851                    closedCorrelationKeys = new LRUCache<Object, Object>(getCloseCorrelationKeyOnCompletion());
852                } else {
853                    LOG.info("Using ClosedCorrelationKeys with unbounded capacity");
854                    closedCorrelationKeys = new HashMap<Object, Object>();
855                }
856            }
857    
858            ServiceHelper.startServices(processor, aggregationRepository);
859    
860            // should we use recover checker
861            if (aggregationRepository instanceof RecoverableAggregationRepository) {
862                RecoverableAggregationRepository recoverable = (RecoverableAggregationRepository) aggregationRepository;
863                if (recoverable.isUseRecovery()) {
864                    long interval = recoverable.getRecoveryIntervalInMillis();
865                    if (interval <= 0) {
866                        throw new IllegalArgumentException("AggregationRepository has recovery enabled and the RecoveryInterval option must be a positive number, was: " + interval);
867                    }
868    
869                    // create a background recover thread to check every interval
870                    recoverService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "AggregateRecoverChecker", 1);
871                    Runnable recoverTask = new RecoverTask(recoverable);
872                    LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every " + interval + " millis.");
873                    // use fixed delay so there is X interval between each run
874                    recoverService.scheduleWithFixedDelay(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS);
875    
876                    if (recoverable.getDeadLetterUri() != null) {
877                        int max = recoverable.getMaximumRedeliveries();
878                        if (max <= 0) {
879                            throw new IllegalArgumentException("Option maximumRedeliveries must be a positive number, was: " + max);
880                        }
881                        LOG.info("After " + max + " failed redelivery attempts Exchanges will be moved to deadLetterUri: " + recoverable.getDeadLetterUri());
882    
883                        // dead letter uri must be a valid endpoint
884                        Endpoint endpoint = camelContext.getEndpoint(recoverable.getDeadLetterUri());
885                        if (endpoint == null) {
886                            throw new NoSuchEndpointException(recoverable.getDeadLetterUri());
887                        }
888                        deadLetterProducerTemplate = camelContext.createProducerTemplate();
889                    }
890                }
891            }
892    
893            if (getCompletionInterval() > 0 && getCompletionTimeout() > 0) {
894                throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both.");
895            }
896            if (getCompletionInterval() > 0) {
897                LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis.");
898                if (getTimeoutCheckerExecutorService() == null) {
899                    setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
900                    shutdownTimeoutCheckerExecutorService = true;
901                }
902                // trigger completion based on interval
903                getTimeoutCheckerExecutorService().scheduleAtFixedRate(new AggregationIntervalTask(), getCompletionInterval(), getCompletionInterval(), TimeUnit.MILLISECONDS);
904            }
905    
906            // start timeout service if its in use
907            if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
908                LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout() + " millis of inactivity.");
909                if (getTimeoutCheckerExecutorService() == null) {
910                    setTimeoutCheckerExecutorService(camelContext.getExecutorServiceManager().newScheduledThreadPool(this, AGGREGATE_TIMEOUT_CHECKER, 1));
911                    shutdownTimeoutCheckerExecutorService = true;
912                }
913                // check for timed out aggregated messages once every second
914                timeoutMap = new AggregationTimeoutMap(getTimeoutCheckerExecutorService(), 1000L);
915                // fill in existing timeout values from the aggregation repository, for example if a restart occurred, then we
916                // need to re-establish the timeout map so timeout can trigger
917                restoreTimeoutMapFromAggregationRepository();
918                ServiceHelper.startService(timeoutMap);
919            }
920        }
921    
922        @Override
923        protected void doStop() throws Exception {
924            // note: we cannot do doForceCompletionOnStop from this doStop method
925            // as this is handled in the prepareShutdown method which is also invoked when stopping a route
926            // and is better suited for preparing to shutdown than this doStop method is
927    
928            if (recoverService != null) {
929                camelContext.getExecutorServiceManager().shutdownNow(recoverService);
930            }
931            ServiceHelper.stopServices(timeoutMap, processor, deadLetterProducerTemplate);
932    
933            if (closedCorrelationKeys != null) {
934                // it may be a service so stop it as well
935                ServiceHelper.stopService(closedCorrelationKeys);
936                closedCorrelationKeys.clear();
937            }
938            batchConsumerCorrelationKeys.clear();
939            redeliveryState.clear();
940        }
941    
942        @Override
943        public void prepareShutdown(boolean forced) {
944            // we are shutting down, so force completion if this option was enabled
945            // but only do this when forced=false, as that is when we have chance to
946            // send out new messages to be routed by Camel. When forced=true, then
947            // we have to shutdown in a hurry
948            if (!forced && forceCompletionOnStop) {
949                doForceCompletionOnStop();
950            }
951        }
952    
953        private void doForceCompletionOnStop() {
954            int expected = forceCompletionOfAllGroups();
955    
956            StopWatch watch = new StopWatch();
957            while (inProgressCompleteExchanges.size() > 0) {
958                LOG.trace("Waiting for {} inflight exchanges to complete", inProgressCompleteExchanges.size());
959                try {
960                    Thread.sleep(100);
961                } catch (InterruptedException e) {
962                    // break out as we got interrupted such as the JVM terminating
963                    LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", inProgressCompleteExchanges.size());
964                    break;
965                }
966            }
967    
968            if (expected > 0) {
969                LOG.info("Forcing completion of all groups with {} exchanges completed in {}", expected, TimeUtils.printDuration(watch.stop()));
970            }
971        }
972    
973        @Override
974        protected void doShutdown() throws Exception {
975            // shutdown aggregation repository
976            ServiceHelper.stopService(aggregationRepository);
977    
978            // cleanup when shutting down
979            inProgressCompleteExchanges.clear();
980    
981            if (shutdownExecutorService) {
982                camelContext.getExecutorServiceManager().shutdownNow(executorService);
983            }
984            if (shutdownTimeoutCheckerExecutorService) {
985                camelContext.getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService);
986                timeoutCheckerExecutorService = null;
987            }
988    
989            super.doShutdown();
990        }
991    
992        public int forceCompletionOfAllGroups() {
993    
994            // only run if CamelContext has been fully started or is stopping
995            boolean allow = camelContext.getStatus().isStarted() || camelContext.getStatus().isStopping();
996            if (!allow) {
997                LOG.warn("Cannot start force completion of all groups because CamelContext({}) has not been started", camelContext.getName());
998                return 0;
999            }
1000    
1001            LOG.trace("Starting force completion of all groups task");
1002    
1003            // trigger completion for all in the repository
1004            Set<String> keys = aggregationRepository.getKeys();
1005    
1006            int total = 0;
1007            if (keys != null && !keys.isEmpty()) {
1008                // must acquire the shared aggregation lock to be able to trigger force completion
1009                lock.lock();
1010                total = keys.size();
1011                try {
1012                    for (String key : keys) {
1013                        Exchange exchange = aggregationRepository.get(camelContext, key);
1014                        if (exchange != null) {
1015                            LOG.trace("Force completion triggered for correlation key: {}", key);
1016                            // indicate it was completed by a force completion request
1017                            exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
1018                            onCompletion(key, exchange, false);
1019                        }
1020                    }
1021                } finally {
1022                    lock.unlock();
1023                }
1024            }
1025            LOG.trace("Completed force completion of all groups task");
1026    
1027            if (total > 0) {
1028                LOG.debug("Forcing completion of all groups with {} exchanges", total);
1029            }
1030            return total;
1031        }
1032    
1033    }