001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.Callable;
020    import java.util.concurrent.RejectedExecutionException;
021    import java.util.concurrent.ScheduledExecutorService;
022    import java.util.concurrent.TimeUnit;
023    
024    import org.apache.camel.AsyncCallback;
025    import org.apache.camel.AsyncProcessor;
026    import org.apache.camel.CamelContext;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.LoggingLevel;
029    import org.apache.camel.Message;
030    import org.apache.camel.Predicate;
031    import org.apache.camel.Processor;
032    import org.apache.camel.model.OnExceptionDefinition;
033    import org.apache.camel.spi.SubUnitOfWorkCallback;
034    import org.apache.camel.spi.UnitOfWork;
035    import org.apache.camel.util.AsyncProcessorConverterHelper;
036    import org.apache.camel.util.AsyncProcessorHelper;
037    import org.apache.camel.util.CamelContextHelper;
038    import org.apache.camel.util.CamelLogger;
039    import org.apache.camel.util.EventHelper;
040    import org.apache.camel.util.ExchangeHelper;
041    import org.apache.camel.util.MessageHelper;
042    import org.apache.camel.util.ObjectHelper;
043    import org.apache.camel.util.ServiceHelper;
044    
045    /**
046     * Base redeliverable error handler that also supports a final dead letter queue in case
047     * all redelivery attempts fail.
048     * <p/>
049     * This implementation should contain all the error handling logic and the sub classes
050     * should only configure it according to what they support.
051     *
052     * @version
053     */
054    public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor {
055    
056        protected ScheduledExecutorService executorService;
057        protected final CamelContext camelContext;
058        protected final Processor deadLetter;
059        protected final String deadLetterUri;
060        protected final Processor output;
061        protected final AsyncProcessor outputAsync;
062        protected final Processor redeliveryProcessor;
063        protected final RedeliveryPolicy redeliveryPolicy;
064        protected final Predicate retryWhilePolicy;
065        protected final CamelLogger logger;
066        protected final boolean useOriginalMessagePolicy;
067        protected boolean redeliveryEnabled;
068    
069        /**
070         * Contains the current redelivery data
071         */
072        protected class RedeliveryData {
073            Exchange original;
074            boolean sync = true;
075            int redeliveryCounter;
076            long redeliveryDelay;
077            Predicate retryWhilePredicate = retryWhilePolicy;
078            boolean redeliverFromSync;
079    
080            // default behavior which can be overloaded on a per exception basis
081            RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
082            Processor deadLetterProcessor = deadLetter;
083            Processor failureProcessor;
084            Processor onRedeliveryProcessor = redeliveryProcessor;
085            Predicate handledPredicate = getDefaultHandledPredicate();
086            Predicate continuedPredicate;
087            boolean useOriginalInMessage = useOriginalMessagePolicy;
088            boolean asyncDelayedRedelivery = redeliveryPolicy.isAsyncDelayedRedelivery();
089        }
090    
091        /**
092         * Tasks which performs asynchronous redelivery attempts, and being triggered by a
093         * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task
094         * has to be delayed before a redelivery attempt is performed.
095         */
096        private class AsyncRedeliveryTask implements Callable<Boolean> {
097    
098            private final Exchange exchange;
099            private final AsyncCallback callback;
100            private final RedeliveryData data;
101    
102            public AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
103                this.exchange = exchange;
104                this.callback = callback;
105                this.data = data;
106            }
107    
108            public Boolean call() throws Exception {
109                // prepare for redelivery
110                prepareExchangeForRedelivery(exchange, data);
111    
112                // letting onRedeliver be executed at first
113                deliverToOnRedeliveryProcessor(exchange, data);
114    
115                if (log.isTraceEnabled()) {
116                    log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", new Object[]{exchange.getExchangeId(), outputAsync, exchange});
117                }
118    
119                // emmit event we are doing redelivery
120                EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
121    
122                // process the exchange (also redelivery)
123                boolean sync;
124                if (data.redeliverFromSync) {
125                    // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from
126                    // this error handler, which means we have to invoke the callback with false, to have the callback
127                    // be notified when we are done
128                    sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() {
129                        public void done(boolean doneSync) {
130                            log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
131    
132                            // mark we are in sync mode now
133                            data.sync = false;
134    
135                            // only process if the exchange hasn't failed
136                            // and it has not been handled by the error processor
137                            if (isDone(exchange)) {
138                                callback.done(false);
139                                return;
140                            }
141    
142                            // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
143                            processAsyncErrorHandler(exchange, callback, data);
144                        }
145                    });
146                } else {
147                    // this redelivery task was scheduled from asynchronous, which means we should only
148                    // handle when the asynchronous task was done
149                    sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() {
150                        public void done(boolean doneSync) {
151                            log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync);
152    
153                            // this callback should only handle the async case
154                            if (doneSync) {
155                                return;
156                            }
157    
158                            // mark we are in async mode now
159                            data.sync = false;
160    
161                            // only process if the exchange hasn't failed
162                            // and it has not been handled by the error processor
163                            if (isDone(exchange)) {
164                                callback.done(doneSync);
165                                return;
166                            }
167                            // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
168                            processAsyncErrorHandler(exchange, callback, data);
169                        }
170                    });
171                }
172    
173                return sync;
174            }
175        }
176    
177        public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger,
178                Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter,
179                String deadLetterUri, boolean useOriginalMessagePolicy, Predicate retryWhile, ScheduledExecutorService executorService) {
180    
181            ObjectHelper.notNull(camelContext, "CamelContext", this);
182            ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this);
183    
184            this.camelContext = camelContext;
185            this.redeliveryProcessor = redeliveryProcessor;
186            this.deadLetter = deadLetter;
187            this.output = output;
188            this.outputAsync = AsyncProcessorConverterHelper.convert(output);
189            this.redeliveryPolicy = redeliveryPolicy;
190            this.logger = logger;
191            this.deadLetterUri = deadLetterUri;
192            this.useOriginalMessagePolicy = useOriginalMessagePolicy;
193            this.retryWhilePolicy = retryWhile;
194            this.executorService = executorService;
195        }
196    
197        public boolean supportTransacted() {
198            return false;
199        }
200    
201        @Override
202        public boolean isRunAllowed() {
203            // determine if we can still run, or the camel context is forcing a shutdown
204            boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
205            if (forceShutdown) {
206                log.trace("Run not allowed as ShutdownStrategy is forcing shutting down");
207            }
208            return !forceShutdown && super.isRunAllowed();
209        }
210    
211        public void process(Exchange exchange) throws Exception {
212            if (output == null) {
213                // no output then just return
214                return;
215            }
216            AsyncProcessorHelper.process(this, exchange);
217        }
218    
219        public boolean process(Exchange exchange, final AsyncCallback callback) {
220            return processErrorHandler(exchange, callback, new RedeliveryData());
221        }
222    
223        /**
224         * Process the exchange using redelivery error handling.
225         */
226        protected boolean processErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
227    
228            // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
229            // original Exchange is being redelivered, and not a mutated Exchange
230            data.original = defensiveCopyExchangeIfNeeded(exchange);
231    
232            // use looping to have redelivery attempts
233            while (true) {
234    
235                // can we still run
236                if (!isRunAllowed()) {
237                    log.trace("Run not allowed, will reject executing exchange: {}", exchange);
238                    if (exchange.getException() == null) {
239                        exchange.setException(new RejectedExecutionException());
240                    }
241                    // we cannot process so invoke callback
242                    callback.done(data.sync);
243                    return data.sync;
244                }
245    
246                // did previous processing cause an exception?
247                boolean handle = shouldHandleException(exchange);
248                if (handle) {
249                    handleException(exchange, data);
250                }
251    
252                // compute if we are exhausted or not
253                boolean exhausted = isExhausted(exchange, data);
254                if (exhausted) {
255                    Processor target = null;
256                    boolean deliver = true;
257    
258                    // the unit of work may have an optional callback associated we need to leverage
259                    SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback();
260                    if (uowCallback != null) {
261                        // signal to the callback we are exhausted
262                        uowCallback.onExhausted(exchange);
263                        // do not deliver to the failure processor as its been handled by the callback instead
264                        deliver = false;
265                    }
266    
267                    if (deliver) {
268                        // should deliver to failure processor (either from onException or the dead letter channel)
269                        target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
270                    }
271                    // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
272                    // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
273                    boolean sync = deliverToFailureProcessor(target, exchange, data, callback);
274                    // we are breaking out
275                    return sync;
276                }
277    
278                if (data.redeliveryCounter > 0) {
279                    // calculate delay
280                    data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter);
281    
282                    if (data.redeliveryDelay > 0) {
283                        // okay there is a delay so create a scheduled task to have it executed in the future
284    
285                        if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) {
286    
287                            // we are doing a redelivery then a thread pool must be configured (see the doStart method)
288                            ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
289    
290                            // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
291                            // have it being executed in the future, or immediately
292                            // we are continuing asynchronously
293    
294                            // mark we are routing async from now and that this redelivery task came from a synchronous routing
295                            data.sync = false;
296                            data.redeliverFromSync = true;
297                            AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
298    
299                            // schedule the redelivery task
300                            if (log.isTraceEnabled()) {
301                                log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
302                            }
303                            executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
304    
305                            return false;
306                        } else {
307                            // async delayed redelivery was disabled or we are transacted so we must be synchronous
308                            // as the transaction manager requires to execute in the same thread context
309                            try {
310                                data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
311                            } catch (InterruptedException e) {
312                                // we was interrupted so break out
313                                exchange.setException(e);
314                                // mark the exchange to stop continue routing when interrupted
315                                // as we do not want to continue routing (for example a task has been cancelled)
316                                exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
317                                callback.done(data.sync);
318                                return data.sync;
319                            }
320                        }
321                    }
322    
323                    // prepare for redelivery
324                    prepareExchangeForRedelivery(exchange, data);
325    
326                    // letting onRedeliver be executed
327                    deliverToOnRedeliveryProcessor(exchange, data);
328    
329                    // emmit event we are doing redelivery
330                    EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter);
331                }
332    
333                // process the exchange (also redelivery)
334                boolean sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() {
335                    public void done(boolean sync) {
336                        // this callback should only handle the async case
337                        if (sync) {
338                            return;
339                        }
340    
341                        // mark we are in async mode now
342                        data.sync = false;
343    
344                        // if we are done then notify callback and exit
345                        if (isDone(exchange)) {
346                            callback.done(sync);
347                            return;
348                        }
349    
350                        // error occurred so loop back around which we do by invoking the processAsyncErrorHandler
351                        // method which takes care of this in a asynchronous manner
352                        processAsyncErrorHandler(exchange, callback, data);
353                    }
354                });
355    
356                if (!sync) {
357                    // the remainder of the Exchange is being processed asynchronously so we should return
358                    return false;
359                }
360                // we continue to route synchronously
361    
362                // if we are done then notify callback and exit
363                boolean done = isDone(exchange);
364                if (done) {
365                    callback.done(true);
366                    return true;
367                }
368    
369                // error occurred so loop back around.....
370            }
371        }
372    
373        /**
374         * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY}
375         * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p>
376         *
377         * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay}
378         * and {@link RedeliveryData#redeliveryCounter} are copied in.</p>
379         *
380         * @param exchange The current exchange in question.
381         * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation.
382         * @param redeliveryDelay The default redelivery delay from RedeliveryData
383         * @param redeliveryCounter The redeliveryCounter
384         * @return The time to wait before the next redelivery.
385         */
386        protected long determineRedeliveryDelay(Exchange exchange, RedeliveryPolicy redeliveryPolicy, long redeliveryDelay, int redeliveryCounter) {
387            Message message = exchange.getIn();
388            Long delay = message.getHeader(Exchange.REDELIVERY_DELAY, Long.class);
389            if (delay == null) {
390                delay = redeliveryPolicy.calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter);
391                log.debug("Redelivery delay calculated as {}", delay);
392            } else {
393                log.debug("Redelivery delay is {} from Message Header [{}]", delay, Exchange.REDELIVERY_DELAY);
394            }
395            return delay;
396        }
397    
398        /**
399         * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback.
400         * <p/>
401         * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use
402         * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b>
403         * in terms of logic.
404         */
405        protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
406            // can we still run
407            if (!isRunAllowed()) {
408                log.trace("Run not allowed, will reject executing exchange: {}", exchange);
409                if (exchange.getException() == null) {
410                    exchange.setException(new RejectedExecutionException());
411                }
412                callback.done(data.sync);
413                return;
414            }
415    
416            // did previous processing cause an exception?
417            boolean handle = shouldHandleException(exchange);
418            if (handle) {
419                handleException(exchange, data);
420            }
421    
422            // compute if we are exhausted or not
423            boolean exhausted = isExhausted(exchange, data);
424            if (exhausted) {
425                Processor target = null;
426                boolean deliver = true;
427    
428                // the unit of work may have an optional callback associated we need to leverage
429                SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback();
430                if (uowCallback != null) {
431                    // signal to the callback we are exhausted
432                    uowCallback.onExhausted(exchange);
433                    // do not deliver to the failure processor as its been handled by the callback instead
434                    deliver = false;
435                }
436    
437                if (deliver) {
438                    // should deliver to failure processor (either from onException or the dead letter channel)
439                    target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor;
440                }
441                // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair
442                // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint)
443                deliverToFailureProcessor(target, exchange, data, callback);
444                // we are breaking out
445                return;
446            }
447    
448            if (data.redeliveryCounter > 0) {
449                // we are doing a redelivery then a thread pool must be configured (see the doStart method)
450                ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this);
451    
452                // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to
453                // have it being executed in the future, or immediately
454                // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously
455                // to ensure the callback will continue routing from where we left
456                AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data);
457    
458                // calculate the redelivery delay
459                data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
460                if (data.redeliveryDelay > 0) {
461                    // schedule the redelivery task
462                    if (log.isTraceEnabled()) {
463                        log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId());
464                    }
465                    executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS);
466                } else {
467                    // execute the task immediately
468                    executorService.submit(task);
469                }
470            }
471        }
472    
473        /**
474         * Performs a defensive copy of the exchange if needed
475         *
476         * @param exchange the exchange
477         * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled).
478         */
479        protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) {
480            // only do a defensive copy if redelivery is enabled
481            if (redeliveryEnabled) {
482                return ExchangeHelper.createCopy(exchange, true);
483            } else {
484                return null;
485            }
486        }
487    
488        /**
489         * Strategy whether the exchange has an exception that we should try to handle.
490         * <p/>
491         * Standard implementations should just look for an exception.
492         */
493        protected boolean shouldHandleException(Exchange exchange) {
494            return exchange.getException() != null;
495        }
496    
497        /**
498         * Strategy to determine if the exchange is done so we can continue
499         */
500        protected boolean isDone(Exchange exchange) {
501            boolean answer = isCancelledOrInterrupted(exchange);
502    
503            // only done if the exchange hasn't failed
504            // and it has not been handled by the failure processor
505            // or we are exhausted
506            if (!answer) {
507                answer = exchange.getException() == null
508                    || ExchangeHelper.isFailureHandled(exchange)
509                    || ExchangeHelper.isRedeliveryExhausted(exchange);
510            }
511    
512            log.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer);
513            return answer;
514        }
515    
516        /**
517         * Strategy to determine if the exchange was cancelled or interrupted
518         */
519        protected boolean isCancelledOrInterrupted(Exchange exchange) {
520            boolean answer = false;
521    
522            if (ExchangeHelper.isInterrupted(exchange)) {
523                // mark the exchange to stop continue routing when interrupted
524                // as we do not want to continue routing (for example a task has been cancelled)
525                exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
526                answer = true;
527            }
528    
529            log.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer);
530            return answer;
531        }
532    
533        /**
534         * Returns the output processor
535         */
536        public Processor getOutput() {
537            return output;
538        }
539    
540        /**
541         * Returns the dead letter that message exchanges will be sent to if the
542         * redelivery attempts fail
543         */
544        public Processor getDeadLetter() {
545            return deadLetter;
546        }
547    
548        public String getDeadLetterUri() {
549            return deadLetterUri;
550        }
551    
552        public boolean isUseOriginalMessagePolicy() {
553            return useOriginalMessagePolicy;
554        }
555    
556        public RedeliveryPolicy getRedeliveryPolicy() {
557            return redeliveryPolicy;
558        }
559    
560        public CamelLogger getLogger() {
561            return logger;
562        }
563    
564        protected Predicate getDefaultHandledPredicate() {
565            // Default is not not handle errors
566            return null;
567        }
568    
569        protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data) {
570            Exception caught = exchange.getException();
571    
572            // we continue so clear any exceptions
573            exchange.setException(null);
574            // clear rollback flags
575            exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
576            // reset cached streams so they can be read again
577            MessageHelper.resetStreamCache(exchange.getIn());
578    
579            // its continued then remove traces of redelivery attempted and caught exception
580            exchange.getIn().removeHeader(Exchange.REDELIVERED);
581            exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
582            exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
583            exchange.removeProperty(Exchange.FAILURE_HANDLED);
584            // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception
585    
586            // create log message
587            String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
588            msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
589            msg = msg + ". Handled and continue routing.";
590    
591            // log that we failed but want to continue
592            logFailedDelivery(false, false, true, exchange, msg, data, null);
593        }
594    
595        protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) {
596            if (!redeliveryEnabled) {
597                throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly.");
598            }
599            // there must be a defensive copy of the exchange
600            ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this);
601    
602            // okay we will give it another go so clear the exception so we can try again
603            exchange.setException(null);
604    
605            // clear rollback flags
606            exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
607    
608            // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange
609            // and then put these on the exchange when doing a redelivery / fault processor
610    
611            // preserve these headers
612            Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
613            Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
614            Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class);
615    
616            // we are redelivering so copy from original back to exchange
617            exchange.getIn().copyFrom(data.original.getIn());
618            exchange.setOut(null);
619            // reset cached streams so they can be read again
620            MessageHelper.resetStreamCache(exchange.getIn());
621    
622            // put back headers
623            if (redeliveryCounter != null) {
624                exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
625            }
626            if (redeliveryMaxCounter != null) {
627                exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
628            }
629            if (redelivered != null) {
630                exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered);
631            }
632        }
633    
634        protected void handleException(Exchange exchange, RedeliveryData data) {
635            Exception e = exchange.getException();
636    
637            // store the original caused exception in a property, so we can restore it later
638            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
639    
640            // find the error handler to use (if any)
641            OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e);
642            if (exceptionPolicy != null) {
643                data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy);
644                data.handledPredicate = exceptionPolicy.getHandledPolicy();
645                data.continuedPredicate = exceptionPolicy.getContinuedPolicy();
646                data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy();
647                data.useOriginalInMessage = exceptionPolicy.isUseOriginalMessage();
648                data.asyncDelayedRedelivery = exceptionPolicy.isAsyncDelayedRedelivery(exchange.getContext());
649    
650                // route specific failure handler?
651                Processor processor = null;
652                UnitOfWork uow = exchange.getUnitOfWork();
653                if (uow != null && uow.getRouteContext() != null) {
654                    String routeId = uow.getRouteContext().getRoute().getId();
655                    processor = exceptionPolicy.getErrorHandler(routeId);
656                } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) {
657                    // note this should really not happen, but we have this code as a fail safe
658                    // to be backwards compatible with the old behavior
659                    log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId());
660                    processor = exceptionPolicy.getErrorHandlers().iterator().next();
661                }
662                if (processor != null) {
663                    data.failureProcessor = processor;
664                }
665    
666                // route specific on redelivery?
667                processor = exceptionPolicy.getOnRedelivery();
668                if (processor != null) {
669                    data.onRedeliveryProcessor = processor;
670                }
671            }
672    
673            // only log if not failure handled or not an exhausted unit of work
674            if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) {
675                String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange)
676                        + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e;
677                logFailedDelivery(true, false, false, exchange, msg, data, e);
678            }
679    
680            data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data);
681        }
682    
683        /**
684         * Gives an optional configure redelivery processor a chance to process before the Exchange
685         * will be redelivered. This can be used to alter the Exchange.
686         */
687        protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
688            if (data.onRedeliveryProcessor == null) {
689                return;
690            }
691    
692            if (log.isTraceEnabled()) {
693                log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered",
694                        data.onRedeliveryProcessor, exchange);
695            }
696    
697            // run this synchronously as its just a Processor
698            try {
699                data.onRedeliveryProcessor.process(exchange);
700            } catch (Throwable e) {
701                exchange.setException(e);
702            }
703            log.trace("Redelivery processor done");
704        }
705    
706        /**
707         * All redelivery attempts failed so move the exchange to the dead letter queue
708         */
709        protected boolean deliverToFailureProcessor(final Processor processor, final Exchange exchange,
710                                                    final RedeliveryData data, final AsyncCallback callback) {
711            boolean sync = true;
712    
713            Exception caught = exchange.getException();
714    
715            // we did not success with the redelivery so now we let the failure processor handle it
716            // clear exception as we let the failure processor handle it
717            exchange.setException(null);
718    
719            final boolean shouldHandle = shouldHandled(exchange, data);
720            final boolean shouldContinue = shouldContinue(exchange, data);
721            // regard both handled or continued as being handled
722            boolean handled = false;
723    
724            if (shouldHandle || shouldContinue) {
725                // its handled then remove traces of redelivery attempted
726                exchange.getIn().removeHeader(Exchange.REDELIVERED);
727                exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
728                exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
729                exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
730    
731                // and remove traces of rollback only and uow exhausted markers
732                exchange.removeProperty(Exchange.ROLLBACK_ONLY);
733                exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED);
734    
735                handled = true;
736            } else {
737                // must decrement the redelivery counter as we didn't process the redelivery but is
738                // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
739                decrementRedeliveryCounter(exchange);
740            }
741    
742            // is the a failure processor to process the Exchange
743            if (processor != null) {
744    
745                // prepare original IN body if it should be moved instead of current body
746                if (data.useOriginalInMessage) {
747                    log.trace("Using the original IN message instead of current");
748                    Message original = exchange.getUnitOfWork().getOriginalInMessage();
749                    exchange.setIn(original);
750                    if (exchange.hasOut()) {
751                        log.trace("Removing the out message to avoid some uncertain behavior");
752                        exchange.setOut(null);
753                    }
754                }
755    
756                // reset cached streams so they can be read again
757                MessageHelper.resetStreamCache(exchange.getIn());
758    
759                log.trace("Failure processor {} is processing Exchange: {}", processor, exchange);
760    
761                // store the last to endpoint as the failure endpoint
762                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
763                // and store the route id so we know in which route we failed
764                if (exchange.getUnitOfWork().getRouteContext() != null) {
765                    exchange.setProperty(Exchange.FAILURE_ROUTE_ID, exchange.getUnitOfWork().getRouteContext().getRoute().getId());
766                }
767    
768                // the failure processor could also be asynchronous
769                AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor);
770                sync = AsyncProcessorHelper.process(afp, exchange, new AsyncCallback() {
771                    public void done(boolean sync) {
772                        log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange);
773                        try {
774                            prepareExchangeAfterFailure(exchange, data, shouldHandle, shouldContinue);
775                            // fire event as we had a failure processor to handle it, which there is a event for
776                            boolean deadLetterChannel = processor == data.deadLetterProcessor && data.deadLetterProcessor != null;
777                            EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel);
778                        } finally {
779                            // if the fault was handled asynchronously, this should be reflected in the callback as well
780                            data.sync &= sync;
781                            callback.done(data.sync);
782                        }
783                    }
784                });
785            } else {
786                try {
787                    // no processor but we need to prepare after failure as well
788                    prepareExchangeAfterFailure(exchange, data, shouldHandle, shouldContinue);
789                } finally {
790                    // callback we are done
791                    callback.done(data.sync);
792                }
793            }
794    
795            // create log message
796            String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange);
797            msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught;
798            if (processor != null) {
799                msg = msg + ". Processed by failure processor: " + processor;
800            }
801    
802            // log that we failed delivery as we are exhausted
803            logFailedDelivery(false, handled, false, exchange, msg, data, null);
804    
805            return sync;
806        }
807    
808        protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data,
809                                                   final boolean shouldHandle, final boolean shouldContinue) {
810            // we could not process the exchange so we let the failure processor handled it
811            ExchangeHelper.setFailureHandled(exchange);
812    
813            // honor if already set a handling
814            boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null;
815            if (alreadySet) {
816                boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class);
817                log.trace("This exchange has already been marked for handling: {}", handled);
818                if (handled) {
819                    exchange.setException(null);
820                } else {
821                    // exception not handled, put exception back in the exchange
822                    exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
823                    // and put failure endpoint back as well
824                    exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
825                }
826                return;
827            }
828    
829            if (shouldHandle) {
830                log.trace("This exchange is handled so its marked as not failed: {}", exchange);
831                exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE);
832            } else if (shouldContinue) {
833                log.trace("This exchange is continued: {}", exchange);
834                // okay we want to continue then prepare the exchange for that as well
835                prepareExchangeForContinue(exchange, data);
836            } else {
837                log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange);
838                // exception not handled, put exception back in the exchange
839                exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE);
840                exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
841                // and put failure endpoint back as well
842                exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
843                // and store the route id so we know in which route we failed
844                if (exchange.getUnitOfWork().getRouteContext() != null) {
845                    exchange.setProperty(Exchange.FAILURE_ROUTE_ID, exchange.getUnitOfWork().getRouteContext().getRoute().getId());
846                }
847            }
848        }
849    
850        private void logFailedDelivery(boolean shouldRedeliver, boolean handled, boolean continued, Exchange exchange, String message, RedeliveryData data, Throwable e) {
851            if (logger == null) {
852                return;
853            }
854    
855            if (!exchange.isRollbackOnly()) {
856                // if we should not rollback, then check whether logging is enabled
857                if (handled && !data.currentRedeliveryPolicy.isLogHandled()) {
858                    // do not log handled
859                    return;
860                }
861    
862                if (continued && !data.currentRedeliveryPolicy.isLogContinued()) {
863                    // do not log handled
864                    return;
865                }
866    
867                if (shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) {
868                    // do not log retry attempts
869                    return;
870                }
871    
872                if (!shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) {
873                    // do not log exhausted
874                    return;
875                }
876            }
877    
878            LoggingLevel newLogLevel;
879            boolean logStackTrace;
880            if (exchange.isRollbackOnly()) {
881                newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
882                logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
883            } else if (shouldRedeliver) {
884                newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
885                logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace();
886            } else {
887                newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
888                logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace();
889            }
890            if (e == null) {
891                e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class);
892            }
893    
894            if (exchange.isRollbackOnly()) {
895                String msg = "Rollback " + ExchangeHelper.logIds(exchange);
896                Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class);
897                if (cause != null) {
898                    msg = msg + " due: " + cause.getMessage();
899                }
900                if (newLogLevel == LoggingLevel.ERROR) {
901                    // log intended rollback on maximum WARN level (no ERROR)
902                    logger.log(msg, LoggingLevel.WARN);
903                } else {
904                    // otherwise use the desired logging level
905                    logger.log(msg, newLogLevel);
906                }
907            } else if (e != null && logStackTrace) {
908                logger.log(message, e, newLogLevel);
909            } else {
910                logger.log(message, newLogLevel);
911            }
912        }
913    
914        /**
915         * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback).
916         * <p/>
917         * If the exchange is exhausted, then we will not continue processing, but let the
918         * failure processor deal with the exchange.
919         *
920         * @param exchange the current exchange
921         * @param data     the redelivery data
922         * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust.
923         */
924        private boolean isExhausted(Exchange exchange, RedeliveryData data) {
925            // if marked as rollback only then do not continue/redeliver
926            boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
927            if (exhausted) {
928                log.trace("This exchange is marked as redelivery exhausted: {}", exchange);
929                return true;
930            }
931    
932            // if marked as rollback only then do not continue/redeliver
933            boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class);
934            if (rollbackOnly) {
935                log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange);
936                return true;
937            }
938            // its the first original call so continue
939            if (data.redeliveryCounter == 0) {
940                return false;
941            }
942            // its a potential redelivery so determine if we should redeliver or not
943            boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate);
944            return !redeliver;
945        }
946    
947        /**
948         * Determines whether or not to continue if we are exhausted.
949         *
950         * @param exchange the current exchange
951         * @param data     the redelivery data
952         * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust.
953         */
954        private boolean shouldContinue(Exchange exchange, RedeliveryData data) {
955            if (data.continuedPredicate != null) {
956                return data.continuedPredicate.matches(exchange);
957            }
958            // do not continue by default
959            return false;
960        }
961    
962        /**
963         * Determines whether or not to handle if we are exhausted.
964         *
965         * @param exchange the current exchange
966         * @param data     the redelivery data
967         * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust.
968         */
969        private boolean shouldHandled(Exchange exchange, RedeliveryData data) {
970            if (data.handledPredicate != null) {
971                return data.handledPredicate.matches(exchange);
972            }
973            // do not handle by default
974            return false;
975        }
976    
977        /**
978         * Increments the redelivery counter and adds the redelivered flag if the
979         * message has been redelivered
980         */
981        private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) {
982            Message in = exchange.getIn();
983            Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
984            int next = 1;
985            if (counter != null) {
986                next = counter + 1;
987            }
988            in.setHeader(Exchange.REDELIVERY_COUNTER, next);
989            in.setHeader(Exchange.REDELIVERED, Boolean.TRUE);
990            // if maximum redeliveries is used, then provide that information as well
991            if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) {
992                in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries());
993            }
994            return next;
995        }
996    
997        /**
998         * Prepares the redelivery counter and boolean flag for the failure handle processor
999         */
1000        private void decrementRedeliveryCounter(Exchange exchange) {
1001            Message in = exchange.getIn();
1002            Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
1003            if (counter != null) {
1004                int prev = counter - 1;
1005                in.setHeader(Exchange.REDELIVERY_COUNTER, prev);
1006                // set boolean flag according to counter
1007                in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE);
1008            } else {
1009                // not redelivered
1010                in.setHeader(Exchange.REDELIVERY_COUNTER, 0);
1011                in.setHeader(Exchange.REDELIVERED, Boolean.FALSE);
1012            }
1013        }
1014    
1015        /**
1016         * Determines if redelivery is enabled by checking if any of the redelivery policy
1017         * settings may allow redeliveries.
1018         *
1019         * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise
1020         * @throws Exception can be thrown
1021         */
1022        private boolean determineIfRedeliveryIsEnabled() throws Exception {
1023            // determine if redeliver is enabled either on error handler
1024            if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) {
1025                // must check for != 0 as (-1 means redeliver forever)
1026                return true;
1027            }
1028            if (retryWhilePolicy != null) {
1029                return true;
1030            }
1031    
1032            // or on the exception policies
1033            if (!exceptionPolicies.isEmpty()) {
1034                // walk them to see if any of them have a maximum redeliveries > 0 or retry until set
1035                for (OnExceptionDefinition def : exceptionPolicies.values()) {
1036    
1037                    String ref = def.getRedeliveryPolicyRef();
1038                    if (ref != null) {
1039                        // lookup in registry if ref provided
1040                        RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class);
1041                        if (policy.getMaximumRedeliveries() != 0) {
1042                            // must check for != 0 as (-1 means redeliver forever)
1043                            return true;
1044                        }
1045                    } else if (def.getRedeliveryPolicy() != null) {
1046                        Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries());
1047                        if (max != null && max != 0) {
1048                            // must check for != 0 as (-1 means redeliver forever)
1049                            return true;
1050                        }
1051                    }
1052    
1053                    if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) {
1054                        return true;
1055                    }
1056                }
1057            }
1058    
1059            return false;
1060        }
1061    
1062        @Override
1063        protected void doStart() throws Exception {
1064            ServiceHelper.startServices(output, outputAsync, deadLetter);
1065    
1066            // determine if redeliver is enabled or not
1067            redeliveryEnabled = determineIfRedeliveryIsEnabled();
1068            if (log.isDebugEnabled()) {
1069                log.debug("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this);
1070            }
1071    
1072            // we only need thread pool if redelivery is enabled
1073            if (redeliveryEnabled) {
1074                if (executorService == null) {
1075                    // use default shared executor service
1076                    executorService = camelContext.getErrorHandlerExecutorService();
1077                }
1078                if (log.isTraceEnabled()) {
1079                    log.trace("Using ExecutorService: {} for redeliveries on error handler: {}", executorService, this);
1080                }
1081            }
1082        }
1083    
1084        @Override
1085        protected void doStop() throws Exception {
1086            // noop, do not stop any services which we only do when shutting down
1087            // as the error handler can be context scoped, and should not stop in case
1088            // a route stops
1089        }
1090    
1091        @Override
1092        protected void doShutdown() throws Exception {
1093            ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync);
1094        }
1095    }