001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.Date; 022import java.util.LinkedList; 023import java.util.List; 024import java.util.concurrent.RejectedExecutionException; 025 026import org.apache.camel.AsyncCallback; 027import org.apache.camel.CamelContext; 028import org.apache.camel.Exchange; 029import org.apache.camel.MessageHistory; 030import org.apache.camel.Ordered; 031import org.apache.camel.Processor; 032import org.apache.camel.Route; 033import org.apache.camel.StatefulService; 034import org.apache.camel.StreamCache; 035import org.apache.camel.api.management.PerformanceCounter; 036import org.apache.camel.management.DelegatePerformanceCounter; 037import org.apache.camel.management.mbean.ManagedPerformanceCounter; 038import org.apache.camel.model.ProcessorDefinition; 039import org.apache.camel.model.ProcessorDefinitionHelper; 040import org.apache.camel.processor.interceptor.BacklogDebugger; 041import org.apache.camel.processor.interceptor.BacklogTracer; 042import org.apache.camel.processor.interceptor.DefaultBacklogTracerEventMessage; 043import org.apache.camel.spi.InflightRepository; 044import org.apache.camel.spi.MessageHistoryFactory; 045import org.apache.camel.spi.RouteContext; 046import org.apache.camel.spi.RoutePolicy; 047import org.apache.camel.spi.StreamCachingStrategy; 048import org.apache.camel.spi.UnitOfWork; 049import org.apache.camel.util.MessageHelper; 050import org.apache.camel.util.OrderedComparator; 051import org.apache.camel.util.StopWatch; 052import org.apache.camel.util.UnitOfWorkHelper; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * Internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as: 058 * <ul> 059 * <li>Execute {@link UnitOfWork}</li> 060 * <li>Keeping track which route currently is being routed</li> 061 * <li>Execute {@link RoutePolicy}</li> 062 * <li>Gather JMX performance statics</li> 063 * <li>Tracing</li> 064 * <li>Debugging</li> 065 * <li>Message History</li> 066 * <li>Stream Caching</li> 067 * </ul> 068 * ... and more. 069 * <p/> 070 * This implementation executes this cross cutting functionality as a {@link CamelInternalProcessorAdvice} advice (before and after advice) 071 * by executing the {@link CamelInternalProcessorAdvice#before(org.apache.camel.Exchange)} and 072 * {@link CamelInternalProcessorAdvice#after(org.apache.camel.Exchange, Object)} callbacks in correct order during routing. 073 * This reduces number of stack frames needed during routing, and reduce the number of lines in stacktraces, as well 074 * makes debugging the routing engine easier for end users. 075 * <p/> 076 * <b>Debugging tips:</b> Camel end users whom want to debug their Camel applications with the Camel source code, then make sure to 077 * read the source code of this class about the debugging tips, which you can find in the 078 * {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method. 079 * <p/> 080 * The added advices can implement {@link Ordered} to control in which order the advices are executed. 081 */ 082public class CamelInternalProcessor extends DelegateAsyncProcessor { 083 084 private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class); 085 private final List<CamelInternalProcessorAdvice> advices = new ArrayList<CamelInternalProcessorAdvice>(); 086 087 public CamelInternalProcessor() { 088 } 089 090 public CamelInternalProcessor(Processor processor) { 091 super(processor); 092 } 093 094 /** 095 * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor. 096 * 097 * @param advice the advice to add 098 */ 099 public void addAdvice(CamelInternalProcessorAdvice advice) { 100 advices.add(advice); 101 // ensure advices are sorted so they are in the order we want 102 Collections.sort(advices, new OrderedComparator()); 103 } 104 105 /** 106 * Gets the advice with the given type. 107 * 108 * @param type the type of the advice 109 * @return the advice if exists, or <tt>null</tt> if no advices has been added with the given type. 110 */ 111 public <T> T getAdvice(Class<T> type) { 112 for (CamelInternalProcessorAdvice task : advices) { 113 if (type.isInstance(task)) { 114 return type.cast(task); 115 } 116 } 117 return null; 118 } 119 120 @Override 121 public boolean process(Exchange exchange, AsyncCallback callback) { 122 // ---------------------------------------------------------- 123 // CAMEL END USER - READ ME FOR DEBUGGING TIPS 124 // ---------------------------------------------------------- 125 // If you want to debug the Camel routing engine, then there is a lot of internal functionality 126 // the routing engine executes during routing messages. You can skip debugging this internal 127 // functionality and instead debug where the routing engine continues routing to the next node 128 // in the routes. The CamelInternalProcessor is a vital part of the routing engine, as its 129 // being used in between the nodes. As an end user you can just debug the code in this class 130 // in between the: 131 // CAMEL END USER - DEBUG ME HERE +++ START +++ 132 // CAMEL END USER - DEBUG ME HERE +++ END +++ 133 // you can see in the code below. 134 // ---------------------------------------------------------- 135 136 if (processor == null || !continueProcessing(exchange)) { 137 // no processor or we should not continue then we are done 138 callback.done(true); 139 return true; 140 } 141 142 final List<Object> states = new ArrayList<Object>(advices.size()); 143 for (CamelInternalProcessorAdvice task : advices) { 144 try { 145 Object state = task.before(exchange); 146 states.add(state); 147 } catch (Throwable e) { 148 exchange.setException(e); 149 callback.done(true); 150 return true; 151 } 152 } 153 154 // create internal callback which will execute the advices in reverse order when done 155 callback = new InternalCallback(states, exchange, callback); 156 157 // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0 158 Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC); 159 if (exchange.isTransacted() || synchronous != null) { 160 // must be synchronized for transacted exchanges 161 if (LOG.isTraceEnabled()) { 162 if (exchange.isTransacted()) { 163 LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); 164 } else { 165 LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); 166 } 167 } 168 // ---------------------------------------------------------- 169 // CAMEL END USER - DEBUG ME HERE +++ START +++ 170 // ---------------------------------------------------------- 171 try { 172 processor.process(exchange); 173 } catch (Throwable e) { 174 exchange.setException(e); 175 } 176 // ---------------------------------------------------------- 177 // CAMEL END USER - DEBUG ME HERE +++ END +++ 178 // ---------------------------------------------------------- 179 callback.done(true); 180 return true; 181 } else { 182 final UnitOfWork uow = exchange.getUnitOfWork(); 183 184 // allow unit of work to wrap callback in case it need to do some special work 185 // for example the MDCUnitOfWork 186 AsyncCallback async = callback; 187 if (uow != null) { 188 async = uow.beforeProcess(processor, exchange, callback); 189 } 190 191 // ---------------------------------------------------------- 192 // CAMEL END USER - DEBUG ME HERE +++ START +++ 193 // ---------------------------------------------------------- 194 if (LOG.isTraceEnabled()) { 195 LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange); 196 } 197 boolean sync = processor.process(exchange, async); 198 // ---------------------------------------------------------- 199 // CAMEL END USER - DEBUG ME HERE +++ END +++ 200 // ---------------------------------------------------------- 201 202 // execute any after processor work (in current thread, not in the callback) 203 if (uow != null) { 204 uow.afterProcess(processor, exchange, callback, sync); 205 } 206 207 if (LOG.isTraceEnabled()) { 208 LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}", 209 new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange}); 210 } 211 return sync; 212 } 213 } 214 215 @Override 216 public String toString() { 217 return processor != null ? processor.toString() : super.toString(); 218 } 219 220 /** 221 * Internal callback that executes the after advices. 222 */ 223 private final class InternalCallback implements AsyncCallback { 224 225 private final List<Object> states; 226 private final Exchange exchange; 227 private final AsyncCallback callback; 228 229 private InternalCallback(List<Object> states, Exchange exchange, AsyncCallback callback) { 230 this.states = states; 231 this.exchange = exchange; 232 this.callback = callback; 233 } 234 235 @Override 236 public void done(boolean doneSync) { 237 // NOTE: if you are debugging Camel routes, then all the code in the for loop below is internal only 238 // so you can step straight to the finally block and invoke the callback 239 240 // we should call after in reverse order 241 try { 242 for (int i = advices.size() - 1; i >= 0; i--) { 243 CamelInternalProcessorAdvice task = advices.get(i); 244 Object state = states.get(i); 245 try { 246 task.after(exchange, state); 247 } catch (Exception e) { 248 exchange.setException(e); 249 // allow all advices to complete even if there was an exception 250 } 251 } 252 } finally { 253 // ---------------------------------------------------------- 254 // CAMEL END USER - DEBUG ME HERE +++ START +++ 255 // ---------------------------------------------------------- 256 // callback must be called 257 callback.done(doneSync); 258 // ---------------------------------------------------------- 259 // CAMEL END USER - DEBUG ME HERE +++ END +++ 260 // ---------------------------------------------------------- 261 } 262 } 263 } 264 265 /** 266 * Strategy to determine if we should continue processing the {@link Exchange}. 267 */ 268 protected boolean continueProcessing(Exchange exchange) { 269 Object stop = exchange.getProperty(Exchange.ROUTE_STOP); 270 if (stop != null) { 271 boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); 272 if (doStop) { 273 LOG.debug("Exchange is marked to stop routing: {}", exchange); 274 return false; 275 } 276 } 277 278 // determine if we can still run, or the camel context is forcing a shutdown 279 boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown(this); 280 if (forceShutdown) { 281 String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + exchange; 282 LOG.debug(msg); 283 if (exchange.getException() == null) { 284 exchange.setException(new RejectedExecutionException(msg)); 285 } 286 return false; 287 } 288 289 // yes we can continue 290 return true; 291 } 292 293 /** 294 * Advice to invoke callbacks for before and after routing. 295 */ 296 public static class RouteLifecycleAdvice implements CamelInternalProcessorAdvice<Object> { 297 298 private Route route; 299 300 public void setRoute(Route route) { 301 this.route = route; 302 } 303 304 @Override 305 public Object before(Exchange exchange) throws Exception { 306 UnitOfWork uow = exchange.getUnitOfWork(); 307 if (uow != null) { 308 uow.beforeRoute(exchange, route); 309 } 310 return null; 311 } 312 313 @Override 314 public void after(Exchange exchange, Object object) throws Exception { 315 UnitOfWork uow = exchange.getUnitOfWork(); 316 if (uow != null) { 317 uow.afterRoute(exchange, route); 318 } 319 } 320 } 321 322 /** 323 * Advice for JMX instrumentation of the process being invoked. 324 * <p/> 325 * This advice keeps track of JMX metrics for performance statistics. 326 * <p/> 327 * The current implementation of this advice is only used for route level statistics. For processor levels 328 * they are still wrapped in the route processor chains. 329 */ 330 public static class InstrumentationAdvice implements CamelInternalProcessorAdvice<StopWatch> { 331 332 private PerformanceCounter counter; 333 private String type; 334 335 public InstrumentationAdvice(String type) { 336 this.type = type; 337 } 338 339 public void setCounter(Object counter) { 340 ManagedPerformanceCounter mpc = null; 341 if (counter instanceof ManagedPerformanceCounter) { 342 mpc = (ManagedPerformanceCounter) counter; 343 } 344 345 if (this.counter instanceof DelegatePerformanceCounter) { 346 ((DelegatePerformanceCounter) this.counter).setCounter(mpc); 347 } else if (mpc != null) { 348 this.counter = mpc; 349 } else if (counter instanceof PerformanceCounter) { 350 this.counter = (PerformanceCounter) counter; 351 } 352 } 353 354 protected void beginTime(Exchange exchange) { 355 counter.processExchange(exchange); 356 } 357 358 protected void recordTime(Exchange exchange, long duration) { 359 if (LOG.isTraceEnabled()) { 360 LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type != null ? type + ": " : "", duration, exchange}); 361 } 362 363 if (!exchange.isFailed() && exchange.getException() == null) { 364 counter.completedExchange(exchange, duration); 365 } else { 366 counter.failedExchange(exchange); 367 } 368 } 369 370 public String getType() { 371 return type; 372 } 373 374 public void setType(String type) { 375 this.type = type; 376 } 377 378 @Override 379 public StopWatch before(Exchange exchange) throws Exception { 380 // only record time if stats is enabled 381 StopWatch answer = counter != null && counter.isStatisticsEnabled() ? new StopWatch() : null; 382 if (answer != null) { 383 beginTime(exchange); 384 } 385 return answer; 386 } 387 388 @Override 389 public void after(Exchange exchange, StopWatch watch) throws Exception { 390 // record end time 391 if (watch != null) { 392 recordTime(exchange, watch.stop()); 393 } 394 } 395 } 396 397 /** 398 * Advice to inject the current {@link RouteContext} into the {@link UnitOfWork} on the {@link Exchange} 399 * 400 * @deprecated this logic has been merged into {@link org.apache.camel.processor.CamelInternalProcessor.UnitOfWorkProcessorAdvice} 401 */ 402 @Deprecated 403 public static class RouteContextAdvice implements CamelInternalProcessorAdvice<UnitOfWork> { 404 405 private final RouteContext routeContext; 406 407 public RouteContextAdvice(RouteContext routeContext) { 408 this.routeContext = routeContext; 409 } 410 411 @Override 412 public UnitOfWork before(Exchange exchange) throws Exception { 413 // push the current route context 414 final UnitOfWork unitOfWork = exchange.getUnitOfWork(); 415 if (unitOfWork != null) { 416 unitOfWork.pushRouteContext(routeContext); 417 } 418 return unitOfWork; 419 } 420 421 @Override 422 public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception { 423 if (unitOfWork != null) { 424 unitOfWork.popRouteContext(); 425 } 426 } 427 } 428 429 /** 430 * Advice to keep the {@link InflightRepository} up to date. 431 */ 432 public static class RouteInflightRepositoryAdvice implements CamelInternalProcessorAdvice { 433 434 private final InflightRepository inflightRepository; 435 private final String id; 436 437 public RouteInflightRepositoryAdvice(InflightRepository inflightRepository, String id) { 438 this.inflightRepository = inflightRepository; 439 this.id = id; 440 } 441 442 @Override 443 public Object before(Exchange exchange) throws Exception { 444 inflightRepository.add(exchange, id); 445 return null; 446 } 447 448 @Override 449 public void after(Exchange exchange, Object state) throws Exception { 450 inflightRepository.remove(exchange, id); 451 } 452 } 453 454 /** 455 * Advice to execute any {@link RoutePolicy} a route may have been configured with. 456 */ 457 public static class RoutePolicyAdvice implements CamelInternalProcessorAdvice { 458 459 private final List<RoutePolicy> routePolicies; 460 private Route route; 461 462 public RoutePolicyAdvice(List<RoutePolicy> routePolicies) { 463 this.routePolicies = routePolicies; 464 } 465 466 public void setRoute(Route route) { 467 this.route = route; 468 } 469 470 /** 471 * Strategy to determine if this policy is allowed to run 472 * 473 * @param policy the policy 474 * @return <tt>true</tt> to run 475 */ 476 protected boolean isRoutePolicyRunAllowed(RoutePolicy policy) { 477 if (policy instanceof StatefulService) { 478 StatefulService ss = (StatefulService) policy; 479 return ss.isRunAllowed(); 480 } 481 return true; 482 } 483 484 @Override 485 public Object before(Exchange exchange) throws Exception { 486 // invoke begin 487 for (RoutePolicy policy : routePolicies) { 488 try { 489 if (isRoutePolicyRunAllowed(policy)) { 490 policy.onExchangeBegin(route, exchange); 491 } 492 } catch (Exception e) { 493 LOG.warn("Error occurred during onExchangeBegin on RoutePolicy: " + policy 494 + ". This exception will be ignored", e); 495 } 496 } 497 return null; 498 } 499 500 @Override 501 public void after(Exchange exchange, Object data) throws Exception { 502 // do not invoke it if Camel is stopping as we don't want 503 // the policy to start a consumer during Camel is stopping 504 if (isCamelStopping(exchange.getContext())) { 505 return; 506 } 507 508 for (RoutePolicy policy : routePolicies) { 509 try { 510 if (isRoutePolicyRunAllowed(policy)) { 511 policy.onExchangeDone(route, exchange); 512 } 513 } catch (Exception e) { 514 LOG.warn("Error occurred during onExchangeDone on RoutePolicy: " + policy 515 + ". This exception will be ignored", e); 516 } 517 } 518 } 519 520 private static boolean isCamelStopping(CamelContext context) { 521 if (context instanceof StatefulService) { 522 StatefulService ss = (StatefulService) context; 523 return ss.isStopping() || ss.isStopped(); 524 } 525 return false; 526 } 527 } 528 529 /** 530 * Advice to execute the {@link BacklogTracer} if enabled. 531 */ 532 public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice, Ordered { 533 534 private final BacklogTracer backlogTracer; 535 private final ProcessorDefinition<?> processorDefinition; 536 private final ProcessorDefinition<?> routeDefinition; 537 private final boolean first; 538 539 public BacklogTracerAdvice(BacklogTracer backlogTracer, ProcessorDefinition<?> processorDefinition, 540 ProcessorDefinition<?> routeDefinition, boolean first) { 541 this.backlogTracer = backlogTracer; 542 this.processorDefinition = processorDefinition; 543 this.routeDefinition = routeDefinition; 544 this.first = first; 545 } 546 547 @Override 548 public Object before(Exchange exchange) throws Exception { 549 if (backlogTracer.shouldTrace(processorDefinition, exchange)) { 550 Date timestamp = new Date(); 551 String toNode = processorDefinition.getId(); 552 String exchangeId = exchange.getExchangeId(); 553 String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 4, 554 backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), backlogTracer.getBodyMaxChars()); 555 556 // if first we should add a pseudo trace message as well, so we have a starting message (eg from the route) 557 String routeId = routeDefinition != null ? routeDefinition.getId() : null; 558 if (first) { 559 Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class); 560 DefaultBacklogTracerEventMessage pseudo = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), created, routeId, null, exchangeId, messageAsXml); 561 backlogTracer.traceEvent(pseudo); 562 } 563 DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), timestamp, routeId, toNode, exchangeId, messageAsXml); 564 backlogTracer.traceEvent(event); 565 } 566 567 return null; 568 } 569 570 @Override 571 public void after(Exchange exchange, Object data) throws Exception { 572 // noop 573 } 574 575 @Override 576 public int getOrder() { 577 // we want tracer just before calling the processor 578 return Ordered.LOWEST - 1; 579 } 580 581 } 582 583 /** 584 * Advice to execute the {@link org.apache.camel.processor.interceptor.BacklogDebugger} if enabled. 585 */ 586 public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice<StopWatch>, Ordered { 587 588 private final BacklogDebugger backlogDebugger; 589 private final Processor target; 590 private final ProcessorDefinition<?> definition; 591 private final String nodeId; 592 593 public BacklogDebuggerAdvice(BacklogDebugger backlogDebugger, Processor target, ProcessorDefinition<?> definition) { 594 this.backlogDebugger = backlogDebugger; 595 this.target = target; 596 this.definition = definition; 597 this.nodeId = definition.getId(); 598 } 599 600 @Override 601 public StopWatch before(Exchange exchange) throws Exception { 602 if (backlogDebugger.isEnabled() && (backlogDebugger.hasBreakpoint(nodeId) || backlogDebugger.isSingleStepMode())) { 603 StopWatch watch = new StopWatch(); 604 backlogDebugger.beforeProcess(exchange, target, definition); 605 return watch; 606 } else { 607 return null; 608 } 609 } 610 611 @Override 612 public void after(Exchange exchange, StopWatch stopWatch) throws Exception { 613 if (stopWatch != null) { 614 backlogDebugger.afterProcess(exchange, target, definition, stopWatch.stop()); 615 } 616 } 617 618 @Override 619 public int getOrder() { 620 // we want debugger just before calling the processor 621 return Ordered.LOWEST; 622 } 623 } 624 625 /** 626 * Advice to inject new {@link UnitOfWork} to the {@link Exchange} if needed, and as well to ensure 627 * the {@link UnitOfWork} is done and stopped. 628 */ 629 public static class UnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> { 630 631 private final RouteContext routeContext; 632 633 public UnitOfWorkProcessorAdvice(RouteContext routeContext) { 634 this.routeContext = routeContext; 635 } 636 637 @Override 638 public UnitOfWork before(Exchange exchange) throws Exception { 639 // if the exchange doesn't have from route id set, then set it if it originated 640 // from this unit of work 641 if (routeContext != null && exchange.getFromRouteId() == null) { 642 String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); 643 exchange.setFromRouteId(routeId); 644 } 645 646 // only return UnitOfWork if we created a new as then its us that handle the lifecycle to done the created UoW 647 UnitOfWork created = null; 648 649 if (exchange.getUnitOfWork() == null) { 650 // If there is no existing UoW, then we should start one and 651 // terminate it once processing is completed for the exchange. 652 created = createUnitOfWork(exchange); 653 exchange.setUnitOfWork(created); 654 created.start(); 655 } 656 657 // for any exchange we should push/pop route context so we can keep track of which route we are routing 658 if (routeContext != null) { 659 UnitOfWork existing = exchange.getUnitOfWork(); 660 if (existing != null) { 661 existing.pushRouteContext(routeContext); 662 } 663 } 664 665 return created; 666 } 667 668 @Override 669 public void after(Exchange exchange, UnitOfWork uow) throws Exception { 670 UnitOfWork existing = exchange.getUnitOfWork(); 671 672 // execute done on uow if we created it, and the consumer is not doing it 673 if (uow != null) { 674 UnitOfWorkHelper.doneUow(uow, exchange); 675 } 676 677 // after UoW is done lets pop the route context which must be done on every existing UoW 678 if (routeContext != null && existing != null) { 679 existing.popRouteContext(); 680 } 681 } 682 683 protected UnitOfWork createUnitOfWork(Exchange exchange) { 684 return exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange); 685 } 686 687 } 688 689 /** 690 * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality. 691 */ 692 public static class ChildUnitOfWorkProcessorAdvice extends UnitOfWorkProcessorAdvice { 693 694 private final UnitOfWork parent; 695 696 public ChildUnitOfWorkProcessorAdvice(RouteContext routeContext, UnitOfWork parent) { 697 super(routeContext); 698 this.parent = parent; 699 } 700 701 @Override 702 protected UnitOfWork createUnitOfWork(Exchange exchange) { 703 // let the parent create a child unit of work to be used 704 return parent.createChildUnitOfWork(exchange); 705 } 706 707 } 708 709 /** 710 * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality. 711 */ 712 public static class SubUnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> { 713 714 @Override 715 public UnitOfWork before(Exchange exchange) throws Exception { 716 // begin savepoint 717 exchange.getUnitOfWork().beginSubUnitOfWork(exchange); 718 return exchange.getUnitOfWork(); 719 } 720 721 @Override 722 public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception { 723 // end sub unit of work 724 unitOfWork.endSubUnitOfWork(exchange); 725 } 726 } 727 728 /** 729 * Advice when Message History has been enabled. 730 */ 731 @SuppressWarnings("unchecked") 732 public static class MessageHistoryAdvice implements CamelInternalProcessorAdvice<MessageHistory> { 733 734 private final MessageHistoryFactory factory; 735 private final ProcessorDefinition<?> definition; 736 private final String routeId; 737 738 public MessageHistoryAdvice(MessageHistoryFactory factory, ProcessorDefinition<?> definition) { 739 this.factory = factory; 740 this.definition = definition; 741 this.routeId = ProcessorDefinitionHelper.getRouteId(definition); 742 } 743 744 @Override 745 public MessageHistory before(Exchange exchange) throws Exception { 746 List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class); 747 if (list == null) { 748 list = new LinkedList<>(); 749 exchange.setProperty(Exchange.MESSAGE_HISTORY, list); 750 } 751 MessageHistory history = factory.newMessageHistory(routeId, definition, new Date()); 752 list.add(history); 753 return history; 754 } 755 756 @Override 757 public void after(Exchange exchange, MessageHistory history) throws Exception { 758 if (history != null) { 759 history.nodeProcessingDone(); 760 } 761 } 762 } 763 764 /** 765 * Advice for {@link org.apache.camel.spi.StreamCachingStrategy} 766 */ 767 public static class StreamCachingAdvice implements CamelInternalProcessorAdvice<StreamCache>, Ordered { 768 769 private final StreamCachingStrategy strategy; 770 771 public StreamCachingAdvice(StreamCachingStrategy strategy) { 772 this.strategy = strategy; 773 } 774 775 @Override 776 public StreamCache before(Exchange exchange) throws Exception { 777 // check if body is already cached 778 Object body = exchange.getIn().getBody(); 779 if (body == null) { 780 return null; 781 } else if (body instanceof StreamCache) { 782 StreamCache sc = (StreamCache) body; 783 // reset so the cache is ready to be used before processing 784 sc.reset(); 785 return sc; 786 } 787 // cache the body and if we could do that replace it as the new body 788 StreamCache sc = strategy.cache(exchange); 789 if (sc != null) { 790 exchange.getIn().setBody(sc); 791 } 792 return sc; 793 } 794 795 @Override 796 public void after(Exchange exchange, StreamCache sc) throws Exception { 797 Object body; 798 if (exchange.hasOut()) { 799 body = exchange.getOut().getBody(); 800 } else { 801 body = exchange.getIn().getBody(); 802 } 803 if (body != null && body instanceof StreamCache) { 804 // reset so the cache is ready to be reused after processing 805 ((StreamCache) body).reset(); 806 } 807 } 808 809 @Override 810 public int getOrder() { 811 // we want stream caching first 812 return Ordered.HIGHEST; 813 } 814 } 815 816 /** 817 * Advice for delaying 818 */ 819 public static class DelayerAdvice implements CamelInternalProcessorAdvice { 820 821 private final long delay; 822 823 public DelayerAdvice(long delay) { 824 this.delay = delay; 825 } 826 827 @Override 828 public Object before(Exchange exchange) throws Exception { 829 try { 830 LOG.trace("Sleeping for: {} millis", delay); 831 Thread.sleep(delay); 832 } catch (InterruptedException e) { 833 LOG.debug("Sleep interrupted"); 834 Thread.currentThread().interrupt(); 835 throw e; 836 } 837 return null; 838 } 839 840 @Override 841 public void after(Exchange exchange, Object data) throws Exception { 842 // noop 843 } 844 } 845 846}