001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.Callable;
022import java.util.concurrent.CountDownLatch;
023import java.util.concurrent.RejectedExecutionException;
024import java.util.concurrent.ScheduledExecutorService;
025import java.util.concurrent.ThreadPoolExecutor;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicInteger;
028
029import org.apache.camel.AsyncCallback;
030import org.apache.camel.AsyncProcessor;
031import org.apache.camel.CamelContext;
032import org.apache.camel.Exchange;
033import org.apache.camel.LoggingLevel;
034import org.apache.camel.Message;
035import org.apache.camel.Navigate;
036import org.apache.camel.Predicate;
037import org.apache.camel.Processor;
038import org.apache.camel.model.OnExceptionDefinition;
039import org.apache.camel.spi.AsyncProcessorAwaitManager;
040import org.apache.camel.spi.ExchangeFormatter;
041import org.apache.camel.spi.ShutdownPrepared;
042import org.apache.camel.spi.SubUnitOfWorkCallback;
043import org.apache.camel.spi.UnitOfWork;
044import org.apache.camel.util.AsyncProcessorConverterHelper;
045import org.apache.camel.util.AsyncProcessorHelper;
046import org.apache.camel.util.CamelContextHelper;
047import org.apache.camel.util.CamelLogger;
048import org.apache.camel.util.EventHelper;
049import org.apache.camel.util.ExchangeHelper;
050import org.apache.camel.util.MessageHelper;
051import org.apache.camel.util.ObjectHelper;
052import org.apache.camel.util.ServiceHelper;
053import org.apache.camel.util.StopWatch;
054import org.apache.camel.util.URISupport;
055
056/**
057 * Base redeliverable error handler that also supports a final dead letter queue in case
058 * all redelivery attempts fail.
059 * <p/>
060 * This implementation should contain all the error handling logic and the sub classes
061 * should only configure it according to what they support.
062 *
063 * @version
064 */
065public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> {
066
067    protected final AtomicInteger redeliverySleepCounter = new AtomicInteger();
068    protected ScheduledExecutorService executorService;
069    protected final CamelContext camelContext;
070    protected final AsyncProcessorAwaitManager awaitManager;
071    protected final Processor deadLetter;
072    protected final String deadLetterUri;
073    protected final boolean deadLetterHandleNewException;
074    protected final Processor output;
075    protected final AsyncProcessor outputAsync;
076    protected final Processor redeliveryProcessor;
077    protected final RedeliveryPolicy redeliveryPolicy;
078    protected final Predicate retryWhilePolicy;
079    protected final CamelLogger logger;
080    protected final boolean useOriginalMessagePolicy;
081    protected boolean redeliveryEnabled;
082    protected volatile boolean preparingShutdown;
083    protected final ExchangeFormatter exchangeFormatter;
084    protected final boolean customExchangeFormatter;
085    protected final Processor onPrepareProcessor;
086    protected final Processor onExceptionProcessor;
087
088    /**
089     * Contains the current redelivery data
090     */
091    protected class RedeliveryData {
092        Exchange original;
093        boolean sync = true;
094        int redeliveryCounter;
095        long redeliveryDelay;
096        Predicate retryWhilePredicate;
097        boolean redeliverFromSync;
098
099        // default behavior which can be overloaded on a per exception basis
100        RedeliveryPolicy currentRedeliveryPolicy;
101        Processor deadLetterProcessor;
102        Processor failureProcessor;
103        Processor onRedeliveryProcessor;
104        Processor onPrepareProcessor;
105        Processor onExceptionProcessor;
106        Predicate handledPredicate;
107        Predicate continuedPredicate;
108        boolean useOriginalInMessage;
109        boolean handleNewException;
110
111        public RedeliveryData() {
112            // init with values from the error handler
113            this.retryWhilePredicate = retryWhilePolicy;
114            this.currentRedeliveryPolicy = redeliveryPolicy;
115            this.deadLetterProcessor = deadLetter;
116            this.onRedeliveryProcessor = redeliveryProcessor;
117            this.onPrepareProcessor = RedeliveryErrorHandler.this.onPrepareProcessor;
118            this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor;
119            this.handledPredicate = getDefaultHandledPredicate();
120            this.useOriginalInMessage = useOriginalMessagePolicy;
121            this.handleNewException = deadLetterHandleNewException;
122        }
123    }
124
125    /**
126     * Task for sleeping during redelivery attempts.
127     * <p/>
128     * This task is for the synchronous blocking. If using async delayed then a scheduled thread pool
129     * is used for sleeping and trigger redeliveries.
130     */
131    private final class RedeliverSleepTask {
132
133        private final RedeliveryPolicy policy;
134        private final long delay;
135
136        RedeliverSleepTask(RedeliveryPolicy policy, long delay) {
137            this.policy = policy;
138            this.delay = delay;
139        }
140
141        public boolean sleep() throws InterruptedException {
142            // for small delays then just sleep
143            if (delay < 1000) {
144                policy.sleep(delay);
145                return true;
146            }
147
148            StopWatch watch = new StopWatch();
149
150            log.debug("Sleeping for: {} millis until attempting redelivery", delay);
151            while (watch.taken() < delay) {
152                // sleep using 1 sec interval
153
154                long delta = delay - watch.taken();
155                long max = Math.min(1000, delta);
156                if (max > 0) {
157                    log.trace("Sleeping for: {} millis until waking up for re-check", max);
158                    Thread.sleep(max);
159                }
160
161                // are we preparing for shutdown then only do redelivery if allowed
162                if (preparingShutdown && !policy.isAllowRedeliveryWhileStopping()) {
163                    log.debug("Rejected redelivery while stopping");
164                    return false;
165                }
166            }
167
168            return true;
169        }
170    }
171
172    /**
173     * Tasks which performs asynchronous redelivery attempts, and being triggered by a
174     * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
175     * has to be delayed before a redelivery attempt is performed.
176     */
177    private final class AsyncRedeliveryTask implements Callable<Boolean> {
178
179        private final Exchange exchange;
180        private final AsyncCallback callback;
181        private final RedeliveryData data;
182
183        AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
184            this.exchange = exchange;
185            this.callback = callback;
186            this.data = data;
187        }
188
189        public Boolean call() throws Exception {
190            // prepare for redelivery
191            prepareExchangeForRedelivery(exchange, data);
192
193            // letting onRedeliver be executed at first
194            deliverToOnRedeliveryProcessor(exchange, data);
195
196            if (log.isTraceEnabled()) {
197                log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", new Object[]{exchange.getExchangeId(), outputAsync, exchange});
198            }
199
200            // emmit event we are doing redelivery
201            EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
202
203            // process the exchange (also redelivery)
204            boolean sync;
205            if (data.redeliverFromSync) {
206                // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from
207                // this error handler, which means we have to invoke the callback with false, to have the callback
208                // be notified when we are done
209                sync = outputAsync.process(exchange, new AsyncCallback() {
210                    public void done(boolean doneSync) {
211                        log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
212
213                        // mark we are in sync mode now
214                        data.sync = false;
215
216                        // only process if the exchange hasn't failed
217                        // and it has not been handled by the error processor
218                        if (isDone(exchange)) {
219                            callback.done(false);
220                            return;
221                        }
222
223                        // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
224                        processAsyncErrorHandler(exchange, callback, data);
225                    }
226                });
227            } else {
228                // this redelivery task was scheduled from asynchronous, which means we should only
229                // handle when the asynchronous task was done
230                sync = outputAsync.process(exchange, new AsyncCallback() {
231                    public void done(boolean doneSync) {
232                        log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
233
234                        // this callback should only handle the async case
235                        if (doneSync) {
236                            return;
237                        }
238
239                        // mark we are in async mode now
240                        data.sync = false;
241
242                        // only process if the exchange hasn't failed
243                        // and it has not been handled by the error processor
244                        if (isDone(exchange)) {
245                            callback.done(doneSync);
246                            return;
247                        }
248                        // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
249                        processAsyncErrorHandler(exchange, callback, data);
250                    }
251                });
252            }
253
254            return sync;
255        }
256    }
257
258    public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
259                                  Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter,
260                                  String deadLetterUri, boolean deadLetterHandleNewException, boolean useOriginalMessagePolicy,
261                                  Predicate retryWhile, ScheduledExecutorService executorService, Processor onPrepareProcessor, Processor onExceptionProcessor) {
262
263        ObjectHelper.notNull(camelContext, "CamelContext", this);
264        ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this);
265
266        this.camelContext = camelContext;
267        this.awaitManager = camelContext.getAsyncProcessorAwaitManager();
268        this.redeliveryProcessor = redeliveryProcessor;
269        this.deadLetter = deadLetter;
270        this.output = output;
271        this.outputAsync = AsyncProcessorConverterHelper.convert(output);
272        this.redeliveryPolicy = redeliveryPolicy;
273        this.logger = logger;
274        this.deadLetterUri = deadLetterUri;
275        this.deadLetterHandleNewException = deadLetterHandleNewException;
276        this.useOriginalMessagePolicy = useOriginalMessagePolicy;
277        this.retryWhilePolicy = retryWhile;
278        this.executorService = executorService;
279        this.onPrepareProcessor = onPrepareProcessor;
280        this.onExceptionProcessor = onExceptionProcessor;
281
282        if (ObjectHelper.isNotEmpty(redeliveryPolicy.getExchangeFormatterRef())) {
283            ExchangeFormatter formatter = camelContext.getRegistry().lookupByNameAndType(redeliveryPolicy.getExchangeFormatterRef(), ExchangeFormatter.class);
284            if (formatter != null) {
285                this.exchangeFormatter = formatter;
286                this.customExchangeFormatter = true;
287            } else {
288                throw new IllegalArgumentException("Cannot find the exchangeFormatter by using reference id " + redeliveryPolicy.getExchangeFormatterRef());
289            }
290        } else {
291            this.customExchangeFormatter = false;
292            // setup exchange formatter to be used for message history dump
293            DefaultExchangeFormatter formatter = new DefaultExchangeFormatter();
294            formatter.setShowExchangeId(true);
295            formatter.setMultiline(true);
296            formatter.setShowHeaders(true);
297            formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed);
298            try {
299                Integer maxChars = CamelContextHelper.parseInteger(camelContext, camelContext.getProperty(Exchange.LOG_DEBUG_BODY_MAX_CHARS));
300                if (maxChars != null) {
301                    formatter.setMaxChars(maxChars);
302                }
303            } catch (Exception e) {
304                throw ObjectHelper.wrapRuntimeCamelException(e);
305            }
306            this.exchangeFormatter = formatter;
307        }
308    }
309
310    public boolean supportTransacted() {
311        return false;
312    }
313
314    @Override
315    public boolean hasNext() {
316        return output != null;
317    }
318
319    @Override
320    public List<Processor> next() {
321        if (!hasNext()) {
322            return null;
323        }
324        List<Processor> answer = new ArrayList<Processor>(1);
325        answer.add(output);
326        return answer;
327    }
328
329    protected boolean isRunAllowed(RedeliveryData data) {
330        // if camel context is forcing a shutdown then do not allow running
331        boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
332        if (forceShutdown) {
333            log.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)");
334            return false;
335        }
336
337        // redelivery policy can control if redelivery is allowed during stopping/shutdown
338        // but this only applies during a redelivery (counter must > 0)
339        if (data.redeliveryCounter > 0) {
340            if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
341                log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)");
342                return true;
343            } else if (preparingShutdown) {
344                // we are preparing for shutdown, now determine if we can still run
345                boolean answer = isRunAllowedOnPreparingShutdown();
346                log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer);
347                return answer;
348            }
349        }
350
351        // we cannot run if we are stopping/stopped
352        boolean answer = !isStoppingOrStopped();
353        log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer);
354        return answer;
355    }
356
357    protected boolean isRunAllowedOnPreparingShutdown() {
358        return false;
359    }
360
361    protected boolean isRedeliveryAllowed(RedeliveryData data) {
362        // redelivery policy can control if redelivery is allowed during stopping/shutdown
363        // but this only applies during a redelivery (counter must > 0)
364        if (data.redeliveryCounter > 0) {
365            boolean stopping = isStoppingOrStopped();
366            if (!preparingShutdown && !stopping) {
367                log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)");
368                return true;
369            } else {
370                // we are stopping or preparing to shutdown
371                if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) {
372                    log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)");
373                    return true;
374                } else {
375                    log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)");
376                    return false;
377                }
378            }
379        }
380
381        return true;
382    }
383
384    @Override
385    public void prepareShutdown(boolean suspendOnly, boolean forced) {
386        // prepare for shutdown, eg do not allow redelivery if configured
387        log.trace("Prepare shutdown on error handler {}", this);
388        preparingShutdown = true;
389    }
390
391    public void process(Exchange exchange) throws Exception {
392        if (output == null) {
393            // no output then just return
394            return;
395        }
396
397        // inline org.apache.camel.util.AsyncProcessorHelper.process(org.apache.camel.AsyncProcessor, org.apache.camel.Exchange)
398        // to optimize and reduce stacktrace lengths
399        final CountDownLatch latch = new CountDownLatch(1);
400        boolean sync = process(exchange, new AsyncCallback() {
401            public void done(boolean doneSync) {
402                if (!doneSync) {
403                    awaitManager.countDown(exchange, latch);
404                }
405            }
406        });
407        if (!sync) {
408            awaitManager.await(exchange, latch);
409        }
410    }
411
412    /**
413     * Process the exchange using redelivery error handling.
414     */
415    public boolean process(final Exchange exchange, final AsyncCallback callback) {
416        final RedeliveryData data = new RedeliveryData();
417
418        // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
419        // original Exchange is being redelivered, and not a mutated Exchange
420        data.original = defensiveCopyExchangeIfNeeded(exchange);
421
422        // use looping to have redelivery attempts
423        while (true) {
424
425            // can we still run
426            if (!isRunAllowed(data)) {
427                log.trace("Run not allowed, will reject executing exchange: {}", exchange);
428                if (exchange.getException() == null) {
429                    exchange.setException(new RejectedExecutionException());
430                }
431                // we cannot process so invoke callback
432                callback.done(data.sync);
433                return data.sync;
434            }
435
436            // did previous processing cause an exception?
437            boolean handle = shouldHandleException(exchange);
438            if (handle) {
439                handleException(exchange, data, isDeadLetterChannel());
440                onExceptionOccurred(exchange, data);
441            }
442
443            // compute if we are exhausted, and whether redelivery is allowed
444            boolean exhausted = isExhausted(exchange, data);
445            boolean redeliverAllowed = isRedeliveryAllowed(data);
446
447            // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC)
448            if (!redeliverAllowed || exhausted) {
449                Processor target = null;
450                boolean deliver = true;
451
452                // the unit of work may have an optional callback associated we need to leverage
453                SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback();
454                if (uowCallback != null) {
455                    // signal to the callback we are exhausted
456                    uowCallback.onExhausted(exchange);
457                    // do not deliver to the failure processor as its been handled by the callback instead
458                    deliver = false;
459                }
460
461                if (deliver) {
462                    // should deliver to failure processor (either from onException or the dead letter channel)
463                    target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
464                }
465                // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
466                // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
467                boolean isDeadLetterChannel = isDeadLetterChannel() && (target == null || target == data.deadLetterProcessor);
468                boolean sync = deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
469                // we are breaking out
470                return sync;
471            }
472
473            if (data.redeliveryCounter > 0) {
474                // calculate delay
475                data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
476
477                if (data.redeliveryDelay > 0) {
478                    // okay there is a delay so create a scheduled task to have it executed in the future
479
480                    if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
481
482                        // we are doing a redelivery then a thread pool must be configured (see the doStart method)
483                        ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
484
485                        // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
486                        // have it being executed in the future, or immediately
487                        // we are continuing asynchronously
488
489                        // mark we are routing async from now and that this redelivery task came from a synchronous routing
490                        data.sync = false;
491                        data.redeliverFromSync = true;
492                        AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
493
494                        // schedule the redelivery task
495                        if (log.isTraceEnabled()) {
496                            log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
497                        }
498                        executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
499
500                        return false;
501                    } else {
502                        // async delayed redelivery was disabled or we are transacted so we must be synchronous
503                        // as the transaction manager requires to execute in the same thread context
504                        try {
505                            // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping
506                            redeliverySleepCounter.incrementAndGet();
507                            RedeliverSleepTask task = new RedeliverSleepTask(data.currentRedeliveryPolicy, data.redeliveryDelay);
508                            boolean complete = task.sleep();
509                            redeliverySleepCounter.decrementAndGet();
510                            if (!complete) {
511                                // the task was rejected
512                                exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping"));
513                                // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange
514                                exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
515                                // jump to start of loop which then detects that we are failed and exhausted
516                                continue;
517                            }
518                        } catch (InterruptedException e) {
519                            redeliverySleepCounter.decrementAndGet();
520                            // we was interrupted so break out
521                            exchange.setException(e);
522                            // mark the exchange to stop continue routing when interrupted
523                            // as we do not want to continue routing (for example a task has been cancelled)
524                            exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
525                            callback.done(data.sync);
526                            return data.sync;
527                        }
528                    }
529                }
530
531                // prepare for redelivery
532                prepareExchangeForRedelivery(exchange, data);
533
534                // letting onRedeliver be executed
535                deliverToOnRedeliveryProcessor(exchange, data);
536
537                // emmit event we are doing redelivery
538                EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
539            }
540
541            // process the exchange (also redelivery)
542            boolean sync = outputAsync.process(exchange, new AsyncCallback() {
543                public void done(boolean sync) {
544                    // this callback should only handle the async case
545                    if (sync) {
546                        return;
547                    }
548
549                    // mark we are in async mode now
550                    data.sync = false;
551
552                    // if we are done then notify callback and exit
553                    if (isDone(exchange)) {
554                        callback.done(sync);
555                        return;
556                    }
557
558                    // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
559                    // method which takes care of this in a asynchronous manner
560                    processAsyncErrorHandler(exchange, callback, data);
561                }
562            });
563
564            if (!sync) {
565                // the remainder of the Exchange is being processed asynchronously so we should return
566                return false;
567            }
568            // we continue to route synchronously
569
570            // if we are done then notify callback and exit
571            boolean done = isDone(exchange);
572            if (done) {
573                callback.done(true);
574                return true;
575            }
576
577            // error occurred so loop back around.....
578        }
579    }
580
581    /**
582     * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY}
583     * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p>
584     *
585     * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay}
586     * and {@link RedeliveryData#redeliveryCounter} are copied in.</p>
587     *
588     * @param exchange The current exchange in question.
589     * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation.
590     * @param redeliveryDelay The default redelivery delay from RedeliveryData
591     * @param redeliveryCounter The redeliveryCounter
592     * @return The time to wait before the next redelivery.
593     */
594    protected long determineRedeliveryDelay(Exchange exchange, RedeliveryPolicy redeliveryPolicy, long redeliveryDelay, int redeliveryCounter) {
595        Message message = exchange.getIn();
596        Long delay = message.getHeader(Exchange.REDELIVERY_DELAY, Long.class);
597        if (delay == null) {
598            delay = redeliveryPolicy.calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter);
599            log.debug("Redelivery delay calculated as {}", delay);
600        } else {
601            log.debug("Redelivery delay is {} from Message Header [{}]", delay, Exchange.REDELIVERY_DELAY);
602        }
603        return delay;
604    }
605
606    /**
607     * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback.
608     * <p/>
609     * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use
610     * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b>
611     * in terms of logic.
612     */
613    protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
614        // can we still run
615        if (!isRunAllowed(data)) {
616            log.trace("Run not allowed, will reject executing exchange: {}", exchange);
617            if (exchange.getException() == null) {
618                exchange.setException(new RejectedExecutionException());
619            }
620            callback.done(data.sync);
621            return;
622        }
623
624        // did previous processing cause an exception?
625        boolean handle = shouldHandleException(exchange);
626        if (handle) {
627            handleException(exchange, data, isDeadLetterChannel());
628            onExceptionOccurred(exchange, data);
629        }
630
631        // compute if we are exhausted or not
632        boolean exhausted = isExhausted(exchange, data);
633        if (exhausted) {
634            Processor target = null;
635            boolean deliver = true;
636
637            // the unit of work may have an optional callback associated we need to leverage
638            UnitOfWork uow = exchange.getUnitOfWork();
639            if (uow != null) {
640                SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback();
641                if (uowCallback != null) {
642                    // signal to the callback we are exhausted
643                    uowCallback.onExhausted(exchange);
644                    // do not deliver to the failure processor as its been handled by the callback instead
645                    deliver = false;
646                }
647            }
648
649            if (deliver) {
650                // should deliver to failure processor (either from onException or the dead letter channel)
651                target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
652            }
653            // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
654            // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
655            boolean isDeadLetterChannel = isDeadLetterChannel() && target == data.deadLetterProcessor;
656            deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback);
657            // we are breaking out
658            return;
659        }
660
661        if (data.redeliveryCounter > 0) {
662            // we are doing a redelivery then a thread pool must be configured (see the doStart method)
663            ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
664
665            // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
666            // have it being executed in the future, or immediately
667            // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously
668            // to ensure the callback will continue routing from where we left
669            AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
670
671            // calculate the redelivery delay
672            data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
673
674            if (data.redeliveryDelay > 0) {
675                // schedule the redelivery task
676                if (log.isTraceEnabled()) {
677                    log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
678                }
679                executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
680            } else {
681                // execute the task immediately
682                executorService.submit(task);
683            }
684        }
685    }
686
687    /**
688     * Performs a defensive copy of the exchange if needed
689     *
690     * @param exchange the exchange
691     * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled).
692     */
693    protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) {
694        // only do a defensive copy if redelivery is enabled
695        if (redeliveryEnabled) {
696            return ExchangeHelper.createCopy(exchange, true);
697        } else {
698            return null;
699        }
700    }
701
702    /**
703     * Strategy whether the exchange has an exception that we should try to handle.
704     * <p/>
705     * Standard implementations should just look for an exception.
706     */
707    protected boolean shouldHandleException(Exchange exchange) {
708        return exchange.getException() != null;
709    }
710
711    /**
712     * Strategy to determine if the exchange is done so we can continue
713     */
714    protected boolean isDone(Exchange exchange) {
715        boolean answer = isCancelledOrInterrupted(exchange);
716
717        // only done if the exchange hasn't failed
718        // and it has not been handled by the failure processor
719        // or we are exhausted
720        if (!answer) {
721            answer = exchange.getException() == null
722                || ExchangeHelper.isFailureHandled(exchange)
723                || ExchangeHelper.isRedeliveryExhausted(exchange);
724        }
725
726        log.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer);
727        return answer;
728    }
729
730    /**
731     * Strategy to determine if the exchange was cancelled or interrupted
732     */
733    protected boolean isCancelledOrInterrupted(Exchange exchange) {
734        boolean answer = false;
735
736        if (ExchangeHelper.isInterrupted(exchange)) {
737            // mark the exchange to stop continue routing when interrupted
738            // as we do not want to continue routing (for example a task has been cancelled)
739            exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
740            answer = true;
741        }
742
743        log.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer);
744        return answer;
745    }
746
747    /**
748     * Returns the output processor
749     */
750    public Processor getOutput() {
751        return output;
752    }
753
754    /**
755     * Returns the dead letter that message exchanges will be sent to if the
756     * redelivery attempts fail
757     */
758    public Processor getDeadLetter() {
759        return deadLetter;
760    }
761
762    public String getDeadLetterUri() {
763        return deadLetterUri;
764    }
765
766    public boolean isUseOriginalMessagePolicy() {
767        return useOriginalMessagePolicy;
768    }
769
770    public boolean isDeadLetterHandleNewException() {
771        return deadLetterHandleNewException;
772    }
773
774    public RedeliveryPolicy getRedeliveryPolicy() {
775        return redeliveryPolicy;
776    }
777
778    public CamelLogger getLogger() {
779        return logger;
780    }
781
782    protected Predicate getDefaultHandledPredicate() {
783        // Default is not not handle errors
784        return null;
785    }
786
787    protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) {
788        Exception caught = exchange.getException();
789
790        // we continue so clear any exceptions
791        exchange.setException(null);
792        // clear rollback flags
793        exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
794        // reset cached streams so they can be read again
795        MessageHelper.resetStreamCache(exchange.getIn());
796
797        // its continued then remove traces of redelivery attempted and caught exception
798        exchange.getIn().removeHeader(Exchange.REDELIVERED);
799        exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
800        exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
801        exchange.removeProperty(Exchange.FAILURE_HANDLED);
802        // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
803
804        // create log message
805        String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
806        msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
807        msg = msg + ". Handled and continue routing.";
808
809        // log that we failed but want to continue
810        logFailedDelivery(false, false, false, true, isDeadLetterChannel, exchange, msg, data, null);
811    }
812
813    protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) {
814        if (!redeliveryEnabled) {
815            throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly.");
816        }
817        // there must be a defensive copy of the exchange
818        ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this);
819
820        // okay we will give it another go so clear the exception so we can try again
821        exchange.setException(null);
822
823        // clear rollback flags
824        exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
825
826        // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange
827        // and then put these on the exchange when doing a redelivery / fault processor
828
829        // preserve these headers
830        Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
831        Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
832        Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class);
833
834        // we are redelivering so copy from original back to exchange
835        exchange.getIn().copyFrom(data.original.getIn());
836        exchange.setOut(null);
837        // reset cached streams so they can be read again
838        MessageHelper.resetStreamCache(exchange.getIn());
839
840        // put back headers
841        if (redeliveryCounter != null) {
842            exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
843        }
844        if (redeliveryMaxCounter != null) {
845            exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
846        }
847        if (redelivered != null) {
848            exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered);
849        }
850    }
851
852    protected void handleException(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) {
853        Exception e = exchange.getException();
854        // e is never null
855
856        Throwable previous = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
857        if (previous != null && previous != e) {
858            // a 2nd exception was thrown while handling a previous exception
859            // so we need to add the previous as suppressed by the new exception
860            // see also FatalFallbackErrorHandler
861            Throwable[] suppressed = e.getSuppressed();
862            boolean found = false;
863            for (Throwable t : suppressed) {
864                if (t == previous) {
865                    found = true;
866                }
867            }
868            if (!found) {
869                e.addSuppressed(previous);
870            }
871        }
872
873        // store the original caused exception in a property, so we can restore it later
874        exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
875
876        // find the error handler to use (if any)
877        OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
878        if (exceptionPolicy != null) {
879            data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
880            data.handledPredicate = exceptionPolicy.getHandledPolicy();
881            data.continuedPredicate = exceptionPolicy.getContinuedPolicy();
882            data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy();
883            data.useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy() != null && exceptionPolicy.getUseOriginalMessagePolicy();
884
885            // route specific failure handler?
886            Processor processor = null;
887            UnitOfWork uow = exchange.getUnitOfWork();
888            if (uow != null && uow.getRouteContext() != null) {
889                String routeId = uow.getRouteContext().getRoute().getId();
890                processor = exceptionPolicy.getErrorHandler(routeId);
891            } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) {
892                // note this should really not happen, but we have this code as a fail safe
893                // to be backwards compatible with the old behavior
894                log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId());
895                processor = exceptionPolicy.getErrorHandlers().iterator().next();
896            }
897            if (processor != null) {
898                data.failureProcessor = processor;
899            }
900
901            // route specific on redelivery?
902            processor = exceptionPolicy.getOnRedelivery();
903            if (processor != null) {
904                data.onRedeliveryProcessor = processor;
905            }
906            // route specific on exception occurred?
907            processor = exceptionPolicy.getOnExceptionOccurred();
908            if (processor != null) {
909                data.onExceptionProcessor = processor;
910            }
911        }
912
913        // only log if not failure handled or not an exhausted unit of work
914        if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) {
915            String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange)
916                    + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
917            logFailedDelivery(true, false, false, false, isDeadLetterChannel, exchange, msg, data, e);
918        }
919
920        data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data);
921    }
922
923    /**
924     * Gives an optional configured OnExceptionOccurred processor a chance to process just after an exception
925     * was thrown while processing the Exchange. This allows to execute the processor at the same time the exception was thrown.
926     */
927    protected void onExceptionOccurred(Exchange exchange, final RedeliveryData data) {
928        if (data.onExceptionProcessor == null) {
929            return;
930        }
931
932        // run this synchronously as its just a Processor
933        try {
934            if (log.isTraceEnabled()) {
935                log.trace("OnExceptionOccurred processor {} is processing Exchange: {} due exception occurred", data.onExceptionProcessor, exchange);
936            }
937            data.onExceptionProcessor.process(exchange);
938        } catch (Throwable e) {
939            // we dont not want new exception to override existing, so log it as a WARN
940            log.warn("Error during processing OnExceptionOccurred. This exception is ignored.", e);
941        }
942        log.trace("OnExceptionOccurred processor done");
943    }
944
945    /**
946     * Gives an optional configured redelivery processor a chance to process before the Exchange
947     * will be redelivered. This can be used to alter the Exchange.
948     */
949    protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
950        if (data.onRedeliveryProcessor == null) {
951            return;
952        }
953
954        if (log.isTraceEnabled()) {
955            log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered",
956                    data.onRedeliveryProcessor, exchange);
957        }
958
959        // run this synchronously as its just a Processor
960        try {
961            data.onRedeliveryProcessor.process(exchange);
962        } catch (Throwable e) {
963            exchange.setException(e);
964        }
965        log.trace("Redelivery processor done");
966    }
967
968    /**
969     * All redelivery attempts failed so move the exchange to the dead letter queue
970     */
971    protected boolean deliverToFailureProcessor(final Processor processor, final boolean isDeadLetterChannel, final Exchange exchange,
972                                                final RedeliveryData data, final AsyncCallback callback) {
973        boolean sync = true;
974
975        Exception caught = exchange.getException();
976
977        // we did not success with the redelivery so now we let the failure processor handle it
978        // clear exception as we let the failure processor handle it
979        exchange.setException(null);
980
981        final boolean shouldHandle = shouldHandle(exchange, data);
982        final boolean shouldContinue = shouldContinue(exchange, data);
983
984        // regard both handled or continued as being handled
985        boolean handled = false;
986
987        // always handle if dead letter channel
988        boolean handleOrContinue = isDeadLetterChannel || shouldHandle || shouldContinue;
989        if (handleOrContinue) {
990            // its handled then remove traces of redelivery attempted
991            exchange.getIn().removeHeader(Exchange.REDELIVERED);
992            exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
993            exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
994            exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
995
996            // and remove traces of rollback only and uow exhausted markers
997            exchange.removeProperty(Exchange.ROLLBACK_ONLY);
998            exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED);
999
1000            handled = true;
1001        } else {
1002            // must decrement the redelivery counter as we didn't process the redelivery but is
1003            // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
1004            decrementRedeliveryCounter(exchange);
1005        }
1006
1007        // we should allow using the failure processor if we should not continue
1008        // or in case of continue then the failure processor is NOT a dead letter channel
1009        // because you can continue and still let the failure processor do some routing
1010        // before continue in the main route.
1011        boolean allowFailureProcessor = !shouldContinue || !isDeadLetterChannel;
1012
1013        if (allowFailureProcessor && processor != null) {
1014
1015            // prepare original IN body if it should be moved instead of current body
1016            if (data.useOriginalInMessage) {
1017                log.trace("Using the original IN message instead of current");
1018                Message original = ExchangeHelper.getOriginalInMessage(exchange);
1019                exchange.setIn(original);
1020                if (exchange.hasOut()) {
1021                    log.trace("Removing the out message to avoid some uncertain behavior");
1022                    exchange.setOut(null);
1023                }
1024            }
1025
1026            // reset cached streams so they can be read again
1027            MessageHelper.resetStreamCache(exchange.getIn());
1028
1029            // invoke custom on prepare
1030            if (onPrepareProcessor != null) {
1031                try {
1032                    log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange);
1033                    onPrepareProcessor.process(exchange);
1034                } catch (Exception e) {
1035                    // a new exception was thrown during prepare
1036                    exchange.setException(e);
1037                }
1038            }
1039
1040            log.trace("Failure processor {} is processing Exchange: {}", processor, exchange);
1041
1042            // store the last to endpoint as the failure endpoint
1043            exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
1044            // and store the route id so we know in which route we failed
1045            UnitOfWork uow = exchange.getUnitOfWork();
1046            if (uow != null && uow.getRouteContext() != null) {
1047                exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
1048            }
1049
1050            // fire event as we had a failure processor to handle it, which there is a event for
1051            final boolean deadLetterChannel = processor == data.deadLetterProcessor;
1052
1053            EventHelper.notifyExchangeFailureHandling(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
1054
1055            // the failure processor could also be asynchronous
1056            AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor);
1057            sync = afp.process(exchange, new AsyncCallback() {
1058                public void done(boolean sync) {
1059                    log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange);
1060                    try {
1061                        prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue);
1062                        // fire event as we had a failure processor to handle it, which there is a event for
1063                        EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri);
1064                    } finally {
1065                        // if the fault was handled asynchronously, this should be reflected in the callback as well
1066                        data.sync &= sync;
1067                        callback.done(data.sync);
1068                    }
1069                }
1070            });
1071        } else {
1072            try {
1073                // invoke custom on prepare
1074                if (onPrepareProcessor != null) {
1075                    try {
1076                        log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange);
1077                        onPrepareProcessor.process(exchange);
1078                    } catch (Exception e) {
1079                        // a new exception was thrown during prepare
1080                        exchange.setException(e);
1081                    }
1082                }
1083                // no processor but we need to prepare after failure as well
1084                prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue);
1085            } finally {
1086                // callback we are done
1087                callback.done(data.sync);
1088            }
1089        }
1090
1091        // create log message
1092        String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
1093        msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
1094        if (processor != null) {
1095            if (isDeadLetterChannel && deadLetterUri != null) {
1096                msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]";
1097            } else {
1098                msg = msg + ". Processed by failure processor: " + processor;
1099            }
1100        }
1101
1102        // log that we failed delivery as we are exhausted
1103        logFailedDelivery(false, false, handled, false, isDeadLetterChannel, exchange, msg, data, null);
1104
1105        return sync;
1106    }
1107
1108    protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data, final boolean isDeadLetterChannel,
1109                                               final boolean shouldHandle, final boolean shouldContinue) {
1110
1111        Exception newException = exchange.getException();
1112
1113        // we could not process the exchange so we let the failure processor handled it
1114        ExchangeHelper.setFailureHandled(exchange);
1115
1116        // honor if already set a handling
1117        boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
1118        if (alreadySet) {
1119            boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
1120            log.trace("This exchange has already been marked for handling: {}", handled);
1121            if (!handled) {
1122                // exception not handled, put exception back in the exchange
1123                exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
1124                // and put failure endpoint back as well
1125                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
1126            }
1127            return;
1128        }
1129
1130        // dead letter channel is special
1131        if (shouldContinue) {
1132            log.trace("This exchange is continued: {}", exchange);
1133            // okay we want to continue then prepare the exchange for that as well
1134            prepareExchangeForContinue(exchange, data, isDeadLetterChannel);
1135        } else if (shouldHandle) {
1136            log.trace("This exchange is handled so its marked as not failed: {}", exchange);
1137            exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
1138        } else {
1139            // okay the redelivery policy are not explicit set to true, so we should allow to check for some
1140            // special situations when using dead letter channel
1141            if (isDeadLetterChannel) {
1142
1143                // DLC is always handling the first thrown exception,
1144                // but if its a new exception then use the configured option
1145                boolean handled = newException == null || data.handleNewException;
1146
1147                // when using DLC then log new exception whether its being handled or not, as otherwise it may appear as
1148                // the DLC swallow new exceptions by default (which is by design to ensure the DLC always complete,
1149                // to avoid causing endless poison messages that fails forever)
1150                if (newException != null && data.currentRedeliveryPolicy.isLogNewException()) {
1151                    String uri = URISupport.sanitizeUri(deadLetterUri);
1152                    String msg = "New exception occurred during processing by the DeadLetterChannel[" + uri + "] due " + newException.getMessage();
1153                    if (handled) {
1154                        msg += ". The new exception is being handled as deadLetterHandleNewException=true.";
1155                    } else {
1156                        msg += ". The new exception is not handled as deadLetterHandleNewException=false.";
1157                    }
1158                    logFailedDelivery(false, true, handled, false, true, exchange, msg, data, newException);
1159                }
1160
1161                if (handled) {
1162                    log.trace("This exchange is handled so its marked as not failed: {}", exchange);
1163                    exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
1164                    return;
1165                }
1166            }
1167
1168            // not handled by default
1169            prepareExchangeAfterFailureNotHandled(exchange);
1170        }
1171    }
1172
1173    private void prepareExchangeAfterFailureNotHandled(Exchange exchange) {
1174        log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange);
1175        // exception not handled, put exception back in the exchange
1176        exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
1177        exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
1178        // and put failure endpoint back as well
1179        exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
1180        // and store the route id so we know in which route we failed
1181        UnitOfWork uow = exchange.getUnitOfWork();
1182        if (uow != null && uow.getRouteContext() != null) {
1183            exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId());
1184        }
1185    }
1186
1187    private void logFailedDelivery(boolean shouldRedeliver, boolean newException, boolean handled, boolean continued, boolean isDeadLetterChannel,
1188                                   Exchange exchange, String message, RedeliveryData data, Throwable e) {
1189        if (logger == null) {
1190            return;
1191        }
1192
1193        if (!exchange.isRollbackOnly()) {
1194            if (newException && !data.currentRedeliveryPolicy.isLogNewException()) {
1195                // do not log new exception
1196                return;
1197            }
1198
1199            // if we should not rollback, then check whether logging is enabled
1200
1201            if (!newException && handled && !data.currentRedeliveryPolicy.isLogHandled()) {
1202                // do not log handled
1203                return;
1204            }
1205
1206            if (!newException && continued && !data.currentRedeliveryPolicy.isLogContinued()) {
1207                // do not log handled
1208                return;
1209            }
1210
1211            if (!newException && shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) {
1212                // do not log retry attempts
1213                return;
1214            }
1215
1216            if (!newException && !shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) {
1217                // do not log exhausted
1218                return;
1219            }
1220        }
1221
1222        LoggingLevel newLogLevel;
1223        boolean logStackTrace;
1224        if (exchange.isRollbackOnly()) {
1225            newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
1226            logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
1227        } else if (shouldRedeliver) {
1228            newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
1229            logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace();
1230        } else {
1231            newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
1232            logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
1233        }
1234        if (e == null) {
1235            e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
1236        }
1237
1238        if (newException) {
1239            // log at most WARN level
1240            if (newLogLevel == LoggingLevel.ERROR) {
1241                newLogLevel = LoggingLevel.WARN;
1242            }
1243            String msg = message;
1244            if (msg == null) {
1245                msg = "New exception " + ExchangeHelper.logIds(exchange);
1246                // special for logging the new exception
1247                Throwable cause = e;
1248                if (cause != null) {
1249                    msg = msg + " due: " + cause.getMessage();
1250                }
1251            }
1252
1253            if (e != null && logStackTrace) {
1254                logger.log(msg, e, newLogLevel);
1255            } else {
1256                logger.log(msg, newLogLevel);
1257            }
1258        } else if (exchange.isRollbackOnly()) {
1259            String msg = "Rollback " + ExchangeHelper.logIds(exchange);
1260            Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
1261            if (cause != null) {
1262                msg = msg + " due: " + cause.getMessage();
1263            }
1264
1265            // should we include message history
1266            if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
1267                // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
1268                ExchangeFormatter formatter = customExchangeFormatter
1269                    ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
1270                String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, false);
1271                if (routeStackTrace != null) {
1272                    msg = msg + "\n" + routeStackTrace;
1273                }
1274            }
1275
1276            if (newLogLevel == LoggingLevel.ERROR) {
1277                // log intended rollback on maximum WARN level (no ERROR)
1278                logger.log(msg, LoggingLevel.WARN);
1279            } else {
1280                // otherwise use the desired logging level
1281                logger.log(msg, newLogLevel);
1282            }
1283        } else {
1284            String msg = message;
1285            // should we include message history
1286            if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) {
1287                // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it)
1288                ExchangeFormatter formatter = customExchangeFormatter
1289                    ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null);
1290                String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, e != null && logStackTrace);
1291                if (routeStackTrace != null) {
1292                    msg = msg + "\n" + routeStackTrace;
1293                }
1294            }
1295
1296            if (e != null && logStackTrace) {
1297                logger.log(msg, e, newLogLevel);
1298            } else {
1299                logger.log(msg, newLogLevel);
1300            }
1301        }
1302    }
1303
1304    /**
1305     * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback).
1306     * <p/>
1307     * If the exchange is exhausted, then we will not continue processing, but let the
1308     * failure processor deal with the exchange.
1309     *
1310     * @param exchange the current exchange
1311     * @param data     the redelivery data
1312     * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust.
1313     */
1314    private boolean isExhausted(Exchange exchange, RedeliveryData data) {
1315        // if marked as rollback only then do not continue/redeliver
1316        boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
1317        if (exhausted) {
1318            log.trace("This exchange is marked as redelivery exhausted: {}", exchange);
1319            return true;
1320        }
1321
1322        // if marked as rollback only then do not continue/redeliver
1323        boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class);
1324        if (rollbackOnly) {
1325            log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange);
1326            return true;
1327        }
1328        // its the first original call so continue
1329        if (data.redeliveryCounter == 0) {
1330            return false;
1331        }
1332        // its a potential redelivery so determine if we should redeliver or not
1333        boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate);
1334        return !redeliver;
1335    }
1336
1337    /**
1338     * Determines whether or not to continue if we are exhausted.
1339     *
1340     * @param exchange the current exchange
1341     * @param data     the redelivery data
1342     * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust.
1343     */
1344    private boolean shouldContinue(Exchange exchange, RedeliveryData data) {
1345        if (data.continuedPredicate != null) {
1346            return data.continuedPredicate.matches(exchange);
1347        }
1348        // do not continue by default
1349        return false;
1350    }
1351
1352    /**
1353     * Determines whether or not to handle if we are exhausted.
1354     *
1355     * @param exchange the current exchange
1356     * @param data     the redelivery data
1357     * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust.
1358     */
1359    private boolean shouldHandle(Exchange exchange, RedeliveryData data) {
1360        if (data.handledPredicate != null) {
1361            return data.handledPredicate.matches(exchange);
1362        }
1363        // do not handle by default
1364        return false;
1365    }
1366
1367    /**
1368     * Increments the redelivery counter and adds the redelivered flag if the
1369     * message has been redelivered
1370     */
1371    private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) {
1372        Message in = exchange.getIn();
1373        Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
1374        int next = 1;
1375        if (counter != null) {
1376            next = counter + 1;
1377        }
1378        in.setHeader(Exchange.REDELIVERY_COUNTER, next);
1379        in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
1380        // if maximum redeliveries is used, then provide that information as well
1381        if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) {
1382            in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries());
1383        }
1384        return next;
1385    }
1386
1387    /**
1388     * Prepares the redelivery counter and boolean flag for the failure handle processor
1389     */
1390    private void decrementRedeliveryCounter(Exchange exchange) {
1391        Message in = exchange.getIn();
1392        Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
1393        if (counter != null) {
1394            int prev = counter - 1;
1395            in.setHeader(Exchange.REDELIVERY_COUNTER, prev);
1396            // set boolean flag according to counter
1397            in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
1398        } else {
1399            // not redelivered
1400            in.setHeader(Exchange.REDELIVERY_COUNTER, 0);
1401            in.setHeader(Exchange.REDELIVERED, Boolean.FALSE);
1402        }
1403    }
1404
1405    /**
1406     * Determines if redelivery is enabled by checking if any of the redelivery policy
1407     * settings may allow redeliveries.
1408     *
1409     * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise
1410     * @throws Exception can be thrown
1411     */
1412    private boolean determineIfRedeliveryIsEnabled() throws Exception {
1413        // determine if redeliver is enabled either on error handler
1414        if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) {
1415            // must check for != 0 as (-1 means redeliver forever)
1416            return true;
1417        }
1418        if (retryWhilePolicy != null) {
1419            return true;
1420        }
1421
1422        // or on the exception policies
1423        if (!exceptionPolicies.isEmpty()) {
1424            // walk them to see if any of them have a maximum redeliveries > 0 or retry until set
1425            for (OnExceptionDefinition def : exceptionPolicies.values()) {
1426
1427                String ref = def.getRedeliveryPolicyRef();
1428                if (ref != null) {
1429                    // lookup in registry if ref provided
1430                    RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class);
1431                    if (policy.getMaximumRedeliveries() != 0) {
1432                        // must check for != 0 as (-1 means redeliver forever)
1433                        return true;
1434                    }
1435                } else if (def.getRedeliveryPolicy() != null) {
1436                    Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries());
1437                    if (max != null && max != 0) {
1438                        // must check for != 0 as (-1 means redeliver forever)
1439                        return true;
1440                    }
1441                }
1442
1443                if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) {
1444                    return true;
1445                }
1446            }
1447        }
1448
1449        return false;
1450    }
1451
1452    /**
1453     * Gets the number of exchanges that are pending for redelivery
1454     */
1455    public int getPendingRedeliveryCount() {
1456        int answer = redeliverySleepCounter.get();
1457        if (executorService != null && executorService instanceof ThreadPoolExecutor) {
1458            answer += ((ThreadPoolExecutor) executorService).getQueue().size();
1459        }
1460
1461        return answer;
1462    }
1463
1464    @Override
1465    protected void doStart() throws Exception {
1466        ServiceHelper.startServices(output, outputAsync, deadLetter);
1467
1468        // determine if redeliver is enabled or not
1469        redeliveryEnabled = determineIfRedeliveryIsEnabled();
1470        if (log.isTraceEnabled()) {
1471            log.trace("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this);
1472        }
1473
1474        // we only need thread pool if redelivery is enabled
1475        if (redeliveryEnabled) {
1476            if (executorService == null) {
1477                // use default shared executor service
1478                executorService = camelContext.getErrorHandlerExecutorService();
1479            }
1480            if (log.isDebugEnabled()) {
1481                log.debug("Using ExecutorService: {} for redeliveries on error handler: {}", executorService, this);
1482            }
1483        }
1484
1485        // reset flag when starting
1486        preparingShutdown = false;
1487        redeliverySleepCounter.set(0);
1488    }
1489
1490    @Override
1491    protected void doStop() throws Exception {
1492        // noop, do not stop any services which we only do when shutting down
1493        // as the error handler can be context scoped, and should not stop in case
1494        // a route stops
1495    }
1496
1497    @Override
1498    protected void doShutdown() throws Exception {
1499        ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync);
1500    }
1501}