001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.Callable; 022import java.util.concurrent.RejectedExecutionException; 023import java.util.concurrent.ScheduledExecutorService; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicInteger; 027 028import org.apache.camel.AsyncCallback; 029import org.apache.camel.AsyncProcessor; 030import org.apache.camel.CamelContext; 031import org.apache.camel.Exchange; 032import org.apache.camel.LoggingLevel; 033import org.apache.camel.Message; 034import org.apache.camel.Navigate; 035import org.apache.camel.Predicate; 036import org.apache.camel.Processor; 037import org.apache.camel.model.OnExceptionDefinition; 038import org.apache.camel.spi.ExchangeFormatter; 039import org.apache.camel.spi.ShutdownPrepared; 040import org.apache.camel.spi.SubUnitOfWorkCallback; 041import org.apache.camel.spi.UnitOfWork; 042import org.apache.camel.util.AsyncProcessorConverterHelper; 043import org.apache.camel.util.AsyncProcessorHelper; 044import org.apache.camel.util.CamelContextHelper; 045import org.apache.camel.util.CamelLogger; 046import org.apache.camel.util.EventHelper; 047import org.apache.camel.util.ExchangeHelper; 048import org.apache.camel.util.MessageHelper; 049import org.apache.camel.util.ObjectHelper; 050import org.apache.camel.util.ServiceHelper; 051import org.apache.camel.util.URISupport; 052 053/** 054 * Base redeliverable error handler that also supports a final dead letter queue in case 055 * all redelivery attempts fail. 056 * <p/> 057 * This implementation should contain all the error handling logic and the sub classes 058 * should only configure it according to what they support. 059 * 060 * @version 061 */ 062public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> { 063 064 protected final AtomicInteger redeliverySleepCounter = new AtomicInteger(); 065 protected ScheduledExecutorService executorService; 066 protected final CamelContext camelContext; 067 protected final Processor deadLetter; 068 protected final String deadLetterUri; 069 protected final boolean deadLetterHandleNewException; 070 protected final Processor output; 071 protected final AsyncProcessor outputAsync; 072 protected final Processor redeliveryProcessor; 073 protected final RedeliveryPolicy redeliveryPolicy; 074 protected final Predicate retryWhilePolicy; 075 protected final CamelLogger logger; 076 protected final boolean useOriginalMessagePolicy; 077 protected boolean redeliveryEnabled; 078 protected volatile boolean preparingShutdown; 079 protected final ExchangeFormatter exchangeFormatter; 080 protected final boolean customExchangeFormatter; 081 protected final Processor onPrepareProcessor; 082 protected final Processor onExceptionProcessor; 083 084 /** 085 * Contains the current redelivery data 086 */ 087 protected class RedeliveryData { 088 Exchange original; 089 boolean sync = true; 090 int redeliveryCounter; 091 long redeliveryDelay; 092 Predicate retryWhilePredicate; 093 boolean redeliverFromSync; 094 095 // default behavior which can be overloaded on a per exception basis 096 RedeliveryPolicy currentRedeliveryPolicy; 097 Processor deadLetterProcessor; 098 Processor failureProcessor; 099 Processor onRedeliveryProcessor; 100 Processor onPrepareProcessor; 101 Processor onExceptionProcessor; 102 Predicate handledPredicate; 103 Predicate continuedPredicate; 104 boolean useOriginalInMessage; 105 boolean handleNewException; 106 107 public RedeliveryData() { 108 // init with values from the error handler 109 this.retryWhilePredicate = retryWhilePolicy; 110 this.currentRedeliveryPolicy = redeliveryPolicy; 111 this.deadLetterProcessor = deadLetter; 112 this.onRedeliveryProcessor = redeliveryProcessor; 113 this.onPrepareProcessor = RedeliveryErrorHandler.this.onPrepareProcessor; 114 this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor; 115 this.handledPredicate = getDefaultHandledPredicate(); 116 this.useOriginalInMessage = useOriginalMessagePolicy; 117 this.handleNewException = deadLetterHandleNewException; 118 } 119 } 120 121 /** 122 * Tasks which performs asynchronous redelivery attempts, and being triggered by a 123 * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task 124 * has to be delayed before a redelivery attempt is performed. 125 */ 126 private class AsyncRedeliveryTask implements Callable<Boolean> { 127 128 private final Exchange exchange; 129 private final AsyncCallback callback; 130 private final RedeliveryData data; 131 132 AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) { 133 this.exchange = exchange; 134 this.callback = callback; 135 this.data = data; 136 } 137 138 public Boolean call() throws Exception { 139 // prepare for redelivery 140 prepareExchangeForRedelivery(exchange, data); 141 142 // letting onRedeliver be executed at first 143 deliverToOnRedeliveryProcessor(exchange, data); 144 145 if (log.isTraceEnabled()) { 146 log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", new Object[]{exchange.getExchangeId(), outputAsync, exchange}); 147 } 148 149 // emmit event we are doing redelivery 150 EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter); 151 152 // process the exchange (also redelivery) 153 boolean sync; 154 if (data.redeliverFromSync) { 155 // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from 156 // this error handler, which means we have to invoke the callback with false, to have the callback 157 // be notified when we are done 158 sync = outputAsync.process(exchange, new AsyncCallback() { 159 public void done(boolean doneSync) { 160 log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); 161 162 // mark we are in sync mode now 163 data.sync = false; 164 165 // only process if the exchange hasn't failed 166 // and it has not been handled by the error processor 167 if (isDone(exchange)) { 168 callback.done(false); 169 return; 170 } 171 172 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 173 processAsyncErrorHandler(exchange, callback, data); 174 } 175 }); 176 } else { 177 // this redelivery task was scheduled from asynchronous, which means we should only 178 // handle when the asynchronous task was done 179 sync = outputAsync.process(exchange, new AsyncCallback() { 180 public void done(boolean doneSync) { 181 log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); 182 183 // this callback should only handle the async case 184 if (doneSync) { 185 return; 186 } 187 188 // mark we are in async mode now 189 data.sync = false; 190 191 // only process if the exchange hasn't failed 192 // and it has not been handled by the error processor 193 if (isDone(exchange)) { 194 callback.done(doneSync); 195 return; 196 } 197 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 198 processAsyncErrorHandler(exchange, callback, data); 199 } 200 }); 201 } 202 203 return sync; 204 } 205 } 206 207 public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, 208 Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter, 209 String deadLetterUri, boolean deadLetterHandleNewException, boolean useOriginalMessagePolicy, 210 Predicate retryWhile, ScheduledExecutorService executorService, Processor onPrepareProcessor, Processor onExceptionProcessor) { 211 212 ObjectHelper.notNull(camelContext, "CamelContext", this); 213 ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this); 214 215 this.camelContext = camelContext; 216 this.redeliveryProcessor = redeliveryProcessor; 217 this.deadLetter = deadLetter; 218 this.output = output; 219 this.outputAsync = AsyncProcessorConverterHelper.convert(output); 220 this.redeliveryPolicy = redeliveryPolicy; 221 this.logger = logger; 222 this.deadLetterUri = deadLetterUri; 223 this.deadLetterHandleNewException = deadLetterHandleNewException; 224 this.useOriginalMessagePolicy = useOriginalMessagePolicy; 225 this.retryWhilePolicy = retryWhile; 226 this.executorService = executorService; 227 this.onPrepareProcessor = onPrepareProcessor; 228 this.onExceptionProcessor = onExceptionProcessor; 229 230 if (ObjectHelper.isNotEmpty(redeliveryPolicy.getExchangeFormatterRef())) { 231 ExchangeFormatter formatter = camelContext.getRegistry().lookupByNameAndType(redeliveryPolicy.getExchangeFormatterRef(), ExchangeFormatter.class); 232 if (formatter != null) { 233 this.exchangeFormatter = formatter; 234 this.customExchangeFormatter = true; 235 } else { 236 throw new IllegalArgumentException("Cannot find the exchangeFormatter by using reference id " + redeliveryPolicy.getExchangeFormatterRef()); 237 } 238 } else { 239 this.customExchangeFormatter = false; 240 // setup exchange formatter to be used for message history dump 241 DefaultExchangeFormatter formatter = new DefaultExchangeFormatter(); 242 formatter.setShowExchangeId(true); 243 formatter.setMultiline(true); 244 formatter.setShowHeaders(true); 245 formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed); 246 try { 247 Integer maxChars = CamelContextHelper.parseInteger(camelContext, camelContext.getProperty(Exchange.LOG_DEBUG_BODY_MAX_CHARS)); 248 if (maxChars != null) { 249 formatter.setMaxChars(maxChars); 250 } 251 } catch (Exception e) { 252 throw ObjectHelper.wrapRuntimeCamelException(e); 253 } 254 this.exchangeFormatter = formatter; 255 } 256 } 257 258 public boolean supportTransacted() { 259 return false; 260 } 261 262 @Override 263 public boolean hasNext() { 264 return output != null; 265 } 266 267 @Override 268 public List<Processor> next() { 269 if (!hasNext()) { 270 return null; 271 } 272 List<Processor> answer = new ArrayList<Processor>(1); 273 answer.add(output); 274 return answer; 275 } 276 277 protected boolean isRunAllowed(RedeliveryData data) { 278 // if camel context is forcing a shutdown then do not allow running 279 boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this); 280 if (forceShutdown) { 281 log.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)"); 282 return false; 283 } 284 285 // redelivery policy can control if redelivery is allowed during stopping/shutdown 286 // but this only applies during a redelivery (counter must > 0) 287 if (data.redeliveryCounter > 0) { 288 if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) { 289 log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)"); 290 return true; 291 } else if (preparingShutdown) { 292 // we are preparing for shutdown, now determine if we can still run 293 boolean answer = isRunAllowedOnPreparingShutdown(); 294 log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer); 295 return answer; 296 } 297 } 298 299 // we cannot run if we are stopping/stopped 300 boolean answer = !isStoppingOrStopped(); 301 log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer); 302 return answer; 303 } 304 305 protected boolean isRunAllowedOnPreparingShutdown() { 306 return false; 307 } 308 309 protected boolean isRedeliveryAllowed(RedeliveryData data) { 310 // redelivery policy can control if redelivery is allowed during stopping/shutdown 311 // but this only applies during a redelivery (counter must > 0) 312 if (data.redeliveryCounter > 0) { 313 boolean stopping = isStoppingOrStopped(); 314 if (!preparingShutdown && !stopping) { 315 log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)"); 316 return true; 317 } else { 318 // we are stopping or preparing to shutdown 319 if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) { 320 log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)"); 321 return true; 322 } else { 323 log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)"); 324 return false; 325 } 326 } 327 } 328 329 return true; 330 } 331 332 @Override 333 public void prepareShutdown(boolean suspendOnly, boolean forced) { 334 // prepare for shutdown, eg do not allow redelivery if configured 335 log.trace("Prepare shutdown on error handler {}", this); 336 preparingShutdown = true; 337 } 338 339 public void process(Exchange exchange) throws Exception { 340 if (output == null) { 341 // no output then just return 342 return; 343 } 344 AsyncProcessorHelper.process(this, exchange); 345 } 346 347 /** 348 * Process the exchange using redelivery error handling. 349 */ 350 public boolean process(final Exchange exchange, final AsyncCallback callback) { 351 final RedeliveryData data = new RedeliveryData(); 352 353 // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the 354 // original Exchange is being redelivered, and not a mutated Exchange 355 data.original = defensiveCopyExchangeIfNeeded(exchange); 356 357 // use looping to have redelivery attempts 358 while (true) { 359 360 // can we still run 361 if (!isRunAllowed(data)) { 362 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 363 if (exchange.getException() == null) { 364 exchange.setException(new RejectedExecutionException()); 365 } 366 // we cannot process so invoke callback 367 callback.done(data.sync); 368 return data.sync; 369 } 370 371 // did previous processing cause an exception? 372 boolean handle = shouldHandleException(exchange); 373 if (handle) { 374 handleException(exchange, data, isDeadLetterChannel()); 375 onExceptionOccurred(exchange, data); 376 } 377 378 // compute if we are exhausted, and whether redelivery is allowed 379 boolean exhausted = isExhausted(exchange, data); 380 boolean redeliverAllowed = isRedeliveryAllowed(data); 381 382 // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC) 383 if (!redeliverAllowed || exhausted) { 384 Processor target = null; 385 boolean deliver = true; 386 387 // the unit of work may have an optional callback associated we need to leverage 388 SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback(); 389 if (uowCallback != null) { 390 // signal to the callback we are exhausted 391 uowCallback.onExhausted(exchange); 392 // do not deliver to the failure processor as its been handled by the callback instead 393 deliver = false; 394 } 395 396 if (deliver) { 397 // should deliver to failure processor (either from onException or the dead letter channel) 398 target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor; 399 } 400 // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair 401 // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) 402 boolean isDeadLetterChannel = isDeadLetterChannel() && (target == null || target == data.deadLetterProcessor); 403 boolean sync = deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback); 404 // we are breaking out 405 return sync; 406 } 407 408 if (data.redeliveryCounter > 0) { 409 // calculate delay 410 data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter); 411 412 if (data.redeliveryDelay > 0) { 413 // okay there is a delay so create a scheduled task to have it executed in the future 414 415 if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) { 416 417 // we are doing a redelivery then a thread pool must be configured (see the doStart method) 418 ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this); 419 420 // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to 421 // have it being executed in the future, or immediately 422 // we are continuing asynchronously 423 424 // mark we are routing async from now and that this redelivery task came from a synchronous routing 425 data.sync = false; 426 data.redeliverFromSync = true; 427 AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); 428 429 // schedule the redelivery task 430 if (log.isTraceEnabled()) { 431 log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId()); 432 } 433 executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS); 434 435 return false; 436 } else { 437 // async delayed redelivery was disabled or we are transacted so we must be synchronous 438 // as the transaction manager requires to execute in the same thread context 439 try { 440 // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping 441 redeliverySleepCounter.incrementAndGet(); 442 data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); 443 redeliverySleepCounter.decrementAndGet(); 444 } catch (InterruptedException e) { 445 redeliverySleepCounter.decrementAndGet(); 446 // we was interrupted so break out 447 exchange.setException(e); 448 // mark the exchange to stop continue routing when interrupted 449 // as we do not want to continue routing (for example a task has been cancelled) 450 exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); 451 callback.done(data.sync); 452 return data.sync; 453 } 454 } 455 } 456 457 // prepare for redelivery 458 prepareExchangeForRedelivery(exchange, data); 459 460 // letting onRedeliver be executed 461 deliverToOnRedeliveryProcessor(exchange, data); 462 463 // emmit event we are doing redelivery 464 EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter); 465 } 466 467 // process the exchange (also redelivery) 468 boolean sync = outputAsync.process(exchange, new AsyncCallback() { 469 public void done(boolean sync) { 470 // this callback should only handle the async case 471 if (sync) { 472 return; 473 } 474 475 // mark we are in async mode now 476 data.sync = false; 477 478 // if we are done then notify callback and exit 479 if (isDone(exchange)) { 480 callback.done(sync); 481 return; 482 } 483 484 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 485 // method which takes care of this in a asynchronous manner 486 processAsyncErrorHandler(exchange, callback, data); 487 } 488 }); 489 490 if (!sync) { 491 // the remainder of the Exchange is being processed asynchronously so we should return 492 return false; 493 } 494 // we continue to route synchronously 495 496 // if we are done then notify callback and exit 497 boolean done = isDone(exchange); 498 if (done) { 499 callback.done(true); 500 return true; 501 } 502 503 // error occurred so loop back around..... 504 } 505 } 506 507 /** 508 * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY} 509 * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p> 510 * 511 * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay} 512 * and {@link RedeliveryData#redeliveryCounter} are copied in.</p> 513 * 514 * @param exchange The current exchange in question. 515 * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation. 516 * @param redeliveryDelay The default redelivery delay from RedeliveryData 517 * @param redeliveryCounter The redeliveryCounter 518 * @return The time to wait before the next redelivery. 519 */ 520 protected long determineRedeliveryDelay(Exchange exchange, RedeliveryPolicy redeliveryPolicy, long redeliveryDelay, int redeliveryCounter) { 521 Message message = exchange.getIn(); 522 Long delay = message.getHeader(Exchange.REDELIVERY_DELAY, Long.class); 523 if (delay == null) { 524 delay = redeliveryPolicy.calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter); 525 log.debug("Redelivery delay calculated as {}", delay); 526 } else { 527 log.debug("Redelivery delay is {} from Message Header [{}]", delay, Exchange.REDELIVERY_DELAY); 528 } 529 return delay; 530 } 531 532 /** 533 * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback. 534 * <p/> 535 * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use 536 * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b> 537 * in terms of logic. 538 */ 539 protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 540 // can we still run 541 if (!isRunAllowed(data)) { 542 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 543 if (exchange.getException() == null) { 544 exchange.setException(new RejectedExecutionException()); 545 } 546 callback.done(data.sync); 547 return; 548 } 549 550 // did previous processing cause an exception? 551 boolean handle = shouldHandleException(exchange); 552 if (handle) { 553 handleException(exchange, data, isDeadLetterChannel()); 554 onExceptionOccurred(exchange, data); 555 } 556 557 // compute if we are exhausted or not 558 boolean exhausted = isExhausted(exchange, data); 559 if (exhausted) { 560 Processor target = null; 561 boolean deliver = true; 562 563 // the unit of work may have an optional callback associated we need to leverage 564 UnitOfWork uow = exchange.getUnitOfWork(); 565 if (uow != null) { 566 SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback(); 567 if (uowCallback != null) { 568 // signal to the callback we are exhausted 569 uowCallback.onExhausted(exchange); 570 // do not deliver to the failure processor as its been handled by the callback instead 571 deliver = false; 572 } 573 } 574 575 if (deliver) { 576 // should deliver to failure processor (either from onException or the dead letter channel) 577 target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor; 578 } 579 // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair 580 // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) 581 boolean isDeadLetterChannel = isDeadLetterChannel() && target == data.deadLetterProcessor; 582 deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback); 583 // we are breaking out 584 return; 585 } 586 587 if (data.redeliveryCounter > 0) { 588 // we are doing a redelivery then a thread pool must be configured (see the doStart method) 589 ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this); 590 591 // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to 592 // have it being executed in the future, or immediately 593 // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously 594 // to ensure the callback will continue routing from where we left 595 AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); 596 597 // calculate the redelivery delay 598 data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter); 599 600 if (data.redeliveryDelay > 0) { 601 // schedule the redelivery task 602 if (log.isTraceEnabled()) { 603 log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId()); 604 } 605 executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS); 606 } else { 607 // execute the task immediately 608 executorService.submit(task); 609 } 610 } 611 } 612 613 /** 614 * Performs a defensive copy of the exchange if needed 615 * 616 * @param exchange the exchange 617 * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled). 618 */ 619 protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) { 620 // only do a defensive copy if redelivery is enabled 621 if (redeliveryEnabled) { 622 return ExchangeHelper.createCopy(exchange, true); 623 } else { 624 return null; 625 } 626 } 627 628 /** 629 * Strategy whether the exchange has an exception that we should try to handle. 630 * <p/> 631 * Standard implementations should just look for an exception. 632 */ 633 protected boolean shouldHandleException(Exchange exchange) { 634 return exchange.getException() != null; 635 } 636 637 /** 638 * Strategy to determine if the exchange is done so we can continue 639 */ 640 protected boolean isDone(Exchange exchange) { 641 boolean answer = isCancelledOrInterrupted(exchange); 642 643 // only done if the exchange hasn't failed 644 // and it has not been handled by the failure processor 645 // or we are exhausted 646 if (!answer) { 647 answer = exchange.getException() == null 648 || ExchangeHelper.isFailureHandled(exchange) 649 || ExchangeHelper.isRedeliveryExhausted(exchange); 650 } 651 652 log.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer); 653 return answer; 654 } 655 656 /** 657 * Strategy to determine if the exchange was cancelled or interrupted 658 */ 659 protected boolean isCancelledOrInterrupted(Exchange exchange) { 660 boolean answer = false; 661 662 if (ExchangeHelper.isInterrupted(exchange)) { 663 // mark the exchange to stop continue routing when interrupted 664 // as we do not want to continue routing (for example a task has been cancelled) 665 exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); 666 answer = true; 667 } 668 669 log.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer); 670 return answer; 671 } 672 673 /** 674 * Returns the output processor 675 */ 676 public Processor getOutput() { 677 return output; 678 } 679 680 /** 681 * Returns the dead letter that message exchanges will be sent to if the 682 * redelivery attempts fail 683 */ 684 public Processor getDeadLetter() { 685 return deadLetter; 686 } 687 688 public String getDeadLetterUri() { 689 return deadLetterUri; 690 } 691 692 public boolean isUseOriginalMessagePolicy() { 693 return useOriginalMessagePolicy; 694 } 695 696 public boolean isDeadLetterHandleNewException() { 697 return deadLetterHandleNewException; 698 } 699 700 public RedeliveryPolicy getRedeliveryPolicy() { 701 return redeliveryPolicy; 702 } 703 704 public CamelLogger getLogger() { 705 return logger; 706 } 707 708 protected Predicate getDefaultHandledPredicate() { 709 // Default is not not handle errors 710 return null; 711 } 712 713 protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) { 714 Exception caught = exchange.getException(); 715 716 // we continue so clear any exceptions 717 exchange.setException(null); 718 // clear rollback flags 719 exchange.setProperty(Exchange.ROLLBACK_ONLY, null); 720 // reset cached streams so they can be read again 721 MessageHelper.resetStreamCache(exchange.getIn()); 722 723 // its continued then remove traces of redelivery attempted and caught exception 724 exchange.getIn().removeHeader(Exchange.REDELIVERED); 725 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); 726 exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); 727 exchange.removeProperty(Exchange.FAILURE_HANDLED); 728 // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception 729 730 // create log message 731 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); 732 msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; 733 msg = msg + ". Handled and continue routing."; 734 735 // log that we failed but want to continue 736 logFailedDelivery(false, false, false, true, isDeadLetterChannel, exchange, msg, data, null); 737 } 738 739 protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) { 740 if (!redeliveryEnabled) { 741 throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly."); 742 } 743 // there must be a defensive copy of the exchange 744 ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this); 745 746 // okay we will give it another go so clear the exception so we can try again 747 exchange.setException(null); 748 749 // clear rollback flags 750 exchange.setProperty(Exchange.ROLLBACK_ONLY, null); 751 752 // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange 753 // and then put these on the exchange when doing a redelivery / fault processor 754 755 // preserve these headers 756 Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 757 Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class); 758 Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class); 759 760 // we are redelivering so copy from original back to exchange 761 exchange.getIn().copyFrom(data.original.getIn()); 762 exchange.setOut(null); 763 // reset cached streams so they can be read again 764 MessageHelper.resetStreamCache(exchange.getIn()); 765 766 // put back headers 767 if (redeliveryCounter != null) { 768 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter); 769 } 770 if (redeliveryMaxCounter != null) { 771 exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter); 772 } 773 if (redelivered != null) { 774 exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered); 775 } 776 } 777 778 protected void handleException(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) { 779 Exception e = exchange.getException(); 780 781 // store the original caused exception in a property, so we can restore it later 782 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); 783 784 // find the error handler to use (if any) 785 OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e); 786 if (exceptionPolicy != null) { 787 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy); 788 data.handledPredicate = exceptionPolicy.getHandledPolicy(); 789 data.continuedPredicate = exceptionPolicy.getContinuedPolicy(); 790 data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy(); 791 data.useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy() != null && exceptionPolicy.getUseOriginalMessagePolicy(); 792 793 // route specific failure handler? 794 Processor processor = null; 795 UnitOfWork uow = exchange.getUnitOfWork(); 796 if (uow != null && uow.getRouteContext() != null) { 797 String routeId = uow.getRouteContext().getRoute().getId(); 798 processor = exceptionPolicy.getErrorHandler(routeId); 799 } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) { 800 // note this should really not happen, but we have this code as a fail safe 801 // to be backwards compatible with the old behavior 802 log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId()); 803 processor = exceptionPolicy.getErrorHandlers().iterator().next(); 804 } 805 if (processor != null) { 806 data.failureProcessor = processor; 807 } 808 809 // route specific on redelivery? 810 processor = exceptionPolicy.getOnRedelivery(); 811 if (processor != null) { 812 data.onRedeliveryProcessor = processor; 813 } 814 // route specific on exception occurred? 815 processor = exceptionPolicy.getOnExceptionOccurred(); 816 if (processor != null) { 817 data.onExceptionProcessor = processor; 818 } 819 } 820 821 // only log if not failure handled or not an exhausted unit of work 822 if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) { 823 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange) 824 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e; 825 logFailedDelivery(true, false, false, false, isDeadLetterChannel, exchange, msg, data, e); 826 } 827 828 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data); 829 } 830 831 /** 832 * Gives an optional configured OnExceptionOccurred processor a chance to process just after an exception 833 * was thrown while processing the Exchange. This allows to execute the processor at the same time the exception was thrown. 834 */ 835 protected void onExceptionOccurred(Exchange exchange, final RedeliveryData data) { 836 if (data.onExceptionProcessor == null) { 837 return; 838 } 839 840 // run this synchronously as its just a Processor 841 try { 842 if (log.isTraceEnabled()) { 843 log.trace("OnExceptionOccurred processor {} is processing Exchange: {} due exception occurred", data.onExceptionProcessor, exchange); 844 } 845 data.onExceptionProcessor.process(exchange); 846 } catch (Throwable e) { 847 // we dont not want new exception to override existing, so log it as a WARN 848 log.warn("Error during processing OnExceptionOccurred. This exception is ignored.", e); 849 } 850 log.trace("OnExceptionOccurred processor done"); 851 } 852 853 /** 854 * Gives an optional configured redelivery processor a chance to process before the Exchange 855 * will be redelivered. This can be used to alter the Exchange. 856 */ 857 protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) { 858 if (data.onRedeliveryProcessor == null) { 859 return; 860 } 861 862 if (log.isTraceEnabled()) { 863 log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered", 864 data.onRedeliveryProcessor, exchange); 865 } 866 867 // run this synchronously as its just a Processor 868 try { 869 data.onRedeliveryProcessor.process(exchange); 870 } catch (Throwable e) { 871 exchange.setException(e); 872 } 873 log.trace("Redelivery processor done"); 874 } 875 876 /** 877 * All redelivery attempts failed so move the exchange to the dead letter queue 878 */ 879 protected boolean deliverToFailureProcessor(final Processor processor, final boolean isDeadLetterChannel, final Exchange exchange, 880 final RedeliveryData data, final AsyncCallback callback) { 881 boolean sync = true; 882 883 Exception caught = exchange.getException(); 884 885 // we did not success with the redelivery so now we let the failure processor handle it 886 // clear exception as we let the failure processor handle it 887 exchange.setException(null); 888 889 final boolean shouldHandle = shouldHandle(exchange, data); 890 final boolean shouldContinue = shouldContinue(exchange, data); 891 892 // regard both handled or continued as being handled 893 boolean handled = false; 894 895 // always handle if dead letter channel 896 boolean handleOrContinue = isDeadLetterChannel || shouldHandle || shouldContinue; 897 if (handleOrContinue) { 898 // its handled then remove traces of redelivery attempted 899 exchange.getIn().removeHeader(Exchange.REDELIVERED); 900 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); 901 exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); 902 exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); 903 904 // and remove traces of rollback only and uow exhausted markers 905 exchange.removeProperty(Exchange.ROLLBACK_ONLY); 906 exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED); 907 908 handled = true; 909 } else { 910 // must decrement the redelivery counter as we didn't process the redelivery but is 911 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync 912 decrementRedeliveryCounter(exchange); 913 } 914 915 // we should allow using the failure processor if we should not continue 916 // or in case of continue then the failure processor is NOT a dead letter channel 917 // because you can continue and still let the failure processor do some routing 918 // before continue in the main route. 919 boolean allowFailureProcessor = !shouldContinue || !isDeadLetterChannel; 920 921 if (allowFailureProcessor && processor != null) { 922 923 // prepare original IN body if it should be moved instead of current body 924 if (data.useOriginalInMessage) { 925 log.trace("Using the original IN message instead of current"); 926 Message original = ExchangeHelper.getOriginalInMessage(exchange); 927 exchange.setIn(original); 928 if (exchange.hasOut()) { 929 log.trace("Removing the out message to avoid some uncertain behavior"); 930 exchange.setOut(null); 931 } 932 } 933 934 // reset cached streams so they can be read again 935 MessageHelper.resetStreamCache(exchange.getIn()); 936 937 // invoke custom on prepare 938 if (onPrepareProcessor != null) { 939 try { 940 log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange); 941 onPrepareProcessor.process(exchange); 942 } catch (Exception e) { 943 // a new exception was thrown during prepare 944 exchange.setException(e); 945 } 946 } 947 948 log.trace("Failure processor {} is processing Exchange: {}", processor, exchange); 949 950 // store the last to endpoint as the failure endpoint 951 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 952 // and store the route id so we know in which route we failed 953 UnitOfWork uow = exchange.getUnitOfWork(); 954 if (uow != null && uow.getRouteContext() != null) { 955 exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId()); 956 } 957 958 // fire event as we had a failure processor to handle it, which there is a event for 959 final boolean deadLetterChannel = processor == data.deadLetterProcessor; 960 961 EventHelper.notifyExchangeFailureHandling(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri); 962 963 // the failure processor could also be asynchronous 964 AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor); 965 sync = afp.process(exchange, new AsyncCallback() { 966 public void done(boolean sync) { 967 log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange); 968 try { 969 prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue); 970 // fire event as we had a failure processor to handle it, which there is a event for 971 EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri); 972 } finally { 973 // if the fault was handled asynchronously, this should be reflected in the callback as well 974 data.sync &= sync; 975 callback.done(data.sync); 976 } 977 } 978 }); 979 } else { 980 try { 981 // invoke custom on prepare 982 if (onPrepareProcessor != null) { 983 try { 984 log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange); 985 onPrepareProcessor.process(exchange); 986 } catch (Exception e) { 987 // a new exception was thrown during prepare 988 exchange.setException(e); 989 } 990 } 991 // no processor but we need to prepare after failure as well 992 prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue); 993 } finally { 994 // callback we are done 995 callback.done(data.sync); 996 } 997 } 998 999 // create log message 1000 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); 1001 msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; 1002 if (processor != null) { 1003 if (isDeadLetterChannel && deadLetterUri != null) { 1004 msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]"; 1005 } else { 1006 msg = msg + ". Processed by failure processor: " + processor; 1007 } 1008 } 1009 1010 // log that we failed delivery as we are exhausted 1011 logFailedDelivery(false, false, handled, false, isDeadLetterChannel, exchange, msg, data, null); 1012 1013 return sync; 1014 } 1015 1016 protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data, final boolean isDeadLetterChannel, 1017 final boolean shouldHandle, final boolean shouldContinue) { 1018 1019 Exception newException = exchange.getException(); 1020 1021 // we could not process the exchange so we let the failure processor handled it 1022 ExchangeHelper.setFailureHandled(exchange); 1023 1024 // honor if already set a handling 1025 boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null; 1026 if (alreadySet) { 1027 boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class); 1028 log.trace("This exchange has already been marked for handling: {}", handled); 1029 if (!handled) { 1030 // exception not handled, put exception back in the exchange 1031 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 1032 // and put failure endpoint back as well 1033 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 1034 } 1035 return; 1036 } 1037 1038 // dead letter channel is special 1039 if (shouldContinue) { 1040 log.trace("This exchange is continued: {}", exchange); 1041 // okay we want to continue then prepare the exchange for that as well 1042 prepareExchangeForContinue(exchange, data, isDeadLetterChannel); 1043 } else if (shouldHandle) { 1044 log.trace("This exchange is handled so its marked as not failed: {}", exchange); 1045 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE); 1046 } else { 1047 // okay the redelivery policy are not explicit set to true, so we should allow to check for some 1048 // special situations when using dead letter channel 1049 if (isDeadLetterChannel) { 1050 1051 // DLC is always handling the first thrown exception, 1052 // but if its a new exception then use the configured option 1053 boolean handled = newException == null || data.handleNewException; 1054 1055 // when using DLC then log new exception whether its being handled or not, as otherwise it may appear as 1056 // the DLC swallow new exceptions by default (which is by design to ensure the DLC always complete, 1057 // to avoid causing endless poison messages that fails forever) 1058 if (newException != null && data.currentRedeliveryPolicy.isLogNewException()) { 1059 String uri = URISupport.sanitizeUri(deadLetterUri); 1060 String msg = "New exception occurred during processing by the DeadLetterChannel[" + uri + "] due " + newException.getMessage(); 1061 if (handled) { 1062 msg += ". The new exception is being handled as deadLetterHandleNewException=true."; 1063 } else { 1064 msg += ". The new exception is not handled as deadLetterHandleNewException=false."; 1065 } 1066 logFailedDelivery(false, true, handled, false, true, exchange, msg, data, newException); 1067 } 1068 1069 if (handled) { 1070 log.trace("This exchange is handled so its marked as not failed: {}", exchange); 1071 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE); 1072 return; 1073 } 1074 } 1075 1076 // not handled by default 1077 prepareExchangeAfterFailureNotHandled(exchange); 1078 } 1079 } 1080 1081 private void prepareExchangeAfterFailureNotHandled(Exchange exchange) { 1082 log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange); 1083 // exception not handled, put exception back in the exchange 1084 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE); 1085 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 1086 // and put failure endpoint back as well 1087 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 1088 // and store the route id so we know in which route we failed 1089 UnitOfWork uow = exchange.getUnitOfWork(); 1090 if (uow != null && uow.getRouteContext() != null) { 1091 exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId()); 1092 } 1093 } 1094 1095 private void logFailedDelivery(boolean shouldRedeliver, boolean newException, boolean handled, boolean continued, boolean isDeadLetterChannel, 1096 Exchange exchange, String message, RedeliveryData data, Throwable e) { 1097 if (logger == null) { 1098 return; 1099 } 1100 1101 if (!exchange.isRollbackOnly()) { 1102 if (newException && !data.currentRedeliveryPolicy.isLogNewException()) { 1103 // do not log new exception 1104 return; 1105 } 1106 1107 // if we should not rollback, then check whether logging is enabled 1108 1109 if (!newException && handled && !data.currentRedeliveryPolicy.isLogHandled()) { 1110 // do not log handled 1111 return; 1112 } 1113 1114 if (!newException && continued && !data.currentRedeliveryPolicy.isLogContinued()) { 1115 // do not log handled 1116 return; 1117 } 1118 1119 if (!newException && shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) { 1120 // do not log retry attempts 1121 return; 1122 } 1123 1124 if (!newException && !shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) { 1125 // do not log exhausted 1126 return; 1127 } 1128 } 1129 1130 LoggingLevel newLogLevel; 1131 boolean logStackTrace; 1132 if (exchange.isRollbackOnly()) { 1133 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 1134 logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace(); 1135 } else if (shouldRedeliver) { 1136 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel(); 1137 logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace(); 1138 } else { 1139 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 1140 logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace(); 1141 } 1142 if (e == null) { 1143 e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); 1144 } 1145 1146 if (newException) { 1147 // log at most WARN level 1148 if (newLogLevel == LoggingLevel.ERROR) { 1149 newLogLevel = LoggingLevel.WARN; 1150 } 1151 String msg = message; 1152 if (msg == null) { 1153 msg = "New exception " + ExchangeHelper.logIds(exchange); 1154 // special for logging the new exception 1155 Throwable cause = e; 1156 if (cause != null) { 1157 msg = msg + " due: " + cause.getMessage(); 1158 } 1159 } 1160 1161 if (e != null && logStackTrace) { 1162 logger.log(msg, e, newLogLevel); 1163 } else { 1164 logger.log(msg, newLogLevel); 1165 } 1166 } else if (exchange.isRollbackOnly()) { 1167 String msg = "Rollback " + ExchangeHelper.logIds(exchange); 1168 Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class); 1169 if (cause != null) { 1170 msg = msg + " due: " + cause.getMessage(); 1171 } 1172 1173 // should we include message history 1174 if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) { 1175 // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it) 1176 ExchangeFormatter formatter = customExchangeFormatter ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() ? exchangeFormatter : null); 1177 String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, false); 1178 if (routeStackTrace != null) { 1179 msg = msg + "\n" + routeStackTrace; 1180 } 1181 } 1182 1183 if (newLogLevel == LoggingLevel.ERROR) { 1184 // log intended rollback on maximum WARN level (no ERROR) 1185 logger.log(msg, LoggingLevel.WARN); 1186 } else { 1187 // otherwise use the desired logging level 1188 logger.log(msg, newLogLevel); 1189 } 1190 } else { 1191 String msg = message; 1192 // should we include message history 1193 if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) { 1194 // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it) 1195 ExchangeFormatter formatter = customExchangeFormatter ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() ? exchangeFormatter : null); 1196 String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, e != null && logStackTrace); 1197 if (routeStackTrace != null) { 1198 msg = msg + "\n" + routeStackTrace; 1199 } 1200 } 1201 1202 if (e != null && logStackTrace) { 1203 logger.log(msg, e, newLogLevel); 1204 } else { 1205 logger.log(msg, newLogLevel); 1206 } 1207 } 1208 } 1209 1210 /** 1211 * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback). 1212 * <p/> 1213 * If the exchange is exhausted, then we will not continue processing, but let the 1214 * failure processor deal with the exchange. 1215 * 1216 * @param exchange the current exchange 1217 * @param data the redelivery data 1218 * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust. 1219 */ 1220 private boolean isExhausted(Exchange exchange, RedeliveryData data) { 1221 // if marked as rollback only then do not continue/redeliver 1222 boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class); 1223 if (exhausted) { 1224 log.trace("This exchange is marked as redelivery exhausted: {}", exchange); 1225 return true; 1226 } 1227 1228 // if marked as rollback only then do not continue/redeliver 1229 boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class); 1230 if (rollbackOnly) { 1231 log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange); 1232 return true; 1233 } 1234 // its the first original call so continue 1235 if (data.redeliveryCounter == 0) { 1236 return false; 1237 } 1238 // its a potential redelivery so determine if we should redeliver or not 1239 boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate); 1240 return !redeliver; 1241 } 1242 1243 /** 1244 * Determines whether or not to continue if we are exhausted. 1245 * 1246 * @param exchange the current exchange 1247 * @param data the redelivery data 1248 * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust. 1249 */ 1250 private boolean shouldContinue(Exchange exchange, RedeliveryData data) { 1251 if (data.continuedPredicate != null) { 1252 return data.continuedPredicate.matches(exchange); 1253 } 1254 // do not continue by default 1255 return false; 1256 } 1257 1258 /** 1259 * Determines whether or not to handle if we are exhausted. 1260 * 1261 * @param exchange the current exchange 1262 * @param data the redelivery data 1263 * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust. 1264 */ 1265 private boolean shouldHandle(Exchange exchange, RedeliveryData data) { 1266 if (data.handledPredicate != null) { 1267 return data.handledPredicate.matches(exchange); 1268 } 1269 // do not handle by default 1270 return false; 1271 } 1272 1273 /** 1274 * Increments the redelivery counter and adds the redelivered flag if the 1275 * message has been redelivered 1276 */ 1277 private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) { 1278 Message in = exchange.getIn(); 1279 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 1280 int next = 1; 1281 if (counter != null) { 1282 next = counter + 1; 1283 } 1284 in.setHeader(Exchange.REDELIVERY_COUNTER, next); 1285 in.setHeader(Exchange.REDELIVERED, Boolean.TRUE); 1286 // if maximum redeliveries is used, then provide that information as well 1287 if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) { 1288 in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries()); 1289 } 1290 return next; 1291 } 1292 1293 /** 1294 * Prepares the redelivery counter and boolean flag for the failure handle processor 1295 */ 1296 private void decrementRedeliveryCounter(Exchange exchange) { 1297 Message in = exchange.getIn(); 1298 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 1299 if (counter != null) { 1300 int prev = counter - 1; 1301 in.setHeader(Exchange.REDELIVERY_COUNTER, prev); 1302 // set boolean flag according to counter 1303 in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE); 1304 } else { 1305 // not redelivered 1306 in.setHeader(Exchange.REDELIVERY_COUNTER, 0); 1307 in.setHeader(Exchange.REDELIVERED, Boolean.FALSE); 1308 } 1309 } 1310 1311 /** 1312 * Determines if redelivery is enabled by checking if any of the redelivery policy 1313 * settings may allow redeliveries. 1314 * 1315 * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise 1316 * @throws Exception can be thrown 1317 */ 1318 private boolean determineIfRedeliveryIsEnabled() throws Exception { 1319 // determine if redeliver is enabled either on error handler 1320 if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) { 1321 // must check for != 0 as (-1 means redeliver forever) 1322 return true; 1323 } 1324 if (retryWhilePolicy != null) { 1325 return true; 1326 } 1327 1328 // or on the exception policies 1329 if (!exceptionPolicies.isEmpty()) { 1330 // walk them to see if any of them have a maximum redeliveries > 0 or retry until set 1331 for (OnExceptionDefinition def : exceptionPolicies.values()) { 1332 1333 String ref = def.getRedeliveryPolicyRef(); 1334 if (ref != null) { 1335 // lookup in registry if ref provided 1336 RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class); 1337 if (policy.getMaximumRedeliveries() != 0) { 1338 // must check for != 0 as (-1 means redeliver forever) 1339 return true; 1340 } 1341 } else if (def.getRedeliveryPolicy() != null) { 1342 Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries()); 1343 if (max != null && max != 0) { 1344 // must check for != 0 as (-1 means redeliver forever) 1345 return true; 1346 } 1347 } 1348 1349 if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) { 1350 return true; 1351 } 1352 } 1353 } 1354 1355 return false; 1356 } 1357 1358 /** 1359 * Gets the number of exchanges that are pending for redelivery 1360 */ 1361 public int getPendingRedeliveryCount() { 1362 int answer = redeliverySleepCounter.get(); 1363 if (executorService != null && executorService instanceof ThreadPoolExecutor) { 1364 answer += ((ThreadPoolExecutor) executorService).getQueue().size(); 1365 } 1366 1367 return answer; 1368 } 1369 1370 @Override 1371 protected void doStart() throws Exception { 1372 ServiceHelper.startServices(output, outputAsync, deadLetter); 1373 1374 // determine if redeliver is enabled or not 1375 redeliveryEnabled = determineIfRedeliveryIsEnabled(); 1376 if (log.isTraceEnabled()) { 1377 log.trace("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this); 1378 } 1379 1380 // we only need thread pool if redelivery is enabled 1381 if (redeliveryEnabled) { 1382 if (executorService == null) { 1383 // use default shared executor service 1384 executorService = camelContext.getErrorHandlerExecutorService(); 1385 } 1386 if (log.isDebugEnabled()) { 1387 log.debug("Using ExecutorService: {} for redeliveries on error handler: {}", executorService, this); 1388 } 1389 } 1390 1391 // reset flag when starting 1392 preparingShutdown = false; 1393 redeliverySleepCounter.set(0); 1394 } 1395 1396 @Override 1397 protected void doStop() throws Exception { 1398 // noop, do not stop any services which we only do when shutting down 1399 // as the error handler can be context scoped, and should not stop in case 1400 // a route stops 1401 } 1402 1403 @Override 1404 protected void doShutdown() throws Exception { 1405 ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync); 1406 } 1407}