001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.Callable;
022import java.util.concurrent.CountDownLatch;
023import java.util.concurrent.RejectedExecutionException;
024import java.util.concurrent.ScheduledExecutorService;
025import java.util.concurrent.ThreadPoolExecutor;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicInteger;
028
029import org.apache.camel.AsyncCallback;
030import org.apache.camel.AsyncProcessor;
031import org.apache.camel.CamelContext;
032import org.apache.camel.Exchange;
033import org.apache.camel.LoggingLevel;
034import org.apache.camel.Message;
035import org.apache.camel.Navigate;
036import org.apache.camel.Predicate;
037import org.apache.camel.Processor;
038import org.apache.camel.model.OnExceptionDefinition;
039import org.apache.camel.spi.AsyncProcessorAwaitManager;
040import org.apache.camel.spi.ExchangeFormatter;
041import org.apache.camel.spi.ShutdownPrepared;
042import org.apache.camel.spi.SubUnitOfWorkCallback;
043import org.apache.camel.spi.UnitOfWork;
044import org.apache.camel.util.AsyncProcessorConverterHelper;
045import org.apache.camel.util.AsyncProcessorHelper;
046import org.apache.camel.util.CamelContextHelper;
047import org.apache.camel.util.CamelLogger;
048import org.apache.camel.util.EventHelper;
049import org.apache.camel.util.ExchangeHelper;
050import org.apache.camel.util.MessageHelper;
051import org.apache.camel.util.ObjectHelper;
052import org.apache.camel.util.ServiceHelper;
053import org.apache.camel.util.StopWatch;
054import org.apache.camel.util.URISupport;
055
056/**
057 * Base redeliverable error handler that also supports a final dead letter queue in case
058 * all redelivery attempts fail.
059 * <p/>
060 * This implementation should contain all the error handling logic and the sub classes
061 * should only configure it according to what they support.
062 *
063 * @version
064 */
065public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> {
066
067    protected final AtomicInteger redeliverySleepCounter = new AtomicInteger();
068    protected ScheduledExecutorService executorService;
069    protected final CamelContext camelContext;
070    protected final AsyncProcessorAwaitManager awaitManager;
071    protected final Processor deadLetter;
072    protected final String deadLetterUri;
073    protected final boolean deadLetterHandleNewException;
074    protected final Processor output;
075    protected final AsyncProcessor outputAsync;
076    protected final Processor redeliveryProcessor;
077    protected final RedeliveryPolicy redeliveryPolicy;
078    protected final Predicate retryWhilePolicy;
079    protected final CamelLogger logger;
080    protected final boolean useOriginalMessagePolicy;
081    protected boolean redeliveryEnabled;
082    protected volatile boolean preparingShutdown;
083    protected final ExchangeFormatter exchangeFormatter;
084    protected final boolean customExchangeFormatter;
085    protected final Processor onPrepareProcessor;
086    protected final Processor onExceptionProcessor;
087
088    /**
089     * Contains the current redelivery data
090     */
091    protected class RedeliveryData {
092        Exchange original;
093        boolean sync = true;
094        int redeliveryCounter;
095        long redeliveryDelay;
096        Predicate retryWhilePredicate;
097        boolean redeliverFromSync;
098
099        // default behavior which can be overloaded on a per exception basis
100        RedeliveryPolicy currentRedeliveryPolicy;
101        Processor deadLetterProcessor;
102        Processor failureProcessor;
103        Processor onRedeliveryProcessor;
104        Processor onPrepareProcessor;
105        Processor onExceptionProcessor;
106        Predicate handledPredicate;
107        Predicate continuedPredicate;
108        boolean useOriginalInMessage;
109        boolean handleNewException;
110
111        public RedeliveryData() {
112            // init with values from the error handler
113            this.retryWhilePredicate = retryWhilePolicy;
114            this.currentRedeliveryPolicy = redeliveryPolicy;
115            this.deadLetterProcessor = deadLetter;
116            this.onRedeliveryProcessor = redeliveryProcessor;
117            this.onPrepareProcessor = RedeliveryErrorHandler.this.onPrepareProcessor;
118            this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor;
119            this.handledPredicate = getDefaultHandledPredicate();
120            this.useOriginalInMessage = useOriginalMessagePolicy;
121            this.handleNewException = deadLetterHandleNewException;
122        }
123    }
124
125    /**
126     * Task for sleeping during redelivery attempts.
127     * <p/>
128     * This task is for the synchronous blocking. If using async delayed then a scheduled thread pool
129     * is used for sleeping and trigger redeliveries.
130     */
131    private final class RedeliverSleepTask {
132
133        private final RedeliveryPolicy policy;
134        private final long delay;
135
136        RedeliverSleepTask(RedeliveryPolicy policy, long delay) {
137            this.policy = policy;
138            this.delay = delay;
139        }
140
141        public boolean sleep() throws InterruptedException {
142            // for small delays then just sleep
143            if (delay < 1000) {
144                policy.sleep(delay);
145                return true;
146            }
147
148            StopWatch watch = new StopWatch();
149
150            log.debug("Sleeping for: {} millis until attempting redelivery", delay);
151            while (watch.taken() < delay) {
152                // sleep using 1 sec interval
153
154                long delta = delay - watch.taken();
155                long max = Math.min(1000, delta);
156                if (max > 0) {
157                    log.trace("Sleeping for: {} millis until waking up for re-check", max);
158                    Thread.sleep(max);
159                }
160
161                // are we preparing for shutdown then only do redelivery if allowed
162                if (preparingShutdown && !policy.isAllowRedeliveryWhileStopping()) {
163                    log.debug("Rejected redelivery while stopping");
164                    return false;
165                }
166            }
167
168            return true;
169        }
170    }
171
172    /**
173     * Tasks which performs asynchronous redelivery attempts, and being triggered by a
174     * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
175     * has to be delayed before a redelivery attempt is performed.
176     */
177    private final class AsyncRedeliveryTask implements Callable<Boolean> {
178
179        private final Exchange exchange;
180        private final AsyncCallback callback;
181        private final RedeliveryData data;
182
183        AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
184            this.exchange = exchange;
185            this.callback = callback;
186            this.data = data;
187        }
188
189        public Boolean call() throws Exception {
190            // prepare for redelivery
191            prepareExchangeForRedelivery(exchange, data);
192
193            // letting onRedeliver be executed at first
194            deliverToOnRedeliveryProcessor(exchange, data);
195
196            if (log.isTraceEnabled()) {
197                log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", new Object[]{exchange.getExchangeId(), outputAsync, exchange});
198            }
199
200            // emmit event we are doing redelivery
201            EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
202
203            // process the exchange (also redelivery)
204            boolean sync;
205            if (data.redeliverFromSync) {
206                // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from
207                // this error handler, which means we have to invoke the callback with false, to have the callback
208                // be notified when we are done
209                sync = outputAsync.process(exchange, new AsyncCallback() {
210                    public void done(boolean doneSync) {
211                        log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
212
213                        // mark we are in sync mode now
214                        data.sync = false;
215
216                        // only process if the exchange hasn't failed
217                        // and it has not been handled by the error processor
218                        if (isDone(exchange)) {
219                            callback.done(false);
220                            return;
221                        }
222
223                        // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
224                        processAsyncErrorHandler(exchange, callback, data);
225                    }
226                });
227            } else {
228                // this redelivery task was scheduled from asynchronous, which means we should only
229                // handle when the asynchronous task was done
230                sync = outputAsync.process(exchange, new AsyncCallback() {
231                    public void done(boolean doneSync) {
232                        log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
233
234                        // this callback should only handle the async case
235                        if (doneSync) {
236                            return;
237                        }
238
239                        // mark we are in async mode now
240                        data.sync = false;
241
242                        // only process if the exchange hasn't failed
243                        // and it has not been handled by the error processor
244                        if (isDone(exchange)) {
245                            callback.done(doneSync);
246                            return;
247                        }
248                        // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
249                        processAsyncErrorHandler(exchange, callback, data);
250                    }
251                });
252            }
253
254            return sync;
255        }
256    }
257
258    public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
259                                  Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter,
260                                  String deadLetterUri, boolean deadLetterHandleNewException, boolean useOriginalMessagePolicy,
261                                  Predicate retryWhile, ScheduledExecutorService executorService, Processor onPrepareProcessor, Processor onExceptionProcessor) {
262
263        ObjectHelper.notNull(camelContext, "CamelContext", this);
264        ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this);
265
266        this.camelContext = camelContext;
267        this.awaitManager = camelContext.getAsyncProcessorAwaitManager();
268        this.redeliveryProcessor = redeliveryProcessor;
269        this.deadLetter = deadLetter;
270        this.output = output;
271        this.outputAsync = AsyncProcessorConverterHelper.convert(output);
272        this.redeliveryPolicy = redeliveryPolicy;
273        this.logger = logger;
274        this.deadLetterUri = deadLetterUri;
275        this.deadLetterHandleNewException = deadLetterHandleNewException;
276        this.useOriginalMessagePolicy = useOriginalMessagePolicy;
277        this.retryWhilePolicy = retryWhile;
278        this.executorService = executorService;
279        this.onPrepareProcessor = onPrepareProcessor;
280        this.onExceptionProcessor = onExceptionProcessor;
281
282        if (ObjectHelper.isNotEmpty(redeliveryPolicy.getExchangeFormatterRef())) {
283            ExchangeFormatter formatter = camelContext.getRegistry().lookupByNameAndType(redeliveryPolicy.getExchangeFormatterRef(), ExchangeFormatter.class);
284            if (formatter != null) {
285                this.exchangeFormatter = formatter;
286                this.customExchangeFormatter = true;
287            } else {
288                throw new IllegalArgumentException("Cannot find the exchangeFormatter by using reference id " + redeliveryPolicy.getExchangeFormatterRef());
289            }
290        } else {
291            this.customExchangeFormatter = false;
292            // setup exchange formatter to be used for message history dump
293            DefaultExchangeFormatter formatter = new DefaultExchangeFormatter();
294            formatter.setShowExchangeId(true);
295            formatter.setMultiline(true);
296            formatter.setShowHeaders(true);
297            formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed);
298            try {
299                Integer maxChars = CamelContextHelper.parseInteger(camelContext, camelContext.getProperty(Exchange.LOG_DEBUG_BODY_MAX_CHARS));
300                if (maxChars != null) {
301                    formatter.setMaxChars(maxChars);
302                }
303            } catch (Exception e) {
304                throw ObjectHelper.wrapRuntimeCamelException(e);
305            }
306            this.exchangeFormatter = formatter;
307        }
308    }
309
310    public boolean supportTransacted() {
311        return false;
312    }
313
314    @Override
315    public boolean hasNext() {
316        return output != null;
317    }
318
319    @Override
320    public List<Processor> next() {
321        if (!hasNext()) {
322            return null;
323        }
324        List<Processor> answer = new ArrayList<Processor>(1);
325        answer.add(output);
326        return answer;
327    }
328
329    protected boolean isRunAllowed(RedeliveryData data) {
330        // if camel context is forcing a shutdown then do not allow running
331        boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
332        if (forceShutdown) {
333            log.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)");
334            return false;
335        }
336
337        // redelivery policy can control if redelivery is allowed during stopping/shutdown
338        // but this only applies during a redelivery (counter must > 0)
339        if (data.redeliveryCounter > 0) {
340            if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
341                log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)");
342                return true;
343            } else if (preparingShutdown) {
344                // we are preparing for shutdown, now determine if we can still run
345                boolean answer = isRunAllowedOnPreparingShutdown();
346                log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer);
347                return answer;
348            }
349        }
350
351        // we cannot run if we are stopping/stopped
352        boolean answer = !isStoppingOrStopped();
353        log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer);
354        return answer;
355    }
356
357    protected boolean isRunAllowedOnPreparingShutdown() {
358        return false;
359    }
360
361    protected boolean isRedeliveryAllowed(RedeliveryData data) {
362        // redelivery policy can control if redelivery is allowed during stopping/shutdown
363        // but this only applies during a redelivery (counter must > 0)
364        if (data.redeliveryCounter > 0) {
365            boolean stopping = isStoppingOrStopped();
366            if (!preparingShutdown && !stopping) {
367                log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)");
368                return true;
369            } else {
370                // we are stopping or preparing to shutdown
371                if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
372                    log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)");
373                    return true;
374                } else {
375                    log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)");
376                    return false;
377                }
378            }
379        }
380
381        return true;
382    }
383
384    @Override
385    public void prepareShutdown(boolean suspendOnly, boolean forced) {
386        // prepare for shutdown, eg do not allow redelivery if configured
387        log.trace("Prepare shutdown on error handler {}", this);
388        preparingShutdown = true;
389    }
390
391    public void process(Exchange exchange) throws Exception {
392        if (output == null) {
393            // no output then just return
394            return;
395        }
396
397        // inline org.apache.camel.util.AsyncProcessorHelper.process(org.apache.camel.AsyncProcessor, org.apache.camel.Exchange)
398        // to optimize and reduce stacktrace lengths
399        final CountDownLatch latch = new CountDownLatch(1);
400        boolean sync = process(exchange, new AsyncCallback() {
401            public void done(boolean doneSync) {
402                if (!doneSync) {
403                    awaitManager.countDown(exchange, latch);
404                }
405            }
406        });
407        if (!sync) {
408            awaitManager.await(exchange, latch);
409        }
410    }
411
412    /**
413     * Process the exchange using redelivery error handling.
414     */
415    public boolean process(final Exchange exchange, final AsyncCallback callback) {
416        final RedeliveryData data = new RedeliveryData();
417
418        // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
419        // original Exchange is being redelivered, and not a mutated Exchange
420        data.original = defensiveCopyExchangeIfNeeded(exchange);
421
422        // use looping to have redelivery attempts
423        while (true) {
424
425            // can we still run
426            if (!isRunAllowed(data)) {
427                log.trace("Run not allowed, will reject executing exchange: {}", exchange);
428                if (exchange.getException() == null) {
429                    exchange.setException(new RejectedExecutionException());
430                }
431                // we cannot process so invoke callback
432                callback.done(data.sync);
433                return data.sync;
434            }
435
436            // did previous processing cause an exception?
437            boolean handle = shouldHandleException(exchange);
438            if (handle) {
439                handleException(exchange, data, isDeadLetterChannel());
440                onExceptionOccurred(exchange, data);
441            }
442
443            // compute if we are exhausted, and whether redelivery is allowed
444            boolean exhausted = isExhausted(exchange, data);
445            boolean redeliverAllowed = isRedeliveryAllowed(data);
446
447            // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC)
448            if (!redeliverAllowed || exhausted) {
449                Processor target = null;
450                boolean deliver = true;
451
452                // the unit of work may have an optional callback associated we need to leverage
453                SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback();
454                if (uowCallback != null) {
455                    // signal to the callback we are exhausted
456                    uowCallback.onExhausted(exchange);
457                    // do not deliver to the failure processor as its been handled by the callback instead
458                    deliver = false;
459                }
460
461                if (deliver) {
462                    // should deliver to failure processor (either from onException or the dead letter channel)
463                    target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
464                }
465                // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
466                // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
467                boolean isDeadLetterChannel = isDeadLetterChannel() && (target == null || target == data.deadLetterProcessor);
468                boolean sync = deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
469                // we are breaking out
470                return sync;
471            }
472
473            if (data.redeliveryCounter > 0) {
474                // calculate delay
475                data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
476
477                if (data.redeliveryDelay > 0) {
478                    // okay there is a delay so create a scheduled task to have it executed in the future
479
480                    if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
481
482                        // we are doing a redelivery then a thread pool must be configured (see the doStart method)
483                        ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
484
485                        // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
486                        // have it being executed in the future, or immediately
487                        // we are continuing asynchronously
488
489                        // mark we are routing async from now and that this redelivery task came from a synchronous routing
490                        data.sync = false;
491                        data.redeliverFromSync = true;
492                        AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
493
494                        // schedule the redelivery task
495                        if (log.isTraceEnabled()) {
496                            log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
497                        }
498                        executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
499
500                        return false;
501                    } else {
502                        // async delayed redelivery was disabled or we are transacted so we must be synchronous
503                        // as the transaction manager requires to execute in the same thread context
504                        try {
505                            // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping
506                            redeliverySleepCounter.incrementAndGet();
507                            RedeliverSleepTask task = new RedeliverSleepTask(data.currentRedeliveryPolicy, data.redeliveryDelay);
508                            boolean complete = task.sleep();
509                            redeliverySleepCounter.decrementAndGet();
510                            if (!complete) {
511                                // the task was rejected
512                                exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping"));
513                                // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
514                                exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
515                                // jump to start of loop which then detects that we are failed and exhausted
516                                continue;
517                            }
518                        } catch (InterruptedException e) {
519                            redeliverySleepCounter.decrementAndGet();
520                            // we was interrupted so break out
521                            exchange.setException(e);
522                            // mark the exchange to stop continue routing when interrupted
523                            // as we do not want to continue routing (for example a task has been cancelled)
524                            exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
525                            callback.done(data.sync);
526                            return data.sync;
527                        }
528                    }
529                }
530
531                // prepare for redelivery
532                prepareExchangeForRedelivery(exchange, data);
533
534                // letting onRedeliver be executed
535                deliverToOnRedeliveryProcessor(exchange, data);
536
537                // emmit event we are doing redelivery
538                EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
539            }
540
541            // process the exchange (also redelivery)
542            boolean sync = outputAsync.process(exchange, new AsyncCallback() {
543                public void done(boolean sync) {
544                    // this callback should only handle the async case
545                    if (sync) {
546                        return;
547                    }
548
549                    // mark we are in async mode now
550                    data.sync = false;
551
552                    // if we are done then notify callback and exit
553                    if (isDone(exchange)) {
554                        callback.done(sync);
555                        return;
556                    }
557
558                    // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
559                    // method which takes care of this in a asynchronous manner
560                    processAsyncErrorHandler(exchange, callback, data);
561                }
562            });
563
564            if (!sync) {
565                // the remainder of the Exchange is being processed asynchronously so we should return
566                return false;
567            }
568            // we continue to route synchronously
569
570            // if we are done then notify callback and exit
571            boolean done = isDone(exchange);
572            if (done) {
573                callback.done(true);
574                return true;
575            }
576
577            // error occurred so loop back around.....
578        }
579    }
580
581    /**
582     * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY}
583     * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p>
584     *
585     * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay}
586     * and {@link RedeliveryData#redeliveryCounter} are copied in.</p>
587     *
588     * @param exchange The current exchange in question.
589     * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation.
590     * @param redeliveryDelay The default redelivery delay from RedeliveryData
591     * @param redeliveryCounter The redeliveryCounter
592     * @return The time to wait before the next redelivery.
593     */
594    protected long determineRedeliveryDelay(Exchange exchange, RedeliveryPolicy redeliveryPolicy, long redeliveryDelay, int redeliveryCounter) {
595        Message message = exchange.getIn();
596        Long delay = message.getHeader(Exchange.REDELIVERY_DELAY, Long.class);
597        if (delay == null) {
598            delay = redeliveryPolicy.calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter);
599            log.debug("Redelivery delay calculated as {}", delay);
600        } else {
601            log.debug("Redelivery delay is {} from Message Header [{}]", delay, Exchange.REDELIVERY_DELAY);
602        }
603        return delay;
604    }
605
606    /**
607     * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback.
608     * <p/>
609     * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use
610     * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b>
611     * in terms of logic.
612     */
613    protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
614        // can we still run
615        if (!isRunAllowed(data)) {
616            log.trace("Run not allowed, will reject executing exchange: {}", exchange);
617            if (exchange.getException() == null) {
618                exchange.setException(new RejectedExecutionException());
619            }
620            callback.done(data.sync);
621            return;
622        }
623
624        // did previous processing cause an exception?
625        boolean handle = shouldHandleException(exchange);
626        if (handle) {
627            handleException(exchange, data, isDeadLetterChannel());
628            onExceptionOccurred(exchange, data);
629        }
630
631        // compute if we are exhausted or not
632        boolean exhausted = isExhausted(exchange, data);
633        if (exhausted) {
634            Processor target = null;
635            boolean deliver = true;
636
637            // the unit of work may have an optional callback associated we need to leverage
638            UnitOfWork uow = exchange.getUnitOfWork();
639            if (uow != null) {
640                SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback();
641                if (uowCallback != null) {
642                    // signal to the callback we are exhausted
643                    uowCallback.onExhausted(exchange);
644                    // do not deliver to the failure processor as its been handled by the callback instead
645                    deliver = false;
646                }
647            }
648
649            if (deliver) {
650                // should deliver to failure processor (either from onException or the dead letter channel)
651                target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
652            }
653            // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
654            // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
655            boolean isDeadLetterChannel = isDeadLetterChannel() && target == data.deadLetterProcessor;
656            deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
657            // we are breaking out
658            return;
659        }
660
661        if (data.redeliveryCounter > 0) {
662            // we are doing a redelivery then a thread pool must be configured (see the doStart method)
663            ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
664
665            // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
666            // have it being executed in the future, or immediately
667            // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously
668            // to ensure the callback will continue routing from where we left
669            AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
670
671            // calculate the redelivery delay
672            data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
673
674            if (data.redeliveryDelay > 0) {
675                // schedule the redelivery task
676                if (log.isTraceEnabled()) {
677                    log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
678                }
679                executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
680            } else {
681                // execute the task immediately
682                executorService.submit(task);
683            }
684        }
685    }
686
687    /**
688     * Performs a defensive copy of the exchange if needed
689     *
690     * @param exchange the exchange
691     * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled).
692     */
693    protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) {
694        // only do a defensive copy if redelivery is enabled
695        if (redeliveryEnabled) {
696            return ExchangeHelper.createCopy(exchange, true);
697        } else {
698            return null;
699        }
700    }
701
702    /**
703     * Strategy whether the exchange has an exception that we should try to handle.
704     * <p/>
705     * Standard implementations should just look for an exception.
706     */
707    protected boolean shouldHandleException(Exchange exchange) {
708        return exchange.getException() != null;
709    }
710
711    /**
712     * Strategy to determine if the exchange is done so we can continue
713     */
714    protected boolean isDone(Exchange exchange) {
715        boolean answer = isCancelledOrInterrupted(exchange);
716
717        // only done if the exchange hasn't failed
718        // and it has not been handled by the failure processor
719        // or we are exhausted
720        if (!answer) {
721            answer = exchange.getException() == null
722                || ExchangeHelper.isFailureHandled(exchange)
723                || ExchangeHelper.isRedeliveryExhausted(exchange);
724        }
725
726        log.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer);
727        return answer;
728    }
729
730    /**
731     * Strategy to determine if the exchange was cancelled or interrupted
732     */
733    protected boolean isCancelledOrInterrupted(Exchange exchange) {
734        boolean answer = false;
735
736        if (ExchangeHelper.isInterrupted(exchange)) {
737            // mark the exchange to stop continue routing when interrupted
738            // as we do not want to continue routing (for example a task has been cancelled)
739            exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
740            answer = true;
741        }
742
743        log.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer);
744        return answer;
745    }
746
747    /**
748     * Returns the output processor
749     */
750    public Processor getOutput() {
751        return output;
752    }
753
754    /**
755     * Returns the dead letter that message exchanges will be sent to if the
756     * redelivery attempts fail
757     */
758    public Processor getDeadLetter() {
759        return deadLetter;
760    }
761
762    public String getDeadLetterUri() {
763        return deadLetterUri;
764    }
765
766    public boolean isUseOriginalMessagePolicy() {
767        return useOriginalMessagePolicy;
768    }
769
770    public boolean isDeadLetterHandleNewException() {
771        return deadLetterHandleNewException;
772    }
773
774    public RedeliveryPolicy getRedeliveryPolicy() {
775        return redeliveryPolicy;
776    }
777
778    public CamelLogger getLogger() {
779        return logger;
780    }
781
782    protected Predicate getDefaultHandledPredicate() {
783        // Default is not not handle errors
784        return null;
785    }
786
787    protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) {
788        Exception caught = exchange.getException();
789
790        // we continue so clear any exceptions
791        exchange.setException(null);
792        // clear rollback flags
793        exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
794        // reset cached streams so they can be read again
795        MessageHelper.resetStreamCache(exchange.getIn());
796
797        // its continued then remove traces of redelivery attempted and caught exception
798        exchange.getIn().removeHeader(Exchange.REDELIVERED);
799        exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
800        exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
801        exchange.removeProperty(Exchange.FAILURE_HANDLED);
802        // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
803
804        // create log message
805        String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
806        msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
807        msg = msg + ". Handled and continue routing.";
808
809        // log that we failed but want to continue
810        logFailedDelivery(false, false, false, true, isDeadLetterChannel, exchange, msg, data, null);
811    }
812
813    protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) {
814        if (!redeliveryEnabled) {
815            throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly.");
816        }
817        // there must be a defensive copy of the exchange
818        ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this);
819
820        // okay we will give it another go so clear the exception so we can try again
821        exchange.setException(null);
822
823        // clear rollback flags
824        exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
825
826        // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange
827        // and then put these on the exchange when doing a redelivery / fault processor
828
829        // preserve these headers
830        Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
831        Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
832        Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class);
833
834        // we are redelivering so copy from original back to exchange
835        exchange.getIn().copyFrom(data.original.getIn());
836        exchange.setOut(null);
837        // reset cached streams so they can be read again
838        MessageHelper.resetStreamCache(exchange.getIn());
839
840        // put back headers
841        if (redeliveryCounter != null) {
842            exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
843        }
844        if (redeliveryMaxCounter != null) {
845            exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
846        }
847        if (redelivered != null) {
848            exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered);
849        }
850    }
851
852    protected void handleException(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) {
853        Exception e = exchange.getException();
854
855        // store the original caused exception in a property, so we can restore it later
856        exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
857
858        // find the error handler to use (if any)
859        OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
860        if (exceptionPolicy != null) {
861            data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
862            data.handledPredicate = exceptionPolicy.getHandledPolicy();
863            data.continuedPredicate = exceptionPolicy.getContinuedPolicy();
864            data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy();
865            data.useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy() != null && exceptionPolicy.getUseOriginalMessagePolicy();
866
867            // route specific failure handler?
868            Processor processor = null;
869            UnitOfWork uow = exchange.getUnitOfWork();
870            if (uow != null && uow.getRouteContext() != null) {
871                String routeId = uow.getRouteContext().getRoute().getId();
872                processor = exceptionPolicy.getErrorHandler(routeId);
873            } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) {
874                // note this should really not happen, but we have this code as a fail safe
875                // to be backwards compatible with the old behavior
876                log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId());
877                processor = exceptionPolicy.getErrorHandlers().iterator().next();
878            }
879            if (processor != null) {
880                data.failureProcessor = processor;
881            }
882
883            // route specific on redelivery?
884            processor = exceptionPolicy.getOnRedelivery();
885            if (processor != null) {
886                data.onRedeliveryProcessor = processor;
887            }
888            // route specific on exception occurred?
889            processor = exceptionPolicy.getOnExceptionOccurred();
890            if (processor != null) {
891                data.onExceptionProcessor = processor;
892            }
893        }
894
895        // only log if not failure handled or not an exhausted unit of work
896        if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) {
897            String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange)
898                    + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
899            logFailedDelivery(true, false, false, false, isDeadLetterChannel, exchange, msg, data, e);
900        }
901
902        data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data);
903    }
904
905    /**
906     * Gives an optional configured OnExceptionOccurred processor a chance to process just after an exception
907     * was thrown while processing the Exchange. This allows to execute the processor at the same time the exception was thrown.
908     */
909    protected void onExceptionOccurred(Exchange exchange, final RedeliveryData data) {
910        if (data.onExceptionProcessor == null) {
911            return;
912        }
913
914        // run this synchronously as its just a Processor
915        try {
916            if (log.isTraceEnabled()) {
917                log.trace("OnExceptionOccurred processor {} is processing Exchange: {} due exception occurred", data.onExceptionProcessor, exchange);
918            }
919            data.onExceptionProcessor.process(exchange);
920        } catch (Throwable e) {
921            // we dont not want new exception to override existing, so log it as a WARN
922            log.warn("Error during processing OnExceptionOccurred. This exception is ignored.", e);
923        }
924        log.trace("OnExceptionOccurred processor done");
925    }
926
927    /**
928     * Gives an optional configured redelivery processor a chance to process before the Exchange
929     * will be redelivered. This can be used to alter the Exchange.
930     */
931    protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
932        if (data.onRedeliveryProcessor == null) {
933            return;
934        }
935
936        if (log.isTraceEnabled()) {
937            log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered",
938                    data.onRedeliveryProcessor, exchange);
939        }
940
941        // run this synchronously as its just a Processor
942        try {
943            data.onRedeliveryProcessor.process(exchange);
944        } catch (Throwable e) {
945            exchange.setException(e);
946        }
947        log.trace("Redelivery processor done");
948    }
949
950    /**
951     * All redelivery attempts failed so move the exchange to the dead letter queue
952     */
953    protected boolean deliverToFailureProcessor(final Processor processor, final boolean isDeadLetterChannel, final Exchange exchange,
954                                                final RedeliveryData data, final AsyncCallback callback) {
955        boolean sync = true;
956
957        Exception caught = exchange.getException();
958
959        // we did not success with the redelivery so now we let the failure processor handle it
960        // clear exception as we let the failure processor handle it
961        exchange.setException(null);
962
963        final boolean shouldHandle = shouldHandle(exchange, data);
964        final boolean shouldContinue = shouldContinue(exchange, data);
965
966        // regard both handled or continued as being handled
967        boolean handled = false;
968
969        // always handle if dead letter channel
970        boolean handleOrContinue = isDeadLetterChannel || shouldHandle || shouldContinue;
971        if (handleOrContinue) {
972            // its handled then remove traces of redelivery attempted
973            exchange.getIn().removeHeader(Exchange.REDELIVERED);
974            exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
975            exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
976            exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
977
978            // and remove traces of rollback only and uow exhausted markers
979            exchange.removeProperty(Exchange.ROLLBACK_ONLY);
980            exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED);
981
982            handled = true;
983        } else {
984            // must decrement the redelivery counter as we didn't process the redelivery but is
985            // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
986            decrementRedeliveryCounter(exchange);
987        }
988
989        // we should allow using the failure processor if we should not continue
990        // or in case of continue then the failure processor is NOT a dead letter channel
991        // because you can continue and still let the failure processor do some routing
992        // before continue in the main route.
993        boolean allowFailureProcessor = !shouldContinue || !isDeadLetterChannel;
994
995        if (allowFailureProcessor && processor != null) {
996
997            // prepare original IN body if it should be moved instead of current body
998            if (data.useOriginalInMessage) {
999                log.trace("Using the original IN message instead of current");
1000                Message original = ExchangeHelper.getOriginalInMessage(exchange);
1001                exchange.setIn(original);
1002                if (exchange.hasOut()) {
1003                    log.trace("Removing the out message to avoid some uncertain behavior");
1004                    exchange.setOut(null);
1005                }
1006            }
1007
1008            // reset cached streams so they can be read again
1009            MessageHelper.resetStreamCache(exchange.getIn());
1010
1011            // invoke custom on prepare
1012            if (onPrepareProcessor != null) {
1013                try {
1014                    log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange);
1015                    onPrepareProcessor.process(exchange);
1016                } catch (Exception e) {
1017                    // a new exception was thrown during prepare
1018                    exchange.setException(e);
1019                }
1020            }
1021
1022            log.trace("Failure processor {} is processing Exchange: {}", processor, exchange);
1023
1024            // store the last to endpoint as the failure endpoint
1025            exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
1026            // and store the route id so we know in which route we failed
1027            UnitOfWork uow = exchange.getUnitOfWork();
1028            if (uow != null && uow.getRouteContext() != null) {
1029                exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
1030            }
1031
1032            // fire event as we had a failure processor to handle it, which there is a event for
1033            final boolean deadLetterChannel = processor == data.deadLetterProcessor;
1034
1035            EventHelper.notifyExchangeFailureHandling(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
1036
1037            // the failure processor could also be asynchronous
1038            AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor);
1039            sync = afp.process(exchange, new AsyncCallback() {
1040                public void done(boolean sync) {
1041                    log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange);
1042                    try {
1043                        prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue);
1044                        // fire event as we had a failure processor to handle it, which there is a event for
1045                        EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
1046                    } finally {
1047                        // if the fault was handled asynchronously, this should be reflected in the callback as well
1048                        data.sync &= sync;
1049                        callback.done(data.sync);
1050                    }
1051                }
1052            });
1053        } else {
1054            try {
1055                // invoke custom on prepare
1056                if (onPrepareProcessor != null) {
1057                    try {
1058                        log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange);
1059                        onPrepareProcessor.process(exchange);
1060                    } catch (Exception e) {
1061                        // a new exception was thrown during prepare
1062                        exchange.setException(e);
1063                    }
1064                }
1065                // no processor but we need to prepare after failure as well
1066                prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue);
1067            } finally {
1068                // callback we are done
1069                callback.done(data.sync);
1070            }
1071        }
1072
1073        // create log message
1074        String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
1075        msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
1076        if (processor != null) {
1077            if (isDeadLetterChannel && deadLetterUri != null) {
1078                msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]";
1079            } else {
1080                msg = msg + ". Processed by failure processor: " + processor;
1081            }
1082        }
1083
1084        // log that we failed delivery as we are exhausted
1085        logFailedDelivery(false, false, handled, false, isDeadLetterChannel, exchange, msg, data, null);
1086
1087        return sync;
1088    }
1089
1090    protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data, final boolean isDeadLetterChannel,
1091                                               final boolean shouldHandle, final boolean shouldContinue) {
1092
1093        Exception newException = exchange.getException();
1094
1095        // we could not process the exchange so we let the failure processor handled it
1096        ExchangeHelper.setFailureHandled(exchange);
1097
1098        // honor if already set a handling
1099        boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
1100        if (alreadySet) {
1101            boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
1102            log.trace("This exchange has already been marked for handling: {}", handled);
1103            if (!handled) {
1104                // exception not handled, put exception back in the exchange
1105                exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
1106                // and put failure endpoint back as well
1107                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
1108            }
1109            return;
1110        }
1111
1112        // dead letter channel is special
1113        if (shouldContinue) {
1114            log.trace("This exchange is continued: {}", exchange);
1115            // okay we want to continue then prepare the exchange for that as well
1116            prepareExchangeForContinue(exchange, data, isDeadLetterChannel);
1117        } else if (shouldHandle) {
1118            log.trace("This exchange is handled so its marked as not failed: {}", exchange);
1119            exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
1120        } else {
1121            // okay the redelivery policy are not explicit set to true, so we should allow to check for some
1122            // special situations when using dead letter channel
1123            if (isDeadLetterChannel) {
1124
1125                // DLC is always handling the first thrown exception,
1126                // but if its a new exception then use the configured option
1127                boolean handled = newException == null || data.handleNewException;
1128
1129                // when using DLC then log new exception whether its being handled or not, as otherwise it may appear as
1130                // the DLC swallow new exceptions by default (which is by design to ensure the DLC always complete,
1131                // to avoid causing endless poison messages that fails forever)
1132                if (newException != null && data.currentRedeliveryPolicy.isLogNewException()) {
1133                    String uri = URISupport.sanitizeUri(deadLetterUri);
1134                    String msg = "New exception occurred during processing by the DeadLetterChannel[" + uri + "] due " + newException.getMessage();
1135                    if (handled) {
1136                        msg += ". The new exception is being handled as deadLetterHandleNewException=true.";
1137                    } else {
1138                        msg += ". The new exception is not handled as deadLetterHandleNewException=false.";
1139                    }
1140                    logFailedDelivery(false, true, handled, false, true, exchange, msg, data, newException);
1141                }
1142
1143                if (handled) {
1144                    log.trace("This exchange is handled so its marked as not failed: {}", exchange);
1145                    exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
1146                    return;
1147                }
1148            }
1149
1150            // not handled by default
1151            prepareExchangeAfterFailureNotHandled(exchange);
1152        }
1153    }
1154
1155    private void prepareExchangeAfterFailureNotHandled(Exchange exchange) {
1156        log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange);
1157        // exception not handled, put exception back in the exchange
1158        exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
1159        exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
1160        // and put failure endpoint back as well
1161        exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
1162        // and store the route id so we know in which route we failed
1163        UnitOfWork uow = exchange.getUnitOfWork();
1164        if (uow != null && uow.getRouteContext() != null) {
1165            exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
1166        }
1167    }
1168
1169    private void logFailedDelivery(boolean shouldRedeliver, boolean newException, boolean handled, boolean continued, boolean isDeadLetterChannel,
1170                                   Exchange exchange, String message, RedeliveryData data, Throwable e) {
1171        if (logger == null) {
1172            return;
1173        }
1174
1175        if (!exchange.isRollbackOnly()) {
1176            if (newException && !data.currentRedeliveryPolicy.isLogNewException()) {
1177                // do not log new exception
1178                return;
1179            }
1180
1181            // if we should not rollback, then check whether logging is enabled
1182
1183            if (!newException && handled && !data.currentRedeliveryPolicy.isLogHandled()) {
1184                // do not log handled
1185                return;
1186            }
1187
1188            if (!newException && continued && !data.currentRedeliveryPolicy.isLogContinued()) {
1189                // do not log handled
1190                return;
1191            }
1192
1193            if (!newException && shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) {
1194                // do not log retry attempts
1195                return;
1196            }
1197
1198            if (!newException && !shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) {
1199                // do not log exhausted
1200                return;
1201            }
1202        }
1203
1204        LoggingLevel newLogLevel;
1205        boolean logStackTrace;
1206        if (exchange.isRollbackOnly()) {
1207            newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
1208            logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
1209        } else if (shouldRedeliver) {
1210            newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
1211            logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace();
1212        } else {
1213            newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
1214            logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
1215        }
1216        if (e == null) {
1217            e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
1218        }
1219
1220        if (newException) {
1221            // log at most WARN level
1222            if (newLogLevel == LoggingLevel.ERROR) {
1223                newLogLevel = LoggingLevel.WARN;
1224            }
1225            String msg = message;
1226            if (msg == null) {
1227                msg = "New exception " + ExchangeHelper.logIds(exchange);
1228                // special for logging the new exception
1229                Throwable cause = e;
1230                if (cause != null) {
1231                    msg = msg + " due: " + cause.getMessage();
1232                }
1233            }
1234
1235            if (e != null && logStackTrace) {
1236                logger.log(msg, e, newLogLevel);
1237            } else {
1238                logger.log(msg, newLogLevel);
1239            }
1240        } else if (exchange.isRollbackOnly()) {
1241            String msg = "Rollback " + ExchangeHelper.logIds(exchange);
1242            Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
1243            if (cause != null) {
1244                msg = msg + " due: " + cause.getMessage();
1245            }
1246
1247            // should we include message history
1248            if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
1249                // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
1250                ExchangeFormatter formatter = customExchangeFormatter
1251                    ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
1252                String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, false);
1253                if (routeStackTrace != null) {
1254                    msg = msg + "\n" + routeStackTrace;
1255                }
1256            }
1257
1258            if (newLogLevel == LoggingLevel.ERROR) {
1259                // log intended rollback on maximum WARN level (no ERROR)
1260                logger.log(msg, LoggingLevel.WARN);
1261            } else {
1262                // otherwise use the desired logging level
1263                logger.log(msg, newLogLevel);
1264            }
1265        } else {
1266            String msg = message;
1267            // should we include message history
1268            if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
1269                // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
1270                ExchangeFormatter formatter = customExchangeFormatter
1271                    ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
1272                String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, e != null && logStackTrace);
1273                if (routeStackTrace != null) {
1274                    msg = msg + "\n" + routeStackTrace;
1275                }
1276            }
1277
1278            if (e != null && logStackTrace) {
1279                logger.log(msg, e, newLogLevel);
1280            } else {
1281                logger.log(msg, newLogLevel);
1282            }
1283        }
1284    }
1285
1286    /**
1287     * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback).
1288     * <p/>
1289     * If the exchange is exhausted, then we will not continue processing, but let the
1290     * failure processor deal with the exchange.
1291     *
1292     * @param exchange the current exchange
1293     * @param data     the redelivery data
1294     * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust.
1295     */
1296    private boolean isExhausted(Exchange exchange, RedeliveryData data) {
1297        // if marked as rollback only then do not continue/redeliver
1298        boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
1299        if (exhausted) {
1300            log.trace("This exchange is marked as redelivery exhausted: {}", exchange);
1301            return true;
1302        }
1303
1304        // if marked as rollback only then do not continue/redeliver
1305        boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class);
1306        if (rollbackOnly) {
1307            log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange);
1308            return true;
1309        }
1310        // its the first original call so continue
1311        if (data.redeliveryCounter == 0) {
1312            return false;
1313        }
1314        // its a potential redelivery so determine if we should redeliver or not
1315        boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate);
1316        return !redeliver;
1317    }
1318
1319    /**
1320     * Determines whether or not to continue if we are exhausted.
1321     *
1322     * @param exchange the current exchange
1323     * @param data     the redelivery data
1324     * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust.
1325     */
1326    private boolean shouldContinue(Exchange exchange, RedeliveryData data) {
1327        if (data.continuedPredicate != null) {
1328            return data.continuedPredicate.matches(exchange);
1329        }
1330        // do not continue by default
1331        return false;
1332    }
1333
1334    /**
1335     * Determines whether or not to handle if we are exhausted.
1336     *
1337     * @param exchange the current exchange
1338     * @param data     the redelivery data
1339     * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust.
1340     */
1341    private boolean shouldHandle(Exchange exchange, RedeliveryData data) {
1342        if (data.handledPredicate != null) {
1343            return data.handledPredicate.matches(exchange);
1344        }
1345        // do not handle by default
1346        return false;
1347    }
1348
1349    /**
1350     * Increments the redelivery counter and adds the redelivered flag if the
1351     * message has been redelivered
1352     */
1353    private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) {
1354        Message in = exchange.getIn();
1355        Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
1356        int next = 1;
1357        if (counter != null) {
1358            next = counter + 1;
1359        }
1360        in.setHeader(Exchange.REDELIVERY_COUNTER, next);
1361        in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
1362        // if maximum redeliveries is used, then provide that information as well
1363        if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) {
1364            in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries());
1365        }
1366        return next;
1367    }
1368
1369    /**
1370     * Prepares the redelivery counter and boolean flag for the failure handle processor
1371     */
1372    private void decrementRedeliveryCounter(Exchange exchange) {
1373        Message in = exchange.getIn();
1374        Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
1375        if (counter != null) {
1376            int prev = counter - 1;
1377            in.setHeader(Exchange.REDELIVERY_COUNTER, prev);
1378            // set boolean flag according to counter
1379            in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
1380        } else {
1381            // not redelivered
1382            in.setHeader(Exchange.REDELIVERY_COUNTER, 0);
1383            in.setHeader(Exchange.REDELIVERED, Boolean.FALSE);
1384        }
1385    }
1386
1387    /**
1388     * Determines if redelivery is enabled by checking if any of the redelivery policy
1389     * settings may allow redeliveries.
1390     *
1391     * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise
1392     * @throws Exception can be thrown
1393     */
1394    private boolean determineIfRedeliveryIsEnabled() throws Exception {
1395        // determine if redeliver is enabled either on error handler
1396        if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) {
1397            // must check for != 0 as (-1 means redeliver forever)
1398            return true;
1399        }
1400        if (retryWhilePolicy != null) {
1401            return true;
1402        }
1403
1404        // or on the exception policies
1405        if (!exceptionPolicies.isEmpty()) {
1406            // walk them to see if any of them have a maximum redeliveries > 0 or retry until set
1407            for (OnExceptionDefinition def : exceptionPolicies.values()) {
1408
1409                String ref = def.getRedeliveryPolicyRef();
1410                if (ref != null) {
1411                    // lookup in registry if ref provided
1412                    RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class);
1413                    if (policy.getMaximumRedeliveries() != 0) {
1414                        // must check for != 0 as (-1 means redeliver forever)
1415                        return true;
1416                    }
1417                } else if (def.getRedeliveryPolicy() != null) {
1418                    Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries());
1419                    if (max != null && max != 0) {
1420                        // must check for != 0 as (-1 means redeliver forever)
1421                        return true;
1422                    }
1423                }
1424
1425                if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) {
1426                    return true;
1427                }
1428            }
1429        }
1430
1431        return false;
1432    }
1433
1434    /**
1435     * Gets the number of exchanges that are pending for redelivery
1436     */
1437    public int getPendingRedeliveryCount() {
1438        int answer = redeliverySleepCounter.get();
1439        if (executorService != null && executorService instanceof ThreadPoolExecutor) {
1440            answer += ((ThreadPoolExecutor) executorService).getQueue().size();
1441        }
1442
1443        return answer;
1444    }
1445
1446    @Override
1447    protected void doStart() throws Exception {
1448        ServiceHelper.startServices(output, outputAsync, deadLetter);
1449
1450        // determine if redeliver is enabled or not
1451        redeliveryEnabled = determineIfRedeliveryIsEnabled();
1452        if (log.isTraceEnabled()) {
1453            log.trace("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this);
1454        }
1455
1456        // we only need thread pool if redelivery is enabled
1457        if (redeliveryEnabled) {
1458            if (executorService == null) {
1459                // use default shared executor service
1460                executorService = camelContext.getErrorHandlerExecutorService();
1461            }
1462            if (log.isDebugEnabled()) {
1463                log.debug("Using ExecutorService: {} for redeliveries on error handler: {}", executorService, this);
1464            }
1465        }
1466
1467        // reset flag when starting
1468        preparingShutdown = false;
1469        redeliverySleepCounter.set(0);
1470    }
1471
1472    @Override
1473    protected void doStop() throws Exception {
1474        // noop, do not stop any services which we only do when shutting down
1475        // as the error handler can be context scoped, and should not stop in case
1476        // a route stops
1477    }
1478
1479    @Override
1480    protected void doShutdown() throws Exception {
1481        ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync);
1482    }
1483}