001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.Callable; 022import java.util.concurrent.CountDownLatch; 023import java.util.concurrent.RejectedExecutionException; 024import java.util.concurrent.ScheduledExecutorService; 025import java.util.concurrent.ThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicInteger; 028 029import org.apache.camel.AsyncCallback; 030import org.apache.camel.AsyncProcessor; 031import org.apache.camel.CamelContext; 032import org.apache.camel.Exchange; 033import org.apache.camel.LoggingLevel; 034import org.apache.camel.Message; 035import org.apache.camel.Navigate; 036import org.apache.camel.Predicate; 037import org.apache.camel.Processor; 038import org.apache.camel.model.OnExceptionDefinition; 039import org.apache.camel.spi.AsyncProcessorAwaitManager; 040import org.apache.camel.spi.ExchangeFormatter; 041import org.apache.camel.spi.ShutdownPrepared; 042import org.apache.camel.spi.SubUnitOfWorkCallback; 043import org.apache.camel.spi.UnitOfWork; 044import org.apache.camel.util.AsyncProcessorConverterHelper; 045import org.apache.camel.util.AsyncProcessorHelper; 046import org.apache.camel.util.CamelContextHelper; 047import org.apache.camel.util.CamelLogger; 048import org.apache.camel.util.EventHelper; 049import org.apache.camel.util.ExchangeHelper; 050import org.apache.camel.util.MessageHelper; 051import org.apache.camel.util.ObjectHelper; 052import org.apache.camel.util.ServiceHelper; 053import org.apache.camel.util.StopWatch; 054import org.apache.camel.util.URISupport; 055 056/** 057 * Base redeliverable error handler that also supports a final dead letter queue in case 058 * all redelivery attempts fail. 059 * <p/> 060 * This implementation should contain all the error handling logic and the sub classes 061 * should only configure it according to what they support. 062 * 063 * @version 064 */ 065public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> { 066 067 protected final AtomicInteger redeliverySleepCounter = new AtomicInteger(); 068 protected ScheduledExecutorService executorService; 069 protected final CamelContext camelContext; 070 protected final AsyncProcessorAwaitManager awaitManager; 071 protected final Processor deadLetter; 072 protected final String deadLetterUri; 073 protected final boolean deadLetterHandleNewException; 074 protected final Processor output; 075 protected final AsyncProcessor outputAsync; 076 protected final Processor redeliveryProcessor; 077 protected final RedeliveryPolicy redeliveryPolicy; 078 protected final Predicate retryWhilePolicy; 079 protected final CamelLogger logger; 080 protected final boolean useOriginalMessagePolicy; 081 protected boolean redeliveryEnabled; 082 protected volatile boolean preparingShutdown; 083 protected final ExchangeFormatter exchangeFormatter; 084 protected final boolean customExchangeFormatter; 085 protected final Processor onPrepareProcessor; 086 protected final Processor onExceptionProcessor; 087 088 /** 089 * Contains the current redelivery data 090 */ 091 protected class RedeliveryData { 092 Exchange original; 093 boolean sync = true; 094 int redeliveryCounter; 095 long redeliveryDelay; 096 Predicate retryWhilePredicate; 097 boolean redeliverFromSync; 098 099 // default behavior which can be overloaded on a per exception basis 100 RedeliveryPolicy currentRedeliveryPolicy; 101 Processor deadLetterProcessor; 102 Processor failureProcessor; 103 Processor onRedeliveryProcessor; 104 Processor onPrepareProcessor; 105 Processor onExceptionProcessor; 106 Predicate handledPredicate; 107 Predicate continuedPredicate; 108 boolean useOriginalInMessage; 109 boolean handleNewException; 110 111 public RedeliveryData() { 112 // init with values from the error handler 113 this.retryWhilePredicate = retryWhilePolicy; 114 this.currentRedeliveryPolicy = redeliveryPolicy; 115 this.deadLetterProcessor = deadLetter; 116 this.onRedeliveryProcessor = redeliveryProcessor; 117 this.onPrepareProcessor = RedeliveryErrorHandler.this.onPrepareProcessor; 118 this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor; 119 this.handledPredicate = getDefaultHandledPredicate(); 120 this.useOriginalInMessage = useOriginalMessagePolicy; 121 this.handleNewException = deadLetterHandleNewException; 122 } 123 } 124 125 /** 126 * Task for sleeping during redelivery attempts. 127 * <p/> 128 * This task is for the synchronous blocking. If using async delayed then a scheduled thread pool 129 * is used for sleeping and trigger redeliveries. 130 */ 131 private final class RedeliverSleepTask { 132 133 private final RedeliveryPolicy policy; 134 private final long delay; 135 136 RedeliverSleepTask(RedeliveryPolicy policy, long delay) { 137 this.policy = policy; 138 this.delay = delay; 139 } 140 141 public boolean sleep() throws InterruptedException { 142 // for small delays then just sleep 143 if (delay < 1000) { 144 policy.sleep(delay); 145 return true; 146 } 147 148 StopWatch watch = new StopWatch(); 149 150 log.debug("Sleeping for: {} millis until attempting redelivery", delay); 151 while (watch.taken() < delay) { 152 // sleep using 1 sec interval 153 154 long delta = delay - watch.taken(); 155 long max = Math.min(1000, delta); 156 if (max > 0) { 157 log.trace("Sleeping for: {} millis until waking up for re-check", max); 158 Thread.sleep(max); 159 } 160 161 // are we preparing for shutdown then only do redelivery if allowed 162 if (preparingShutdown && !policy.isAllowRedeliveryWhileStopping()) { 163 log.debug("Rejected redelivery while stopping"); 164 return false; 165 } 166 } 167 168 return true; 169 } 170 } 171 172 /** 173 * Tasks which performs asynchronous redelivery attempts, and being triggered by a 174 * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task 175 * has to be delayed before a redelivery attempt is performed. 176 */ 177 private final class AsyncRedeliveryTask implements Callable<Boolean> { 178 179 private final Exchange exchange; 180 private final AsyncCallback callback; 181 private final RedeliveryData data; 182 183 AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) { 184 this.exchange = exchange; 185 this.callback = callback; 186 this.data = data; 187 } 188 189 public Boolean call() throws Exception { 190 // prepare for redelivery 191 prepareExchangeForRedelivery(exchange, data); 192 193 // letting onRedeliver be executed at first 194 deliverToOnRedeliveryProcessor(exchange, data); 195 196 if (log.isTraceEnabled()) { 197 log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", new Object[]{exchange.getExchangeId(), outputAsync, exchange}); 198 } 199 200 // emmit event we are doing redelivery 201 EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter); 202 203 // process the exchange (also redelivery) 204 boolean sync; 205 if (data.redeliverFromSync) { 206 // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from 207 // this error handler, which means we have to invoke the callback with false, to have the callback 208 // be notified when we are done 209 sync = outputAsync.process(exchange, new AsyncCallback() { 210 public void done(boolean doneSync) { 211 log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); 212 213 // mark we are in sync mode now 214 data.sync = false; 215 216 // only process if the exchange hasn't failed 217 // and it has not been handled by the error processor 218 if (isDone(exchange)) { 219 callback.done(false); 220 return; 221 } 222 223 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 224 processAsyncErrorHandler(exchange, callback, data); 225 } 226 }); 227 } else { 228 // this redelivery task was scheduled from asynchronous, which means we should only 229 // handle when the asynchronous task was done 230 sync = outputAsync.process(exchange, new AsyncCallback() { 231 public void done(boolean doneSync) { 232 log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); 233 234 // this callback should only handle the async case 235 if (doneSync) { 236 return; 237 } 238 239 // mark we are in async mode now 240 data.sync = false; 241 242 // only process if the exchange hasn't failed 243 // and it has not been handled by the error processor 244 if (isDone(exchange)) { 245 callback.done(doneSync); 246 return; 247 } 248 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 249 processAsyncErrorHandler(exchange, callback, data); 250 } 251 }); 252 } 253 254 return sync; 255 } 256 } 257 258 public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, 259 Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter, 260 String deadLetterUri, boolean deadLetterHandleNewException, boolean useOriginalMessagePolicy, 261 Predicate retryWhile, ScheduledExecutorService executorService, Processor onPrepareProcessor, Processor onExceptionProcessor) { 262 263 ObjectHelper.notNull(camelContext, "CamelContext", this); 264 ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this); 265 266 this.camelContext = camelContext; 267 this.awaitManager = camelContext.getAsyncProcessorAwaitManager(); 268 this.redeliveryProcessor = redeliveryProcessor; 269 this.deadLetter = deadLetter; 270 this.output = output; 271 this.outputAsync = AsyncProcessorConverterHelper.convert(output); 272 this.redeliveryPolicy = redeliveryPolicy; 273 this.logger = logger; 274 this.deadLetterUri = deadLetterUri; 275 this.deadLetterHandleNewException = deadLetterHandleNewException; 276 this.useOriginalMessagePolicy = useOriginalMessagePolicy; 277 this.retryWhilePolicy = retryWhile; 278 this.executorService = executorService; 279 this.onPrepareProcessor = onPrepareProcessor; 280 this.onExceptionProcessor = onExceptionProcessor; 281 282 if (ObjectHelper.isNotEmpty(redeliveryPolicy.getExchangeFormatterRef())) { 283 ExchangeFormatter formatter = camelContext.getRegistry().lookupByNameAndType(redeliveryPolicy.getExchangeFormatterRef(), ExchangeFormatter.class); 284 if (formatter != null) { 285 this.exchangeFormatter = formatter; 286 this.customExchangeFormatter = true; 287 } else { 288 throw new IllegalArgumentException("Cannot find the exchangeFormatter by using reference id " + redeliveryPolicy.getExchangeFormatterRef()); 289 } 290 } else { 291 this.customExchangeFormatter = false; 292 // setup exchange formatter to be used for message history dump 293 DefaultExchangeFormatter formatter = new DefaultExchangeFormatter(); 294 formatter.setShowExchangeId(true); 295 formatter.setMultiline(true); 296 formatter.setShowHeaders(true); 297 formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed); 298 try { 299 Integer maxChars = CamelContextHelper.parseInteger(camelContext, camelContext.getProperty(Exchange.LOG_DEBUG_BODY_MAX_CHARS)); 300 if (maxChars != null) { 301 formatter.setMaxChars(maxChars); 302 } 303 } catch (Exception e) { 304 throw ObjectHelper.wrapRuntimeCamelException(e); 305 } 306 this.exchangeFormatter = formatter; 307 } 308 } 309 310 public boolean supportTransacted() { 311 return false; 312 } 313 314 @Override 315 public boolean hasNext() { 316 return output != null; 317 } 318 319 @Override 320 public List<Processor> next() { 321 if (!hasNext()) { 322 return null; 323 } 324 List<Processor> answer = new ArrayList<Processor>(1); 325 answer.add(output); 326 return answer; 327 } 328 329 protected boolean isRunAllowed(RedeliveryData data) { 330 // if camel context is forcing a shutdown then do not allow running 331 boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this); 332 if (forceShutdown) { 333 log.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)"); 334 return false; 335 } 336 337 // redelivery policy can control if redelivery is allowed during stopping/shutdown 338 // but this only applies during a redelivery (counter must > 0) 339 if (data.redeliveryCounter > 0) { 340 if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) { 341 log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)"); 342 return true; 343 } else if (preparingShutdown) { 344 // we are preparing for shutdown, now determine if we can still run 345 boolean answer = isRunAllowedOnPreparingShutdown(); 346 log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer); 347 return answer; 348 } 349 } 350 351 // we cannot run if we are stopping/stopped 352 boolean answer = !isStoppingOrStopped(); 353 log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer); 354 return answer; 355 } 356 357 protected boolean isRunAllowedOnPreparingShutdown() { 358 return false; 359 } 360 361 protected boolean isRedeliveryAllowed(RedeliveryData data) { 362 // redelivery policy can control if redelivery is allowed during stopping/shutdown 363 // but this only applies during a redelivery (counter must > 0) 364 if (data.redeliveryCounter > 0) { 365 boolean stopping = isStoppingOrStopped(); 366 if (!preparingShutdown && !stopping) { 367 log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)"); 368 return true; 369 } else { 370 // we are stopping or preparing to shutdown 371 if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) { 372 log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)"); 373 return true; 374 } else { 375 log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)"); 376 return false; 377 } 378 } 379 } 380 381 return true; 382 } 383 384 @Override 385 public void prepareShutdown(boolean suspendOnly, boolean forced) { 386 // prepare for shutdown, eg do not allow redelivery if configured 387 log.trace("Prepare shutdown on error handler {}", this); 388 preparingShutdown = true; 389 } 390 391 public void process(Exchange exchange) throws Exception { 392 if (output == null) { 393 // no output then just return 394 return; 395 } 396 397 // inline org.apache.camel.util.AsyncProcessorHelper.process(org.apache.camel.AsyncProcessor, org.apache.camel.Exchange) 398 // to optimize and reduce stacktrace lengths 399 final CountDownLatch latch = new CountDownLatch(1); 400 boolean sync = process(exchange, new AsyncCallback() { 401 public void done(boolean doneSync) { 402 if (!doneSync) { 403 awaitManager.countDown(exchange, latch); 404 } 405 } 406 }); 407 if (!sync) { 408 awaitManager.await(exchange, latch); 409 } 410 } 411 412 /** 413 * Process the exchange using redelivery error handling. 414 */ 415 public boolean process(final Exchange exchange, final AsyncCallback callback) { 416 final RedeliveryData data = new RedeliveryData(); 417 418 // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the 419 // original Exchange is being redelivered, and not a mutated Exchange 420 data.original = defensiveCopyExchangeIfNeeded(exchange); 421 422 // use looping to have redelivery attempts 423 while (true) { 424 425 // can we still run 426 if (!isRunAllowed(data)) { 427 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 428 if (exchange.getException() == null) { 429 exchange.setException(new RejectedExecutionException()); 430 } 431 // we cannot process so invoke callback 432 callback.done(data.sync); 433 return data.sync; 434 } 435 436 // did previous processing cause an exception? 437 boolean handle = shouldHandleException(exchange); 438 if (handle) { 439 handleException(exchange, data, isDeadLetterChannel()); 440 onExceptionOccurred(exchange, data); 441 } 442 443 // compute if we are exhausted, and whether redelivery is allowed 444 boolean exhausted = isExhausted(exchange, data); 445 boolean redeliverAllowed = isRedeliveryAllowed(data); 446 447 // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC) 448 if (!redeliverAllowed || exhausted) { 449 Processor target = null; 450 boolean deliver = true; 451 452 // the unit of work may have an optional callback associated we need to leverage 453 SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback(); 454 if (uowCallback != null) { 455 // signal to the callback we are exhausted 456 uowCallback.onExhausted(exchange); 457 // do not deliver to the failure processor as its been handled by the callback instead 458 deliver = false; 459 } 460 461 if (deliver) { 462 // should deliver to failure processor (either from onException or the dead letter channel) 463 target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor; 464 } 465 // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair 466 // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) 467 boolean isDeadLetterChannel = isDeadLetterChannel() && (target == null || target == data.deadLetterProcessor); 468 boolean sync = deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback); 469 // we are breaking out 470 return sync; 471 } 472 473 if (data.redeliveryCounter > 0) { 474 // calculate delay 475 data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter); 476 477 if (data.redeliveryDelay > 0) { 478 // okay there is a delay so create a scheduled task to have it executed in the future 479 480 if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) { 481 482 // we are doing a redelivery then a thread pool must be configured (see the doStart method) 483 ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this); 484 485 // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to 486 // have it being executed in the future, or immediately 487 // we are continuing asynchronously 488 489 // mark we are routing async from now and that this redelivery task came from a synchronous routing 490 data.sync = false; 491 data.redeliverFromSync = true; 492 AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); 493 494 // schedule the redelivery task 495 if (log.isTraceEnabled()) { 496 log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId()); 497 } 498 executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS); 499 500 return false; 501 } else { 502 // async delayed redelivery was disabled or we are transacted so we must be synchronous 503 // as the transaction manager requires to execute in the same thread context 504 try { 505 // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping 506 redeliverySleepCounter.incrementAndGet(); 507 RedeliverSleepTask task = new RedeliverSleepTask(data.currentRedeliveryPolicy, data.redeliveryDelay); 508 boolean complete = task.sleep(); 509 redeliverySleepCounter.decrementAndGet(); 510 if (!complete) { 511 // the task was rejected 512 exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping")); 513 // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange 514 exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); 515 // jump to start of loop which then detects that we are failed and exhausted 516 continue; 517 } 518 } catch (InterruptedException e) { 519 redeliverySleepCounter.decrementAndGet(); 520 // we was interrupted so break out 521 exchange.setException(e); 522 // mark the exchange to stop continue routing when interrupted 523 // as we do not want to continue routing (for example a task has been cancelled) 524 exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); 525 callback.done(data.sync); 526 return data.sync; 527 } 528 } 529 } 530 531 // prepare for redelivery 532 prepareExchangeForRedelivery(exchange, data); 533 534 // letting onRedeliver be executed 535 deliverToOnRedeliveryProcessor(exchange, data); 536 537 // emmit event we are doing redelivery 538 EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter); 539 } 540 541 // process the exchange (also redelivery) 542 boolean sync = outputAsync.process(exchange, new AsyncCallback() { 543 public void done(boolean sync) { 544 // this callback should only handle the async case 545 if (sync) { 546 return; 547 } 548 549 // mark we are in async mode now 550 data.sync = false; 551 552 // if we are done then notify callback and exit 553 if (isDone(exchange)) { 554 callback.done(sync); 555 return; 556 } 557 558 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 559 // method which takes care of this in a asynchronous manner 560 processAsyncErrorHandler(exchange, callback, data); 561 } 562 }); 563 564 if (!sync) { 565 // the remainder of the Exchange is being processed asynchronously so we should return 566 return false; 567 } 568 // we continue to route synchronously 569 570 // if we are done then notify callback and exit 571 boolean done = isDone(exchange); 572 if (done) { 573 callback.done(true); 574 return true; 575 } 576 577 // error occurred so loop back around..... 578 } 579 } 580 581 /** 582 * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY} 583 * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p> 584 * 585 * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay} 586 * and {@link RedeliveryData#redeliveryCounter} are copied in.</p> 587 * 588 * @param exchange The current exchange in question. 589 * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation. 590 * @param redeliveryDelay The default redelivery delay from RedeliveryData 591 * @param redeliveryCounter The redeliveryCounter 592 * @return The time to wait before the next redelivery. 593 */ 594 protected long determineRedeliveryDelay(Exchange exchange, RedeliveryPolicy redeliveryPolicy, long redeliveryDelay, int redeliveryCounter) { 595 Message message = exchange.getIn(); 596 Long delay = message.getHeader(Exchange.REDELIVERY_DELAY, Long.class); 597 if (delay == null) { 598 delay = redeliveryPolicy.calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter); 599 log.debug("Redelivery delay calculated as {}", delay); 600 } else { 601 log.debug("Redelivery delay is {} from Message Header [{}]", delay, Exchange.REDELIVERY_DELAY); 602 } 603 return delay; 604 } 605 606 /** 607 * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback. 608 * <p/> 609 * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use 610 * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b> 611 * in terms of logic. 612 */ 613 protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 614 // can we still run 615 if (!isRunAllowed(data)) { 616 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 617 if (exchange.getException() == null) { 618 exchange.setException(new RejectedExecutionException()); 619 } 620 callback.done(data.sync); 621 return; 622 } 623 624 // did previous processing cause an exception? 625 boolean handle = shouldHandleException(exchange); 626 if (handle) { 627 handleException(exchange, data, isDeadLetterChannel()); 628 onExceptionOccurred(exchange, data); 629 } 630 631 // compute if we are exhausted or not 632 boolean exhausted = isExhausted(exchange, data); 633 if (exhausted) { 634 Processor target = null; 635 boolean deliver = true; 636 637 // the unit of work may have an optional callback associated we need to leverage 638 UnitOfWork uow = exchange.getUnitOfWork(); 639 if (uow != null) { 640 SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback(); 641 if (uowCallback != null) { 642 // signal to the callback we are exhausted 643 uowCallback.onExhausted(exchange); 644 // do not deliver to the failure processor as its been handled by the callback instead 645 deliver = false; 646 } 647 } 648 649 if (deliver) { 650 // should deliver to failure processor (either from onException or the dead letter channel) 651 target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor; 652 } 653 // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair 654 // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) 655 boolean isDeadLetterChannel = isDeadLetterChannel() && target == data.deadLetterProcessor; 656 deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback); 657 // we are breaking out 658 return; 659 } 660 661 if (data.redeliveryCounter > 0) { 662 // we are doing a redelivery then a thread pool must be configured (see the doStart method) 663 ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this); 664 665 // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to 666 // have it being executed in the future, or immediately 667 // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously 668 // to ensure the callback will continue routing from where we left 669 AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); 670 671 // calculate the redelivery delay 672 data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter); 673 674 if (data.redeliveryDelay > 0) { 675 // schedule the redelivery task 676 if (log.isTraceEnabled()) { 677 log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId()); 678 } 679 executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS); 680 } else { 681 // execute the task immediately 682 executorService.submit(task); 683 } 684 } 685 } 686 687 /** 688 * Performs a defensive copy of the exchange if needed 689 * 690 * @param exchange the exchange 691 * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled). 692 */ 693 protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) { 694 // only do a defensive copy if redelivery is enabled 695 if (redeliveryEnabled) { 696 return ExchangeHelper.createCopy(exchange, true); 697 } else { 698 return null; 699 } 700 } 701 702 /** 703 * Strategy whether the exchange has an exception that we should try to handle. 704 * <p/> 705 * Standard implementations should just look for an exception. 706 */ 707 protected boolean shouldHandleException(Exchange exchange) { 708 return exchange.getException() != null; 709 } 710 711 /** 712 * Strategy to determine if the exchange is done so we can continue 713 */ 714 protected boolean isDone(Exchange exchange) { 715 boolean answer = isCancelledOrInterrupted(exchange); 716 717 // only done if the exchange hasn't failed 718 // and it has not been handled by the failure processor 719 // or we are exhausted 720 if (!answer) { 721 answer = exchange.getException() == null 722 || ExchangeHelper.isFailureHandled(exchange) 723 || ExchangeHelper.isRedeliveryExhausted(exchange); 724 } 725 726 log.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer); 727 return answer; 728 } 729 730 /** 731 * Strategy to determine if the exchange was cancelled or interrupted 732 */ 733 protected boolean isCancelledOrInterrupted(Exchange exchange) { 734 boolean answer = false; 735 736 if (ExchangeHelper.isInterrupted(exchange)) { 737 // mark the exchange to stop continue routing when interrupted 738 // as we do not want to continue routing (for example a task has been cancelled) 739 exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); 740 answer = true; 741 } 742 743 log.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer); 744 return answer; 745 } 746 747 /** 748 * Returns the output processor 749 */ 750 public Processor getOutput() { 751 return output; 752 } 753 754 /** 755 * Returns the dead letter that message exchanges will be sent to if the 756 * redelivery attempts fail 757 */ 758 public Processor getDeadLetter() { 759 return deadLetter; 760 } 761 762 public String getDeadLetterUri() { 763 return deadLetterUri; 764 } 765 766 public boolean isUseOriginalMessagePolicy() { 767 return useOriginalMessagePolicy; 768 } 769 770 public boolean isDeadLetterHandleNewException() { 771 return deadLetterHandleNewException; 772 } 773 774 public RedeliveryPolicy getRedeliveryPolicy() { 775 return redeliveryPolicy; 776 } 777 778 public CamelLogger getLogger() { 779 return logger; 780 } 781 782 protected Predicate getDefaultHandledPredicate() { 783 // Default is not not handle errors 784 return null; 785 } 786 787 protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) { 788 Exception caught = exchange.getException(); 789 790 // we continue so clear any exceptions 791 exchange.setException(null); 792 // clear rollback flags 793 exchange.setProperty(Exchange.ROLLBACK_ONLY, null); 794 // reset cached streams so they can be read again 795 MessageHelper.resetStreamCache(exchange.getIn()); 796 797 // its continued then remove traces of redelivery attempted and caught exception 798 exchange.getIn().removeHeader(Exchange.REDELIVERED); 799 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); 800 exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); 801 exchange.removeProperty(Exchange.FAILURE_HANDLED); 802 // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception 803 804 // create log message 805 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); 806 msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; 807 msg = msg + ". Handled and continue routing."; 808 809 // log that we failed but want to continue 810 logFailedDelivery(false, false, false, true, isDeadLetterChannel, exchange, msg, data, null); 811 } 812 813 protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) { 814 if (!redeliveryEnabled) { 815 throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly."); 816 } 817 // there must be a defensive copy of the exchange 818 ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this); 819 820 // okay we will give it another go so clear the exception so we can try again 821 exchange.setException(null); 822 823 // clear rollback flags 824 exchange.setProperty(Exchange.ROLLBACK_ONLY, null); 825 826 // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange 827 // and then put these on the exchange when doing a redelivery / fault processor 828 829 // preserve these headers 830 Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 831 Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class); 832 Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class); 833 834 // we are redelivering so copy from original back to exchange 835 exchange.getIn().copyFrom(data.original.getIn()); 836 exchange.setOut(null); 837 // reset cached streams so they can be read again 838 MessageHelper.resetStreamCache(exchange.getIn()); 839 840 // put back headers 841 if (redeliveryCounter != null) { 842 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter); 843 } 844 if (redeliveryMaxCounter != null) { 845 exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter); 846 } 847 if (redelivered != null) { 848 exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered); 849 } 850 } 851 852 protected void handleException(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) { 853 Exception e = exchange.getException(); 854 // e is never null 855 856 Throwable previous = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class); 857 if (previous != null && previous != e) { 858 // a 2nd exception was thrown while handling a previous exception 859 // so we need to add the previous as suppressed by the new exception 860 // see also FatalFallbackErrorHandler 861 Throwable[] suppressed = e.getSuppressed(); 862 boolean found = false; 863 for (Throwable t : suppressed) { 864 if (t == previous) { 865 found = true; 866 } 867 } 868 if (!found) { 869 e.addSuppressed(previous); 870 } 871 } 872 873 // store the original caused exception in a property, so we can restore it later 874 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); 875 876 // find the error handler to use (if any) 877 OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e); 878 if (exceptionPolicy != null) { 879 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy); 880 data.handledPredicate = exceptionPolicy.getHandledPolicy(); 881 data.continuedPredicate = exceptionPolicy.getContinuedPolicy(); 882 data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy(); 883 data.useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy() != null && exceptionPolicy.getUseOriginalMessagePolicy(); 884 885 // route specific failure handler? 886 Processor processor = null; 887 UnitOfWork uow = exchange.getUnitOfWork(); 888 if (uow != null && uow.getRouteContext() != null) { 889 String routeId = uow.getRouteContext().getRoute().getId(); 890 processor = exceptionPolicy.getErrorHandler(routeId); 891 } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) { 892 // note this should really not happen, but we have this code as a fail safe 893 // to be backwards compatible with the old behavior 894 log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId()); 895 processor = exceptionPolicy.getErrorHandlers().iterator().next(); 896 } 897 if (processor != null) { 898 data.failureProcessor = processor; 899 } 900 901 // route specific on redelivery? 902 processor = exceptionPolicy.getOnRedelivery(); 903 if (processor != null) { 904 data.onRedeliveryProcessor = processor; 905 } 906 // route specific on exception occurred? 907 processor = exceptionPolicy.getOnExceptionOccurred(); 908 if (processor != null) { 909 data.onExceptionProcessor = processor; 910 } 911 } 912 913 // only log if not failure handled or not an exhausted unit of work 914 if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) { 915 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange) 916 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e; 917 logFailedDelivery(true, false, false, false, isDeadLetterChannel, exchange, msg, data, e); 918 } 919 920 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data); 921 } 922 923 /** 924 * Gives an optional configured OnExceptionOccurred processor a chance to process just after an exception 925 * was thrown while processing the Exchange. This allows to execute the processor at the same time the exception was thrown. 926 */ 927 protected void onExceptionOccurred(Exchange exchange, final RedeliveryData data) { 928 if (data.onExceptionProcessor == null) { 929 return; 930 } 931 932 // run this synchronously as its just a Processor 933 try { 934 if (log.isTraceEnabled()) { 935 log.trace("OnExceptionOccurred processor {} is processing Exchange: {} due exception occurred", data.onExceptionProcessor, exchange); 936 } 937 data.onExceptionProcessor.process(exchange); 938 } catch (Throwable e) { 939 // we dont not want new exception to override existing, so log it as a WARN 940 log.warn("Error during processing OnExceptionOccurred. This exception is ignored.", e); 941 } 942 log.trace("OnExceptionOccurred processor done"); 943 } 944 945 /** 946 * Gives an optional configured redelivery processor a chance to process before the Exchange 947 * will be redelivered. This can be used to alter the Exchange. 948 */ 949 protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) { 950 if (data.onRedeliveryProcessor == null) { 951 return; 952 } 953 954 if (log.isTraceEnabled()) { 955 log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered", 956 data.onRedeliveryProcessor, exchange); 957 } 958 959 // run this synchronously as its just a Processor 960 try { 961 data.onRedeliveryProcessor.process(exchange); 962 } catch (Throwable e) { 963 exchange.setException(e); 964 } 965 log.trace("Redelivery processor done"); 966 } 967 968 /** 969 * All redelivery attempts failed so move the exchange to the dead letter queue 970 */ 971 protected boolean deliverToFailureProcessor(final Processor processor, final boolean isDeadLetterChannel, final Exchange exchange, 972 final RedeliveryData data, final AsyncCallback callback) { 973 boolean sync = true; 974 975 Exception caught = exchange.getException(); 976 977 // we did not success with the redelivery so now we let the failure processor handle it 978 // clear exception as we let the failure processor handle it 979 exchange.setException(null); 980 981 final boolean shouldHandle = shouldHandle(exchange, data); 982 final boolean shouldContinue = shouldContinue(exchange, data); 983 984 // regard both handled or continued as being handled 985 boolean handled = false; 986 987 // always handle if dead letter channel 988 boolean handleOrContinue = isDeadLetterChannel || shouldHandle || shouldContinue; 989 if (handleOrContinue) { 990 // its handled then remove traces of redelivery attempted 991 exchange.getIn().removeHeader(Exchange.REDELIVERED); 992 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); 993 exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); 994 exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); 995 996 // and remove traces of rollback only and uow exhausted markers 997 exchange.removeProperty(Exchange.ROLLBACK_ONLY); 998 exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED); 999 1000 handled = true; 1001 } else { 1002 // must decrement the redelivery counter as we didn't process the redelivery but is 1003 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync 1004 decrementRedeliveryCounter(exchange); 1005 } 1006 1007 // we should allow using the failure processor if we should not continue 1008 // or in case of continue then the failure processor is NOT a dead letter channel 1009 // because you can continue and still let the failure processor do some routing 1010 // before continue in the main route. 1011 boolean allowFailureProcessor = !shouldContinue || !isDeadLetterChannel; 1012 1013 if (allowFailureProcessor && processor != null) { 1014 1015 // prepare original IN body if it should be moved instead of current body 1016 if (data.useOriginalInMessage) { 1017 log.trace("Using the original IN message instead of current"); 1018 Message original = ExchangeHelper.getOriginalInMessage(exchange); 1019 exchange.setIn(original); 1020 if (exchange.hasOut()) { 1021 log.trace("Removing the out message to avoid some uncertain behavior"); 1022 exchange.setOut(null); 1023 } 1024 } 1025 1026 // reset cached streams so they can be read again 1027 MessageHelper.resetStreamCache(exchange.getIn()); 1028 1029 // invoke custom on prepare 1030 if (onPrepareProcessor != null) { 1031 try { 1032 log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange); 1033 onPrepareProcessor.process(exchange); 1034 } catch (Exception e) { 1035 // a new exception was thrown during prepare 1036 exchange.setException(e); 1037 } 1038 } 1039 1040 log.trace("Failure processor {} is processing Exchange: {}", processor, exchange); 1041 1042 // store the last to endpoint as the failure endpoint 1043 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 1044 // and store the route id so we know in which route we failed 1045 UnitOfWork uow = exchange.getUnitOfWork(); 1046 if (uow != null && uow.getRouteContext() != null) { 1047 exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId()); 1048 } 1049 1050 // fire event as we had a failure processor to handle it, which there is a event for 1051 final boolean deadLetterChannel = processor == data.deadLetterProcessor; 1052 1053 EventHelper.notifyExchangeFailureHandling(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri); 1054 1055 // the failure processor could also be asynchronous 1056 AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor); 1057 sync = afp.process(exchange, new AsyncCallback() { 1058 public void done(boolean sync) { 1059 log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange); 1060 try { 1061 prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue); 1062 // fire event as we had a failure processor to handle it, which there is a event for 1063 EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri); 1064 } finally { 1065 // if the fault was handled asynchronously, this should be reflected in the callback as well 1066 data.sync &= sync; 1067 callback.done(data.sync); 1068 } 1069 } 1070 }); 1071 } else { 1072 try { 1073 // invoke custom on prepare 1074 if (onPrepareProcessor != null) { 1075 try { 1076 log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange); 1077 onPrepareProcessor.process(exchange); 1078 } catch (Exception e) { 1079 // a new exception was thrown during prepare 1080 exchange.setException(e); 1081 } 1082 } 1083 // no processor but we need to prepare after failure as well 1084 prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue); 1085 } finally { 1086 // callback we are done 1087 callback.done(data.sync); 1088 } 1089 } 1090 1091 // create log message 1092 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); 1093 msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; 1094 if (processor != null) { 1095 if (isDeadLetterChannel && deadLetterUri != null) { 1096 msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]"; 1097 } else { 1098 msg = msg + ". Processed by failure processor: " + processor; 1099 } 1100 } 1101 1102 // log that we failed delivery as we are exhausted 1103 logFailedDelivery(false, false, handled, false, isDeadLetterChannel, exchange, msg, data, null); 1104 1105 return sync; 1106 } 1107 1108 protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data, final boolean isDeadLetterChannel, 1109 final boolean shouldHandle, final boolean shouldContinue) { 1110 1111 Exception newException = exchange.getException(); 1112 1113 // we could not process the exchange so we let the failure processor handled it 1114 ExchangeHelper.setFailureHandled(exchange); 1115 1116 // honor if already set a handling 1117 boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null; 1118 if (alreadySet) { 1119 boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class); 1120 log.trace("This exchange has already been marked for handling: {}", handled); 1121 if (!handled) { 1122 // exception not handled, put exception back in the exchange 1123 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 1124 // and put failure endpoint back as well 1125 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 1126 } 1127 return; 1128 } 1129 1130 // dead letter channel is special 1131 if (shouldContinue) { 1132 log.trace("This exchange is continued: {}", exchange); 1133 // okay we want to continue then prepare the exchange for that as well 1134 prepareExchangeForContinue(exchange, data, isDeadLetterChannel); 1135 } else if (shouldHandle) { 1136 log.trace("This exchange is handled so its marked as not failed: {}", exchange); 1137 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE); 1138 } else { 1139 // okay the redelivery policy are not explicit set to true, so we should allow to check for some 1140 // special situations when using dead letter channel 1141 if (isDeadLetterChannel) { 1142 1143 // DLC is always handling the first thrown exception, 1144 // but if its a new exception then use the configured option 1145 boolean handled = newException == null || data.handleNewException; 1146 1147 // when using DLC then log new exception whether its being handled or not, as otherwise it may appear as 1148 // the DLC swallow new exceptions by default (which is by design to ensure the DLC always complete, 1149 // to avoid causing endless poison messages that fails forever) 1150 if (newException != null && data.currentRedeliveryPolicy.isLogNewException()) { 1151 String uri = URISupport.sanitizeUri(deadLetterUri); 1152 String msg = "New exception occurred during processing by the DeadLetterChannel[" + uri + "] due " + newException.getMessage(); 1153 if (handled) { 1154 msg += ". The new exception is being handled as deadLetterHandleNewException=true."; 1155 } else { 1156 msg += ". The new exception is not handled as deadLetterHandleNewException=false."; 1157 } 1158 logFailedDelivery(false, true, handled, false, true, exchange, msg, data, newException); 1159 } 1160 1161 if (handled) { 1162 log.trace("This exchange is handled so its marked as not failed: {}", exchange); 1163 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE); 1164 return; 1165 } 1166 } 1167 1168 // not handled by default 1169 prepareExchangeAfterFailureNotHandled(exchange); 1170 } 1171 } 1172 1173 private void prepareExchangeAfterFailureNotHandled(Exchange exchange) { 1174 log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange); 1175 // exception not handled, put exception back in the exchange 1176 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE); 1177 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 1178 // and put failure endpoint back as well 1179 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 1180 // and store the route id so we know in which route we failed 1181 UnitOfWork uow = exchange.getUnitOfWork(); 1182 if (uow != null && uow.getRouteContext() != null) { 1183 exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId()); 1184 } 1185 } 1186 1187 private void logFailedDelivery(boolean shouldRedeliver, boolean newException, boolean handled, boolean continued, boolean isDeadLetterChannel, 1188 Exchange exchange, String message, RedeliveryData data, Throwable e) { 1189 if (logger == null) { 1190 return; 1191 } 1192 1193 if (!exchange.isRollbackOnly()) { 1194 if (newException && !data.currentRedeliveryPolicy.isLogNewException()) { 1195 // do not log new exception 1196 return; 1197 } 1198 1199 // if we should not rollback, then check whether logging is enabled 1200 1201 if (!newException && handled && !data.currentRedeliveryPolicy.isLogHandled()) { 1202 // do not log handled 1203 return; 1204 } 1205 1206 if (!newException && continued && !data.currentRedeliveryPolicy.isLogContinued()) { 1207 // do not log handled 1208 return; 1209 } 1210 1211 if (!newException && shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) { 1212 // do not log retry attempts 1213 return; 1214 } 1215 1216 if (!newException && !shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) { 1217 // do not log exhausted 1218 return; 1219 } 1220 } 1221 1222 LoggingLevel newLogLevel; 1223 boolean logStackTrace; 1224 if (exchange.isRollbackOnly()) { 1225 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 1226 logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace(); 1227 } else if (shouldRedeliver) { 1228 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel(); 1229 logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace(); 1230 } else { 1231 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 1232 logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace(); 1233 } 1234 if (e == null) { 1235 e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); 1236 } 1237 1238 if (newException) { 1239 // log at most WARN level 1240 if (newLogLevel == LoggingLevel.ERROR) { 1241 newLogLevel = LoggingLevel.WARN; 1242 } 1243 String msg = message; 1244 if (msg == null) { 1245 msg = "New exception " + ExchangeHelper.logIds(exchange); 1246 // special for logging the new exception 1247 Throwable cause = e; 1248 if (cause != null) { 1249 msg = msg + " due: " + cause.getMessage(); 1250 } 1251 } 1252 1253 if (e != null && logStackTrace) { 1254 logger.log(msg, e, newLogLevel); 1255 } else { 1256 logger.log(msg, newLogLevel); 1257 } 1258 } else if (exchange.isRollbackOnly()) { 1259 String msg = "Rollback " + ExchangeHelper.logIds(exchange); 1260 Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class); 1261 if (cause != null) { 1262 msg = msg + " due: " + cause.getMessage(); 1263 } 1264 1265 // should we include message history 1266 if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) { 1267 // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it) 1268 ExchangeFormatter formatter = customExchangeFormatter 1269 ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null); 1270 String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, false); 1271 if (routeStackTrace != null) { 1272 msg = msg + "\n" + routeStackTrace; 1273 } 1274 } 1275 1276 if (newLogLevel == LoggingLevel.ERROR) { 1277 // log intended rollback on maximum WARN level (no ERROR) 1278 logger.log(msg, LoggingLevel.WARN); 1279 } else { 1280 // otherwise use the desired logging level 1281 logger.log(msg, newLogLevel); 1282 } 1283 } else { 1284 String msg = message; 1285 // should we include message history 1286 if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) { 1287 // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it) 1288 ExchangeFormatter formatter = customExchangeFormatter 1289 ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null); 1290 String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, e != null && logStackTrace); 1291 if (routeStackTrace != null) { 1292 msg = msg + "\n" + routeStackTrace; 1293 } 1294 } 1295 1296 if (e != null && logStackTrace) { 1297 logger.log(msg, e, newLogLevel); 1298 } else { 1299 logger.log(msg, newLogLevel); 1300 } 1301 } 1302 } 1303 1304 /** 1305 * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback). 1306 * <p/> 1307 * If the exchange is exhausted, then we will not continue processing, but let the 1308 * failure processor deal with the exchange. 1309 * 1310 * @param exchange the current exchange 1311 * @param data the redelivery data 1312 * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust. 1313 */ 1314 private boolean isExhausted(Exchange exchange, RedeliveryData data) { 1315 // if marked as rollback only then do not continue/redeliver 1316 boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class); 1317 if (exhausted) { 1318 log.trace("This exchange is marked as redelivery exhausted: {}", exchange); 1319 return true; 1320 } 1321 1322 // if marked as rollback only then do not continue/redeliver 1323 boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class); 1324 if (rollbackOnly) { 1325 log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange); 1326 return true; 1327 } 1328 // its the first original call so continue 1329 if (data.redeliveryCounter == 0) { 1330 return false; 1331 } 1332 // its a potential redelivery so determine if we should redeliver or not 1333 boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate); 1334 return !redeliver; 1335 } 1336 1337 /** 1338 * Determines whether or not to continue if we are exhausted. 1339 * 1340 * @param exchange the current exchange 1341 * @param data the redelivery data 1342 * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust. 1343 */ 1344 private boolean shouldContinue(Exchange exchange, RedeliveryData data) { 1345 if (data.continuedPredicate != null) { 1346 return data.continuedPredicate.matches(exchange); 1347 } 1348 // do not continue by default 1349 return false; 1350 } 1351 1352 /** 1353 * Determines whether or not to handle if we are exhausted. 1354 * 1355 * @param exchange the current exchange 1356 * @param data the redelivery data 1357 * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust. 1358 */ 1359 private boolean shouldHandle(Exchange exchange, RedeliveryData data) { 1360 if (data.handledPredicate != null) { 1361 return data.handledPredicate.matches(exchange); 1362 } 1363 // do not handle by default 1364 return false; 1365 } 1366 1367 /** 1368 * Increments the redelivery counter and adds the redelivered flag if the 1369 * message has been redelivered 1370 */ 1371 private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) { 1372 Message in = exchange.getIn(); 1373 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 1374 int next = 1; 1375 if (counter != null) { 1376 next = counter + 1; 1377 } 1378 in.setHeader(Exchange.REDELIVERY_COUNTER, next); 1379 in.setHeader(Exchange.REDELIVERED, Boolean.TRUE); 1380 // if maximum redeliveries is used, then provide that information as well 1381 if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) { 1382 in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries()); 1383 } 1384 return next; 1385 } 1386 1387 /** 1388 * Prepares the redelivery counter and boolean flag for the failure handle processor 1389 */ 1390 private void decrementRedeliveryCounter(Exchange exchange) { 1391 Message in = exchange.getIn(); 1392 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 1393 if (counter != null) { 1394 int prev = counter - 1; 1395 in.setHeader(Exchange.REDELIVERY_COUNTER, prev); 1396 // set boolean flag according to counter 1397 in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE); 1398 } else { 1399 // not redelivered 1400 in.setHeader(Exchange.REDELIVERY_COUNTER, 0); 1401 in.setHeader(Exchange.REDELIVERED, Boolean.FALSE); 1402 } 1403 } 1404 1405 /** 1406 * Determines if redelivery is enabled by checking if any of the redelivery policy 1407 * settings may allow redeliveries. 1408 * 1409 * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise 1410 * @throws Exception can be thrown 1411 */ 1412 private boolean determineIfRedeliveryIsEnabled() throws Exception { 1413 // determine if redeliver is enabled either on error handler 1414 if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) { 1415 // must check for != 0 as (-1 means redeliver forever) 1416 return true; 1417 } 1418 if (retryWhilePolicy != null) { 1419 return true; 1420 } 1421 1422 // or on the exception policies 1423 if (!exceptionPolicies.isEmpty()) { 1424 // walk them to see if any of them have a maximum redeliveries > 0 or retry until set 1425 for (OnExceptionDefinition def : exceptionPolicies.values()) { 1426 1427 String ref = def.getRedeliveryPolicyRef(); 1428 if (ref != null) { 1429 // lookup in registry if ref provided 1430 RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class); 1431 if (policy.getMaximumRedeliveries() != 0) { 1432 // must check for != 0 as (-1 means redeliver forever) 1433 return true; 1434 } 1435 } else if (def.getRedeliveryPolicy() != null) { 1436 Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries()); 1437 if (max != null && max != 0) { 1438 // must check for != 0 as (-1 means redeliver forever) 1439 return true; 1440 } 1441 } 1442 1443 if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) { 1444 return true; 1445 } 1446 } 1447 } 1448 1449 return false; 1450 } 1451 1452 /** 1453 * Gets the number of exchanges that are pending for redelivery 1454 */ 1455 public int getPendingRedeliveryCount() { 1456 int answer = redeliverySleepCounter.get(); 1457 if (executorService != null && executorService instanceof ThreadPoolExecutor) { 1458 answer += ((ThreadPoolExecutor) executorService).getQueue().size(); 1459 } 1460 1461 return answer; 1462 } 1463 1464 @Override 1465 protected void doStart() throws Exception { 1466 ServiceHelper.startServices(output, outputAsync, deadLetter); 1467 1468 // determine if redeliver is enabled or not 1469 redeliveryEnabled = determineIfRedeliveryIsEnabled(); 1470 if (log.isTraceEnabled()) { 1471 log.trace("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this); 1472 } 1473 1474 // we only need thread pool if redelivery is enabled 1475 if (redeliveryEnabled) { 1476 if (executorService == null) { 1477 // use default shared executor service 1478 executorService = camelContext.getErrorHandlerExecutorService(); 1479 } 1480 if (log.isDebugEnabled()) { 1481 log.debug("Using ExecutorService: {} for redeliveries on error handler: {}", executorService, this); 1482 } 1483 } 1484 1485 // reset flag when starting 1486 preparingShutdown = false; 1487 redeliverySleepCounter.set(0); 1488 } 1489 1490 @Override 1491 protected void doStop() throws Exception { 1492 // noop, do not stop any services which we only do when shutting down 1493 // as the error handler can be context scoped, and should not stop in case 1494 // a route stops 1495 } 1496 1497 @Override 1498 protected void doShutdown() throws Exception { 1499 ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync); 1500 } 1501}