001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.Callable; 022import java.util.concurrent.CountDownLatch; 023import java.util.concurrent.RejectedExecutionException; 024import java.util.concurrent.ScheduledExecutorService; 025import java.util.concurrent.ThreadPoolExecutor; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicInteger; 028 029import org.apache.camel.AsyncCallback; 030import org.apache.camel.AsyncProcessor; 031import org.apache.camel.CamelContext; 032import org.apache.camel.Exchange; 033import org.apache.camel.LoggingLevel; 034import org.apache.camel.Message; 035import org.apache.camel.Navigate; 036import org.apache.camel.Predicate; 037import org.apache.camel.Processor; 038import org.apache.camel.model.OnExceptionDefinition; 039import org.apache.camel.spi.AsyncProcessorAwaitManager; 040import org.apache.camel.spi.ExchangeFormatter; 041import org.apache.camel.spi.ShutdownPrepared; 042import org.apache.camel.spi.SubUnitOfWorkCallback; 043import org.apache.camel.spi.UnitOfWork; 044import org.apache.camel.util.AsyncProcessorConverterHelper; 045import org.apache.camel.util.AsyncProcessorHelper; 046import org.apache.camel.util.CamelContextHelper; 047import org.apache.camel.util.CamelLogger; 048import org.apache.camel.util.EventHelper; 049import org.apache.camel.util.ExchangeHelper; 050import org.apache.camel.util.MessageHelper; 051import org.apache.camel.util.ObjectHelper; 052import org.apache.camel.util.ServiceHelper; 053import org.apache.camel.util.StopWatch; 054import org.apache.camel.util.URISupport; 055 056/** 057 * Base redeliverable error handler that also supports a final dead letter queue in case 058 * all redelivery attempts fail. 059 * <p/> 060 * This implementation should contain all the error handling logic and the sub classes 061 * should only configure it according to what they support. 062 * 063 * @version 064 */ 065public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport implements AsyncProcessor, ShutdownPrepared, Navigate<Processor> { 066 067 protected final AtomicInteger redeliverySleepCounter = new AtomicInteger(); 068 protected ScheduledExecutorService executorService; 069 protected final CamelContext camelContext; 070 protected final AsyncProcessorAwaitManager awaitManager; 071 protected final Processor deadLetter; 072 protected final String deadLetterUri; 073 protected final boolean deadLetterHandleNewException; 074 protected final Processor output; 075 protected final AsyncProcessor outputAsync; 076 protected final Processor redeliveryProcessor; 077 protected final RedeliveryPolicy redeliveryPolicy; 078 protected final Predicate retryWhilePolicy; 079 protected final CamelLogger logger; 080 protected final boolean useOriginalMessagePolicy; 081 protected boolean redeliveryEnabled; 082 protected volatile boolean preparingShutdown; 083 protected final ExchangeFormatter exchangeFormatter; 084 protected final boolean customExchangeFormatter; 085 protected final Processor onPrepareProcessor; 086 protected final Processor onExceptionProcessor; 087 088 /** 089 * Contains the current redelivery data 090 */ 091 protected class RedeliveryData { 092 Exchange original; 093 boolean sync = true; 094 int redeliveryCounter; 095 long redeliveryDelay; 096 Predicate retryWhilePredicate; 097 boolean redeliverFromSync; 098 099 // default behavior which can be overloaded on a per exception basis 100 RedeliveryPolicy currentRedeliveryPolicy; 101 Processor deadLetterProcessor; 102 Processor failureProcessor; 103 Processor onRedeliveryProcessor; 104 Processor onPrepareProcessor; 105 Processor onExceptionProcessor; 106 Predicate handledPredicate; 107 Predicate continuedPredicate; 108 boolean useOriginalInMessage; 109 boolean handleNewException; 110 111 public RedeliveryData() { 112 // init with values from the error handler 113 this.retryWhilePredicate = retryWhilePolicy; 114 this.currentRedeliveryPolicy = redeliveryPolicy; 115 this.deadLetterProcessor = deadLetter; 116 this.onRedeliveryProcessor = redeliveryProcessor; 117 this.onPrepareProcessor = RedeliveryErrorHandler.this.onPrepareProcessor; 118 this.onExceptionProcessor = RedeliveryErrorHandler.this.onExceptionProcessor; 119 this.handledPredicate = getDefaultHandledPredicate(); 120 this.useOriginalInMessage = useOriginalMessagePolicy; 121 this.handleNewException = deadLetterHandleNewException; 122 } 123 } 124 125 /** 126 * Task for sleeping during redelivery attempts. 127 * <p/> 128 * This task is for the synchronous blocking. If using async delayed then a scheduled thread pool 129 * is used for sleeping and trigger redeliveries. 130 */ 131 private final class RedeliverSleepTask { 132 133 private final RedeliveryPolicy policy; 134 private final long delay; 135 136 RedeliverSleepTask(RedeliveryPolicy policy, long delay) { 137 this.policy = policy; 138 this.delay = delay; 139 } 140 141 public boolean sleep() throws InterruptedException { 142 // for small delays then just sleep 143 if (delay < 1000) { 144 policy.sleep(delay); 145 return true; 146 } 147 148 StopWatch watch = new StopWatch(); 149 150 log.debug("Sleeping for: {} millis until attempting redelivery", delay); 151 while (watch.taken() < delay) { 152 // sleep using 1 sec interval 153 154 long delta = delay - watch.taken(); 155 long max = Math.min(1000, delta); 156 if (max > 0) { 157 log.trace("Sleeping for: {} millis until waking up for re-check", max); 158 Thread.sleep(max); 159 } 160 161 // are we preparing for shutdown then only do redelivery if allowed 162 if (preparingShutdown && !policy.isAllowRedeliveryWhileStopping()) { 163 log.debug("Rejected redelivery while stopping"); 164 return false; 165 } 166 } 167 168 return true; 169 } 170 } 171 172 /** 173 * Tasks which performs asynchronous redelivery attempts, and being triggered by a 174 * {@link java.util.concurrent.ScheduledExecutorService} to avoid having any threads blocking if a task 175 * has to be delayed before a redelivery attempt is performed. 176 */ 177 private final class AsyncRedeliveryTask implements Callable<Boolean> { 178 179 private final Exchange exchange; 180 private final AsyncCallback callback; 181 private final RedeliveryData data; 182 183 AsyncRedeliveryTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) { 184 this.exchange = exchange; 185 this.callback = callback; 186 this.data = data; 187 } 188 189 public Boolean call() throws Exception { 190 // prepare for redelivery 191 prepareExchangeForRedelivery(exchange, data); 192 193 // letting onRedeliver be executed at first 194 deliverToOnRedeliveryProcessor(exchange, data); 195 196 if (log.isTraceEnabled()) { 197 log.trace("Redelivering exchangeId: {} -> {} for Exchange: {}", new Object[]{exchange.getExchangeId(), outputAsync, exchange}); 198 } 199 200 // emmit event we are doing redelivery 201 EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter); 202 203 // process the exchange (also redelivery) 204 boolean sync; 205 if (data.redeliverFromSync) { 206 // this redelivery task was scheduled from synchronous, which we forced to be asynchronous from 207 // this error handler, which means we have to invoke the callback with false, to have the callback 208 // be notified when we are done 209 sync = outputAsync.process(exchange, new AsyncCallback() { 210 public void done(boolean doneSync) { 211 log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); 212 213 // mark we are in sync mode now 214 data.sync = false; 215 216 // only process if the exchange hasn't failed 217 // and it has not been handled by the error processor 218 if (isDone(exchange)) { 219 callback.done(false); 220 return; 221 } 222 223 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 224 processAsyncErrorHandler(exchange, callback, data); 225 } 226 }); 227 } else { 228 // this redelivery task was scheduled from asynchronous, which means we should only 229 // handle when the asynchronous task was done 230 sync = outputAsync.process(exchange, new AsyncCallback() { 231 public void done(boolean doneSync) { 232 log.trace("Redelivering exchangeId: {} done sync: {}", exchange.getExchangeId(), doneSync); 233 234 // this callback should only handle the async case 235 if (doneSync) { 236 return; 237 } 238 239 // mark we are in async mode now 240 data.sync = false; 241 242 // only process if the exchange hasn't failed 243 // and it has not been handled by the error processor 244 if (isDone(exchange)) { 245 callback.done(doneSync); 246 return; 247 } 248 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 249 processAsyncErrorHandler(exchange, callback, data); 250 } 251 }); 252 } 253 254 return sync; 255 } 256 } 257 258 public RedeliveryErrorHandler(CamelContext camelContext, Processor output, CamelLogger logger, 259 Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Processor deadLetter, 260 String deadLetterUri, boolean deadLetterHandleNewException, boolean useOriginalMessagePolicy, 261 Predicate retryWhile, ScheduledExecutorService executorService, Processor onPrepareProcessor, Processor onExceptionProcessor) { 262 263 ObjectHelper.notNull(camelContext, "CamelContext", this); 264 ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this); 265 266 this.camelContext = camelContext; 267 this.awaitManager = camelContext.getAsyncProcessorAwaitManager(); 268 this.redeliveryProcessor = redeliveryProcessor; 269 this.deadLetter = deadLetter; 270 this.output = output; 271 this.outputAsync = AsyncProcessorConverterHelper.convert(output); 272 this.redeliveryPolicy = redeliveryPolicy; 273 this.logger = logger; 274 this.deadLetterUri = deadLetterUri; 275 this.deadLetterHandleNewException = deadLetterHandleNewException; 276 this.useOriginalMessagePolicy = useOriginalMessagePolicy; 277 this.retryWhilePolicy = retryWhile; 278 this.executorService = executorService; 279 this.onPrepareProcessor = onPrepareProcessor; 280 this.onExceptionProcessor = onExceptionProcessor; 281 282 if (ObjectHelper.isNotEmpty(redeliveryPolicy.getExchangeFormatterRef())) { 283 ExchangeFormatter formatter = camelContext.getRegistry().lookupByNameAndType(redeliveryPolicy.getExchangeFormatterRef(), ExchangeFormatter.class); 284 if (formatter != null) { 285 this.exchangeFormatter = formatter; 286 this.customExchangeFormatter = true; 287 } else { 288 throw new IllegalArgumentException("Cannot find the exchangeFormatter by using reference id " + redeliveryPolicy.getExchangeFormatterRef()); 289 } 290 } else { 291 this.customExchangeFormatter = false; 292 // setup exchange formatter to be used for message history dump 293 DefaultExchangeFormatter formatter = new DefaultExchangeFormatter(); 294 formatter.setShowExchangeId(true); 295 formatter.setMultiline(true); 296 formatter.setShowHeaders(true); 297 formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed); 298 try { 299 Integer maxChars = CamelContextHelper.parseInteger(camelContext, camelContext.getProperty(Exchange.LOG_DEBUG_BODY_MAX_CHARS)); 300 if (maxChars != null) { 301 formatter.setMaxChars(maxChars); 302 } 303 } catch (Exception e) { 304 throw ObjectHelper.wrapRuntimeCamelException(e); 305 } 306 this.exchangeFormatter = formatter; 307 } 308 } 309 310 public boolean supportTransacted() { 311 return false; 312 } 313 314 @Override 315 public boolean hasNext() { 316 return output != null; 317 } 318 319 @Override 320 public List<Processor> next() { 321 if (!hasNext()) { 322 return null; 323 } 324 List<Processor> answer = new ArrayList<Processor>(1); 325 answer.add(output); 326 return answer; 327 } 328 329 protected boolean isRunAllowed(RedeliveryData data) { 330 // if camel context is forcing a shutdown then do not allow running 331 boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this); 332 if (forceShutdown) { 333 log.trace("isRunAllowed() -> false (Run not allowed as ShutdownStrategy is forcing shutting down)"); 334 return false; 335 } 336 337 // redelivery policy can control if redelivery is allowed during stopping/shutdown 338 // but this only applies during a redelivery (counter must > 0) 339 if (data.redeliveryCounter > 0) { 340 if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) { 341 log.trace("isRunAllowed() -> true (Run allowed as RedeliverWhileStopping is enabled)"); 342 return true; 343 } else if (preparingShutdown) { 344 // we are preparing for shutdown, now determine if we can still run 345 boolean answer = isRunAllowedOnPreparingShutdown(); 346 log.trace("isRunAllowed() -> {} (Run not allowed as we are preparing for shutdown)", answer); 347 return answer; 348 } 349 } 350 351 // we cannot run if we are stopping/stopped 352 boolean answer = !isStoppingOrStopped(); 353 log.trace("isRunAllowed() -> {} (Run allowed if we are not stopped/stopping)", answer); 354 return answer; 355 } 356 357 protected boolean isRunAllowedOnPreparingShutdown() { 358 return false; 359 } 360 361 protected boolean isRedeliveryAllowed(RedeliveryData data) { 362 // redelivery policy can control if redelivery is allowed during stopping/shutdown 363 // but this only applies during a redelivery (counter must > 0) 364 if (data.redeliveryCounter > 0) { 365 boolean stopping = isStoppingOrStopped(); 366 if (!preparingShutdown && !stopping) { 367 log.trace("isRedeliveryAllowed() -> true (we are not stopping/stopped)"); 368 return true; 369 } else { 370 // we are stopping or preparing to shutdown 371 if (data.currentRedeliveryPolicy.allowRedeliveryWhileStopping) { 372 log.trace("isRedeliveryAllowed() -> true (Redelivery allowed as RedeliverWhileStopping is enabled)"); 373 return true; 374 } else { 375 log.trace("isRedeliveryAllowed() -> false (Redelivery not allowed as RedeliverWhileStopping is disabled)"); 376 return false; 377 } 378 } 379 } 380 381 return true; 382 } 383 384 @Override 385 public void prepareShutdown(boolean suspendOnly, boolean forced) { 386 // prepare for shutdown, eg do not allow redelivery if configured 387 log.trace("Prepare shutdown on error handler {}", this); 388 preparingShutdown = true; 389 } 390 391 public void process(Exchange exchange) throws Exception { 392 if (output == null) { 393 // no output then just return 394 return; 395 } 396 397 // inline org.apache.camel.util.AsyncProcessorHelper.process(org.apache.camel.AsyncProcessor, org.apache.camel.Exchange) 398 // to optimize and reduce stacktrace lengths 399 final CountDownLatch latch = new CountDownLatch(1); 400 boolean sync = process(exchange, new AsyncCallback() { 401 public void done(boolean doneSync) { 402 if (!doneSync) { 403 awaitManager.countDown(exchange, latch); 404 } 405 } 406 }); 407 if (!sync) { 408 awaitManager.await(exchange, latch); 409 } 410 } 411 412 /** 413 * Process the exchange using redelivery error handling. 414 */ 415 public boolean process(final Exchange exchange, final AsyncCallback callback) { 416 final RedeliveryData data = new RedeliveryData(); 417 418 // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the 419 // original Exchange is being redelivered, and not a mutated Exchange 420 data.original = defensiveCopyExchangeIfNeeded(exchange); 421 422 // use looping to have redelivery attempts 423 while (true) { 424 425 // can we still run 426 if (!isRunAllowed(data)) { 427 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 428 if (exchange.getException() == null) { 429 exchange.setException(new RejectedExecutionException()); 430 } 431 // we cannot process so invoke callback 432 callback.done(data.sync); 433 return data.sync; 434 } 435 436 // did previous processing cause an exception? 437 boolean handle = shouldHandleException(exchange); 438 if (handle) { 439 handleException(exchange, data, isDeadLetterChannel()); 440 onExceptionOccurred(exchange, data); 441 } 442 443 // compute if we are exhausted, and whether redelivery is allowed 444 boolean exhausted = isExhausted(exchange, data); 445 boolean redeliverAllowed = isRedeliveryAllowed(data); 446 447 // if we are exhausted or redelivery is not allowed, then deliver to failure processor (eg such as DLC) 448 if (!redeliverAllowed || exhausted) { 449 Processor target = null; 450 boolean deliver = true; 451 452 // the unit of work may have an optional callback associated we need to leverage 453 SubUnitOfWorkCallback uowCallback = exchange.getUnitOfWork().getSubUnitOfWorkCallback(); 454 if (uowCallback != null) { 455 // signal to the callback we are exhausted 456 uowCallback.onExhausted(exchange); 457 // do not deliver to the failure processor as its been handled by the callback instead 458 deliver = false; 459 } 460 461 if (deliver) { 462 // should deliver to failure processor (either from onException or the dead letter channel) 463 target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor; 464 } 465 // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair 466 // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) 467 boolean isDeadLetterChannel = isDeadLetterChannel() && (target == null || target == data.deadLetterProcessor); 468 boolean sync = deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback); 469 // we are breaking out 470 return sync; 471 } 472 473 if (data.redeliveryCounter > 0) { 474 // calculate delay 475 data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter); 476 477 if (data.redeliveryDelay > 0) { 478 // okay there is a delay so create a scheduled task to have it executed in the future 479 480 if (data.currentRedeliveryPolicy.isAsyncDelayedRedelivery() && !exchange.isTransacted()) { 481 482 // we are doing a redelivery then a thread pool must be configured (see the doStart method) 483 ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this); 484 485 // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to 486 // have it being executed in the future, or immediately 487 // we are continuing asynchronously 488 489 // mark we are routing async from now and that this redelivery task came from a synchronous routing 490 data.sync = false; 491 data.redeliverFromSync = true; 492 AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); 493 494 // schedule the redelivery task 495 if (log.isTraceEnabled()) { 496 log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId()); 497 } 498 executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS); 499 500 return false; 501 } else { 502 // async delayed redelivery was disabled or we are transacted so we must be synchronous 503 // as the transaction manager requires to execute in the same thread context 504 try { 505 // we are doing synchronous redelivery and use thread sleep, so we keep track using a counter how many are sleeping 506 redeliverySleepCounter.incrementAndGet(); 507 RedeliverSleepTask task = new RedeliverSleepTask(data.currentRedeliveryPolicy, data.redeliveryDelay); 508 boolean complete = task.sleep(); 509 redeliverySleepCounter.decrementAndGet(); 510 if (!complete) { 511 // the task was rejected 512 exchange.setException(new RejectedExecutionException("Redelivery not allowed while stopping")); 513 // mark the exchange as redelivery exhausted so the failure processor / dead letter channel can process the exchange 514 exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); 515 // jump to start of loop which then detects that we are failed and exhausted 516 continue; 517 } 518 } catch (InterruptedException e) { 519 redeliverySleepCounter.decrementAndGet(); 520 // we was interrupted so break out 521 exchange.setException(e); 522 // mark the exchange to stop continue routing when interrupted 523 // as we do not want to continue routing (for example a task has been cancelled) 524 exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); 525 callback.done(data.sync); 526 return data.sync; 527 } 528 } 529 } 530 531 // prepare for redelivery 532 prepareExchangeForRedelivery(exchange, data); 533 534 // letting onRedeliver be executed 535 deliverToOnRedeliveryProcessor(exchange, data); 536 537 // emmit event we are doing redelivery 538 EventHelper.notifyExchangeRedelivery(exchange.getContext(), exchange, data.redeliveryCounter); 539 } 540 541 // process the exchange (also redelivery) 542 boolean sync = outputAsync.process(exchange, new AsyncCallback() { 543 public void done(boolean sync) { 544 // this callback should only handle the async case 545 if (sync) { 546 return; 547 } 548 549 // mark we are in async mode now 550 data.sync = false; 551 552 // if we are done then notify callback and exit 553 if (isDone(exchange)) { 554 callback.done(sync); 555 return; 556 } 557 558 // error occurred so loop back around which we do by invoking the processAsyncErrorHandler 559 // method which takes care of this in a asynchronous manner 560 processAsyncErrorHandler(exchange, callback, data); 561 } 562 }); 563 564 if (!sync) { 565 // the remainder of the Exchange is being processed asynchronously so we should return 566 return false; 567 } 568 // we continue to route synchronously 569 570 // if we are done then notify callback and exit 571 boolean done = isDone(exchange); 572 if (done) { 573 callback.done(true); 574 return true; 575 } 576 577 // error occurred so loop back around..... 578 } 579 } 580 581 /** 582 * <p>Determines the redelivery delay time by first inspecting the Message header {@link Exchange#REDELIVERY_DELAY} 583 * and if not present, defaulting to {@link RedeliveryPolicy#calculateRedeliveryDelay(long, int)}</p> 584 * 585 * <p>In order to prevent manipulation of the RedeliveryData state, the values of {@link RedeliveryData#redeliveryDelay} 586 * and {@link RedeliveryData#redeliveryCounter} are copied in.</p> 587 * 588 * @param exchange The current exchange in question. 589 * @param redeliveryPolicy The RedeliveryPolicy to use in the calculation. 590 * @param redeliveryDelay The default redelivery delay from RedeliveryData 591 * @param redeliveryCounter The redeliveryCounter 592 * @return The time to wait before the next redelivery. 593 */ 594 protected long determineRedeliveryDelay(Exchange exchange, RedeliveryPolicy redeliveryPolicy, long redeliveryDelay, int redeliveryCounter) { 595 Message message = exchange.getIn(); 596 Long delay = message.getHeader(Exchange.REDELIVERY_DELAY, Long.class); 597 if (delay == null) { 598 delay = redeliveryPolicy.calculateRedeliveryDelay(redeliveryDelay, redeliveryCounter); 599 log.debug("Redelivery delay calculated as {}", delay); 600 } else { 601 log.debug("Redelivery delay is {} from Message Header [{}]", delay, Exchange.REDELIVERY_DELAY); 602 } 603 return delay; 604 } 605 606 /** 607 * This logic is only executed if we have to retry redelivery asynchronously, which have to be done from the callback. 608 * <p/> 609 * And therefore the logic is a bit different than the synchronous <tt>processErrorHandler</tt> method which can use 610 * a loop based redelivery technique. However this means that these two methods in overall have to be in <b>sync</b> 611 * in terms of logic. 612 */ 613 protected void processAsyncErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 614 // can we still run 615 if (!isRunAllowed(data)) { 616 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 617 if (exchange.getException() == null) { 618 exchange.setException(new RejectedExecutionException()); 619 } 620 callback.done(data.sync); 621 return; 622 } 623 624 // did previous processing cause an exception? 625 boolean handle = shouldHandleException(exchange); 626 if (handle) { 627 handleException(exchange, data, isDeadLetterChannel()); 628 onExceptionOccurred(exchange, data); 629 } 630 631 // compute if we are exhausted or not 632 boolean exhausted = isExhausted(exchange, data); 633 if (exhausted) { 634 Processor target = null; 635 boolean deliver = true; 636 637 // the unit of work may have an optional callback associated we need to leverage 638 UnitOfWork uow = exchange.getUnitOfWork(); 639 if (uow != null) { 640 SubUnitOfWorkCallback uowCallback = uow.getSubUnitOfWorkCallback(); 641 if (uowCallback != null) { 642 // signal to the callback we are exhausted 643 uowCallback.onExhausted(exchange); 644 // do not deliver to the failure processor as its been handled by the callback instead 645 deliver = false; 646 } 647 } 648 649 if (deliver) { 650 // should deliver to failure processor (either from onException or the dead letter channel) 651 target = data.failureProcessor != null ? data.failureProcessor : data.deadLetterProcessor; 652 } 653 // we should always invoke the deliverToFailureProcessor as it prepares, logs and does a fair 654 // bit of work for exhausted exchanges (its only the target processor which may be null if handled by a savepoint) 655 boolean isDeadLetterChannel = isDeadLetterChannel() && target == data.deadLetterProcessor; 656 deliverToFailureProcessor(target, isDeadLetterChannel, exchange, data, callback); 657 // we are breaking out 658 return; 659 } 660 661 if (data.redeliveryCounter > 0) { 662 // we are doing a redelivery then a thread pool must be configured (see the doStart method) 663 ObjectHelper.notNull(executorService, "Redelivery is enabled but ExecutorService has not been configured.", this); 664 665 // let the RedeliverTask be the logic which tries to redeliver the Exchange which we can used a scheduler to 666 // have it being executed in the future, or immediately 667 // Note: the data.redeliverFromSync should be kept as is, in case it was enabled previously 668 // to ensure the callback will continue routing from where we left 669 AsyncRedeliveryTask task = new AsyncRedeliveryTask(exchange, callback, data); 670 671 // calculate the redelivery delay 672 data.redeliveryDelay = determineRedeliveryDelay(exchange, data.currentRedeliveryPolicy, data.redeliveryDelay, data.redeliveryCounter); 673 674 if (data.redeliveryDelay > 0) { 675 // schedule the redelivery task 676 if (log.isTraceEnabled()) { 677 log.trace("Scheduling redelivery task to run in {} millis for exchangeId: {}", data.redeliveryDelay, exchange.getExchangeId()); 678 } 679 executorService.schedule(task, data.redeliveryDelay, TimeUnit.MILLISECONDS); 680 } else { 681 // execute the task immediately 682 executorService.submit(task); 683 } 684 } 685 } 686 687 /** 688 * Performs a defensive copy of the exchange if needed 689 * 690 * @param exchange the exchange 691 * @return the defensive copy, or <tt>null</tt> if not needed (redelivery is not enabled). 692 */ 693 protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) { 694 // only do a defensive copy if redelivery is enabled 695 if (redeliveryEnabled) { 696 return ExchangeHelper.createCopy(exchange, true); 697 } else { 698 return null; 699 } 700 } 701 702 /** 703 * Strategy whether the exchange has an exception that we should try to handle. 704 * <p/> 705 * Standard implementations should just look for an exception. 706 */ 707 protected boolean shouldHandleException(Exchange exchange) { 708 return exchange.getException() != null; 709 } 710 711 /** 712 * Strategy to determine if the exchange is done so we can continue 713 */ 714 protected boolean isDone(Exchange exchange) { 715 boolean answer = isCancelledOrInterrupted(exchange); 716 717 // only done if the exchange hasn't failed 718 // and it has not been handled by the failure processor 719 // or we are exhausted 720 if (!answer) { 721 answer = exchange.getException() == null 722 || ExchangeHelper.isFailureHandled(exchange) 723 || ExchangeHelper.isRedeliveryExhausted(exchange); 724 } 725 726 log.trace("Is exchangeId: {} done? {}", exchange.getExchangeId(), answer); 727 return answer; 728 } 729 730 /** 731 * Strategy to determine if the exchange was cancelled or interrupted 732 */ 733 protected boolean isCancelledOrInterrupted(Exchange exchange) { 734 boolean answer = false; 735 736 if (ExchangeHelper.isInterrupted(exchange)) { 737 // mark the exchange to stop continue routing when interrupted 738 // as we do not want to continue routing (for example a task has been cancelled) 739 exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE); 740 answer = true; 741 } 742 743 log.trace("Is exchangeId: {} interrupted? {}", exchange.getExchangeId(), answer); 744 return answer; 745 } 746 747 /** 748 * Returns the output processor 749 */ 750 public Processor getOutput() { 751 return output; 752 } 753 754 /** 755 * Returns the dead letter that message exchanges will be sent to if the 756 * redelivery attempts fail 757 */ 758 public Processor getDeadLetter() { 759 return deadLetter; 760 } 761 762 public String getDeadLetterUri() { 763 return deadLetterUri; 764 } 765 766 public boolean isUseOriginalMessagePolicy() { 767 return useOriginalMessagePolicy; 768 } 769 770 public boolean isDeadLetterHandleNewException() { 771 return deadLetterHandleNewException; 772 } 773 774 public RedeliveryPolicy getRedeliveryPolicy() { 775 return redeliveryPolicy; 776 } 777 778 public CamelLogger getLogger() { 779 return logger; 780 } 781 782 protected Predicate getDefaultHandledPredicate() { 783 // Default is not not handle errors 784 return null; 785 } 786 787 protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) { 788 Exception caught = exchange.getException(); 789 790 // we continue so clear any exceptions 791 exchange.setException(null); 792 // clear rollback flags 793 exchange.setProperty(Exchange.ROLLBACK_ONLY, null); 794 // reset cached streams so they can be read again 795 MessageHelper.resetStreamCache(exchange.getIn()); 796 797 // its continued then remove traces of redelivery attempted and caught exception 798 exchange.getIn().removeHeader(Exchange.REDELIVERED); 799 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); 800 exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); 801 exchange.removeProperty(Exchange.FAILURE_HANDLED); 802 // keep the Exchange.EXCEPTION_CAUGHT as property so end user knows the caused exception 803 804 // create log message 805 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); 806 msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; 807 msg = msg + ". Handled and continue routing."; 808 809 // log that we failed but want to continue 810 logFailedDelivery(false, false, false, true, isDeadLetterChannel, exchange, msg, data, null); 811 } 812 813 protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) { 814 if (!redeliveryEnabled) { 815 throw new IllegalStateException("Redelivery is not enabled on " + this + ". Make sure you have configured the error handler properly."); 816 } 817 // there must be a defensive copy of the exchange 818 ObjectHelper.notNull(data.original, "Defensive copy of Exchange is null", this); 819 820 // okay we will give it another go so clear the exception so we can try again 821 exchange.setException(null); 822 823 // clear rollback flags 824 exchange.setProperty(Exchange.ROLLBACK_ONLY, null); 825 826 // TODO: We may want to store these as state on RedeliveryData so we keep them in case end user messes with Exchange 827 // and then put these on the exchange when doing a redelivery / fault processor 828 829 // preserve these headers 830 Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 831 Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class); 832 Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class); 833 834 // we are redelivering so copy from original back to exchange 835 exchange.getIn().copyFrom(data.original.getIn()); 836 exchange.setOut(null); 837 // reset cached streams so they can be read again 838 MessageHelper.resetStreamCache(exchange.getIn()); 839 840 // put back headers 841 if (redeliveryCounter != null) { 842 exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter); 843 } 844 if (redeliveryMaxCounter != null) { 845 exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter); 846 } 847 if (redelivered != null) { 848 exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered); 849 } 850 } 851 852 protected void handleException(Exchange exchange, RedeliveryData data, boolean isDeadLetterChannel) { 853 Exception e = exchange.getException(); 854 855 // store the original caused exception in a property, so we can restore it later 856 exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); 857 858 // find the error handler to use (if any) 859 OnExceptionDefinition exceptionPolicy = getExceptionPolicy(exchange, e); 860 if (exceptionPolicy != null) { 861 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), data.currentRedeliveryPolicy); 862 data.handledPredicate = exceptionPolicy.getHandledPolicy(); 863 data.continuedPredicate = exceptionPolicy.getContinuedPolicy(); 864 data.retryWhilePredicate = exceptionPolicy.getRetryWhilePolicy(); 865 data.useOriginalInMessage = exceptionPolicy.getUseOriginalMessagePolicy() != null && exceptionPolicy.getUseOriginalMessagePolicy(); 866 867 // route specific failure handler? 868 Processor processor = null; 869 UnitOfWork uow = exchange.getUnitOfWork(); 870 if (uow != null && uow.getRouteContext() != null) { 871 String routeId = uow.getRouteContext().getRoute().getId(); 872 processor = exceptionPolicy.getErrorHandler(routeId); 873 } else if (!exceptionPolicy.getErrorHandlers().isEmpty()) { 874 // note this should really not happen, but we have this code as a fail safe 875 // to be backwards compatible with the old behavior 876 log.warn("Cannot determine current route from Exchange with id: {}, will fallback and use first error handler.", exchange.getExchangeId()); 877 processor = exceptionPolicy.getErrorHandlers().iterator().next(); 878 } 879 if (processor != null) { 880 data.failureProcessor = processor; 881 } 882 883 // route specific on redelivery? 884 processor = exceptionPolicy.getOnRedelivery(); 885 if (processor != null) { 886 data.onRedeliveryProcessor = processor; 887 } 888 // route specific on exception occurred? 889 processor = exceptionPolicy.getOnExceptionOccurred(); 890 if (processor != null) { 891 data.onExceptionProcessor = processor; 892 } 893 } 894 895 // only log if not failure handled or not an exhausted unit of work 896 if (!ExchangeHelper.isFailureHandled(exchange) && !ExchangeHelper.isUnitOfWorkExhausted(exchange)) { 897 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange) 898 + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e; 899 logFailedDelivery(true, false, false, false, isDeadLetterChannel, exchange, msg, data, e); 900 } 901 902 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e, data); 903 } 904 905 /** 906 * Gives an optional configured OnExceptionOccurred processor a chance to process just after an exception 907 * was thrown while processing the Exchange. This allows to execute the processor at the same time the exception was thrown. 908 */ 909 protected void onExceptionOccurred(Exchange exchange, final RedeliveryData data) { 910 if (data.onExceptionProcessor == null) { 911 return; 912 } 913 914 // run this synchronously as its just a Processor 915 try { 916 if (log.isTraceEnabled()) { 917 log.trace("OnExceptionOccurred processor {} is processing Exchange: {} due exception occurred", data.onExceptionProcessor, exchange); 918 } 919 data.onExceptionProcessor.process(exchange); 920 } catch (Throwable e) { 921 // we dont not want new exception to override existing, so log it as a WARN 922 log.warn("Error during processing OnExceptionOccurred. This exception is ignored.", e); 923 } 924 log.trace("OnExceptionOccurred processor done"); 925 } 926 927 /** 928 * Gives an optional configured redelivery processor a chance to process before the Exchange 929 * will be redelivered. This can be used to alter the Exchange. 930 */ 931 protected void deliverToOnRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) { 932 if (data.onRedeliveryProcessor == null) { 933 return; 934 } 935 936 if (log.isTraceEnabled()) { 937 log.trace("Redelivery processor {} is processing Exchange: {} before its redelivered", 938 data.onRedeliveryProcessor, exchange); 939 } 940 941 // run this synchronously as its just a Processor 942 try { 943 data.onRedeliveryProcessor.process(exchange); 944 } catch (Throwable e) { 945 exchange.setException(e); 946 } 947 log.trace("Redelivery processor done"); 948 } 949 950 /** 951 * All redelivery attempts failed so move the exchange to the dead letter queue 952 */ 953 protected boolean deliverToFailureProcessor(final Processor processor, final boolean isDeadLetterChannel, final Exchange exchange, 954 final RedeliveryData data, final AsyncCallback callback) { 955 boolean sync = true; 956 957 Exception caught = exchange.getException(); 958 959 // we did not success with the redelivery so now we let the failure processor handle it 960 // clear exception as we let the failure processor handle it 961 exchange.setException(null); 962 963 final boolean shouldHandle = shouldHandle(exchange, data); 964 final boolean shouldContinue = shouldContinue(exchange, data); 965 966 // regard both handled or continued as being handled 967 boolean handled = false; 968 969 // always handle if dead letter channel 970 boolean handleOrContinue = isDeadLetterChannel || shouldHandle || shouldContinue; 971 if (handleOrContinue) { 972 // its handled then remove traces of redelivery attempted 973 exchange.getIn().removeHeader(Exchange.REDELIVERED); 974 exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); 975 exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); 976 exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); 977 978 // and remove traces of rollback only and uow exhausted markers 979 exchange.removeProperty(Exchange.ROLLBACK_ONLY); 980 exchange.removeProperty(Exchange.UNIT_OF_WORK_EXHAUSTED); 981 982 handled = true; 983 } else { 984 // must decrement the redelivery counter as we didn't process the redelivery but is 985 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync 986 decrementRedeliveryCounter(exchange); 987 } 988 989 // we should allow using the failure processor if we should not continue 990 // or in case of continue then the failure processor is NOT a dead letter channel 991 // because you can continue and still let the failure processor do some routing 992 // before continue in the main route. 993 boolean allowFailureProcessor = !shouldContinue || !isDeadLetterChannel; 994 995 if (allowFailureProcessor && processor != null) { 996 997 // prepare original IN body if it should be moved instead of current body 998 if (data.useOriginalInMessage) { 999 log.trace("Using the original IN message instead of current"); 1000 Message original = ExchangeHelper.getOriginalInMessage(exchange); 1001 exchange.setIn(original); 1002 if (exchange.hasOut()) { 1003 log.trace("Removing the out message to avoid some uncertain behavior"); 1004 exchange.setOut(null); 1005 } 1006 } 1007 1008 // reset cached streams so they can be read again 1009 MessageHelper.resetStreamCache(exchange.getIn()); 1010 1011 // invoke custom on prepare 1012 if (onPrepareProcessor != null) { 1013 try { 1014 log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange); 1015 onPrepareProcessor.process(exchange); 1016 } catch (Exception e) { 1017 // a new exception was thrown during prepare 1018 exchange.setException(e); 1019 } 1020 } 1021 1022 log.trace("Failure processor {} is processing Exchange: {}", processor, exchange); 1023 1024 // store the last to endpoint as the failure endpoint 1025 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 1026 // and store the route id so we know in which route we failed 1027 UnitOfWork uow = exchange.getUnitOfWork(); 1028 if (uow != null && uow.getRouteContext() != null) { 1029 exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId()); 1030 } 1031 1032 // fire event as we had a failure processor to handle it, which there is a event for 1033 final boolean deadLetterChannel = processor == data.deadLetterProcessor; 1034 1035 EventHelper.notifyExchangeFailureHandling(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri); 1036 1037 // the failure processor could also be asynchronous 1038 AsyncProcessor afp = AsyncProcessorConverterHelper.convert(processor); 1039 sync = afp.process(exchange, new AsyncCallback() { 1040 public void done(boolean sync) { 1041 log.trace("Failure processor done: {} processing Exchange: {}", processor, exchange); 1042 try { 1043 prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue); 1044 // fire event as we had a failure processor to handle it, which there is a event for 1045 EventHelper.notifyExchangeFailureHandled(exchange.getContext(), exchange, processor, deadLetterChannel, deadLetterUri); 1046 } finally { 1047 // if the fault was handled asynchronously, this should be reflected in the callback as well 1048 data.sync &= sync; 1049 callback.done(data.sync); 1050 } 1051 } 1052 }); 1053 } else { 1054 try { 1055 // invoke custom on prepare 1056 if (onPrepareProcessor != null) { 1057 try { 1058 log.trace("OnPrepare processor {} is processing Exchange: {}", onPrepareProcessor, exchange); 1059 onPrepareProcessor.process(exchange); 1060 } catch (Exception e) { 1061 // a new exception was thrown during prepare 1062 exchange.setException(e); 1063 } 1064 } 1065 // no processor but we need to prepare after failure as well 1066 prepareExchangeAfterFailure(exchange, data, isDeadLetterChannel, shouldHandle, shouldContinue); 1067 } finally { 1068 // callback we are done 1069 callback.done(data.sync); 1070 } 1071 } 1072 1073 // create log message 1074 String msg = "Failed delivery for " + ExchangeHelper.logIds(exchange); 1075 msg = msg + ". Exhausted after delivery attempt: " + data.redeliveryCounter + " caught: " + caught; 1076 if (processor != null) { 1077 if (isDeadLetterChannel && deadLetterUri != null) { 1078 msg = msg + ". Handled by DeadLetterChannel: [" + URISupport.sanitizeUri(deadLetterUri) + "]"; 1079 } else { 1080 msg = msg + ". Processed by failure processor: " + processor; 1081 } 1082 } 1083 1084 // log that we failed delivery as we are exhausted 1085 logFailedDelivery(false, false, handled, false, isDeadLetterChannel, exchange, msg, data, null); 1086 1087 return sync; 1088 } 1089 1090 protected void prepareExchangeAfterFailure(final Exchange exchange, final RedeliveryData data, final boolean isDeadLetterChannel, 1091 final boolean shouldHandle, final boolean shouldContinue) { 1092 1093 Exception newException = exchange.getException(); 1094 1095 // we could not process the exchange so we let the failure processor handled it 1096 ExchangeHelper.setFailureHandled(exchange); 1097 1098 // honor if already set a handling 1099 boolean alreadySet = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED) != null; 1100 if (alreadySet) { 1101 boolean handled = exchange.getProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.class); 1102 log.trace("This exchange has already been marked for handling: {}", handled); 1103 if (!handled) { 1104 // exception not handled, put exception back in the exchange 1105 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 1106 // and put failure endpoint back as well 1107 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 1108 } 1109 return; 1110 } 1111 1112 // dead letter channel is special 1113 if (shouldContinue) { 1114 log.trace("This exchange is continued: {}", exchange); 1115 // okay we want to continue then prepare the exchange for that as well 1116 prepareExchangeForContinue(exchange, data, isDeadLetterChannel); 1117 } else if (shouldHandle) { 1118 log.trace("This exchange is handled so its marked as not failed: {}", exchange); 1119 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE); 1120 } else { 1121 // okay the redelivery policy are not explicit set to true, so we should allow to check for some 1122 // special situations when using dead letter channel 1123 if (isDeadLetterChannel) { 1124 1125 // DLC is always handling the first thrown exception, 1126 // but if its a new exception then use the configured option 1127 boolean handled = newException == null || data.handleNewException; 1128 1129 // when using DLC then log new exception whether its being handled or not, as otherwise it may appear as 1130 // the DLC swallow new exceptions by default (which is by design to ensure the DLC always complete, 1131 // to avoid causing endless poison messages that fails forever) 1132 if (newException != null && data.currentRedeliveryPolicy.isLogNewException()) { 1133 String uri = URISupport.sanitizeUri(deadLetterUri); 1134 String msg = "New exception occurred during processing by the DeadLetterChannel[" + uri + "] due " + newException.getMessage(); 1135 if (handled) { 1136 msg += ". The new exception is being handled as deadLetterHandleNewException=true."; 1137 } else { 1138 msg += ". The new exception is not handled as deadLetterHandleNewException=false."; 1139 } 1140 logFailedDelivery(false, true, handled, false, true, exchange, msg, data, newException); 1141 } 1142 1143 if (handled) { 1144 log.trace("This exchange is handled so its marked as not failed: {}", exchange); 1145 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.TRUE); 1146 return; 1147 } 1148 } 1149 1150 // not handled by default 1151 prepareExchangeAfterFailureNotHandled(exchange); 1152 } 1153 } 1154 1155 private void prepareExchangeAfterFailureNotHandled(Exchange exchange) { 1156 log.trace("This exchange is not handled or continued so its marked as failed: {}", exchange); 1157 // exception not handled, put exception back in the exchange 1158 exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, Boolean.FALSE); 1159 exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); 1160 // and put failure endpoint back as well 1161 exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); 1162 // and store the route id so we know in which route we failed 1163 UnitOfWork uow = exchange.getUnitOfWork(); 1164 if (uow != null && uow.getRouteContext() != null) { 1165 exchange.setProperty(Exchange.FAILURE_ROUTE_ID, uow.getRouteContext().getRoute().getId()); 1166 } 1167 } 1168 1169 private void logFailedDelivery(boolean shouldRedeliver, boolean newException, boolean handled, boolean continued, boolean isDeadLetterChannel, 1170 Exchange exchange, String message, RedeliveryData data, Throwable e) { 1171 if (logger == null) { 1172 return; 1173 } 1174 1175 if (!exchange.isRollbackOnly()) { 1176 if (newException && !data.currentRedeliveryPolicy.isLogNewException()) { 1177 // do not log new exception 1178 return; 1179 } 1180 1181 // if we should not rollback, then check whether logging is enabled 1182 1183 if (!newException && handled && !data.currentRedeliveryPolicy.isLogHandled()) { 1184 // do not log handled 1185 return; 1186 } 1187 1188 if (!newException && continued && !data.currentRedeliveryPolicy.isLogContinued()) { 1189 // do not log handled 1190 return; 1191 } 1192 1193 if (!newException && shouldRedeliver && !data.currentRedeliveryPolicy.isLogRetryAttempted()) { 1194 // do not log retry attempts 1195 return; 1196 } 1197 1198 if (!newException && !shouldRedeliver && !data.currentRedeliveryPolicy.isLogExhausted()) { 1199 // do not log exhausted 1200 return; 1201 } 1202 } 1203 1204 LoggingLevel newLogLevel; 1205 boolean logStackTrace; 1206 if (exchange.isRollbackOnly()) { 1207 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 1208 logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace(); 1209 } else if (shouldRedeliver) { 1210 newLogLevel = data.currentRedeliveryPolicy.getRetryAttemptedLogLevel(); 1211 logStackTrace = data.currentRedeliveryPolicy.isLogRetryStackTrace(); 1212 } else { 1213 newLogLevel = data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel(); 1214 logStackTrace = data.currentRedeliveryPolicy.isLogStackTrace(); 1215 } 1216 if (e == null) { 1217 e = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); 1218 } 1219 1220 if (newException) { 1221 // log at most WARN level 1222 if (newLogLevel == LoggingLevel.ERROR) { 1223 newLogLevel = LoggingLevel.WARN; 1224 } 1225 String msg = message; 1226 if (msg == null) { 1227 msg = "New exception " + ExchangeHelper.logIds(exchange); 1228 // special for logging the new exception 1229 Throwable cause = e; 1230 if (cause != null) { 1231 msg = msg + " due: " + cause.getMessage(); 1232 } 1233 } 1234 1235 if (e != null && logStackTrace) { 1236 logger.log(msg, e, newLogLevel); 1237 } else { 1238 logger.log(msg, newLogLevel); 1239 } 1240 } else if (exchange.isRollbackOnly()) { 1241 String msg = "Rollback " + ExchangeHelper.logIds(exchange); 1242 Throwable cause = exchange.getException() != null ? exchange.getException() : exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class); 1243 if (cause != null) { 1244 msg = msg + " due: " + cause.getMessage(); 1245 } 1246 1247 // should we include message history 1248 if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) { 1249 // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it) 1250 ExchangeFormatter formatter = customExchangeFormatter 1251 ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null); 1252 String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, false); 1253 if (routeStackTrace != null) { 1254 msg = msg + "\n" + routeStackTrace; 1255 } 1256 } 1257 1258 if (newLogLevel == LoggingLevel.ERROR) { 1259 // log intended rollback on maximum WARN level (no ERROR) 1260 logger.log(msg, LoggingLevel.WARN); 1261 } else { 1262 // otherwise use the desired logging level 1263 logger.log(msg, newLogLevel); 1264 } 1265 } else { 1266 String msg = message; 1267 // should we include message history 1268 if (!shouldRedeliver && data.currentRedeliveryPolicy.isLogExhaustedMessageHistory()) { 1269 // only use the exchange formatter if we should log exhausted message body (and if using a custom formatter then always use it) 1270 ExchangeFormatter formatter = customExchangeFormatter 1271 ? exchangeFormatter : (data.currentRedeliveryPolicy.isLogExhaustedMessageBody() || camelContext.isLogExhaustedMessageBody() ? exchangeFormatter : null); 1272 String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, formatter, e != null && logStackTrace); 1273 if (routeStackTrace != null) { 1274 msg = msg + "\n" + routeStackTrace; 1275 } 1276 } 1277 1278 if (e != null && logStackTrace) { 1279 logger.log(msg, e, newLogLevel); 1280 } else { 1281 logger.log(msg, newLogLevel); 1282 } 1283 } 1284 } 1285 1286 /** 1287 * Determines whether the exchange is exhausted (or anyway marked to not continue such as rollback). 1288 * <p/> 1289 * If the exchange is exhausted, then we will not continue processing, but let the 1290 * failure processor deal with the exchange. 1291 * 1292 * @param exchange the current exchange 1293 * @param data the redelivery data 1294 * @return <tt>false</tt> to continue/redeliver, or <tt>true</tt> to exhaust. 1295 */ 1296 private boolean isExhausted(Exchange exchange, RedeliveryData data) { 1297 // if marked as rollback only then do not continue/redeliver 1298 boolean exhausted = exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class); 1299 if (exhausted) { 1300 log.trace("This exchange is marked as redelivery exhausted: {}", exchange); 1301 return true; 1302 } 1303 1304 // if marked as rollback only then do not continue/redeliver 1305 boolean rollbackOnly = exchange.getProperty(Exchange.ROLLBACK_ONLY, false, Boolean.class); 1306 if (rollbackOnly) { 1307 log.trace("This exchange is marked as rollback only, so forcing it to be exhausted: {}", exchange); 1308 return true; 1309 } 1310 // its the first original call so continue 1311 if (data.redeliveryCounter == 0) { 1312 return false; 1313 } 1314 // its a potential redelivery so determine if we should redeliver or not 1315 boolean redeliver = data.currentRedeliveryPolicy.shouldRedeliver(exchange, data.redeliveryCounter, data.retryWhilePredicate); 1316 return !redeliver; 1317 } 1318 1319 /** 1320 * Determines whether or not to continue if we are exhausted. 1321 * 1322 * @param exchange the current exchange 1323 * @param data the redelivery data 1324 * @return <tt>true</tt> to continue, or <tt>false</tt> to exhaust. 1325 */ 1326 private boolean shouldContinue(Exchange exchange, RedeliveryData data) { 1327 if (data.continuedPredicate != null) { 1328 return data.continuedPredicate.matches(exchange); 1329 } 1330 // do not continue by default 1331 return false; 1332 } 1333 1334 /** 1335 * Determines whether or not to handle if we are exhausted. 1336 * 1337 * @param exchange the current exchange 1338 * @param data the redelivery data 1339 * @return <tt>true</tt> to handle, or <tt>false</tt> to exhaust. 1340 */ 1341 private boolean shouldHandle(Exchange exchange, RedeliveryData data) { 1342 if (data.handledPredicate != null) { 1343 return data.handledPredicate.matches(exchange); 1344 } 1345 // do not handle by default 1346 return false; 1347 } 1348 1349 /** 1350 * Increments the redelivery counter and adds the redelivered flag if the 1351 * message has been redelivered 1352 */ 1353 private int incrementRedeliveryCounter(Exchange exchange, Throwable e, RedeliveryData data) { 1354 Message in = exchange.getIn(); 1355 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 1356 int next = 1; 1357 if (counter != null) { 1358 next = counter + 1; 1359 } 1360 in.setHeader(Exchange.REDELIVERY_COUNTER, next); 1361 in.setHeader(Exchange.REDELIVERED, Boolean.TRUE); 1362 // if maximum redeliveries is used, then provide that information as well 1363 if (data.currentRedeliveryPolicy.getMaximumRedeliveries() > 0) { 1364 in.setHeader(Exchange.REDELIVERY_MAX_COUNTER, data.currentRedeliveryPolicy.getMaximumRedeliveries()); 1365 } 1366 return next; 1367 } 1368 1369 /** 1370 * Prepares the redelivery counter and boolean flag for the failure handle processor 1371 */ 1372 private void decrementRedeliveryCounter(Exchange exchange) { 1373 Message in = exchange.getIn(); 1374 Integer counter = in.getHeader(Exchange.REDELIVERY_COUNTER, Integer.class); 1375 if (counter != null) { 1376 int prev = counter - 1; 1377 in.setHeader(Exchange.REDELIVERY_COUNTER, prev); 1378 // set boolean flag according to counter 1379 in.setHeader(Exchange.REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE); 1380 } else { 1381 // not redelivered 1382 in.setHeader(Exchange.REDELIVERY_COUNTER, 0); 1383 in.setHeader(Exchange.REDELIVERED, Boolean.FALSE); 1384 } 1385 } 1386 1387 /** 1388 * Determines if redelivery is enabled by checking if any of the redelivery policy 1389 * settings may allow redeliveries. 1390 * 1391 * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> otherwise 1392 * @throws Exception can be thrown 1393 */ 1394 private boolean determineIfRedeliveryIsEnabled() throws Exception { 1395 // determine if redeliver is enabled either on error handler 1396 if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) { 1397 // must check for != 0 as (-1 means redeliver forever) 1398 return true; 1399 } 1400 if (retryWhilePolicy != null) { 1401 return true; 1402 } 1403 1404 // or on the exception policies 1405 if (!exceptionPolicies.isEmpty()) { 1406 // walk them to see if any of them have a maximum redeliveries > 0 or retry until set 1407 for (OnExceptionDefinition def : exceptionPolicies.values()) { 1408 1409 String ref = def.getRedeliveryPolicyRef(); 1410 if (ref != null) { 1411 // lookup in registry if ref provided 1412 RedeliveryPolicy policy = CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class); 1413 if (policy.getMaximumRedeliveries() != 0) { 1414 // must check for != 0 as (-1 means redeliver forever) 1415 return true; 1416 } 1417 } else if (def.getRedeliveryPolicy() != null) { 1418 Integer max = CamelContextHelper.parseInteger(camelContext, def.getRedeliveryPolicy().getMaximumRedeliveries()); 1419 if (max != null && max != 0) { 1420 // must check for != 0 as (-1 means redeliver forever) 1421 return true; 1422 } 1423 } 1424 1425 if (def.getRetryWhilePolicy() != null || def.getRetryWhile() != null) { 1426 return true; 1427 } 1428 } 1429 } 1430 1431 return false; 1432 } 1433 1434 /** 1435 * Gets the number of exchanges that are pending for redelivery 1436 */ 1437 public int getPendingRedeliveryCount() { 1438 int answer = redeliverySleepCounter.get(); 1439 if (executorService != null && executorService instanceof ThreadPoolExecutor) { 1440 answer += ((ThreadPoolExecutor) executorService).getQueue().size(); 1441 } 1442 1443 return answer; 1444 } 1445 1446 @Override 1447 protected void doStart() throws Exception { 1448 ServiceHelper.startServices(output, outputAsync, deadLetter); 1449 1450 // determine if redeliver is enabled or not 1451 redeliveryEnabled = determineIfRedeliveryIsEnabled(); 1452 if (log.isTraceEnabled()) { 1453 log.trace("Redelivery enabled: {} on error handler: {}", redeliveryEnabled, this); 1454 } 1455 1456 // we only need thread pool if redelivery is enabled 1457 if (redeliveryEnabled) { 1458 if (executorService == null) { 1459 // use default shared executor service 1460 executorService = camelContext.getErrorHandlerExecutorService(); 1461 } 1462 if (log.isDebugEnabled()) { 1463 log.debug("Using ExecutorService: {} for redeliveries on error handler: {}", executorService, this); 1464 } 1465 } 1466 1467 // reset flag when starting 1468 preparingShutdown = false; 1469 redeliverySleepCounter.set(0); 1470 } 1471 1472 @Override 1473 protected void doStop() throws Exception { 1474 // noop, do not stop any services which we only do when shutting down 1475 // as the error handler can be context scoped, and should not stop in case 1476 // a route stops 1477 } 1478 1479 @Override 1480 protected void doShutdown() throws Exception { 1481 ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync); 1482 } 1483}