001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.Callable; 020 import java.util.concurrent.RejectedExecutionException; 021 import java.util.concurrent.ScheduledExecutorService; 022 import java.util.concurrent.TimeUnit; 023 024 import org.apache.camel.AsyncCallback; 025 import org.apache.camel.AsyncProcessor; 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.LoggingLevel; 029 import org.apache.camel.Message; 030 import org.apache.camel.Predicate; 031 import org.apache.camel.Processor; 032 import org.apache.camel.model.OnExceptionDefinition; 033 import org.apache.camel.spi.SubUnitOfWorkCallback; 034 import org.apache.camel.spi.UnitOfWork; 035 import org.apache.camel.util.AsyncProcessorConverterHelper; 036 import org.apache.camel.util.AsyncProcessorHelper; 037 import org.apache.camel.util.CamelContextHelper; 038 import org.apache.camel.util.CamelLogger; 039 import org.apache.camel.util.EventHelper; 040 import org.apache.camel.util.ExchangeHelper; 041 import org.apache.camel.util.MessageHelper; 042 import org.apache.camel.util.ObjectHelper; 043 import org.apache.camel.util.ServiceHelper; 044 045 /** 046 * Base redeliverable error handler that also supports a final dead letter queue in case 047 * all redelivery attempts fail. 048 * <p/> 049 * This implementation should contain all the error handling logic and the sub classes 050 * should only configure it according to what they support. 051 * 052 * @version 053 */ 054 public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor { 055 056 protected ScheduledExecutorService executorService; 057 protected final CamelContext camelContext; 058 protected final Processor deadLetter; 059 protected final String deadLetterUri; 060 protected final Processor output; 061 protected final AsyncProcessor outputAsync; 062 protected final Processor redeliveryProcessor; 063 protected final RedeliveryPolicy redeliveryPolicy; 064 protected final Predicate retryWhilePolicy; 065 protected final CamelLogger logger; 066 protected final boolean useOriginalMessagePolicy; 067 protected boolean redeliveryEnabled; 068 069 /** 070 * Contains the current redelivery data 071 */ 072 protected class RedeliveryData { 073 Exchange original; 074 boolean sync = true; 075 int redeliveryCounter; 076 long redeliveryDelay; 077 Predicate retryWhilePredicate = retryWhilePolicy; 078 boolean redeliverFromSync; 079 080 // default behavior which can be overloaded on a per exception basis 081 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy; 082 Processor deadLetterProcessor = deadLetter; 083 Processor failureProcessor; 084 Processor onRedeliveryProcessor = redeliveryProcessor; 085 Predicate handledPredicate = getDefaultHandledPredicate(); 086 Predicate continuedPredicate; 087 boolean useOriginalInMessage = useOriginalMessagePolicy; 088 boolean asyncDelayedRedelivery = redeliveryPolicy.isAsyncDelayedRedelivery(); 089 } 090 091 /** 092 * Tasks which performs asynchronous redelivery attempts, and being triggered by a 093 * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task 094 * has to be delayed before a redelivery attempt is performed. 095 */ 096 private class AsyncRedeliveryTask implements Callable<Boolean> { 097 098 private final Exchange exchange; 099 private final AsyncCallback callback; 100 private final RedeliveryData data; 101 102 public AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) { 103 this.exchange = exchange; 104 this.callback = callback; 105 this.data = data; 106 } 107 108 public Boolean call() throws Exception { 109 // prepare for redelivery 110 prepareExchangeForRedelivery(exchange, data); 111 112 // letting onRedeliver be executed at first 113 deliverToOnRedeliveryProcessor(exchange, data); 114 115 if (log.isTraceEnabled()) { 116 log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", new Object[]{exchange.getExchangeId(), outputAsync, exchange}); 117 } 118 119 // emmit event we are doing redelivery 120 EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter); 121 122 // process the exchange (also redelivery) 123 boolean sync; 124 if (data.redeliverFromSync) { 125 // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from 126 // this error handler, which means we have to invoke the callback with false, to have the callback 127 // be notified when we are done 128 sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() { 129 public void done(boolean doneSync) { 130 log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); 131 132 // mark we are in sync mode now 133 data.sync = false; 134 135 // only process if the exchange hasn't failed 136 // and it has not been handled by the error processor 137 if (isDone(exchange)) { 138 callback.done(false); 139 return; 140 } 141 142 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 143 processAsyncErrorHandler(exchange, callback, data); 144 } 145 }); 146 } else { 147 // this redelivery task was scheduled from asynchronous, which means we should only 148 // handle when the asynchronous task was done 149 sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() { 150 public void done(boolean doneSync) { 151 log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); 152 153 // this callback should only handle the async case 154 if (doneSync) { 155 return; 156 } 157 158 // mark we are in async mode now 159 data.sync = false; 160 161 // only process if the exchange hasn't failed 162 // and it has not been handled by the error processor 163 if (isDone(exchange)) { 164 callback.done(doneSync); 165 return; 166 } 167 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 168 processAsyncErrorHandler(exchange, callback, data); 169 } 170 }); 171 } 172 173 return sync; 174 } 175 } 176 177 public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, 178 Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter, 179 String deadLetterUri, boolean useOriginalMessagePolicy, Predicate retryWhile, ScheduledExecutorService executorService) { 180 181 ObjectHelper.notNull(camelContext, "CamelContext", this); 182 ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this); 183 184 this.camelContext = camelContext; 185 this.redeliveryProcessor = redeliveryProcessor; 186 this.deadLetter = deadLetter; 187 this.output = output; 188 this.outputAsync = AsyncProcessorConverterHelper.convert(output); 189 this.redeliveryPolicy = redeliveryPolicy; 190 this.logger = logger; 191 this.deadLetterUri = deadLetterUri; 192 this.useOriginalMessagePolicy = useOriginalMessagePolicy; 193 this.retryWhilePolicy = retryWhile; 194 this.executorService = executorService; 195 } 196 197 public boolean supportTransacted() { 198 return false; 199 } 200 201 @Override 202 public boolean isRunAllowed() { 203 // determine if we can still run, or the camel context is forcing a shutdown 204 boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this); 205 if (forceShutdown) { 206 log.trace("Run not allowed as ShutdownStrategy is forcing shutting down"); 207 } 208 return !forceShutdown && super.isRunAllowed(); 209 } 210 211 public void process(Exchange exchange) throws Exception { 212 if (output == null) { 213 // no output then just return 214 return; 215 } 216 AsyncProcessorHelper.process(this, exchange); 217 } 218 219 public boolean process(Exchange exchange, final AsyncCallback callback) { 220 return processErrorHandler(exchange, callback, new RedeliveryData()); 221 } 222 223 /** 224 * Process the exchange using redelivery error handling. 225 */ 226 protected boolean processErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 227 228 // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the 229 // original Exchange is being redelivered, and not a mutated Exchange 230 data.original = defensiveCopyExchangeIfNeeded(exchange); 231 232 // use looping to have redelivery attempts 233 while (true) { 234 235 // can we still run 236 if (!isRunAllowed()) { 237 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 238 if (exchange.getException() == null) { 239 exchange.setException(new RejectedExecutionException()); 240 } 241 // we cannot process so invoke callback 242 callback.done(data.sync); 243 return data.sync; 244 } 245 246 // did previous processing cause an exception? 247 boolean handle = shouldHandleException(exchange); 248 if (handle) { 249 handleException(exchange, data); 250 } 251 252 // compute if we are exhausted or not 253 boolean exhausted = isExhausted(exchange, data); 254 if (exhausted) { 255 Processor target = null; 256 boolean deliver = true; 257 258 // the unit of work may have an optional callback associated we need to leverage 259 SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback(); 260 if (uowCallback != null) { 261 // signal to the callback we are exhausted 262 uowCallback.onExhausted(exchange); 263 // do not deliver to the failure processor as its been handled by the callback instead 264 deliver = false; 265 } 266 267 if (deliver) { 268 // should deliver to failure processor (either from onException or the dead letter channel) 269 target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor; 270 } 271 // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair 272 // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) 273 boolean sync = deliverToFailureProcessor(target, exchange, data, callback); 274 // we are breaking out 275 return sync; 276 } 277 278 if (data.redeliveryCounter > 0) { 279 // calculate delay 280 data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter); 281 282 if (data.redeliveryDelay > 0) { 283 // okay there is a delay so create a scheduled task to have it executed in the future 284 285 if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) { 286 287 // we are doing a redelivery then a thread pool must be configured (see the doStart method) 288 ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this); 289 290 // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to 291 // have it being executed in the future, or immediately 292 // we are continuing asynchronously 293 294 // mark we are routing async from now and that this redelivery task came from a synchronous routing 295 data.sync = false; 296 data.redeliverFromSync = true; 297 AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); 298 299 // schedule the redelivery task 300 if (log.isTraceEnabled()) { 301 log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId()); 302 } 303 executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS); 304 305 return false; 306 } else { 307 // async delayed redelivery was disabled or we are transacted so we must be synchronous 308 // as the transaction manager requires to execute in the same thread context 309 try { 310 data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); 311 } catch (InterruptedException e) { 312 // we was interrupted so break out 313 exchange.setException(e); 314 // mark the exchange to stop continue routing when interrupted 315 // as we do not want to continue routing (for example a task has been cancelled) 316 exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); 317 callback.done(data.sync); 318 return data.sync; 319 } 320 } 321 } 322 323 // prepare for redelivery 324 prepareExchangeForRedelivery(exchange, data); 325 326 // letting onRedeliver be executed 327 deliverToOnRedeliveryProcessor(exchange, data); 328 329 // emmit event we are doing redelivery 330 EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter); 331 } 332 333 // process the exchange (also redelivery) 334 boolean sync = AsyncProcessorHelper.process(outputAsync, exchange, new AsyncCallback() { 335 public void done(boolean sync) { 336 // this callback should only handle the async case 337 if (sync) { 338 return; 339 } 340 341 // mark we are in async mode now 342 data.sync = false; 343 344 // if we are done then notify callback and exit 345 if (isDone(exchange)) { 346 callback.done(sync); 347 return; 348 } 349 350 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 351 // method which takes care of this in a asynchronous manner 352 processAsyncErrorHandler(exchange, callback, data); 353 } 354 }); 355 356 if (!sync) { 357 // the remainder of the Exchange is being processed asynchronously so we should return 358 return false; 359 } 360 // we continue to route synchronously 361 362 // if we are done then notify callback and exit 363 boolean done = isDone(exchange); 364 if (done) { 365 callback.done(true); 366 return true; 367 } 368 369 // error occurred so loop back around..... 370 } 371 } 372 373 /** 374 * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY} 375 * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p> 376 * 377 * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay} 378 * and {@link RedeliveryData#redeliveryCounter} are copied in.</p> 379 * 380 * @param exchange The current exchange in question. 381 * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation. 382 * @param redeliveryDelay The default redelivery delay from RedeliveryData 383 * @param redeliveryCounter The redeliveryCounter 384 * @return The time to wait before the next redelivery. 385 */ 386 protected long determineRedeliveryDelay(Exchange exchange, RedeliveryPolicy redeliveryPolicy, long redeliveryDelay, int redeliveryCounter) { 387 Message message = exchange.getIn(); 388 Long delay = message.getHeader(Exchange.REDELIVERY_DELAY, Long.class); 389 if (delay == null) { 390 delay = redeliveryPolicy.calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter); 391 log.debug("Redelivery delay calculated as {}", delay); 392 } else { 393 log.debug("Redelivery delay is {} from Message Header [{}]", delay, Exchange.REDELIVERY_DELAY); 394 } 395 return delay; 396 } 397 398 /** 399 * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback. 400 * <p/> 401 * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use 402 * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b> 403 * in terms of logic. 404 */ 405 protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 406 // can we still run 407 if (!isRunAllowed()) { 408 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 409 if (exchange.getException() == null) { 410 exchange.setException(new RejectedExecutionException()); 411 } 412 callback.done(data.sync); 413 return; 414 } 415 416 // did previous processing cause an exception? 417 boolean handle = shouldHandleException(exchange); 418 if (handle) { 419 handleException(exchange, data); 420 } 421 422 // compute if we are exhausted or not 423 boolean exhausted = isExhausted(exchange, data); 424 if (exhausted) { 425 Processor target = null; 426 boolean deliver = true; 427 428 // the unit of work may have an optional callback associated we need to leverage 429 SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback(); 430 if (uowCallback != null) { 431 // signal to the callback we are exhausted 432 uowCallback.onExhausted(exchange); 433 // do not deliver to the failure processor as its been handled by the callback instead 434 deliver = false; 435 } 436 437 if (deliver) { 438 // should deliver to failure processor (either from onException or the dead letter channel) 439 target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor; 440 } 441 // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair 442 // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) 443 deliverToFailureProcessor(target, exchange, data, callback); 444 // we are breaking out 445 return; 446 } 447 448 if (data.redeliveryCounter > 0) { 449 // we are doing a redelivery then a thread pool must be configured (see the doStart method) 450 ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this); 451 452 // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to 453 // have it being executed in the future, or immediately 454 // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously 455 // to ensure the callback will continue routing from where we left 456 AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); 457 458 // calculate the redelivery delay 459 data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter); 460 if (data.redeliveryDelay > 0) { 461 // schedule the redelivery task 462 if (log.isTraceEnabled()) { 463 log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId()); 464 } 465 executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS); 466 } else { 467 // execute the task immediately 468 executorService.submit(task); 469 } 470 } 471 } 472 473 /** 474 * Performs a defensive copy of the exchange if needed 475 * 476 * @param exchange the exchange 477 * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled). 478 */ 479 protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) { 480 // only do a defensive copy if redelivery is enabled 481 if (redeliveryEnabled) { 482 return ExchangeHelper.createCopy(exchange, true); 483 } else { 484 return null; 485 } 486 } 487 488 /** 489 * Strategy whether the exchange has an exception that we should try to handle. 490 * <p/> 491 * Standard implementations should just look for an exception. 492 */ 493 protected boolean shouldHandleException(Exchange exchange) { 494 return exchange.getException() != null; 495 } 496 497 /** 498 * Strategy to determine if the exchange is done so we can continue 499 */ 500 protected boolean isDone(Exchange exchange) { 501 boolean answer = isCancelledOrInterrupted(exchange); 502 503 // only done if the exchange hasn't failed 504 // and it has not been handled by the failure processor 505 // or we are exhausted 506 if (!answer) { 507 answer = exchange.getException() == null 508 || ExchangeHelper.isFailureHandled(exchange) 509 || ExchangeHelper.isRedeliveryExhausted(exchange); 510 } 511 512 log.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer); 513 return answer; 514 } 515 516 /** 517 * Strategy to determine if the exchange was cancelled or interrupted 518 */ 519 protected boolean isCancelledOrInterrupted(Exchange exchange) { 520 boolean answer = false; 521 522 if (ExchangeHelper.isInterrupted(exchange)) { 523 // mark the exchange to stop continue routing when interrupted 524 // as we do not want to continue routing (for example a task has been cancelled) 525 exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); 526 answer = true; 527 } 528 529 log.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer); 530 return answer; 531 } 532 533 /** 534 * Returns the output processor 535 */ 536 public Processor getOutput() { 537 return output; 538 } 539 540 /** 541 * Returns the dead letter that message exchanges will be sent to if the 542 * redelivery attempts fail 543 */ 544 public Processor getDeadLetter() { 545 return deadLetter; 546 } 547 548 public String getDeadLetterUri() { 549 return deadLetterUri; 550 } 551 552 public boolean isUseOriginalMessagePolicy() { 553 return useOriginalMessagePolicy; 554 } 555 556 public RedeliveryPolicy getRedeliveryPolicy() { 557 return redeliveryPolicy; 558 } 559 560 public CamelLogger getLogger() { 561 return logger; 562 } 563 564 protected Predicate getDefaultHandledPredicate() { 565 // Default is not not handle errors 566 return null; 567 } 568 569 protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data) { 570 Exception caught = exchange.getException(); 571 572 // we continue so clear any exceptions 573 exchange.setException(null); 574 // clear rollback flags 575 exchange.setProperty(Exchange.ROLLBACK_ONLY, null); 576 // reset cached streams so they can be read again 577 MessageHelper.resetStreamCache(exchange.getIn()); 578 579 // its continued then remove traces of redelivery attempted and caught exception 580 exchange.getIn().removeHeader(Exchange.REDELIVERED); 581 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); 582 exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); 583 exchange.removeProperty(Exchange.FAILURE_HANDLED); 584 // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception 585 586 // create log message 587 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); 588 msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; 589 msg = msg + ". Handled and continue routing."; 590 591 // log that we failed but want to continue 592 logFailedDelivery(false, false, true, exchange, msg, data, null); 593 } 594 595 protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) { 596 if (!redeliveryEnabled) { 597 throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly."); 598 } 599 // there must be a defensive copy of the exchange 600 ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this); 601 602 // okay we will give it another go so clear the exception so we can try again 603 exchange.setException(null); 604 605 // clear rollback flags 606 exchange.setProperty(Exchange.ROLLBACK_ONLY, null); 607 608 // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange 609 // and then put these on the exchange when doing a redelivery / fault processor 610 611 // preserve these headers 612 Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 613 Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class); 614 Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class); 615 616 // we are redelivering so copy from original back to exchange 617 exchange.getIn().copyFrom(data.original.getIn()); 618 exchange.setOut(null); 619 // reset cached streams so they can be read again 620 MessageHelper.resetStreamCache(exchange.getIn()); 621 622 // put back headers 623 if (redeliveryCounter != null) { 624 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter); 625 } 626 if (redeliveryMaxCounter != null) { 627 exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter); 628 } 629 if (redelivered != null) { 630 exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered); 631 } 632 } 633 634 protected void handleException(Exchange exchange, RedeliveryData data) { 635 Exception e = exchange.getException(); 636 637 // store the original caused exception in a property, so we can restore it later 638 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); 639 640 // find the error handler to use (if any) 641 OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e); 642 if (exceptionPolicy != null) { 643 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy); 644 data.handledPredicate = exceptionPolicy.getHandledPolicy(); 645 data.continuedPredicate = exceptionPolicy.getContinuedPolicy(); 646 data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy(); 647 data.useOriginalInMessage = exceptionPolicy.isUseOriginalMessage(); 648 data.asyncDelayedRedelivery = exceptionPolicy.isAsyncDelayedRedelivery(exchange.getContext()); 649 650 // route specific failure handler? 651 Processor processor = null; 652 UnitOfWork uow = exchange.getUnitOfWork(); 653 if (uow != null && uow.getRouteContext() != null) { 654 String routeId = uow.getRouteContext().getRoute().getId(); 655 processor = exceptionPolicy.getErrorHandler(routeId); 656 } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) { 657 // note this should really not happen, but we have this code as a fail safe 658 // to be backwards compatible with the old behavior 659 log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId()); 660 processor = exceptionPolicy.getErrorHandlers().iterator().next(); 661 } 662 if (processor != null) { 663 data.failureProcessor = processor; 664 } 665 666 // route specific on redelivery? 667 processor = exceptionPolicy.getOnRedelivery(); 668 if (processor != null) { 669 data.onRedeliveryProcessor = processor; 670 } 671 } 672 673 // only log if not failure handled or not an exhausted unit of work 674 if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) { 675 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange) 676 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e; 677 logFailedDelivery(true, false, false, exchange, msg, data, e); 678 } 679 680 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data); 681 } 682 683 /** 684 * Gives an optional configure redelivery processor a chance to process before the Exchange 685 * will be redelivered. This can be used to alter the Exchange. 686 */ 687 protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) { 688 if (data.onRedeliveryProcessor == null) { 689 return; 690 } 691 692 if (log.isTraceEnabled()) { 693 log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered", 694 data.onRedeliveryProcessor, exchange); 695 } 696 697 // run this synchronously as its just a Processor 698 try { 699 data.onRedeliveryProcessor.process(exchange); 700 } catch (Throwable e) { 701 exchange.setException(e); 702 } 703 log.trace("Redelivery processor done"); 704 } 705 706 /** 707 * All redelivery attempts failed so move the exchange to the dead letter queue 708 */ 709 protected boolean deliverToFailureProcessor(final Processor processor, final Exchange exchange, 710 final RedeliveryData data, final AsyncCallback callback) { 711 boolean sync = true; 712 713 Exception caught = exchange.getException(); 714 715 // we did not success with the redelivery so now we let the failure processor handle it 716 // clear exception as we let the failure processor handle it 717 exchange.setException(null); 718 719 final boolean shouldHandle = shouldHandled(exchange, data); 720 final boolean shouldContinue = shouldContinue(exchange, data); 721 // regard both handled or continued as being handled 722 boolean handled = false; 723 724 if (shouldHandle || shouldContinue) { 725 // its handled then remove traces of redelivery attempted 726 exchange.getIn().removeHeader(Exchange.REDELIVERED); 727 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); 728 exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); 729 exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); 730 731 // and remove traces of rollback only and uow exhausted markers 732 exchange.removeProperty(Exchange.ROLLBACK_ONLY); 733 exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED); 734 735 handled = true; 736 } else { 737 // must decrement the redelivery counter as we didn't process the redelivery but is 738 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync 739 decrementRedeliveryCounter(exchange); 740 } 741 742 // is the a failure processor to process the Exchange 743 if (processor != null) { 744 745 // prepare original IN body if it should be moved instead of current body 746 if (data.useOriginalInMessage) { 747 log.trace("Using the original IN message instead of current"); 748 Message original = exchange.getUnitOfWork().getOriginalInMessage(); 749 exchange.setIn(original); 750 if (exchange.hasOut()) { 751 log.trace("Removing the out message to avoid some uncertain behavior"); 752 exchange.setOut(null); 753 } 754 } 755 756 // reset cached streams so they can be read again 757 MessageHelper.resetStreamCache(exchange.getIn()); 758 759 log.trace("Failure processor {} is processing Exchange: {}", processor, exchange); 760 761 // store the last to endpoint as the failure endpoint 762 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 763 // and store the route id so we know in which route we failed 764 if (exchange.getUnitOfWork().getRouteContext() != null) { 765 exchange.setProperty(Exchange.FAILURE_ROUTE_ID, exchange.getUnitOfWork().getRouteContext().getRoute().getId()); 766 } 767 768 // the failure processor could also be asynchronous 769 AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor); 770 sync = AsyncProcessorHelper.process(afp, exchange, new AsyncCallback() { 771 public void done(boolean sync) { 772 log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange); 773 try { 774 prepareExchangeAfterFailure(exchange, data, shouldHandle, shouldContinue); 775 // fire event as we had a failure processor to handle it, which there is a event for 776 boolean deadLetterChannel = processor == data.deadLetterProcessor && data.deadLetterProcessor != null; 777 EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel); 778 } finally { 779 // if the fault was handled asynchronously, this should be reflected in the callback as well 780 data.sync &= sync; 781 callback.done(data.sync); 782 } 783 } 784 }); 785 } else { 786 try { 787 // no processor but we need to prepare after failure as well 788 prepareExchangeAfterFailure(exchange, data, shouldHandle, shouldContinue); 789 } finally { 790 // callback we are done 791 callback.done(data.sync); 792 } 793 } 794 795 // create log message 796 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); 797 msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; 798 if (processor != null) { 799 msg = msg + ". Processed by failure processor: " + processor; 800 } 801 802 // log that we failed delivery as we are exhausted 803 logFailedDelivery(false, handled, false, exchange, msg, data, null); 804 805 return sync; 806 } 807 808 protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data, 809 final boolean shouldHandle, final boolean shouldContinue) { 810 // we could not process the exchange so we let the failure processor handled it 811 ExchangeHelper.setFailureHandled(exchange); 812 813 // honor if already set a handling 814 boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null; 815 if (alreadySet) { 816 boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class); 817 log.trace("This exchange has already been marked for handling: {}", handled); 818 if (handled) { 819 exchange.setException(null); 820 } else { 821 // exception not handled, put exception back in the exchange 822 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 823 // and put failure endpoint back as well 824 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 825 } 826 return; 827 } 828 829 if (shouldHandle) { 830 log.trace("This exchange is handled so its marked as not failed: {}", exchange); 831 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE); 832 } else if (shouldContinue) { 833 log.trace("This exchange is continued: {}", exchange); 834 // okay we want to continue then prepare the exchange for that as well 835 prepareExchangeForContinue(exchange, data); 836 } else { 837 log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange); 838 // exception not handled, put exception back in the exchange 839 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE); 840 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 841 // and put failure endpoint back as well 842 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 843 // and store the route id so we know in which route we failed 844 if (exchange.getUnitOfWork().getRouteContext() != null) { 845 exchange.setProperty(Exchange.FAILURE_ROUTE_ID, exchange.getUnitOfWork().getRouteContext().getRoute().getId()); 846 } 847 } 848 } 849 850 private void logFailedDelivery(boolean shouldRedeliver, boolean handled, boolean continued, Exchange exchange, String message, RedeliveryData data, Throwable e) { 851 if (logger == null) { 852 return; 853 } 854 855 if (!exchange.isRollbackOnly()) { 856 // if we should not rollback, then check whether logging is enabled 857 if (handled && !data.currentRedeliveryPolicy.isLogHandled()) { 858 // do not log handled 859 return; 860 } 861 862 if (continued && !data.currentRedeliveryPolicy.isLogContinued()) { 863 // do not log handled 864 return; 865 } 866 867 if (shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) { 868 // do not log retry attempts 869 return; 870 } 871 872 if (!shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) { 873 // do not log exhausted 874 return; 875 } 876 } 877 878 LoggingLevel newLogLevel; 879 boolean logStackTrace; 880 if (exchange.isRollbackOnly()) { 881 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 882 logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace(); 883 } else if (shouldRedeliver) { 884 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel(); 885 logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace(); 886 } else { 887 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 888 logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace(); 889 } 890 if (e == null) { 891 e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); 892 } 893 894 if (exchange.isRollbackOnly()) { 895 String msg = "Rollback " + ExchangeHelper.logIds(exchange); 896 Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class); 897 if (cause != null) { 898 msg = msg + " due: " + cause.getMessage(); 899 } 900 if (newLogLevel == LoggingLevel.ERROR) { 901 // log intended rollback on maximum WARN level (no ERROR) 902 logger.log(msg, LoggingLevel.WARN); 903 } else { 904 // otherwise use the desired logging level 905 logger.log(msg, newLogLevel); 906 } 907 } else if (e != null && logStackTrace) { 908 logger.log(message, e, newLogLevel); 909 } else { 910 logger.log(message, newLogLevel); 911 } 912 } 913 914 /** 915 * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback). 916 * <p/> 917 * If the exchange is exhausted, then we will not continue processing, but let the 918 * failure processor deal with the exchange. 919 * 920 * @param exchange the current exchange 921 * @param data the redelivery data 922 * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust. 923 */ 924 private boolean isExhausted(Exchange exchange, RedeliveryData data) { 925 // if marked as rollback only then do not continue/redeliver 926 boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class); 927 if (exhausted) { 928 log.trace("This exchange is marked as redelivery exhausted: {}", exchange); 929 return true; 930 } 931 932 // if marked as rollback only then do not continue/redeliver 933 boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class); 934 if (rollbackOnly) { 935 log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange); 936 return true; 937 } 938 // its the first original call so continue 939 if (data.redeliveryCounter == 0) { 940 return false; 941 } 942 // its a potential redelivery so determine if we should redeliver or not 943 boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate); 944 return !redeliver; 945 } 946 947 /** 948 * Determines whether or not to continue if we are exhausted. 949 * 950 * @param exchange the current exchange 951 * @param data the redelivery data 952 * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust. 953 */ 954 private boolean shouldContinue(Exchange exchange, RedeliveryData data) { 955 if (data.continuedPredicate != null) { 956 return data.continuedPredicate.matches(exchange); 957 } 958 // do not continue by default 959 return false; 960 } 961 962 /** 963 * Determines whether or not to handle if we are exhausted. 964 * 965 * @param exchange the current exchange 966 * @param data the redelivery data 967 * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust. 968 */ 969 private boolean shouldHandled(Exchange exchange, RedeliveryData data) { 970 if (data.handledPredicate != null) { 971 return data.handledPredicate.matches(exchange); 972 } 973 // do not handle by default 974 return false; 975 } 976 977 /** 978 * Increments the redelivery counter and adds the redelivered flag if the 979 * message has been redelivered 980 */ 981 private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) { 982 Message in = exchange.getIn(); 983 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 984 int next = 1; 985 if (counter != null) { 986 next = counter + 1; 987 } 988 in.setHeader(Exchange.REDELIVERY_COUNTER, next); 989 in.setHeader(Exchange.REDELIVERED, Boolean.TRUE); 990 // if maximum redeliveries is used, then provide that information as well 991 if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) { 992 in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries()); 993 } 994 return next; 995 } 996 997 /** 998 * Prepares the redelivery counter and boolean flag for the failure handle processor 999 */ 1000 private void decrementRedeliveryCounter(Exchange exchange) { 1001 Message in = exchange.getIn(); 1002 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 1003 if (counter != null) { 1004 int prev = counter - 1; 1005 in.setHeader(Exchange.REDELIVERY_COUNTER, prev); 1006 // set boolean flag according to counter 1007 in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE); 1008 } else { 1009 // not redelivered 1010 in.setHeader(Exchange.REDELIVERY_COUNTER, 0); 1011 in.setHeader(Exchange.REDELIVERED, Boolean.FALSE); 1012 } 1013 } 1014 1015 /** 1016 * Determines if redelivery is enabled by checking if any of the redelivery policy 1017 * settings may allow redeliveries. 1018 * 1019 * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise 1020 * @throws Exception can be thrown 1021 */ 1022 private boolean determineIfRedeliveryIsEnabled() throws Exception { 1023 // determine if redeliver is enabled either on error handler 1024 if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) { 1025 // must check for != 0 as (-1 means redeliver forever) 1026 return true; 1027 } 1028 if (retryWhilePolicy != null) { 1029 return true; 1030 } 1031 1032 // or on the exception policies 1033 if (!exceptionPolicies.isEmpty()) { 1034 // walk them to see if any of them have a maximum redeliveries > 0 or retry until set 1035 for (OnExceptionDefinition def : exceptionPolicies.values()) { 1036 1037 String ref = def.getRedeliveryPolicyRef(); 1038 if (ref != null) { 1039 // lookup in registry if ref provided 1040 RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class); 1041 if (policy.getMaximumRedeliveries() != 0) { 1042 // must check for != 0 as (-1 means redeliver forever) 1043 return true; 1044 } 1045 } else if (def.getRedeliveryPolicy() != null) { 1046 Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries()); 1047 if (max != null && max != 0) { 1048 // must check for != 0 as (-1 means redeliver forever) 1049 return true; 1050 } 1051 } 1052 1053 if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) { 1054 return true; 1055 } 1056 } 1057 } 1058 1059 return false; 1060 } 1061 1062 @Override 1063 protected void doStart() throws Exception { 1064 ServiceHelper.startServices(output, outputAsync, deadLetter); 1065 1066 // determine if redeliver is enabled or not 1067 redeliveryEnabled = determineIfRedeliveryIsEnabled(); 1068 if (log.isDebugEnabled()) { 1069 log.debug("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this); 1070 } 1071 1072 // we only need thread pool if redelivery is enabled 1073 if (redeliveryEnabled) { 1074 if (executorService == null) { 1075 // use default shared executor service 1076 executorService = camelContext.getErrorHandlerExecutorService(); 1077 } 1078 if (log.isTraceEnabled()) { 1079 log.trace("Using ExecutorService: {} for redeliveries on error handler: {}", executorService, this); 1080 } 1081 } 1082 } 1083 1084 @Override 1085 protected void doStop() throws Exception { 1086 // noop, do not stop any services which we only do when shutting down 1087 // as the error handler can be context scoped, and should not stop in case 1088 // a route stops 1089 } 1090 1091 @Override 1092 protected void doShutdown() throws Exception { 1093 ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync); 1094 } 1095 }