001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.Callable;
022import java.util.concurrent.RejectedExecutionException;
023import java.util.concurrent.ScheduledExecutorService;
024import java.util.concurrent.ThreadPoolExecutor;
025import java.util.concurrent.TimeUnit;
026import java.util.concurrent.atomic.AtomicInteger;
027
028import org.apache.camel.AsyncCallback;
029import org.apache.camel.AsyncProcessor;
030import org.apache.camel.CamelContext;
031import org.apache.camel.Exchange;
032import org.apache.camel.LoggingLevel;
033import org.apache.camel.Message;
034import org.apache.camel.Navigate;
035import org.apache.camel.Predicate;
036import org.apache.camel.Processor;
037import org.apache.camel.model.OnExceptionDefinition;
038import org.apache.camel.spi.ExchangeFormatter;
039import org.apache.camel.spi.ShutdownPrepared;
040import org.apache.camel.spi.SubUnitOfWorkCallback;
041import org.apache.camel.spi.UnitOfWork;
042import org.apache.camel.util.AsyncProcessorConverterHelper;
043import org.apache.camel.util.AsyncProcessorHelper;
044import org.apache.camel.util.CamelContextHelper;
045import org.apache.camel.util.CamelLogger;
046import org.apache.camel.util.EventHelper;
047import org.apache.camel.util.ExchangeHelper;
048import org.apache.camel.util.MessageHelper;
049import org.apache.camel.util.ObjectHelper;
050import org.apache.camel.util.ServiceHelper;
051import org.apache.camel.util.URISupport;
052
053/**
054 * Base redeliverable error handler that also supports a final dead letter queue in case
055 * all redelivery attempts fail.
056 * <p/>
057 * This implementation should contain all the error handling logic and the sub classes
058 * should only configure it according to what they support.
059 *
060 * @version
061 */
062public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> {
063
064    protected final AtomicInteger redeliverySleepCounter = new AtomicInteger();
065    protected ScheduledExecutorService executorService;
066    protected final CamelContext camelContext;
067    protected final Processor deadLetter;
068    protected final String deadLetterUri;
069    protected final boolean deadLetterHandleNewException;
070    protected final Processor output;
071    protected final AsyncProcessor outputAsync;
072    protected final Processor redeliveryProcessor;
073    protected final RedeliveryPolicy redeliveryPolicy;
074    protected final Predicate retryWhilePolicy;
075    protected final CamelLogger logger;
076    protected final boolean useOriginalMessagePolicy;
077    protected boolean redeliveryEnabled;
078    protected volatile boolean preparingShutdown;
079    protected final ExchangeFormatter exchangeFormatter;
080    protected final boolean customExchangeFormatter;
081    protected final Processor onPrepareProcessor;
082    protected final Processor onExceptionProcessor;
083
084    /**
085     * Contains the current redelivery data
086     */
087    protected class RedeliveryData {
088        Exchange original;
089        boolean sync = true;
090        int redeliveryCounter;
091        long redeliveryDelay;
092        Predicate retryWhilePredicate;
093        boolean redeliverFromSync;
094
095        // default behavior which can be overloaded on a per exception basis
096        RedeliveryPolicy currentRedeliveryPolicy;
097        Processor deadLetterProcessor;
098        Processor failureProcessor;
099        Processor onRedeliveryProcessor;
100        Processor onPrepareProcessor;
101        Processor onExceptionProcessor;
102        Predicate handledPredicate;
103        Predicate continuedPredicate;
104        boolean useOriginalInMessage;
105        boolean handleNewException;
106
107        public RedeliveryData() {
108            // init with values from the error handler
109            this.retryWhilePredicate = retryWhilePolicy;
110            this.currentRedeliveryPolicy = redeliveryPolicy;
111            this.deadLetterProcessor = deadLetter;
112            this.onRedeliveryProcessor = redeliveryProcessor;
113            this.onPrepareProcessor = RedeliveryErrorHandler.this.onPrepareProcessor;
114            this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor;
115            this.handledPredicate = getDefaultHandledPredicate();
116            this.useOriginalInMessage = useOriginalMessagePolicy;
117            this.handleNewException = deadLetterHandleNewException;
118        }
119    }
120
121    /**
122     * Tasks which performs asynchronous redelivery attempts, and being triggered by a
123     * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
124     * has to be delayed before a redelivery attempt is performed.
125     */
126    private class AsyncRedeliveryTask implements Callable<Boolean> {
127
128        private final Exchange exchange;
129        private final AsyncCallback callback;
130        private final RedeliveryData data;
131
132        AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
133            this.exchange = exchange;
134            this.callback = callback;
135            this.data = data;
136        }
137
138        public Boolean call() throws Exception {
139            // prepare for redelivery
140            prepareExchangeForRedelivery(exchange, data);
141
142            // letting onRedeliver be executed at first
143            deliverToOnRedeliveryProcessor(exchange, data);
144
145            if (log.isTraceEnabled()) {
146                log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", new Object[]{exchange.getExchangeId(), outputAsync, exchange});
147            }
148
149            // emmit event we are doing redelivery
150            EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
151
152            // process the exchange (also redelivery)
153            boolean sync;
154            if (data.redeliverFromSync) {
155                // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from
156                // this error handler, which means we have to invoke the callback with false, to have the callback
157                // be notified when we are done
158                sync = outputAsync.process(exchange, new AsyncCallback() {
159                    public void done(boolean doneSync) {
160                        log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
161
162                        // mark we are in sync mode now
163                        data.sync = false;
164
165                        // only process if the exchange hasn't failed
166                        // and it has not been handled by the error processor
167                        if (isDone(exchange)) {
168                            callback.done(false);
169                            return;
170                        }
171
172                        // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
173                        processAsyncErrorHandler(exchange, callback, data);
174                    }
175                });
176            } else {
177                // this redelivery task was scheduled from asynchronous, which means we should only
178                // handle when the asynchronous task was done
179                sync = outputAsync.process(exchange, new AsyncCallback() {
180                    public void done(boolean doneSync) {
181                        log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
182
183                        // this callback should only handle the async case
184                        if (doneSync) {
185                            return;
186                        }
187
188                        // mark we are in async mode now
189                        data.sync = false;
190
191                        // only process if the exchange hasn't failed
192                        // and it has not been handled by the error processor
193                        if (isDone(exchange)) {
194                            callback.done(doneSync);
195                            return;
196                        }
197                        // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
198                        processAsyncErrorHandler(exchange, callback, data);
199                    }
200                });
201            }
202
203            return sync;
204        }
205    }
206
207    public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
208                                  Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter,
209                                  String deadLetterUri, boolean deadLetterHandleNewException, boolean useOriginalMessagePolicy,
210                                  Predicate retryWhile, ScheduledExecutorService executorService, Processor onPrepareProcessor, Processor onExceptionProcessor) {
211
212        ObjectHelper.notNull(camelContext, "CamelContext", this);
213        ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this);
214
215        this.camelContext = camelContext;
216        this.redeliveryProcessor = redeliveryProcessor;
217        this.deadLetter = deadLetter;
218        this.output = output;
219        this.outputAsync = AsyncProcessorConverterHelper.convert(output);
220        this.redeliveryPolicy = redeliveryPolicy;
221        this.logger = logger;
222        this.deadLetterUri = deadLetterUri;
223        this.deadLetterHandleNewException = deadLetterHandleNewException;
224        this.useOriginalMessagePolicy = useOriginalMessagePolicy;
225        this.retryWhilePolicy = retryWhile;
226        this.executorService = executorService;
227        this.onPrepareProcessor = onPrepareProcessor;
228        this.onExceptionProcessor = onExceptionProcessor;
229
230        if (ObjectHelper.isNotEmpty(redeliveryPolicy.getExchangeFormatterRef())) {
231            ExchangeFormatter formatter = camelContext.getRegistry().lookupByNameAndType(redeliveryPolicy.getExchangeFormatterRef(), ExchangeFormatter.class);
232            if (formatter != null) {
233                this.exchangeFormatter = formatter;
234                this.customExchangeFormatter = true;
235            } else {
236                throw new IllegalArgumentException("Cannot find the exchangeFormatter by using reference id " + redeliveryPolicy.getExchangeFormatterRef());
237            }
238        } else {
239            this.customExchangeFormatter = false;
240            // setup exchange formatter to be used for message history dump
241            DefaultExchangeFormatter formatter = new DefaultExchangeFormatter();
242            formatter.setShowExchangeId(true);
243            formatter.setMultiline(true);
244            formatter.setShowHeaders(true);
245            formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed);
246            try {
247                Integer maxChars = CamelContextHelper.parseInteger(camelContext, camelContext.getProperty(Exchange.LOG_DEBUG_BODY_MAX_CHARS));
248                if (maxChars != null) {
249                    formatter.setMaxChars(maxChars);
250                }
251            } catch (Exception e) {
252                throw ObjectHelper.wrapRuntimeCamelException(e);
253            }
254            this.exchangeFormatter = formatter;
255        }
256    }
257
258    public boolean supportTransacted() {
259        return false;
260    }
261
262    @Override
263    public boolean hasNext() {
264        return output != null;
265    }
266
267    @Override
268    public List<Processor> next() {
269        if (!hasNext()) {
270            return null;
271        }
272        List<Processor> answer = new ArrayList<Processor>(1);
273        answer.add(output);
274        return answer;
275    }
276
277    protected boolean isRunAllowed(RedeliveryData data) {
278        // if camel context is forcing a shutdown then do not allow running
279        boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
280        if (forceShutdown) {
281            log.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)");
282            return false;
283        }
284
285        // redelivery policy can control if redelivery is allowed during stopping/shutdown
286        // but this only applies during a redelivery (counter must > 0)
287        if (data.redeliveryCounter > 0) {
288            if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
289                log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)");
290                return true;
291            } else if (preparingShutdown) {
292                // we are preparing for shutdown, now determine if we can still run
293                boolean answer = isRunAllowedOnPreparingShutdown();
294                log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer);
295                return answer;
296            }
297        }
298
299        // we cannot run if we are stopping/stopped
300        boolean answer = !isStoppingOrStopped();
301        log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer);
302        return answer;
303    }
304
305    protected boolean isRunAllowedOnPreparingShutdown() {
306        return false;
307    }
308
309    protected boolean isRedeliveryAllowed(RedeliveryData data) {
310        // redelivery policy can control if redelivery is allowed during stopping/shutdown
311        // but this only applies during a redelivery (counter must > 0)
312        if (data.redeliveryCounter > 0) {
313            boolean stopping = isStoppingOrStopped();
314            if (!preparingShutdown && !stopping) {
315                log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)");
316                return true;
317            } else {
318                // we are stopping or preparing to shutdown
319                if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
320                    log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)");
321                    return true;
322                } else {
323                    log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)");
324                    return false;
325                }
326            }
327        }
328
329        return true;
330    }
331
332    @Override
333    public void prepareShutdown(boolean suspendOnly, boolean forced) {
334        // prepare for shutdown, eg do not allow redelivery if configured
335        log.trace("Prepare shutdown on error handler {}", this);
336        preparingShutdown = true;
337    }
338
339    public void process(Exchange exchange) throws Exception {
340        if (output == null) {
341            // no output then just return
342            return;
343        }
344        AsyncProcessorHelper.process(this, exchange);
345    }
346
347    /**
348     * Process the exchange using redelivery error handling.
349     */
350    public boolean process(final Exchange exchange, final AsyncCallback callback) {
351        final RedeliveryData data = new RedeliveryData();
352
353        // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
354        // original Exchange is being redelivered, and not a mutated Exchange
355        data.original = defensiveCopyExchangeIfNeeded(exchange);
356
357        // use looping to have redelivery attempts
358        while (true) {
359
360            // can we still run
361            if (!isRunAllowed(data)) {
362                log.trace("Run not allowed, will reject executing exchange: {}", exchange);
363                if (exchange.getException() == null) {
364                    exchange.setException(new RejectedExecutionException());
365                }
366                // we cannot process so invoke callback
367                callback.done(data.sync);
368                return data.sync;
369            }
370
371            // did previous processing cause an exception?
372            boolean handle = shouldHandleException(exchange);
373            if (handle) {
374                handleException(exchange, data, isDeadLetterChannel());
375                onExceptionOccurred(exchange, data);
376            }
377
378            // compute if we are exhausted, and whether redelivery is allowed
379            boolean exhausted = isExhausted(exchange, data);
380            boolean redeliverAllowed = isRedeliveryAllowed(data);
381
382            // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC)
383            if (!redeliverAllowed || exhausted) {
384                Processor target = null;
385                boolean deliver = true;
386
387                // the unit of work may have an optional callback associated we need to leverage
388                SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback();
389                if (uowCallback != null) {
390                    // signal to the callback we are exhausted
391                    uowCallback.onExhausted(exchange);
392                    // do not deliver to the failure processor as its been handled by the callback instead
393                    deliver = false;
394                }
395
396                if (deliver) {
397                    // should deliver to failure processor (either from onException or the dead letter channel)
398                    target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
399                }
400                // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
401                // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
402                boolean isDeadLetterChannel = isDeadLetterChannel() && (target == null || target == data.deadLetterProcessor);
403                boolean sync = deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
404                // we are breaking out
405                return sync;
406            }
407
408            if (data.redeliveryCounter > 0) {
409                // calculate delay
410                data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
411
412                if (data.redeliveryDelay > 0) {
413                    // okay there is a delay so create a scheduled task to have it executed in the future
414
415                    if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
416
417                        // we are doing a redelivery then a thread pool must be configured (see the doStart method)
418                        ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
419
420                        // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
421                        // have it being executed in the future, or immediately
422                        // we are continuing asynchronously
423
424                        // mark we are routing async from now and that this redelivery task came from a synchronous routing
425                        data.sync = false;
426                        data.redeliverFromSync = true;
427                        AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
428
429                        // schedule the redelivery task
430                        if (log.isTraceEnabled()) {
431                            log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
432                        }
433                        executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
434
435                        return false;
436                    } else {
437                        // async delayed redelivery was disabled or we are transacted so we must be synchronous
438                        // as the transaction manager requires to execute in the same thread context
439                        try {
440                            // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping
441                            redeliverySleepCounter.incrementAndGet();
442                            data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
443                            redeliverySleepCounter.decrementAndGet();
444                        } catch (InterruptedException e) {
445                            redeliverySleepCounter.decrementAndGet();
446                            // we was interrupted so break out
447                            exchange.setException(e);
448                            // mark the exchange to stop continue routing when interrupted
449                            // as we do not want to continue routing (for example a task has been cancelled)
450                            exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
451                            callback.done(data.sync);
452                            return data.sync;
453                        }
454                    }
455                }
456
457                // prepare for redelivery
458                prepareExchangeForRedelivery(exchange, data);
459
460                // letting onRedeliver be executed
461                deliverToOnRedeliveryProcessor(exchange, data);
462
463                // emmit event we are doing redelivery
464                EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
465            }
466
467            // process the exchange (also redelivery)
468            boolean sync = outputAsync.process(exchange, new AsyncCallback() {
469                public void done(boolean sync) {
470                    // this callback should only handle the async case
471                    if (sync) {
472                        return;
473                    }
474
475                    // mark we are in async mode now
476                    data.sync = false;
477
478                    // if we are done then notify callback and exit
479                    if (isDone(exchange)) {
480                        callback.done(sync);
481                        return;
482                    }
483
484                    // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
485                    // method which takes care of this in a asynchronous manner
486                    processAsyncErrorHandler(exchange, callback, data);
487                }
488            });
489
490            if (!sync) {
491                // the remainder of the Exchange is being processed asynchronously so we should return
492                return false;
493            }
494            // we continue to route synchronously
495
496            // if we are done then notify callback and exit
497            boolean done = isDone(exchange);
498            if (done) {
499                callback.done(true);
500                return true;
501            }
502
503            // error occurred so loop back around.....
504        }
505    }
506
507    /**
508     * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY}
509     * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p>
510     *
511     * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay}
512     * and {@link RedeliveryData#redeliveryCounter} are copied in.</p>
513     *
514     * @param exchange The current exchange in question.
515     * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation.
516     * @param redeliveryDelay The default redelivery delay from RedeliveryData
517     * @param redeliveryCounter The redeliveryCounter
518     * @return The time to wait before the next redelivery.
519     */
520    protected long determineRedeliveryDelay(Exchange exchange, RedeliveryPolicy redeliveryPolicy, long redeliveryDelay, int redeliveryCounter) {
521        Message message = exchange.getIn();
522        Long delay = message.getHeader(Exchange.REDELIVERY_DELAY, Long.class);
523        if (delay == null) {
524            delay = redeliveryPolicy.calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter);
525            log.debug("Redelivery delay calculated as {}", delay);
526        } else {
527            log.debug("Redelivery delay is {} from Message Header [{}]", delay, Exchange.REDELIVERY_DELAY);
528        }
529        return delay;
530    }
531
532    /**
533     * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback.
534     * <p/>
535     * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use
536     * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b>
537     * in terms of logic.
538     */
539    protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
540        // can we still run
541        if (!isRunAllowed(data)) {
542            log.trace("Run not allowed, will reject executing exchange: {}", exchange);
543            if (exchange.getException() == null) {
544                exchange.setException(new RejectedExecutionException());
545            }
546            callback.done(data.sync);
547            return;
548        }
549
550        // did previous processing cause an exception?
551        boolean handle = shouldHandleException(exchange);
552        if (handle) {
553            handleException(exchange, data, isDeadLetterChannel());
554            onExceptionOccurred(exchange, data);
555        }
556
557        // compute if we are exhausted or not
558        boolean exhausted = isExhausted(exchange, data);
559        if (exhausted) {
560            Processor target = null;
561            boolean deliver = true;
562
563            // the unit of work may have an optional callback associated we need to leverage
564            UnitOfWork uow = exchange.getUnitOfWork();
565            if (uow != null) {
566                SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback();
567                if (uowCallback != null) {
568                    // signal to the callback we are exhausted
569                    uowCallback.onExhausted(exchange);
570                    // do not deliver to the failure processor as its been handled by the callback instead
571                    deliver = false;
572                }
573            }
574
575            if (deliver) {
576                // should deliver to failure processor (either from onException or the dead letter channel)
577                target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
578            }
579            // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
580            // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
581            boolean isDeadLetterChannel = isDeadLetterChannel() && target == data.deadLetterProcessor;
582            deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
583            // we are breaking out
584            return;
585        }
586
587        if (data.redeliveryCounter > 0) {
588            // we are doing a redelivery then a thread pool must be configured (see the doStart method)
589            ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
590
591            // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
592            // have it being executed in the future, or immediately
593            // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously
594            // to ensure the callback will continue routing from where we left
595            AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
596
597            // calculate the redelivery delay
598            data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
599
600            if (data.redeliveryDelay > 0) {
601                // schedule the redelivery task
602                if (log.isTraceEnabled()) {
603                    log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
604                }
605                executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
606            } else {
607                // execute the task immediately
608                executorService.submit(task);
609            }
610        }
611    }
612
613    /**
614     * Performs a defensive copy of the exchange if needed
615     *
616     * @param exchange the exchange
617     * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled).
618     */
619    protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) {
620        // only do a defensive copy if redelivery is enabled
621        if (redeliveryEnabled) {
622            return ExchangeHelper.createCopy(exchange, true);
623        } else {
624            return null;
625        }
626    }
627
628    /**
629     * Strategy whether the exchange has an exception that we should try to handle.
630     * <p/>
631     * Standard implementations should just look for an exception.
632     */
633    protected boolean shouldHandleException(Exchange exchange) {
634        return exchange.getException() != null;
635    }
636
637    /**
638     * Strategy to determine if the exchange is done so we can continue
639     */
640    protected boolean isDone(Exchange exchange) {
641        boolean answer = isCancelledOrInterrupted(exchange);
642
643        // only done if the exchange hasn't failed
644        // and it has not been handled by the failure processor
645        // or we are exhausted
646        if (!answer) {
647            answer = exchange.getException() == null
648                || ExchangeHelper.isFailureHandled(exchange)
649                || ExchangeHelper.isRedeliveryExhausted(exchange);
650        }
651
652        log.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer);
653        return answer;
654    }
655
656    /**
657     * Strategy to determine if the exchange was cancelled or interrupted
658     */
659    protected boolean isCancelledOrInterrupted(Exchange exchange) {
660        boolean answer = false;
661
662        if (ExchangeHelper.isInterrupted(exchange)) {
663            // mark the exchange to stop continue routing when interrupted
664            // as we do not want to continue routing (for example a task has been cancelled)
665            exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
666            answer = true;
667        }
668
669        log.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer);
670        return answer;
671    }
672
673    /**
674     * Returns the output processor
675     */
676    public Processor getOutput() {
677        return output;
678    }
679
680    /**
681     * Returns the dead letter that message exchanges will be sent to if the
682     * redelivery attempts fail
683     */
684    public Processor getDeadLetter() {
685        return deadLetter;
686    }
687
688    public String getDeadLetterUri() {
689        return deadLetterUri;
690    }
691
692    public boolean isUseOriginalMessagePolicy() {
693        return useOriginalMessagePolicy;
694    }
695
696    public boolean isDeadLetterHandleNewException() {
697        return deadLetterHandleNewException;
698    }
699
700    public RedeliveryPolicy getRedeliveryPolicy() {
701        return redeliveryPolicy;
702    }
703
704    public CamelLogger getLogger() {
705        return logger;
706    }
707
708    protected Predicate getDefaultHandledPredicate() {
709        // Default is not not handle errors
710        return null;
711    }
712
713    protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) {
714        Exception caught = exchange.getException();
715
716        // we continue so clear any exceptions
717        exchange.setException(null);
718        // clear rollback flags
719        exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
720        // reset cached streams so they can be read again
721        MessageHelper.resetStreamCache(exchange.getIn());
722
723        // its continued then remove traces of redelivery attempted and caught exception
724        exchange.getIn().removeHeader(Exchange.REDELIVERED);
725        exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
726        exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
727        exchange.removeProperty(Exchange.FAILURE_HANDLED);
728        // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
729
730        // create log message
731        String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
732        msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
733        msg = msg + ". Handled and continue routing.";
734
735        // log that we failed but want to continue
736        logFailedDelivery(false, false, false, true, isDeadLetterChannel, exchange, msg, data, null);
737    }
738
739    protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) {
740        if (!redeliveryEnabled) {
741            throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly.");
742        }
743        // there must be a defensive copy of the exchange
744        ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this);
745
746        // okay we will give it another go so clear the exception so we can try again
747        exchange.setException(null);
748
749        // clear rollback flags
750        exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
751
752        // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange
753        // and then put these on the exchange when doing a redelivery / fault processor
754
755        // preserve these headers
756        Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
757        Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
758        Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class);
759
760        // we are redelivering so copy from original back to exchange
761        exchange.getIn().copyFrom(data.original.getIn());
762        exchange.setOut(null);
763        // reset cached streams so they can be read again
764        MessageHelper.resetStreamCache(exchange.getIn());
765
766        // put back headers
767        if (redeliveryCounter != null) {
768            exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
769        }
770        if (redeliveryMaxCounter != null) {
771            exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
772        }
773        if (redelivered != null) {
774            exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered);
775        }
776    }
777
778    protected void handleException(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) {
779        Exception e = exchange.getException();
780
781        // store the original caused exception in a property, so we can restore it later
782        exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
783
784        // find the error handler to use (if any)
785        OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
786        if (exceptionPolicy != null) {
787            data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
788            data.handledPredicate = exceptionPolicy.getHandledPolicy();
789            data.continuedPredicate = exceptionPolicy.getContinuedPolicy();
790            data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy();
791            data.useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy() != null && exceptionPolicy.getUseOriginalMessagePolicy();
792
793            // route specific failure handler?
794            Processor processor = null;
795            UnitOfWork uow = exchange.getUnitOfWork();
796            if (uow != null && uow.getRouteContext() != null) {
797                String routeId = uow.getRouteContext().getRoute().getId();
798                processor = exceptionPolicy.getErrorHandler(routeId);
799            } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) {
800                // note this should really not happen, but we have this code as a fail safe
801                // to be backwards compatible with the old behavior
802                log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId());
803                processor = exceptionPolicy.getErrorHandlers().iterator().next();
804            }
805            if (processor != null) {
806                data.failureProcessor = processor;
807            }
808
809            // route specific on redelivery?
810            processor = exceptionPolicy.getOnRedelivery();
811            if (processor != null) {
812                data.onRedeliveryProcessor = processor;
813            }
814            // route specific on exception occurred?
815            processor = exceptionPolicy.getOnExceptionOccurred();
816            if (processor != null) {
817                data.onExceptionProcessor = processor;
818            }
819        }
820
821        // only log if not failure handled or not an exhausted unit of work
822        if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) {
823            String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange)
824                    + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
825            logFailedDelivery(true, false, false, false, isDeadLetterChannel, exchange, msg, data, e);
826        }
827
828        data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data);
829    }
830
831    /**
832     * Gives an optional configured OnExceptionOccurred processor a chance to process just after an exception
833     * was thrown while processing the Exchange. This allows to execute the processor at the same time the exception was thrown.
834     */
835    protected void onExceptionOccurred(Exchange exchange, final RedeliveryData data) {
836        if (data.onExceptionProcessor == null) {
837            return;
838        }
839
840        // run this synchronously as its just a Processor
841        try {
842            if (log.isTraceEnabled()) {
843                log.trace("OnExceptionOccurred processor {} is processing Exchange: {} due exception occurred", data.onExceptionProcessor, exchange);
844            }
845            data.onExceptionProcessor.process(exchange);
846        } catch (Throwable e) {
847            // we dont not want new exception to override existing, so log it as a WARN
848            log.warn("Error during processing OnExceptionOccurred. This exception is ignored.", e);
849        }
850        log.trace("OnExceptionOccurred processor done");
851    }
852
853    /**
854     * Gives an optional configured redelivery processor a chance to process before the Exchange
855     * will be redelivered. This can be used to alter the Exchange.
856     */
857    protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
858        if (data.onRedeliveryProcessor == null) {
859            return;
860        }
861
862        if (log.isTraceEnabled()) {
863            log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered",
864                    data.onRedeliveryProcessor, exchange);
865        }
866
867        // run this synchronously as its just a Processor
868        try {
869            data.onRedeliveryProcessor.process(exchange);
870        } catch (Throwable e) {
871            exchange.setException(e);
872        }
873        log.trace("Redelivery processor done");
874    }
875
876    /**
877     * All redelivery attempts failed so move the exchange to the dead letter queue
878     */
879    protected boolean deliverToFailureProcessor(final Processor processor, final boolean isDeadLetterChannel, final Exchange exchange,
880                                                final RedeliveryData data, final AsyncCallback callback) {
881        boolean sync = true;
882
883        Exception caught = exchange.getException();
884
885        // we did not success with the redelivery so now we let the failure processor handle it
886        // clear exception as we let the failure processor handle it
887        exchange.setException(null);
888
889        final boolean shouldHandle = shouldHandle(exchange, data);
890        final boolean shouldContinue = shouldContinue(exchange, data);
891
892        // regard both handled or continued as being handled
893        boolean handled = false;
894
895        // always handle if dead letter channel
896        boolean handleOrContinue = isDeadLetterChannel || shouldHandle || shouldContinue;
897        if (handleOrContinue) {
898            // its handled then remove traces of redelivery attempted
899            exchange.getIn().removeHeader(Exchange.REDELIVERED);
900            exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
901            exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
902            exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
903
904            // and remove traces of rollback only and uow exhausted markers
905            exchange.removeProperty(Exchange.ROLLBACK_ONLY);
906            exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED);
907
908            handled = true;
909        } else {
910            // must decrement the redelivery counter as we didn't process the redelivery but is
911            // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
912            decrementRedeliveryCounter(exchange);
913        }
914
915        // we should allow using the failure processor if we should not continue
916        // or in case of continue then the failure processor is NOT a dead letter channel
917        // because you can continue and still let the failure processor do some routing
918        // before continue in the main route.
919        boolean allowFailureProcessor = !shouldContinue || !isDeadLetterChannel;
920
921        if (allowFailureProcessor && processor != null) {
922
923            // prepare original IN body if it should be moved instead of current body
924            if (data.useOriginalInMessage) {
925                log.trace("Using the original IN message instead of current");
926                Message original = ExchangeHelper.getOriginalInMessage(exchange);
927                exchange.setIn(original);
928                if (exchange.hasOut()) {
929                    log.trace("Removing the out message to avoid some uncertain behavior");
930                    exchange.setOut(null);
931                }
932            }
933
934            // reset cached streams so they can be read again
935            MessageHelper.resetStreamCache(exchange.getIn());
936
937            // invoke custom on prepare
938            if (onPrepareProcessor != null) {
939                try {
940                    log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange);
941                    onPrepareProcessor.process(exchange);
942                } catch (Exception e) {
943                    // a new exception was thrown during prepare
944                    exchange.setException(e);
945                }
946            }
947
948            log.trace("Failure processor {} is processing Exchange: {}", processor, exchange);
949
950            // store the last to endpoint as the failure endpoint
951            exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
952            // and store the route id so we know in which route we failed
953            UnitOfWork uow = exchange.getUnitOfWork();
954            if (uow != null && uow.getRouteContext() != null) {
955                exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
956            }
957
958            // fire event as we had a failure processor to handle it, which there is a event for
959            final boolean deadLetterChannel = processor == data.deadLetterProcessor;
960
961            EventHelper.notifyExchangeFailureHandling(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
962
963            // the failure processor could also be asynchronous
964            AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor);
965            sync = afp.process(exchange, new AsyncCallback() {
966                public void done(boolean sync) {
967                    log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange);
968                    try {
969                        prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue);
970                        // fire event as we had a failure processor to handle it, which there is a event for
971                        EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
972                    } finally {
973                        // if the fault was handled asynchronously, this should be reflected in the callback as well
974                        data.sync &= sync;
975                        callback.done(data.sync);
976                    }
977                }
978            });
979        } else {
980            try {
981                // invoke custom on prepare
982                if (onPrepareProcessor != null) {
983                    try {
984                        log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange);
985                        onPrepareProcessor.process(exchange);
986                    } catch (Exception e) {
987                        // a new exception was thrown during prepare
988                        exchange.setException(e);
989                    }
990                }
991                // no processor but we need to prepare after failure as well
992                prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue);
993            } finally {
994                // callback we are done
995                callback.done(data.sync);
996            }
997        }
998
999        // create log message
1000        String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
1001        msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
1002        if (processor != null) {
1003            if (isDeadLetterChannel && deadLetterUri != null) {
1004                msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]";
1005            } else {
1006                msg = msg + ". Processed by failure processor: " + processor;
1007            }
1008        }
1009
1010        // log that we failed delivery as we are exhausted
1011        logFailedDelivery(false, false, handled, false, isDeadLetterChannel, exchange, msg, data, null);
1012
1013        return sync;
1014    }
1015
1016    protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data, final boolean isDeadLetterChannel,
1017                                               final boolean shouldHandle, final boolean shouldContinue) {
1018
1019        Exception newException = exchange.getException();
1020
1021        // we could not process the exchange so we let the failure processor handled it
1022        ExchangeHelper.setFailureHandled(exchange);
1023
1024        // honor if already set a handling
1025        boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
1026        if (alreadySet) {
1027            boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
1028            log.trace("This exchange has already been marked for handling: {}", handled);
1029            if (!handled) {
1030                // exception not handled, put exception back in the exchange
1031                exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
1032                // and put failure endpoint back as well
1033                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
1034            }
1035            return;
1036        }
1037
1038        // dead letter channel is special
1039        if (shouldContinue) {
1040            log.trace("This exchange is continued: {}", exchange);
1041            // okay we want to continue then prepare the exchange for that as well
1042            prepareExchangeForContinue(exchange, data, isDeadLetterChannel);
1043        } else if (shouldHandle) {
1044            log.trace("This exchange is handled so its marked as not failed: {}", exchange);
1045            exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
1046        } else {
1047            // okay the redelivery policy are not explicit set to true, so we should allow to check for some
1048            // special situations when using dead letter channel
1049            if (isDeadLetterChannel) {
1050
1051                // DLC is always handling the first thrown exception,
1052                // but if its a new exception then use the configured option
1053                boolean handled = newException == null || data.handleNewException;
1054
1055                // when using DLC then log new exception whether its being handled or not, as otherwise it may appear as
1056                // the DLC swallow new exceptions by default (which is by design to ensure the DLC always complete,
1057                // to avoid causing endless poison messages that fails forever)
1058                if (newException != null && data.currentRedeliveryPolicy.isLogNewException()) {
1059                    String uri = URISupport.sanitizeUri(deadLetterUri);
1060                    String msg = "New exception occurred during processing by the DeadLetterChannel[" + uri + "] due " + newException.getMessage();
1061                    if (handled) {
1062                        msg += ". The new exception is being handled as deadLetterHandleNewException=true.";
1063                    } else {
1064                        msg += ". The new exception is not handled as deadLetterHandleNewException=false.";
1065                    }
1066                    logFailedDelivery(false, true, handled, false, true, exchange, msg, data, newException);
1067                }
1068
1069                if (handled) {
1070                    log.trace("This exchange is handled so its marked as not failed: {}", exchange);
1071                    exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
1072                    return;
1073                }
1074            }
1075
1076            // not handled by default
1077            prepareExchangeAfterFailureNotHandled(exchange);
1078        }
1079    }
1080
1081    private void prepareExchangeAfterFailureNotHandled(Exchange exchange) {
1082        log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange);
1083        // exception not handled, put exception back in the exchange
1084        exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
1085        exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
1086        // and put failure endpoint back as well
1087        exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
1088        // and store the route id so we know in which route we failed
1089        UnitOfWork uow = exchange.getUnitOfWork();
1090        if (uow != null && uow.getRouteContext() != null) {
1091            exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
1092        }
1093    }
1094
1095    private void logFailedDelivery(boolean shouldRedeliver, boolean newException, boolean handled, boolean continued, boolean isDeadLetterChannel,
1096                                   Exchange exchange, String message, RedeliveryData data, Throwable e) {
1097        if (logger == null) {
1098            return;
1099        }
1100
1101        if (!exchange.isRollbackOnly()) {
1102            if (newException && !data.currentRedeliveryPolicy.isLogNewException()) {
1103                // do not log new exception
1104                return;
1105            }
1106
1107            // if we should not rollback, then check whether logging is enabled
1108
1109            if (!newException && handled && !data.currentRedeliveryPolicy.isLogHandled()) {
1110                // do not log handled
1111                return;
1112            }
1113
1114            if (!newException && continued && !data.currentRedeliveryPolicy.isLogContinued()) {
1115                // do not log handled
1116                return;
1117            }
1118
1119            if (!newException && shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) {
1120                // do not log retry attempts
1121                return;
1122            }
1123
1124            if (!newException && !shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) {
1125                // do not log exhausted
1126                return;
1127            }
1128        }
1129
1130        LoggingLevel newLogLevel;
1131        boolean logStackTrace;
1132        if (exchange.isRollbackOnly()) {
1133            newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
1134            logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
1135        } else if (shouldRedeliver) {
1136            newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
1137            logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace();
1138        } else {
1139            newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
1140            logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
1141        }
1142        if (e == null) {
1143            e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
1144        }
1145
1146        if (newException) {
1147            // log at most WARN level
1148            if (newLogLevel == LoggingLevel.ERROR) {
1149                newLogLevel = LoggingLevel.WARN;
1150            }
1151            String msg = message;
1152            if (msg == null) {
1153                msg = "New exception " + ExchangeHelper.logIds(exchange);
1154                // special for logging the new exception
1155                Throwable cause = e;
1156                if (cause != null) {
1157                    msg = msg + " due: " + cause.getMessage();
1158                }
1159            }
1160
1161            if (e != null && logStackTrace) {
1162                logger.log(msg, e, newLogLevel);
1163            } else {
1164                logger.log(msg, newLogLevel);
1165            }
1166        } else if (exchange.isRollbackOnly()) {
1167            String msg = "Rollback " + ExchangeHelper.logIds(exchange);
1168            Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
1169            if (cause != null) {
1170                msg = msg + " due: " + cause.getMessage();
1171            }
1172
1173            // should we include message history
1174            if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
1175                // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
1176                ExchangeFormatter formatter = customExchangeFormatter ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() ? exchangeFormatter : null);
1177                String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, false);
1178                if (routeStackTrace != null) {
1179                    msg = msg + "\n" + routeStackTrace;
1180                }
1181            }
1182
1183            if (newLogLevel == LoggingLevel.ERROR) {
1184                // log intended rollback on maximum WARN level (no ERROR)
1185                logger.log(msg, LoggingLevel.WARN);
1186            } else {
1187                // otherwise use the desired logging level
1188                logger.log(msg, newLogLevel);
1189            }
1190        } else {
1191            String msg = message;
1192            // should we include message history
1193            if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
1194                // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
1195                ExchangeFormatter formatter = customExchangeFormatter ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() ? exchangeFormatter : null);
1196                String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, e != null && logStackTrace);
1197                if (routeStackTrace != null) {
1198                    msg = msg + "\n" + routeStackTrace;
1199                }
1200            }
1201
1202            if (e != null && logStackTrace) {
1203                logger.log(msg, e, newLogLevel);
1204            } else {
1205                logger.log(msg, newLogLevel);
1206            }
1207        }
1208    }
1209
1210    /**
1211     * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback).
1212     * <p/>
1213     * If the exchange is exhausted, then we will not continue processing, but let the
1214     * failure processor deal with the exchange.
1215     *
1216     * @param exchange the current exchange
1217     * @param data     the redelivery data
1218     * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust.
1219     */
1220    private boolean isExhausted(Exchange exchange, RedeliveryData data) {
1221        // if marked as rollback only then do not continue/redeliver
1222        boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
1223        if (exhausted) {
1224            log.trace("This exchange is marked as redelivery exhausted: {}", exchange);
1225            return true;
1226        }
1227
1228        // if marked as rollback only then do not continue/redeliver
1229        boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class);
1230        if (rollbackOnly) {
1231            log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange);
1232            return true;
1233        }
1234        // its the first original call so continue
1235        if (data.redeliveryCounter == 0) {
1236            return false;
1237        }
1238        // its a potential redelivery so determine if we should redeliver or not
1239        boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate);
1240        return !redeliver;
1241    }
1242
1243    /**
1244     * Determines whether or not to continue if we are exhausted.
1245     *
1246     * @param exchange the current exchange
1247     * @param data     the redelivery data
1248     * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust.
1249     */
1250    private boolean shouldContinue(Exchange exchange, RedeliveryData data) {
1251        if (data.continuedPredicate != null) {
1252            return data.continuedPredicate.matches(exchange);
1253        }
1254        // do not continue by default
1255        return false;
1256    }
1257
1258    /**
1259     * Determines whether or not to handle if we are exhausted.
1260     *
1261     * @param exchange the current exchange
1262     * @param data     the redelivery data
1263     * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust.
1264     */
1265    private boolean shouldHandle(Exchange exchange, RedeliveryData data) {
1266        if (data.handledPredicate != null) {
1267            return data.handledPredicate.matches(exchange);
1268        }
1269        // do not handle by default
1270        return false;
1271    }
1272
1273    /**
1274     * Increments the redelivery counter and adds the redelivered flag if the
1275     * message has been redelivered
1276     */
1277    private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) {
1278        Message in = exchange.getIn();
1279        Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
1280        int next = 1;
1281        if (counter != null) {
1282            next = counter + 1;
1283        }
1284        in.setHeader(Exchange.REDELIVERY_COUNTER, next);
1285        in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
1286        // if maximum redeliveries is used, then provide that information as well
1287        if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) {
1288            in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries());
1289        }
1290        return next;
1291    }
1292
1293    /**
1294     * Prepares the redelivery counter and boolean flag for the failure handle processor
1295     */
1296    private void decrementRedeliveryCounter(Exchange exchange) {
1297        Message in = exchange.getIn();
1298        Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
1299        if (counter != null) {
1300            int prev = counter - 1;
1301            in.setHeader(Exchange.REDELIVERY_COUNTER, prev);
1302            // set boolean flag according to counter
1303            in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
1304        } else {
1305            // not redelivered
1306            in.setHeader(Exchange.REDELIVERY_COUNTER, 0);
1307            in.setHeader(Exchange.REDELIVERED, Boolean.FALSE);
1308        }
1309    }
1310
1311    /**
1312     * Determines if redelivery is enabled by checking if any of the redelivery policy
1313     * settings may allow redeliveries.
1314     *
1315     * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise
1316     * @throws Exception can be thrown
1317     */
1318    private boolean determineIfRedeliveryIsEnabled() throws Exception {
1319        // determine if redeliver is enabled either on error handler
1320        if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) {
1321            // must check for != 0 as (-1 means redeliver forever)
1322            return true;
1323        }
1324        if (retryWhilePolicy != null) {
1325            return true;
1326        }
1327
1328        // or on the exception policies
1329        if (!exceptionPolicies.isEmpty()) {
1330            // walk them to see if any of them have a maximum redeliveries > 0 or retry until set
1331            for (OnExceptionDefinition def : exceptionPolicies.values()) {
1332
1333                String ref = def.getRedeliveryPolicyRef();
1334                if (ref != null) {
1335                    // lookup in registry if ref provided
1336                    RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class);
1337                    if (policy.getMaximumRedeliveries() != 0) {
1338                        // must check for != 0 as (-1 means redeliver forever)
1339                        return true;
1340                    }
1341                } else if (def.getRedeliveryPolicy() != null) {
1342                    Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries());
1343                    if (max != null && max != 0) {
1344                        // must check for != 0 as (-1 means redeliver forever)
1345                        return true;
1346                    }
1347                }
1348
1349                if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) {
1350                    return true;
1351                }
1352            }
1353        }
1354
1355        return false;
1356    }
1357
1358    /**
1359     * Gets the number of exchanges that are pending for redelivery
1360     */
1361    public int getPendingRedeliveryCount() {
1362        int answer = redeliverySleepCounter.get();
1363        if (executorService != null && executorService instanceof ThreadPoolExecutor) {
1364            answer += ((ThreadPoolExecutor) executorService).getQueue().size();
1365        }
1366
1367        return answer;
1368    }
1369
1370    @Override
1371    protected void doStart() throws Exception {
1372        ServiceHelper.startServices(output, outputAsync, deadLetter);
1373
1374        // determine if redeliver is enabled or not
1375        redeliveryEnabled = determineIfRedeliveryIsEnabled();
1376        if (log.isTraceEnabled()) {
1377            log.trace("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this);
1378        }
1379
1380        // we only need thread pool if redelivery is enabled
1381        if (redeliveryEnabled) {
1382            if (executorService == null) {
1383                // use default shared executor service
1384                executorService = camelContext.getErrorHandlerExecutorService();
1385            }
1386            if (log.isDebugEnabled()) {
1387                log.debug("Using ExecutorService: {} for redeliveries on error handler: {}", executorService, this);
1388            }
1389        }
1390
1391        // reset flag when starting
1392        preparingShutdown = false;
1393        redeliverySleepCounter.set(0);
1394    }
1395
1396    @Override
1397    protected void doStop() throws Exception {
1398        // noop, do not stop any services which we only do when shutting down
1399        // as the error handler can be context scoped, and should not stop in case
1400        // a route stops
1401    }
1402
1403    @Override
1404    protected void doShutdown() throws Exception {
1405        ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync);
1406    }
1407}