001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.Date; 022import java.util.List; 023import java.util.concurrent.RejectedExecutionException; 024 025import org.apache.camel.AsyncCallback; 026import org.apache.camel.CamelContext; 027import org.apache.camel.Exchange; 028import org.apache.camel.MessageHistory; 029import org.apache.camel.Ordered; 030import org.apache.camel.Processor; 031import org.apache.camel.Route; 032import org.apache.camel.StatefulService; 033import org.apache.camel.StreamCache; 034import org.apache.camel.api.management.PerformanceCounter; 035import org.apache.camel.management.DelegatePerformanceCounter; 036import org.apache.camel.management.mbean.ManagedPerformanceCounter; 037import org.apache.camel.model.ProcessorDefinition; 038import org.apache.camel.model.ProcessorDefinitionHelper; 039import org.apache.camel.processor.interceptor.BacklogDebugger; 040import org.apache.camel.processor.interceptor.BacklogTracer; 041import org.apache.camel.processor.interceptor.DefaultBacklogTracerEventMessage; 042import org.apache.camel.spi.InflightRepository; 043import org.apache.camel.spi.MessageHistoryFactory; 044import org.apache.camel.spi.RouteContext; 045import org.apache.camel.spi.RoutePolicy; 046import org.apache.camel.spi.StreamCachingStrategy; 047import org.apache.camel.spi.UnitOfWork; 048import org.apache.camel.util.MessageHelper; 049import org.apache.camel.util.OrderedComparator; 050import org.apache.camel.util.StopWatch; 051import org.apache.camel.util.UnitOfWorkHelper; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as: 057 * <ul> 058 * <li>Execute {@link UnitOfWork}</li> 059 * <li>Keeping track which route currently is being routed</li> 060 * <li>Execute {@link RoutePolicy}</li> 061 * <li>Gather JMX performance statics</li> 062 * <li>Tracing</li> 063 * <li>Debugging</li> 064 * <li>Message History</li> 065 * <li>Stream Caching</li> 066 * </ul> 067 * ... and more. 068 * <p/> 069 * This implementation executes this cross cutting functionality as a {@link CamelInternalProcessorAdvice} advice (before and after advice) 070 * by executing the {@link CamelInternalProcessorAdvice#before(org.apache.camel.Exchange)} and 071 * {@link CamelInternalProcessorAdvice#after(org.apache.camel.Exchange, Object)} callbacks in correct order during routing. 072 * This reduces number of stack frames needed during routing, and reduce the number of lines in stacktraces, as well 073 * makes debugging the routing engine easier for end users. 074 * <p/> 075 * <b>Debugging tips:</b> Camel end users whom want to debug their Camel applications with the Camel source code, then make sure to 076 * read the source code of this class about the debugging tips, which you can find in the 077 * {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method. 078 * <p/> 079 * The added advices can implement {@link Ordered} to control in which order the advices are executed. 080 */ 081public class CamelInternalProcessor extends DelegateAsyncProcessor { 082 083 private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class); 084 private final List<CamelInternalProcessorAdvice> advices = new ArrayList<CamelInternalProcessorAdvice>(); 085 086 public CamelInternalProcessor() { 087 } 088 089 public CamelInternalProcessor(Processor processor) { 090 super(processor); 091 } 092 093 /** 094 * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor. 095 * 096 * @param advice the advice to add 097 */ 098 public void addAdvice(CamelInternalProcessorAdvice advice) { 099 advices.add(advice); 100 // ensure advices are sorted so they are in the order we want 101 Collections.sort(advices, new OrderedComparator()); 102 } 103 104 /** 105 * Gets the advice with the given type. 106 * 107 * @param type the type of the advice 108 * @return the advice if exists, or <tt>null</tt> if no advices has been added with the given type. 109 */ 110 public <T> T getAdvice(Class<T> type) { 111 for (CamelInternalProcessorAdvice task : advices) { 112 if (type.isInstance(task)) { 113 return type.cast(task); 114 } 115 } 116 return null; 117 } 118 119 @Override 120 public boolean process(Exchange exchange, AsyncCallback callback) { 121 // ---------------------------------------------------------- 122 // CAMEL END USER - READ ME FOR DEBUGGING TIPS 123 // ---------------------------------------------------------- 124 // If you want to debug the Camel routing engine, then there is a lot of internal functionality 125 // the routing engine executes during routing messages. You can skip debugging this internal 126 // functionality and instead debug where the routing engine continues routing to the next node 127 // in the routes. The CamelInternalProcessor is a vital part of the routing engine, as its 128 // being used in between the nodes. As an end user you can just debug the code in this class 129 // in between the: 130 // CAMEL END USER - DEBUG ME HERE +++ START +++ 131 // CAMEL END USER - DEBUG ME HERE +++ END +++ 132 // you can see in the code below. 133 // ---------------------------------------------------------- 134 135 if (processor == null || !continueProcessing(exchange)) { 136 // no processor or we should not continue then we are done 137 callback.done(true); 138 return true; 139 } 140 141 final List<Object> states = new ArrayList<Object>(advices.size()); 142 for (CamelInternalProcessorAdvice task : advices) { 143 try { 144 Object state = task.before(exchange); 145 states.add(state); 146 } catch (Throwable e) { 147 exchange.setException(e); 148 callback.done(true); 149 return true; 150 } 151 } 152 153 // create internal callback which will execute the advices in reverse order when done 154 callback = new InternalCallback(states, exchange, callback); 155 156 // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0 157 Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC); 158 if (exchange.isTransacted() || synchronous != null) { 159 // must be synchronized for transacted exchanges 160 if (LOG.isTraceEnabled()) { 161 if (exchange.isTransacted()) { 162 LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); 163 } else { 164 LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); 165 } 166 } 167 // ---------------------------------------------------------- 168 // CAMEL END USER - DEBUG ME HERE +++ START +++ 169 // ---------------------------------------------------------- 170 try { 171 processor.process(exchange); 172 } catch (Throwable e) { 173 exchange.setException(e); 174 } 175 // ---------------------------------------------------------- 176 // CAMEL END USER - DEBUG ME HERE +++ END +++ 177 // ---------------------------------------------------------- 178 callback.done(true); 179 return true; 180 } else { 181 final UnitOfWork uow = exchange.getUnitOfWork(); 182 183 // allow unit of work to wrap callback in case it need to do some special work 184 // for example the MDCUnitOfWork 185 AsyncCallback async = callback; 186 if (uow != null) { 187 async = uow.beforeProcess(processor, exchange, callback); 188 } 189 190 // ---------------------------------------------------------- 191 // CAMEL END USER - DEBUG ME HERE +++ START +++ 192 // ---------------------------------------------------------- 193 if (LOG.isTraceEnabled()) { 194 LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); 195 } 196 boolean sync = processor.process(exchange, async); 197 // ---------------------------------------------------------- 198 // CAMEL END USER - DEBUG ME HERE +++ END +++ 199 // ---------------------------------------------------------- 200 201 // execute any after processor work (in current thread, not in the callback) 202 if (uow != null) { 203 uow.afterProcess(processor, exchange, callback, sync); 204 } 205 206 if (LOG.isTraceEnabled()) { 207 LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}", 208 new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange}); 209 } 210 return sync; 211 } 212 } 213 214 @Override 215 public String toString() { 216 return processor != null ? processor.toString() : super.toString(); 217 } 218 219 /** 220 * Internal callback that executes the after advices. 221 */ 222 private final class InternalCallback implements AsyncCallback { 223 224 private final List<Object> states; 225 private final Exchange exchange; 226 private final AsyncCallback callback; 227 228 private InternalCallback(List<Object> states, Exchange exchange, AsyncCallback callback) { 229 this.states = states; 230 this.exchange = exchange; 231 this.callback = callback; 232 } 233 234 @Override 235 public void done(boolean doneSync) { 236 // NOTE: if you are debugging Camel routes, then all the code in the for loop below is internal only 237 // so you can step straight to the finally block and invoke the callback 238 239 // we should call after in reverse order 240 try { 241 for (int i = advices.size() - 1; i >= 0; i--) { 242 CamelInternalProcessorAdvice task = advices.get(i); 243 Object state = states.get(i); 244 try { 245 task.after(exchange, state); 246 } catch (Exception e) { 247 exchange.setException(e); 248 // allow all advices to complete even if there was an exception 249 } 250 } 251 } finally { 252 // ---------------------------------------------------------- 253 // CAMEL END USER - DEBUG ME HERE +++ START +++ 254 // ---------------------------------------------------------- 255 // callback must be called 256 callback.done(doneSync); 257 // ---------------------------------------------------------- 258 // CAMEL END USER - DEBUG ME HERE +++ END +++ 259 // ---------------------------------------------------------- 260 } 261 } 262 } 263 264 /** 265 * Strategy to determine if we should continue processing the {@link Exchange}. 266 */ 267 protected boolean continueProcessing(Exchange exchange) { 268 Object stop = exchange.getProperty(Exchange.ROUTE_STOP); 269 if (stop != null) { 270 boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); 271 if (doStop) { 272 LOG.debug("Exchange is marked to stop routing: {}", exchange); 273 return false; 274 } 275 } 276 277 // determine if we can still run, or the camel context is forcing a shutdown 278 boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown(this); 279 if (forceShutdown) { 280 String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + exchange; 281 LOG.debug(msg); 282 if (exchange.getException() == null) { 283 exchange.setException(new RejectedExecutionException(msg)); 284 } 285 return false; 286 } 287 288 // yes we can continue 289 return true; 290 } 291 292 /** 293 * Advice to invoke callbacks for before and after routing. 294 */ 295 public static class RouteLifecycleAdvice implements CamelInternalProcessorAdvice<Object> { 296 297 private Route route; 298 299 public void setRoute(Route route) { 300 this.route = route; 301 } 302 303 @Override 304 public Object before(Exchange exchange) throws Exception { 305 UnitOfWork uow = exchange.getUnitOfWork(); 306 if (uow != null) { 307 uow.beforeRoute(exchange, route); 308 } 309 return null; 310 } 311 312 @Override 313 public void after(Exchange exchange, Object object) throws Exception { 314 UnitOfWork uow = exchange.getUnitOfWork(); 315 if (uow != null) { 316 uow.afterRoute(exchange, route); 317 } 318 } 319 } 320 321 /** 322 * Advice for JMX instrumentation of the process being invoked. 323 * <p/> 324 * This advice keeps track of JMX metrics for performance statistics. 325 * <p/> 326 * The current implementation of this advice is only used for route level statistics. For processor levels 327 * they are still wrapped in the route processor chains. 328 */ 329 public static class InstrumentationAdvice implements CamelInternalProcessorAdvice<StopWatch> { 330 331 private PerformanceCounter counter; 332 private String type; 333 334 public InstrumentationAdvice(String type) { 335 this.type = type; 336 } 337 338 public void setCounter(Object counter) { 339 ManagedPerformanceCounter mpc = null; 340 if (counter instanceof ManagedPerformanceCounter) { 341 mpc = (ManagedPerformanceCounter) counter; 342 } 343 344 if (this.counter instanceof DelegatePerformanceCounter) { 345 ((DelegatePerformanceCounter) this.counter).setCounter(mpc); 346 } else if (mpc != null) { 347 this.counter = mpc; 348 } else if (counter instanceof PerformanceCounter) { 349 this.counter = (PerformanceCounter) counter; 350 } 351 } 352 353 protected void beginTime(Exchange exchange) { 354 counter.processExchange(exchange); 355 } 356 357 protected void recordTime(Exchange exchange, long duration) { 358 if (LOG.isTraceEnabled()) { 359 LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type != null ? type + ": " : "", duration, exchange}); 360 } 361 362 if (!exchange.isFailed() && exchange.getException() == null) { 363 counter.completedExchange(exchange, duration); 364 } else { 365 counter.failedExchange(exchange); 366 } 367 } 368 369 public String getType() { 370 return type; 371 } 372 373 public void setType(String type) { 374 this.type = type; 375 } 376 377 @Override 378 public StopWatch before(Exchange exchange) throws Exception { 379 // only record time if stats is enabled 380 StopWatch answer = counter != null && counter.isStatisticsEnabled() ? new StopWatch() : null; 381 if (answer != null) { 382 beginTime(exchange); 383 } 384 return answer; 385 } 386 387 @Override 388 public void after(Exchange exchange, StopWatch watch) throws Exception { 389 // record end time 390 if (watch != null) { 391 recordTime(exchange, watch.stop()); 392 } 393 } 394 } 395 396 /** 397 * Advice to inject the current {@link RouteContext} into the {@link UnitOfWork} on the {@link Exchange} 398 * 399 * @deprecated this logic has been merged into {@link org.apache.camel.processor.CamelInternalProcessor.UnitOfWorkProcessorAdvice} 400 */ 401 @Deprecated 402 public static class RouteContextAdvice implements CamelInternalProcessorAdvice<UnitOfWork> { 403 404 private final RouteContext routeContext; 405 406 public RouteContextAdvice(RouteContext routeContext) { 407 this.routeContext = routeContext; 408 } 409 410 @Override 411 public UnitOfWork before(Exchange exchange) throws Exception { 412 // push the current route context 413 final UnitOfWork unitOfWork = exchange.getUnitOfWork(); 414 if (unitOfWork != null) { 415 unitOfWork.pushRouteContext(routeContext); 416 } 417 return unitOfWork; 418 } 419 420 @Override 421 public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception { 422 if (unitOfWork != null) { 423 unitOfWork.popRouteContext(); 424 } 425 } 426 } 427 428 /** 429 * Advice to keep the {@link InflightRepository} up to date. 430 */ 431 public static class RouteInflightRepositoryAdvice implements CamelInternalProcessorAdvice { 432 433 private final InflightRepository inflightRepository; 434 private final String id; 435 436 public RouteInflightRepositoryAdvice(InflightRepository inflightRepository, String id) { 437 this.inflightRepository = inflightRepository; 438 this.id = id; 439 } 440 441 @Override 442 public Object before(Exchange exchange) throws Exception { 443 inflightRepository.add(exchange, id); 444 return null; 445 } 446 447 @Override 448 public void after(Exchange exchange, Object state) throws Exception { 449 inflightRepository.remove(exchange, id); 450 } 451 } 452 453 /** 454 * Advice to execute any {@link RoutePolicy} a route may have been configured with. 455 */ 456 public static class RoutePolicyAdvice implements CamelInternalProcessorAdvice { 457 458 private final List<RoutePolicy> routePolicies; 459 private Route route; 460 461 public RoutePolicyAdvice(List<RoutePolicy> routePolicies) { 462 this.routePolicies = routePolicies; 463 } 464 465 public void setRoute(Route route) { 466 this.route = route; 467 } 468 469 /** 470 * Strategy to determine if this policy is allowed to run 471 * 472 * @param policy the policy 473 * @return <tt>true</tt> to run 474 */ 475 protected boolean isRoutePolicyRunAllowed(RoutePolicy policy) { 476 if (policy instanceof StatefulService) { 477 StatefulService ss = (StatefulService) policy; 478 return ss.isRunAllowed(); 479 } 480 return true; 481 } 482 483 @Override 484 public Object before(Exchange exchange) throws Exception { 485 // invoke begin 486 for (RoutePolicy policy : routePolicies) { 487 try { 488 if (isRoutePolicyRunAllowed(policy)) { 489 policy.onExchangeBegin(route, exchange); 490 } 491 } catch (Exception e) { 492 LOG.warn("Error occurred during onExchangeBegin on RoutePolicy: " + policy 493 + ". This exception will be ignored", e); 494 } 495 } 496 return null; 497 } 498 499 @Override 500 public void after(Exchange exchange, Object data) throws Exception { 501 // do not invoke it if Camel is stopping as we don't want 502 // the policy to start a consumer during Camel is stopping 503 if (isCamelStopping(exchange.getContext())) { 504 return; 505 } 506 507 for (RoutePolicy policy : routePolicies) { 508 try { 509 if (isRoutePolicyRunAllowed(policy)) { 510 policy.onExchangeDone(route, exchange); 511 } 512 } catch (Exception e) { 513 LOG.warn("Error occurred during onExchangeDone on RoutePolicy: " + policy 514 + ". This exception will be ignored", e); 515 } 516 } 517 } 518 519 private static boolean isCamelStopping(CamelContext context) { 520 if (context instanceof StatefulService) { 521 StatefulService ss = (StatefulService) context; 522 return ss.isStopping() || ss.isStopped(); 523 } 524 return false; 525 } 526 } 527 528 /** 529 * Advice to execute the {@link BacklogTracer} if enabled. 530 */ 531 public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice, Ordered { 532 533 private final BacklogTracer backlogTracer; 534 private final ProcessorDefinition<?> processorDefinition; 535 private final ProcessorDefinition<?> routeDefinition; 536 private final boolean first; 537 538 public BacklogTracerAdvice(BacklogTracer backlogTracer, ProcessorDefinition<?> processorDefinition, 539 ProcessorDefinition<?> routeDefinition, boolean first) { 540 this.backlogTracer = backlogTracer; 541 this.processorDefinition = processorDefinition; 542 this.routeDefinition = routeDefinition; 543 this.first = first; 544 } 545 546 @Override 547 public Object before(Exchange exchange) throws Exception { 548 if (backlogTracer.shouldTrace(processorDefinition, exchange)) { 549 Date timestamp = new Date(); 550 String toNode = processorDefinition.getId(); 551 String exchangeId = exchange.getExchangeId(); 552 String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 4, 553 backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), backlogTracer.getBodyMaxChars()); 554 555 // if first we should add a pseudo trace message as well, so we have a starting message (eg from the route) 556 String routeId = routeDefinition != null ? routeDefinition.getId() : null; 557 if (first) { 558 Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class); 559 DefaultBacklogTracerEventMessage pseudo = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), created, routeId, null, exchangeId, messageAsXml); 560 backlogTracer.traceEvent(pseudo); 561 } 562 DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), timestamp, routeId, toNode, exchangeId, messageAsXml); 563 backlogTracer.traceEvent(event); 564 } 565 566 return null; 567 } 568 569 @Override 570 public void after(Exchange exchange, Object data) throws Exception { 571 // noop 572 } 573 574 @Override 575 public int getOrder() { 576 // we want tracer just before calling the processor 577 return Ordered.LOWEST - 1; 578 } 579 580 } 581 582 /** 583 * Advice to execute the {@link org.apache.camel.processor.interceptor.BacklogDebugger} if enabled. 584 */ 585 public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice<StopWatch>, Ordered { 586 587 private final BacklogDebugger backlogDebugger; 588 private final Processor target; 589 private final ProcessorDefinition<?> definition; 590 private final String nodeId; 591 592 public BacklogDebuggerAdvice(BacklogDebugger backlogDebugger, Processor target, ProcessorDefinition<?> definition) { 593 this.backlogDebugger = backlogDebugger; 594 this.target = target; 595 this.definition = definition; 596 this.nodeId = definition.getId(); 597 } 598 599 @Override 600 public StopWatch before(Exchange exchange) throws Exception { 601 if (backlogDebugger.isEnabled() && (backlogDebugger.hasBreakpoint(nodeId) || backlogDebugger.isSingleStepMode())) { 602 StopWatch watch = new StopWatch(); 603 backlogDebugger.beforeProcess(exchange, target, definition); 604 return watch; 605 } else { 606 return null; 607 } 608 } 609 610 @Override 611 public void after(Exchange exchange, StopWatch stopWatch) throws Exception { 612 if (stopWatch != null) { 613 backlogDebugger.afterProcess(exchange, target, definition, stopWatch.stop()); 614 } 615 } 616 617 @Override 618 public int getOrder() { 619 // we want debugger just before calling the processor 620 return Ordered.LOWEST; 621 } 622 } 623 624 /** 625 * Advice to inject new {@link UnitOfWork} to the {@link Exchange} if needed, and as well to ensure 626 * the {@link UnitOfWork} is done and stopped. 627 */ 628 public static class UnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> { 629 630 private final RouteContext routeContext; 631 632 public UnitOfWorkProcessorAdvice(RouteContext routeContext) { 633 this.routeContext = routeContext; 634 } 635 636 @Override 637 public UnitOfWork before(Exchange exchange) throws Exception { 638 // if the exchange doesn't have from route id set, then set it if it originated 639 // from this unit of work 640 if (routeContext != null && exchange.getFromRouteId() == null) { 641 String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); 642 exchange.setFromRouteId(routeId); 643 } 644 645 // only return UnitOfWork if we created a new as then its us that handle the lifecycle to done the created UoW 646 UnitOfWork created = null; 647 648 if (exchange.getUnitOfWork() == null) { 649 // If there is no existing UoW, then we should start one and 650 // terminate it once processing is completed for the exchange. 651 created = createUnitOfWork(exchange); 652 exchange.setUnitOfWork(created); 653 created.start(); 654 } 655 656 // for any exchange we should push/pop route context so we can keep track of which route we are routing 657 if (routeContext != null) { 658 UnitOfWork existing = exchange.getUnitOfWork(); 659 if (existing != null) { 660 existing.pushRouteContext(routeContext); 661 } 662 } 663 664 return created; 665 } 666 667 @Override 668 public void after(Exchange exchange, UnitOfWork uow) throws Exception { 669 UnitOfWork existing = exchange.getUnitOfWork(); 670 671 // execute done on uow if we created it, and the consumer is not doing it 672 if (uow != null) { 673 UnitOfWorkHelper.doneUow(uow, exchange); 674 } 675 676 // after UoW is done lets pop the route context which must be done on every existing UoW 677 if (routeContext != null && existing != null) { 678 existing.popRouteContext(); 679 } 680 } 681 682 protected UnitOfWork createUnitOfWork(Exchange exchange) { 683 return exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange); 684 } 685 686 } 687 688 /** 689 * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality. 690 */ 691 public static class ChildUnitOfWorkProcessorAdvice extends UnitOfWorkProcessorAdvice { 692 693 private final UnitOfWork parent; 694 695 public ChildUnitOfWorkProcessorAdvice(RouteContext routeContext, UnitOfWork parent) { 696 super(routeContext); 697 this.parent = parent; 698 } 699 700 @Override 701 protected UnitOfWork createUnitOfWork(Exchange exchange) { 702 // let the parent create a child unit of work to be used 703 return parent.createChildUnitOfWork(exchange); 704 } 705 706 } 707 708 /** 709 * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality. 710 */ 711 public static class SubUnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> { 712 713 @Override 714 public UnitOfWork before(Exchange exchange) throws Exception { 715 // begin savepoint 716 exchange.getUnitOfWork().beginSubUnitOfWork(exchange); 717 return exchange.getUnitOfWork(); 718 } 719 720 @Override 721 public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception { 722 // end sub unit of work 723 unitOfWork.endSubUnitOfWork(exchange); 724 } 725 } 726 727 /** 728 * Advice when Message History has been enabled. 729 */ 730 @SuppressWarnings("unchecked") 731 public static class MessageHistoryAdvice implements CamelInternalProcessorAdvice<MessageHistory> { 732 733 private final MessageHistoryFactory factory; 734 private final ProcessorDefinition<?> definition; 735 private final String routeId; 736 737 public MessageHistoryAdvice(MessageHistoryFactory factory, ProcessorDefinition<?> definition) { 738 this.factory = factory; 739 this.definition = definition; 740 this.routeId = ProcessorDefinitionHelper.getRouteId(definition); 741 } 742 743 @Override 744 public MessageHistory before(Exchange exchange) throws Exception { 745 List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); 746 if (list == null) { 747 list = new ArrayList<MessageHistory>(); 748 exchange.setProperty(Exchange.MESSAGE_HISTORY, list); 749 } 750 MessageHistory history = factory.newMessageHistory(routeId, definition, new Date()); 751 list.add(history); 752 return history; 753 } 754 755 @Override 756 public void after(Exchange exchange, MessageHistory history) throws Exception { 757 if (history != null) { 758 history.nodeProcessingDone(); 759 } 760 } 761 } 762 763 /** 764 * Advice for {@link org.apache.camel.spi.StreamCachingStrategy} 765 */ 766 public static class StreamCachingAdvice implements CamelInternalProcessorAdvice<StreamCache>, Ordered { 767 768 private final StreamCachingStrategy strategy; 769 770 public StreamCachingAdvice(StreamCachingStrategy strategy) { 771 this.strategy = strategy; 772 } 773 774 @Override 775 public StreamCache before(Exchange exchange) throws Exception { 776 // check if body is already cached 777 Object body = exchange.getIn().getBody(); 778 if (body == null) { 779 return null; 780 } else if (body instanceof StreamCache) { 781 StreamCache sc = (StreamCache) body; 782 // reset so the cache is ready to be used before processing 783 sc.reset(); 784 return sc; 785 } 786 // cache the body and if we could do that replace it as the new body 787 StreamCache sc = strategy.cache(exchange); 788 if (sc != null) { 789 exchange.getIn().setBody(sc); 790 } 791 return sc; 792 } 793 794 @Override 795 public void after(Exchange exchange, StreamCache sc) throws Exception { 796 Object body = null; 797 if (exchange.hasOut()) { 798 body = exchange.getOut().getBody(); 799 } else { 800 body = exchange.getIn().getBody(); 801 } 802 if (body != null && body instanceof StreamCache) { 803 // reset so the cache is ready to be reused after processing 804 ((StreamCache) body).reset(); 805 } 806 } 807 808 @Override 809 public int getOrder() { 810 // we want stream caching first 811 return Ordered.HIGHEST; 812 } 813 } 814 815 /** 816 * Advice for delaying 817 */ 818 public static class DelayerAdvice implements CamelInternalProcessorAdvice { 819 820 private final long delay; 821 822 public DelayerAdvice(long delay) { 823 this.delay = delay; 824 } 825 826 @Override 827 public Object before(Exchange exchange) throws Exception { 828 try { 829 LOG.trace("Sleeping for: {} millis", delay); 830 Thread.sleep(delay); 831 } catch (InterruptedException e) { 832 LOG.debug("Sleep interrupted"); 833 Thread.currentThread().interrupt(); 834 throw e; 835 } 836 return null; 837 } 838 839 @Override 840 public void after(Exchange exchange, Object data) throws Exception { 841 // noop 842 } 843 } 844 845}