001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.Date;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.concurrent.RejectedExecutionException;
025
026import org.apache.camel.AsyncCallback;
027import org.apache.camel.CamelContext;
028import org.apache.camel.Exchange;
029import org.apache.camel.MessageHistory;
030import org.apache.camel.Ordered;
031import org.apache.camel.Processor;
032import org.apache.camel.Route;
033import org.apache.camel.StatefulService;
034import org.apache.camel.StreamCache;
035import org.apache.camel.api.management.PerformanceCounter;
036import org.apache.camel.management.DelegatePerformanceCounter;
037import org.apache.camel.management.mbean.ManagedPerformanceCounter;
038import org.apache.camel.model.ProcessorDefinition;
039import org.apache.camel.model.ProcessorDefinitionHelper;
040import org.apache.camel.processor.interceptor.BacklogDebugger;
041import org.apache.camel.processor.interceptor.BacklogTracer;
042import org.apache.camel.processor.interceptor.DefaultBacklogTracerEventMessage;
043import org.apache.camel.spi.InflightRepository;
044import org.apache.camel.spi.MessageHistoryFactory;
045import org.apache.camel.spi.RouteContext;
046import org.apache.camel.spi.RoutePolicy;
047import org.apache.camel.spi.StreamCachingStrategy;
048import org.apache.camel.spi.UnitOfWork;
049import org.apache.camel.util.MessageHelper;
050import org.apache.camel.util.OrderedComparator;
051import org.apache.camel.util.StopWatch;
052import org.apache.camel.util.UnitOfWorkHelper;
053import org.slf4j.Logger;
054import org.slf4j.LoggerFactory;
055
056/**
057 * Internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as:
058 * <ul>
059 *     <li>Execute {@link UnitOfWork}</li>
060 *     <li>Keeping track which route currently is being routed</li>
061 *     <li>Execute {@link RoutePolicy}</li>
062 *     <li>Gather JMX performance statics</li>
063 *     <li>Tracing</li>
064 *     <li>Debugging</li>
065 *     <li>Message History</li>
066 *     <li>Stream Caching</li>
067 * </ul>
068 * ... and more.
069 * <p/>
070 * This implementation executes this cross cutting functionality as a {@link CamelInternalProcessorAdvice} advice (before and after advice)
071 * by executing the {@link CamelInternalProcessorAdvice#before(org.apache.camel.Exchange)} and
072 * {@link CamelInternalProcessorAdvice#after(org.apache.camel.Exchange, Object)} callbacks in correct order during routing.
073 * This reduces number of stack frames needed during routing, and reduce the number of lines in stacktraces, as well
074 * makes debugging the routing engine easier for end users.
075 * <p/>
076 * <b>Debugging tips:</b> Camel end users whom want to debug their Camel applications with the Camel source code, then make sure to
077 * read the source code of this class about the debugging tips, which you can find in the
078 * {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method.
079 * <p/>
080 * The added advices can implement {@link Ordered} to control in which order the advices are executed.
081 */
082public class CamelInternalProcessor extends DelegateAsyncProcessor {
083
084    private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class);
085    private final List<CamelInternalProcessorAdvice> advices = new ArrayList<CamelInternalProcessorAdvice>();
086
087    public CamelInternalProcessor() {
088    }
089
090    public CamelInternalProcessor(Processor processor) {
091        super(processor);
092    }
093
094    /**
095     * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor.
096     *
097     * @param advice  the advice to add
098     */
099    public void addAdvice(CamelInternalProcessorAdvice advice) {
100        advices.add(advice);
101        // ensure advices are sorted so they are in the order we want
102        Collections.sort(advices, new OrderedComparator());
103    }
104
105    /**
106     * Gets the advice with the given type.
107     *
108     * @param type  the type of the advice
109     * @return the advice if exists, or <tt>null</tt> if no advices has been added with the given type.
110     */
111    public <T> T getAdvice(Class<T> type) {
112        for (CamelInternalProcessorAdvice task : advices) {
113            if (type.isInstance(task)) {
114                return type.cast(task);
115            }
116        }
117        return null;
118    }
119
120    @Override
121    public boolean process(Exchange exchange, AsyncCallback callback) {
122        // ----------------------------------------------------------
123        // CAMEL END USER - READ ME FOR DEBUGGING TIPS
124        // ----------------------------------------------------------
125        // If you want to debug the Camel routing engine, then there is a lot of internal functionality
126        // the routing engine executes during routing messages. You can skip debugging this internal
127        // functionality and instead debug where the routing engine continues routing to the next node
128        // in the routes. The CamelInternalProcessor is a vital part of the routing engine, as its
129        // being used in between the nodes. As an end user you can just debug the code in this class
130        // in between the:
131        //   CAMEL END USER - DEBUG ME HERE +++ START +++
132        //   CAMEL END USER - DEBUG ME HERE +++ END +++
133        // you can see in the code below.
134        // ----------------------------------------------------------
135
136        if (processor == null || !continueProcessing(exchange)) {
137            // no processor or we should not continue then we are done
138            callback.done(true);
139            return true;
140        }
141
142        final List<Object> states = new ArrayList<Object>(advices.size());
143        for (CamelInternalProcessorAdvice task : advices) {
144            try {
145                Object state = task.before(exchange);
146                states.add(state);
147            } catch (Throwable e) {
148                exchange.setException(e);
149                callback.done(true);
150                return true;
151            }
152        }
153
154        // create internal callback which will execute the advices in reverse order when done
155        callback = new InternalCallback(states, exchange, callback);
156
157        // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0
158        Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
159        if (exchange.isTransacted() || synchronous != null) {
160            // must be synchronized for transacted exchanges
161            if (LOG.isTraceEnabled()) {
162                if (exchange.isTransacted()) {
163                    LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
164                } else {
165                    LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
166                }
167            }
168            // ----------------------------------------------------------
169            // CAMEL END USER - DEBUG ME HERE +++ START +++
170            // ----------------------------------------------------------
171            try {
172                processor.process(exchange);
173            } catch (Throwable e) {
174                exchange.setException(e);
175            }
176            // ----------------------------------------------------------
177            // CAMEL END USER - DEBUG ME HERE +++ END +++
178            // ----------------------------------------------------------
179            callback.done(true);
180            return true;
181        } else {
182            final UnitOfWork uow = exchange.getUnitOfWork();
183
184            // allow unit of work to wrap callback in case it need to do some special work
185            // for example the MDCUnitOfWork
186            AsyncCallback async = callback;
187            if (uow != null) {
188                async = uow.beforeProcess(processor, exchange, callback);
189            }
190
191            // ----------------------------------------------------------
192            // CAMEL END USER - DEBUG ME HERE +++ START +++
193            // ----------------------------------------------------------
194            if (LOG.isTraceEnabled()) {
195                LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
196            }
197            boolean sync = processor.process(exchange, async);
198            // ----------------------------------------------------------
199            // CAMEL END USER - DEBUG ME HERE +++ END +++
200            // ----------------------------------------------------------
201
202            // execute any after processor work (in current thread, not in the callback)
203            if (uow != null) {
204                uow.afterProcess(processor, exchange, callback, sync);
205            }
206
207            if (LOG.isTraceEnabled()) {
208                LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
209                        new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange});
210            }
211            return sync;
212        }
213    }
214
215    @Override
216    public String toString() {
217        return processor != null ? processor.toString() : super.toString();
218    }
219
220    /**
221     * Internal callback that executes the after advices.
222     */
223    private final class InternalCallback implements AsyncCallback {
224
225        private final List<Object> states;
226        private final Exchange exchange;
227        private final AsyncCallback callback;
228
229        private InternalCallback(List<Object> states, Exchange exchange, AsyncCallback callback) {
230            this.states = states;
231            this.exchange = exchange;
232            this.callback = callback;
233        }
234
235        @Override
236        public void done(boolean doneSync) {
237            // NOTE: if you are debugging Camel routes, then all the code in the for loop below is internal only
238            // so you can step straight to the finally block and invoke the callback
239
240            // we should call after in reverse order
241            try {
242                for (int i = advices.size() - 1; i >= 0; i--) {
243                    CamelInternalProcessorAdvice task = advices.get(i);
244                    Object state = states.get(i);
245                    try {
246                        task.after(exchange, state);
247                    } catch (Exception e) {
248                        exchange.setException(e);
249                        // allow all advices to complete even if there was an exception
250                    }
251                }
252            } finally {
253                // ----------------------------------------------------------
254                // CAMEL END USER - DEBUG ME HERE +++ START +++
255                // ----------------------------------------------------------
256                // callback must be called
257                callback.done(doneSync);
258                // ----------------------------------------------------------
259                // CAMEL END USER - DEBUG ME HERE +++ END +++
260                // ----------------------------------------------------------
261            }
262        }
263    }
264
265    /**
266     * Strategy to determine if we should continue processing the {@link Exchange}.
267     */
268    protected boolean continueProcessing(Exchange exchange) {
269        Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
270        if (stop != null) {
271            boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
272            if (doStop) {
273                LOG.debug("Exchange is marked to stop routing: {}", exchange);
274                return false;
275            }
276        }
277
278        // determine if we can still run, or the camel context is forcing a shutdown
279        boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown(this);
280        if (forceShutdown) {
281            String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + exchange;
282            LOG.debug(msg);
283            if (exchange.getException() == null) {
284                exchange.setException(new RejectedExecutionException(msg));
285            }
286            return false;
287        }
288
289        // yes we can continue
290        return true;
291    }
292
293    /**
294     * Advice to invoke callbacks for before and after routing.
295     */
296    public static class RouteLifecycleAdvice implements CamelInternalProcessorAdvice<Object> {
297
298        private Route route;
299
300        public void setRoute(Route route) {
301            this.route = route;
302        }
303
304        @Override
305        public Object before(Exchange exchange) throws Exception {
306            UnitOfWork uow = exchange.getUnitOfWork();
307            if (uow != null) {
308                uow.beforeRoute(exchange, route);
309            }
310            return null;
311        }
312
313        @Override
314        public void after(Exchange exchange, Object object) throws Exception {
315            UnitOfWork uow = exchange.getUnitOfWork();
316            if (uow != null) {
317                uow.afterRoute(exchange, route);
318            }
319        }
320    }
321
322    /**
323     * Advice for JMX instrumentation of the process being invoked.
324     * <p/>
325     * This advice keeps track of JMX metrics for performance statistics.
326     * <p/>
327     * The current implementation of this advice is only used for route level statistics. For processor levels
328     * they are still wrapped in the route processor chains.
329     */
330    public static class InstrumentationAdvice implements CamelInternalProcessorAdvice<StopWatch> {
331
332        private PerformanceCounter counter;
333        private String type;
334
335        public InstrumentationAdvice(String type) {
336            this.type = type;
337        }
338
339        public void setCounter(Object counter) {
340            ManagedPerformanceCounter mpc = null;
341            if (counter instanceof ManagedPerformanceCounter) {
342                mpc = (ManagedPerformanceCounter) counter;
343            }
344
345            if (this.counter instanceof DelegatePerformanceCounter) {
346                ((DelegatePerformanceCounter) this.counter).setCounter(mpc);
347            } else if (mpc != null) {
348                this.counter = mpc;
349            } else if (counter instanceof PerformanceCounter) {
350                this.counter = (PerformanceCounter) counter;
351            }
352        }
353
354        protected void beginTime(Exchange exchange) {
355            counter.processExchange(exchange);
356        }
357
358        protected void recordTime(Exchange exchange, long duration) {
359            if (LOG.isTraceEnabled()) {
360                LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type != null ? type + ": " : "", duration, exchange});
361            }
362
363            if (!exchange.isFailed() && exchange.getException() == null) {
364                counter.completedExchange(exchange, duration);
365            } else {
366                counter.failedExchange(exchange);
367            }
368        }
369
370        public String getType() {
371            return type;
372        }
373
374        public void setType(String type) {
375            this.type = type;
376        }
377
378        @Override
379        public StopWatch before(Exchange exchange) throws Exception {
380            // only record time if stats is enabled
381            StopWatch answer = counter != null && counter.isStatisticsEnabled() ? new StopWatch() : null;
382            if (answer != null) {
383                beginTime(exchange);
384            }
385            return answer;
386        }
387
388        @Override
389        public void after(Exchange exchange, StopWatch watch) throws Exception {
390            // record end time
391            if (watch != null) {
392                recordTime(exchange, watch.stop());
393            }
394        }
395    }
396
397    /**
398     * Advice to inject the current {@link RouteContext} into the {@link UnitOfWork} on the {@link Exchange}
399     *
400     * @deprecated this logic has been merged into {@link org.apache.camel.processor.CamelInternalProcessor.UnitOfWorkProcessorAdvice}
401     */
402    @Deprecated
403    public static class RouteContextAdvice implements CamelInternalProcessorAdvice<UnitOfWork> {
404
405        private final RouteContext routeContext;
406
407        public RouteContextAdvice(RouteContext routeContext) {
408            this.routeContext = routeContext;
409        }
410
411        @Override
412        public UnitOfWork before(Exchange exchange) throws Exception {
413            // push the current route context
414            final UnitOfWork unitOfWork = exchange.getUnitOfWork();
415            if (unitOfWork != null) {
416                unitOfWork.pushRouteContext(routeContext);
417            }
418            return unitOfWork;
419        }
420
421        @Override
422        public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception {
423            if (unitOfWork != null) {
424                unitOfWork.popRouteContext();
425            }
426        }
427    }
428
429    /**
430     * Advice to keep the {@link InflightRepository} up to date.
431     */
432    public static class RouteInflightRepositoryAdvice implements CamelInternalProcessorAdvice {
433
434        private final InflightRepository inflightRepository;
435        private final String id;
436
437        public RouteInflightRepositoryAdvice(InflightRepository inflightRepository, String id) {
438            this.inflightRepository = inflightRepository;
439            this.id = id;
440        }
441
442        @Override
443        public Object before(Exchange exchange) throws Exception {
444            inflightRepository.add(exchange, id);
445            return null;
446        }
447
448        @Override
449        public void after(Exchange exchange, Object state) throws Exception {
450            inflightRepository.remove(exchange, id);
451        }
452    }
453
454    /**
455     * Advice to execute any {@link RoutePolicy} a route may have been configured with.
456     */
457    public static class RoutePolicyAdvice implements CamelInternalProcessorAdvice {
458
459        private final List<RoutePolicy> routePolicies;
460        private Route route;
461
462        public RoutePolicyAdvice(List<RoutePolicy> routePolicies) {
463            this.routePolicies = routePolicies;
464        }
465
466        public void setRoute(Route route) {
467            this.route = route;
468        }
469
470        /**
471         * Strategy to determine if this policy is allowed to run
472         *
473         * @param policy the policy
474         * @return <tt>true</tt> to run
475         */
476        protected boolean isRoutePolicyRunAllowed(RoutePolicy policy) {
477            if (policy instanceof StatefulService) {
478                StatefulService ss = (StatefulService) policy;
479                return ss.isRunAllowed();
480            }
481            return true;
482        }
483
484        @Override
485        public Object before(Exchange exchange) throws Exception {
486            // invoke begin
487            for (RoutePolicy policy : routePolicies) {
488                try {
489                    if (isRoutePolicyRunAllowed(policy)) {
490                        policy.onExchangeBegin(route, exchange);
491                    }
492                } catch (Exception e) {
493                    LOG.warn("Error occurred during onExchangeBegin on RoutePolicy: " + policy
494                            + ". This exception will be ignored", e);
495                }
496            }
497            return null;
498        }
499
500        @Override
501        public void after(Exchange exchange, Object data) throws Exception {
502            // do not invoke it if Camel is stopping as we don't want
503            // the policy to start a consumer during Camel is stopping
504            if (isCamelStopping(exchange.getContext())) {
505                return;
506            }
507
508            for (RoutePolicy policy : routePolicies) {
509                try {
510                    if (isRoutePolicyRunAllowed(policy)) {
511                        policy.onExchangeDone(route, exchange);
512                    }
513                } catch (Exception e) {
514                    LOG.warn("Error occurred during onExchangeDone on RoutePolicy: " + policy
515                            + ". This exception will be ignored", e);
516                }
517            }
518        }
519
520        private static boolean isCamelStopping(CamelContext context) {
521            if (context instanceof StatefulService) {
522                StatefulService ss = (StatefulService) context;
523                return ss.isStopping() || ss.isStopped();
524            }
525            return false;
526        }
527    }
528
529    /**
530     * Advice to execute the {@link BacklogTracer} if enabled.
531     */
532    public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice, Ordered {
533
534        private final BacklogTracer backlogTracer;
535        private final ProcessorDefinition<?> processorDefinition;
536        private final ProcessorDefinition<?> routeDefinition;
537        private final boolean first;
538
539        public BacklogTracerAdvice(BacklogTracer backlogTracer, ProcessorDefinition<?> processorDefinition,
540                                   ProcessorDefinition<?> routeDefinition, boolean first) {
541            this.backlogTracer = backlogTracer;
542            this.processorDefinition = processorDefinition;
543            this.routeDefinition = routeDefinition;
544            this.first = first;
545        }
546
547        @Override
548        public Object before(Exchange exchange) throws Exception {
549            if (backlogTracer.shouldTrace(processorDefinition, exchange)) {
550                Date timestamp = new Date();
551                String toNode = processorDefinition.getId();
552                String exchangeId = exchange.getExchangeId();
553                String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 4,
554                        backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), backlogTracer.getBodyMaxChars());
555
556                // if first we should add a pseudo trace message as well, so we have a starting message (eg from the route)
557                String routeId = routeDefinition != null ? routeDefinition.getId() : null;
558                if (first) {
559                    Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class);
560                    DefaultBacklogTracerEventMessage pseudo = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), created, routeId, null, exchangeId, messageAsXml);
561                    backlogTracer.traceEvent(pseudo);
562                }
563                DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), timestamp, routeId, toNode, exchangeId, messageAsXml);
564                backlogTracer.traceEvent(event);
565            }
566
567            return null;
568        }
569
570        @Override
571        public void after(Exchange exchange, Object data) throws Exception {
572            // noop
573        }
574
575        @Override
576        public int getOrder() {
577            // we want tracer just before calling the processor
578            return Ordered.LOWEST - 1;
579        }
580
581    }
582
583    /**
584     * Advice to execute the {@link org.apache.camel.processor.interceptor.BacklogDebugger} if enabled.
585     */
586    public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice<StopWatch>, Ordered {
587
588        private final BacklogDebugger backlogDebugger;
589        private final Processor target;
590        private final ProcessorDefinition<?> definition;
591        private final String nodeId;
592
593        public BacklogDebuggerAdvice(BacklogDebugger backlogDebugger, Processor target, ProcessorDefinition<?> definition) {
594            this.backlogDebugger = backlogDebugger;
595            this.target = target;
596            this.definition = definition;
597            this.nodeId = definition.getId();
598        }
599
600        @Override
601        public StopWatch before(Exchange exchange) throws Exception {
602            if (backlogDebugger.isEnabled() && (backlogDebugger.hasBreakpoint(nodeId) || backlogDebugger.isSingleStepMode())) {
603                StopWatch watch = new StopWatch();
604                backlogDebugger.beforeProcess(exchange, target, definition);
605                return watch;
606            } else {
607                return null;
608            }
609        }
610
611        @Override
612        public void after(Exchange exchange, StopWatch stopWatch) throws Exception {
613            if (stopWatch != null) {
614                backlogDebugger.afterProcess(exchange, target, definition, stopWatch.stop());
615            }
616        }
617
618        @Override
619        public int getOrder() {
620            // we want debugger just before calling the processor
621            return Ordered.LOWEST;
622        }
623    }
624
625    /**
626     * Advice to inject new {@link UnitOfWork} to the {@link Exchange} if needed, and as well to ensure
627     * the {@link UnitOfWork} is done and stopped.
628     */
629    public static class UnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> {
630
631        private final RouteContext routeContext;
632
633        public UnitOfWorkProcessorAdvice(RouteContext routeContext) {
634            this.routeContext = routeContext;
635        }
636
637        @Override
638        public UnitOfWork before(Exchange exchange) throws Exception {
639            // if the exchange doesn't have from route id set, then set it if it originated
640            // from this unit of work
641            if (routeContext != null && exchange.getFromRouteId() == null) {
642                String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
643                exchange.setFromRouteId(routeId);
644            }
645
646            // only return UnitOfWork if we created a new as then its us that handle the lifecycle to done the created UoW
647            UnitOfWork created = null;
648
649            if (exchange.getUnitOfWork() == null) {
650                // If there is no existing UoW, then we should start one and
651                // terminate it once processing is completed for the exchange.
652                created = createUnitOfWork(exchange);
653                exchange.setUnitOfWork(created);
654                created.start();
655            }
656
657            // for any exchange we should push/pop route context so we can keep track of which route we are routing
658            if (routeContext != null) {
659                UnitOfWork existing = exchange.getUnitOfWork();
660                if (existing != null) {
661                    existing.pushRouteContext(routeContext);
662                }
663            }
664
665            return created;
666        }
667
668        @Override
669        public void after(Exchange exchange, UnitOfWork uow) throws Exception {
670            UnitOfWork existing = exchange.getUnitOfWork();
671
672            // execute done on uow if we created it, and the consumer is not doing it
673            if (uow != null) {
674                UnitOfWorkHelper.doneUow(uow, exchange);
675            }
676
677            // after UoW is done lets pop the route context which must be done on every existing UoW
678            if (routeContext != null && existing != null) {
679                existing.popRouteContext();
680            }
681        }
682
683        protected UnitOfWork createUnitOfWork(Exchange exchange) {
684            return exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange);
685        }
686
687    }
688
689    /**
690     * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality.
691     */
692    public static class ChildUnitOfWorkProcessorAdvice extends UnitOfWorkProcessorAdvice {
693
694        private final UnitOfWork parent;
695
696        public ChildUnitOfWorkProcessorAdvice(RouteContext routeContext, UnitOfWork parent) {
697            super(routeContext);
698            this.parent = parent;
699        }
700
701        @Override
702        protected UnitOfWork createUnitOfWork(Exchange exchange) {
703            // let the parent create a child unit of work to be used
704            return parent.createChildUnitOfWork(exchange);
705        }
706
707    }
708
709    /**
710     * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality.
711     */
712    public static class SubUnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> {
713
714        @Override
715        public UnitOfWork before(Exchange exchange) throws Exception {
716            // begin savepoint
717            exchange.getUnitOfWork().beginSubUnitOfWork(exchange);
718            return exchange.getUnitOfWork();
719        }
720
721        @Override
722        public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception {
723            // end sub unit of work
724            unitOfWork.endSubUnitOfWork(exchange);
725        }
726    }
727
728    /**
729     * Advice when Message History has been enabled.
730     */
731    @SuppressWarnings("unchecked")
732    public static class MessageHistoryAdvice implements CamelInternalProcessorAdvice<MessageHistory> {
733
734        private final MessageHistoryFactory factory;
735        private final ProcessorDefinition<?> definition;
736        private final String routeId;
737
738        public MessageHistoryAdvice(MessageHistoryFactory factory, ProcessorDefinition<?> definition) {
739            this.factory = factory;
740            this.definition = definition;
741            this.routeId = ProcessorDefinitionHelper.getRouteId(definition);
742        }
743
744        @Override
745        public MessageHistory before(Exchange exchange) throws Exception {
746            List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
747            if (list == null) {
748                list = new LinkedList<>();
749                exchange.setProperty(Exchange.MESSAGE_HISTORY, list);
750            }
751            MessageHistory history = factory.newMessageHistory(routeId, definition, new Date());
752            list.add(history);
753            return history;
754        }
755
756        @Override
757        public void after(Exchange exchange, MessageHistory history) throws Exception {
758            if (history != null) {
759                history.nodeProcessingDone();
760            }
761        }
762    }
763
764    /**
765     * Advice for {@link org.apache.camel.spi.StreamCachingStrategy}
766     */
767    public static class StreamCachingAdvice implements CamelInternalProcessorAdvice<StreamCache>, Ordered {
768
769        private final StreamCachingStrategy strategy;
770
771        public StreamCachingAdvice(StreamCachingStrategy strategy) {
772            this.strategy = strategy;
773        }
774
775        @Override
776        public StreamCache before(Exchange exchange) throws Exception {
777            // check if body is already cached
778            Object body = exchange.getIn().getBody();
779            if (body == null) {
780                return null;
781            } else if (body instanceof StreamCache) {
782                StreamCache sc = (StreamCache) body;
783                // reset so the cache is ready to be used before processing
784                sc.reset();
785                return sc;
786            }
787            // cache the body and if we could do that replace it as the new body
788            StreamCache sc = strategy.cache(exchange);
789            if (sc != null) {
790                exchange.getIn().setBody(sc);
791            }
792            return sc;
793        }
794
795        @Override
796        public void after(Exchange exchange, StreamCache sc) throws Exception {
797            Object body;
798            if (exchange.hasOut()) {
799                body = exchange.getOut().getBody();
800            } else {
801                body = exchange.getIn().getBody();
802            }
803            if (body != null && body instanceof StreamCache) {
804                // reset so the cache is ready to be reused after processing
805                ((StreamCache) body).reset();
806            }
807        }
808
809        @Override
810        public int getOrder() {
811            // we want stream caching first
812            return Ordered.HIGHEST;
813        }
814    }
815
816    /**
817     * Advice for delaying
818     */
819    public static class DelayerAdvice implements CamelInternalProcessorAdvice {
820
821        private final long delay;
822
823        public DelayerAdvice(long delay) {
824            this.delay = delay;
825        }
826
827        @Override
828        public Object before(Exchange exchange) throws Exception {
829            try {
830                LOG.trace("Sleeping for: {} millis", delay);
831                Thread.sleep(delay);
832            } catch (InterruptedException e) {
833                LOG.debug("Sleep interrupted");
834                Thread.currentThread().interrupt();
835                throw e;
836            }
837            return null;
838        }
839
840        @Override
841        public void after(Exchange exchange, Object data) throws Exception {
842            // noop
843        }
844    }
845
846}