001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.Date;
022import java.util.List;
023import java.util.concurrent.RejectedExecutionException;
024
025import org.apache.camel.AsyncCallback;
026import org.apache.camel.CamelContext;
027import org.apache.camel.Exchange;
028import org.apache.camel.MessageHistory;
029import org.apache.camel.Ordered;
030import org.apache.camel.Processor;
031import org.apache.camel.Route;
032import org.apache.camel.StatefulService;
033import org.apache.camel.StreamCache;
034import org.apache.camel.api.management.PerformanceCounter;
035import org.apache.camel.management.DelegatePerformanceCounter;
036import org.apache.camel.management.mbean.ManagedPerformanceCounter;
037import org.apache.camel.model.ProcessorDefinition;
038import org.apache.camel.model.ProcessorDefinitionHelper;
039import org.apache.camel.processor.interceptor.BacklogDebugger;
040import org.apache.camel.processor.interceptor.BacklogTracer;
041import org.apache.camel.processor.interceptor.DefaultBacklogTracerEventMessage;
042import org.apache.camel.spi.InflightRepository;
043import org.apache.camel.spi.MessageHistoryFactory;
044import org.apache.camel.spi.RouteContext;
045import org.apache.camel.spi.RoutePolicy;
046import org.apache.camel.spi.StreamCachingStrategy;
047import org.apache.camel.spi.UnitOfWork;
048import org.apache.camel.util.MessageHelper;
049import org.apache.camel.util.OrderedComparator;
050import org.apache.camel.util.StopWatch;
051import org.apache.camel.util.UnitOfWorkHelper;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Internal {@link Processor} that Camel routing engine used during routing for cross cutting functionality such as:
057 * <ul>
058 *     <li>Execute {@link UnitOfWork}</li>
059 *     <li>Keeping track which route currently is being routed</li>
060 *     <li>Execute {@link RoutePolicy}</li>
061 *     <li>Gather JMX performance statics</li>
062 *     <li>Tracing</li>
063 *     <li>Debugging</li>
064 *     <li>Message History</li>
065 *     <li>Stream Caching</li>
066 * </ul>
067 * ... and more.
068 * <p/>
069 * This implementation executes this cross cutting functionality as a {@link CamelInternalProcessorAdvice} advice (before and after advice)
070 * by executing the {@link CamelInternalProcessorAdvice#before(org.apache.camel.Exchange)} and
071 * {@link CamelInternalProcessorAdvice#after(org.apache.camel.Exchange, Object)} callbacks in correct order during routing.
072 * This reduces number of stack frames needed during routing, and reduce the number of lines in stacktraces, as well
073 * makes debugging the routing engine easier for end users.
074 * <p/>
075 * <b>Debugging tips:</b> Camel end users whom want to debug their Camel applications with the Camel source code, then make sure to
076 * read the source code of this class about the debugging tips, which you can find in the
077 * {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method.
078 * <p/>
079 * The added advices can implement {@link Ordered} to control in which order the advices are executed.
080 */
081public class CamelInternalProcessor extends DelegateAsyncProcessor {
082
083    private static final Logger LOG = LoggerFactory.getLogger(CamelInternalProcessor.class);
084    private final List<CamelInternalProcessorAdvice> advices = new ArrayList<CamelInternalProcessorAdvice>();
085
086    public CamelInternalProcessor() {
087    }
088
089    public CamelInternalProcessor(Processor processor) {
090        super(processor);
091    }
092
093    /**
094     * Adds an {@link CamelInternalProcessorAdvice} advice to the list of advices to execute by this internal processor.
095     *
096     * @param advice  the advice to add
097     */
098    public void addAdvice(CamelInternalProcessorAdvice advice) {
099        advices.add(advice);
100        // ensure advices are sorted so they are in the order we want
101        Collections.sort(advices, new OrderedComparator());
102    }
103
104    /**
105     * Gets the advice with the given type.
106     *
107     * @param type  the type of the advice
108     * @return the advice if exists, or <tt>null</tt> if no advices has been added with the given type.
109     */
110    public <T> T getAdvice(Class<T> type) {
111        for (CamelInternalProcessorAdvice task : advices) {
112            if (type.isInstance(task)) {
113                return type.cast(task);
114            }
115        }
116        return null;
117    }
118
119    @Override
120    public boolean process(Exchange exchange, AsyncCallback callback) {
121        // ----------------------------------------------------------
122        // CAMEL END USER - READ ME FOR DEBUGGING TIPS
123        // ----------------------------------------------------------
124        // If you want to debug the Camel routing engine, then there is a lot of internal functionality
125        // the routing engine executes during routing messages. You can skip debugging this internal
126        // functionality and instead debug where the routing engine continues routing to the next node
127        // in the routes. The CamelInternalProcessor is a vital part of the routing engine, as its
128        // being used in between the nodes. As an end user you can just debug the code in this class
129        // in between the:
130        //   CAMEL END USER - DEBUG ME HERE +++ START +++
131        //   CAMEL END USER - DEBUG ME HERE +++ END +++
132        // you can see in the code below.
133        // ----------------------------------------------------------
134
135        if (processor == null || !continueProcessing(exchange)) {
136            // no processor or we should not continue then we are done
137            callback.done(true);
138            return true;
139        }
140
141        final List<Object> states = new ArrayList<Object>(advices.size());
142        for (CamelInternalProcessorAdvice task : advices) {
143            try {
144                Object state = task.before(exchange);
145                states.add(state);
146            } catch (Throwable e) {
147                exchange.setException(e);
148                callback.done(true);
149                return true;
150            }
151        }
152
153        // create internal callback which will execute the advices in reverse order when done
154        callback = new InternalCallback(states, exchange, callback);
155
156        // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0
157        Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
158        if (exchange.isTransacted() || synchronous != null) {
159            // must be synchronized for transacted exchanges
160            if (LOG.isTraceEnabled()) {
161                if (exchange.isTransacted()) {
162                    LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
163                } else {
164                    LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
165                }
166            }
167            // ----------------------------------------------------------
168            // CAMEL END USER - DEBUG ME HERE +++ START +++
169            // ----------------------------------------------------------
170            try {
171                processor.process(exchange);
172            } catch (Throwable e) {
173                exchange.setException(e);
174            }
175            // ----------------------------------------------------------
176            // CAMEL END USER - DEBUG ME HERE +++ END +++
177            // ----------------------------------------------------------
178            callback.done(true);
179            return true;
180        } else {
181            final UnitOfWork uow = exchange.getUnitOfWork();
182
183            // allow unit of work to wrap callback in case it need to do some special work
184            // for example the MDCUnitOfWork
185            AsyncCallback async = callback;
186            if (uow != null) {
187                async = uow.beforeProcess(processor, exchange, callback);
188            }
189
190            // ----------------------------------------------------------
191            // CAMEL END USER - DEBUG ME HERE +++ START +++
192            // ----------------------------------------------------------
193            if (LOG.isTraceEnabled()) {
194                LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
195            }
196            boolean sync = processor.process(exchange, async);
197            // ----------------------------------------------------------
198            // CAMEL END USER - DEBUG ME HERE +++ END +++
199            // ----------------------------------------------------------
200
201            // execute any after processor work (in current thread, not in the callback)
202            if (uow != null) {
203                uow.afterProcess(processor, exchange, callback, sync);
204            }
205
206            if (LOG.isTraceEnabled()) {
207                LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
208                        new Object[]{sync ? "synchronously" : "asynchronously", exchange.getExchangeId(), exchange});
209            }
210            return sync;
211        }
212    }
213
214    @Override
215    public String toString() {
216        return processor != null ? processor.toString() : super.toString();
217    }
218
219    /**
220     * Internal callback that executes the after advices.
221     */
222    private final class InternalCallback implements AsyncCallback {
223
224        private final List<Object> states;
225        private final Exchange exchange;
226        private final AsyncCallback callback;
227
228        private InternalCallback(List<Object> states, Exchange exchange, AsyncCallback callback) {
229            this.states = states;
230            this.exchange = exchange;
231            this.callback = callback;
232        }
233
234        @Override
235        public void done(boolean doneSync) {
236            // NOTE: if you are debugging Camel routes, then all the code in the for loop below is internal only
237            // so you can step straight to the finally block and invoke the callback
238
239            // we should call after in reverse order
240            try {
241                for (int i = advices.size() - 1; i >= 0; i--) {
242                    CamelInternalProcessorAdvice task = advices.get(i);
243                    Object state = states.get(i);
244                    try {
245                        task.after(exchange, state);
246                    } catch (Exception e) {
247                        exchange.setException(e);
248                        // allow all advices to complete even if there was an exception
249                    }
250                }
251            } finally {
252                // ----------------------------------------------------------
253                // CAMEL END USER - DEBUG ME HERE +++ START +++
254                // ----------------------------------------------------------
255                // callback must be called
256                callback.done(doneSync);
257                // ----------------------------------------------------------
258                // CAMEL END USER - DEBUG ME HERE +++ END +++
259                // ----------------------------------------------------------
260            }
261        }
262    }
263
264    /**
265     * Strategy to determine if we should continue processing the {@link Exchange}.
266     */
267    protected boolean continueProcessing(Exchange exchange) {
268        Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
269        if (stop != null) {
270            boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
271            if (doStop) {
272                LOG.debug("Exchange is marked to stop routing: {}", exchange);
273                return false;
274            }
275        }
276
277        // determine if we can still run, or the camel context is forcing a shutdown
278        boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown(this);
279        if (forceShutdown) {
280            String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + exchange;
281            LOG.debug(msg);
282            if (exchange.getException() == null) {
283                exchange.setException(new RejectedExecutionException(msg));
284            }
285            return false;
286        }
287
288        // yes we can continue
289        return true;
290    }
291
292    /**
293     * Advice to invoke callbacks for before and after routing.
294     */
295    public static class RouteLifecycleAdvice implements CamelInternalProcessorAdvice<Object> {
296
297        private Route route;
298
299        public void setRoute(Route route) {
300            this.route = route;
301        }
302
303        @Override
304        public Object before(Exchange exchange) throws Exception {
305            UnitOfWork uow = exchange.getUnitOfWork();
306            if (uow != null) {
307                uow.beforeRoute(exchange, route);
308            }
309            return null;
310        }
311
312        @Override
313        public void after(Exchange exchange, Object object) throws Exception {
314            UnitOfWork uow = exchange.getUnitOfWork();
315            if (uow != null) {
316                uow.afterRoute(exchange, route);
317            }
318        }
319    }
320
321    /**
322     * Advice for JMX instrumentation of the process being invoked.
323     * <p/>
324     * This advice keeps track of JMX metrics for performance statistics.
325     * <p/>
326     * The current implementation of this advice is only used for route level statistics. For processor levels
327     * they are still wrapped in the route processor chains.
328     */
329    public static class InstrumentationAdvice implements CamelInternalProcessorAdvice<StopWatch> {
330
331        private PerformanceCounter counter;
332        private String type;
333
334        public InstrumentationAdvice(String type) {
335            this.type = type;
336        }
337
338        public void setCounter(Object counter) {
339            ManagedPerformanceCounter mpc = null;
340            if (counter instanceof ManagedPerformanceCounter) {
341                mpc = (ManagedPerformanceCounter) counter;
342            }
343
344            if (this.counter instanceof DelegatePerformanceCounter) {
345                ((DelegatePerformanceCounter) this.counter).setCounter(mpc);
346            } else if (mpc != null) {
347                this.counter = mpc;
348            } else if (counter instanceof PerformanceCounter) {
349                this.counter = (PerformanceCounter) counter;
350            }
351        }
352
353        protected void beginTime(Exchange exchange) {
354            counter.processExchange(exchange);
355        }
356
357        protected void recordTime(Exchange exchange, long duration) {
358            if (LOG.isTraceEnabled()) {
359                LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type != null ? type + ": " : "", duration, exchange});
360            }
361
362            if (!exchange.isFailed() && exchange.getException() == null) {
363                counter.completedExchange(exchange, duration);
364            } else {
365                counter.failedExchange(exchange);
366            }
367        }
368
369        public String getType() {
370            return type;
371        }
372
373        public void setType(String type) {
374            this.type = type;
375        }
376
377        @Override
378        public StopWatch before(Exchange exchange) throws Exception {
379            // only record time if stats is enabled
380            StopWatch answer = counter != null && counter.isStatisticsEnabled() ? new StopWatch() : null;
381            if (answer != null) {
382                beginTime(exchange);
383            }
384            return answer;
385        }
386
387        @Override
388        public void after(Exchange exchange, StopWatch watch) throws Exception {
389            // record end time
390            if (watch != null) {
391                recordTime(exchange, watch.stop());
392            }
393        }
394    }
395
396    /**
397     * Advice to inject the current {@link RouteContext} into the {@link UnitOfWork} on the {@link Exchange}
398     *
399     * @deprecated this logic has been merged into {@link org.apache.camel.processor.CamelInternalProcessor.UnitOfWorkProcessorAdvice}
400     */
401    @Deprecated
402    public static class RouteContextAdvice implements CamelInternalProcessorAdvice<UnitOfWork> {
403
404        private final RouteContext routeContext;
405
406        public RouteContextAdvice(RouteContext routeContext) {
407            this.routeContext = routeContext;
408        }
409
410        @Override
411        public UnitOfWork before(Exchange exchange) throws Exception {
412            // push the current route context
413            final UnitOfWork unitOfWork = exchange.getUnitOfWork();
414            if (unitOfWork != null) {
415                unitOfWork.pushRouteContext(routeContext);
416            }
417            return unitOfWork;
418        }
419
420        @Override
421        public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception {
422            if (unitOfWork != null) {
423                unitOfWork.popRouteContext();
424            }
425        }
426    }
427
428    /**
429     * Advice to keep the {@link InflightRepository} up to date.
430     */
431    public static class RouteInflightRepositoryAdvice implements CamelInternalProcessorAdvice {
432
433        private final InflightRepository inflightRepository;
434        private final String id;
435
436        public RouteInflightRepositoryAdvice(InflightRepository inflightRepository, String id) {
437            this.inflightRepository = inflightRepository;
438            this.id = id;
439        }
440
441        @Override
442        public Object before(Exchange exchange) throws Exception {
443            inflightRepository.add(exchange, id);
444            return null;
445        }
446
447        @Override
448        public void after(Exchange exchange, Object state) throws Exception {
449            inflightRepository.remove(exchange, id);
450        }
451    }
452
453    /**
454     * Advice to execute any {@link RoutePolicy} a route may have been configured with.
455     */
456    public static class RoutePolicyAdvice implements CamelInternalProcessorAdvice {
457
458        private final List<RoutePolicy> routePolicies;
459        private Route route;
460
461        public RoutePolicyAdvice(List<RoutePolicy> routePolicies) {
462            this.routePolicies = routePolicies;
463        }
464
465        public void setRoute(Route route) {
466            this.route = route;
467        }
468
469        /**
470         * Strategy to determine if this policy is allowed to run
471         *
472         * @param policy the policy
473         * @return <tt>true</tt> to run
474         */
475        protected boolean isRoutePolicyRunAllowed(RoutePolicy policy) {
476            if (policy instanceof StatefulService) {
477                StatefulService ss = (StatefulService) policy;
478                return ss.isRunAllowed();
479            }
480            return true;
481        }
482
483        @Override
484        public Object before(Exchange exchange) throws Exception {
485            // invoke begin
486            for (RoutePolicy policy : routePolicies) {
487                try {
488                    if (isRoutePolicyRunAllowed(policy)) {
489                        policy.onExchangeBegin(route, exchange);
490                    }
491                } catch (Exception e) {
492                    LOG.warn("Error occurred during onExchangeBegin on RoutePolicy: " + policy
493                            + ". This exception will be ignored", e);
494                }
495            }
496            return null;
497        }
498
499        @Override
500        public void after(Exchange exchange, Object data) throws Exception {
501            // do not invoke it if Camel is stopping as we don't want
502            // the policy to start a consumer during Camel is stopping
503            if (isCamelStopping(exchange.getContext())) {
504                return;
505            }
506
507            for (RoutePolicy policy : routePolicies) {
508                try {
509                    if (isRoutePolicyRunAllowed(policy)) {
510                        policy.onExchangeDone(route, exchange);
511                    }
512                } catch (Exception e) {
513                    LOG.warn("Error occurred during onExchangeDone on RoutePolicy: " + policy
514                            + ". This exception will be ignored", e);
515                }
516            }
517        }
518
519        private static boolean isCamelStopping(CamelContext context) {
520            if (context instanceof StatefulService) {
521                StatefulService ss = (StatefulService) context;
522                return ss.isStopping() || ss.isStopped();
523            }
524            return false;
525        }
526    }
527
528    /**
529     * Advice to execute the {@link BacklogTracer} if enabled.
530     */
531    public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice, Ordered {
532
533        private final BacklogTracer backlogTracer;
534        private final ProcessorDefinition<?> processorDefinition;
535        private final ProcessorDefinition<?> routeDefinition;
536        private final boolean first;
537
538        public BacklogTracerAdvice(BacklogTracer backlogTracer, ProcessorDefinition<?> processorDefinition,
539                                   ProcessorDefinition<?> routeDefinition, boolean first) {
540            this.backlogTracer = backlogTracer;
541            this.processorDefinition = processorDefinition;
542            this.routeDefinition = routeDefinition;
543            this.first = first;
544        }
545
546        @Override
547        public Object before(Exchange exchange) throws Exception {
548            if (backlogTracer.shouldTrace(processorDefinition, exchange)) {
549                Date timestamp = new Date();
550                String toNode = processorDefinition.getId();
551                String exchangeId = exchange.getExchangeId();
552                String messageAsXml = MessageHelper.dumpAsXml(exchange.getIn(), true, 4,
553                        backlogTracer.isBodyIncludeStreams(), backlogTracer.isBodyIncludeFiles(), backlogTracer.getBodyMaxChars());
554
555                // if first we should add a pseudo trace message as well, so we have a starting message (eg from the route)
556                String routeId = routeDefinition != null ? routeDefinition.getId() : null;
557                if (first) {
558                    Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class);
559                    DefaultBacklogTracerEventMessage pseudo = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), created, routeId, null, exchangeId, messageAsXml);
560                    backlogTracer.traceEvent(pseudo);
561                }
562                DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), timestamp, routeId, toNode, exchangeId, messageAsXml);
563                backlogTracer.traceEvent(event);
564            }
565
566            return null;
567        }
568
569        @Override
570        public void after(Exchange exchange, Object data) throws Exception {
571            // noop
572        }
573
574        @Override
575        public int getOrder() {
576            // we want tracer just before calling the processor
577            return Ordered.LOWEST - 1;
578        }
579
580    }
581
582    /**
583     * Advice to execute the {@link org.apache.camel.processor.interceptor.BacklogDebugger} if enabled.
584     */
585    public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice<StopWatch>, Ordered {
586
587        private final BacklogDebugger backlogDebugger;
588        private final Processor target;
589        private final ProcessorDefinition<?> definition;
590        private final String nodeId;
591
592        public BacklogDebuggerAdvice(BacklogDebugger backlogDebugger, Processor target, ProcessorDefinition<?> definition) {
593            this.backlogDebugger = backlogDebugger;
594            this.target = target;
595            this.definition = definition;
596            this.nodeId = definition.getId();
597        }
598
599        @Override
600        public StopWatch before(Exchange exchange) throws Exception {
601            if (backlogDebugger.isEnabled() && (backlogDebugger.hasBreakpoint(nodeId) || backlogDebugger.isSingleStepMode())) {
602                StopWatch watch = new StopWatch();
603                backlogDebugger.beforeProcess(exchange, target, definition);
604                return watch;
605            } else {
606                return null;
607            }
608        }
609
610        @Override
611        public void after(Exchange exchange, StopWatch stopWatch) throws Exception {
612            if (stopWatch != null) {
613                backlogDebugger.afterProcess(exchange, target, definition, stopWatch.stop());
614            }
615        }
616
617        @Override
618        public int getOrder() {
619            // we want debugger just before calling the processor
620            return Ordered.LOWEST;
621        }
622    }
623
624    /**
625     * Advice to inject new {@link UnitOfWork} to the {@link Exchange} if needed, and as well to ensure
626     * the {@link UnitOfWork} is done and stopped.
627     */
628    public static class UnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> {
629
630        private final RouteContext routeContext;
631
632        public UnitOfWorkProcessorAdvice(RouteContext routeContext) {
633            this.routeContext = routeContext;
634        }
635
636        @Override
637        public UnitOfWork before(Exchange exchange) throws Exception {
638            // if the exchange doesn't have from route id set, then set it if it originated
639            // from this unit of work
640            if (routeContext != null && exchange.getFromRouteId() == null) {
641                String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
642                exchange.setFromRouteId(routeId);
643            }
644
645            // only return UnitOfWork if we created a new as then its us that handle the lifecycle to done the created UoW
646            UnitOfWork created = null;
647
648            if (exchange.getUnitOfWork() == null) {
649                // If there is no existing UoW, then we should start one and
650                // terminate it once processing is completed for the exchange.
651                created = createUnitOfWork(exchange);
652                exchange.setUnitOfWork(created);
653                created.start();
654            }
655
656            // for any exchange we should push/pop route context so we can keep track of which route we are routing
657            if (routeContext != null) {
658                UnitOfWork existing = exchange.getUnitOfWork();
659                if (existing != null) {
660                    existing.pushRouteContext(routeContext);
661                }
662            }
663
664            return created;
665        }
666
667        @Override
668        public void after(Exchange exchange, UnitOfWork uow) throws Exception {
669            UnitOfWork existing = exchange.getUnitOfWork();
670
671            // execute done on uow if we created it, and the consumer is not doing it
672            if (uow != null) {
673                UnitOfWorkHelper.doneUow(uow, exchange);
674            }
675
676            // after UoW is done lets pop the route context which must be done on every existing UoW
677            if (routeContext != null && existing != null) {
678                existing.popRouteContext();
679            }
680        }
681
682        protected UnitOfWork createUnitOfWork(Exchange exchange) {
683            return exchange.getContext().getUnitOfWorkFactory().createUnitOfWork(exchange);
684        }
685
686    }
687
688    /**
689     * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality.
690     */
691    public static class ChildUnitOfWorkProcessorAdvice extends UnitOfWorkProcessorAdvice {
692
693        private final UnitOfWork parent;
694
695        public ChildUnitOfWorkProcessorAdvice(RouteContext routeContext, UnitOfWork parent) {
696            super(routeContext);
697            this.parent = parent;
698        }
699
700        @Override
701        protected UnitOfWork createUnitOfWork(Exchange exchange) {
702            // let the parent create a child unit of work to be used
703            return parent.createChildUnitOfWork(exchange);
704        }
705
706    }
707
708    /**
709     * Advice when an EIP uses the <tt>shareUnitOfWork</tt> functionality.
710     */
711    public static class SubUnitOfWorkProcessorAdvice implements CamelInternalProcessorAdvice<UnitOfWork> {
712
713        @Override
714        public UnitOfWork before(Exchange exchange) throws Exception {
715            // begin savepoint
716            exchange.getUnitOfWork().beginSubUnitOfWork(exchange);
717            return exchange.getUnitOfWork();
718        }
719
720        @Override
721        public void after(Exchange exchange, UnitOfWork unitOfWork) throws Exception {
722            // end sub unit of work
723            unitOfWork.endSubUnitOfWork(exchange);
724        }
725    }
726
727    /**
728     * Advice when Message History has been enabled.
729     */
730    @SuppressWarnings("unchecked")
731    public static class MessageHistoryAdvice implements CamelInternalProcessorAdvice<MessageHistory> {
732
733        private final MessageHistoryFactory factory;
734        private final ProcessorDefinition<?> definition;
735        private final String routeId;
736
737        public MessageHistoryAdvice(MessageHistoryFactory factory, ProcessorDefinition<?> definition) {
738            this.factory = factory;
739            this.definition = definition;
740            this.routeId = ProcessorDefinitionHelper.getRouteId(definition);
741        }
742
743        @Override
744        public MessageHistory before(Exchange exchange) throws Exception {
745            List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
746            if (list == null) {
747                list = new ArrayList<MessageHistory>();
748                exchange.setProperty(Exchange.MESSAGE_HISTORY, list);
749            }
750            MessageHistory history = factory.newMessageHistory(routeId, definition, new Date());
751            list.add(history);
752            return history;
753        }
754
755        @Override
756        public void after(Exchange exchange, MessageHistory history) throws Exception {
757            if (history != null) {
758                history.nodeProcessingDone();
759            }
760        }
761    }
762
763    /**
764     * Advice for {@link org.apache.camel.spi.StreamCachingStrategy}
765     */
766    public static class StreamCachingAdvice implements CamelInternalProcessorAdvice<StreamCache>, Ordered {
767
768        private final StreamCachingStrategy strategy;
769
770        public StreamCachingAdvice(StreamCachingStrategy strategy) {
771            this.strategy = strategy;
772        }
773
774        @Override
775        public StreamCache before(Exchange exchange) throws Exception {
776            // check if body is already cached
777            Object body = exchange.getIn().getBody();
778            if (body == null) {
779                return null;
780            } else if (body instanceof StreamCache) {
781                StreamCache sc = (StreamCache) body;
782                // reset so the cache is ready to be used before processing
783                sc.reset();
784                return sc;
785            }
786            // cache the body and if we could do that replace it as the new body
787            StreamCache sc = strategy.cache(exchange);
788            if (sc != null) {
789                exchange.getIn().setBody(sc);
790            }
791            return sc;
792        }
793
794        @Override
795        public void after(Exchange exchange, StreamCache sc) throws Exception {
796            Object body = null;
797            if (exchange.hasOut()) {
798                body = exchange.getOut().getBody();
799            } else {
800                body = exchange.getIn().getBody();
801            }
802            if (body != null && body instanceof StreamCache) {
803                // reset so the cache is ready to be reused after processing
804                ((StreamCache) body).reset();
805            }
806        }
807
808        @Override
809        public int getOrder() {
810            // we want stream caching first
811            return Ordered.HIGHEST;
812        }
813    }
814
815    /**
816     * Advice for delaying
817     */
818    public static class DelayerAdvice implements CamelInternalProcessorAdvice {
819
820        private final long delay;
821
822        public DelayerAdvice(long delay) {
823            this.delay = delay;
824        }
825
826        @Override
827        public Object before(Exchange exchange) throws Exception {
828            try {
829                LOG.trace("Sleeping for: {} millis", delay);
830                Thread.sleep(delay);
831            } catch (InterruptedException e) {
832                LOG.debug("Sleep interrupted");
833                Thread.currentThread().interrupt();
834                throw e;
835            }
836            return null;
837        }
838
839        @Override
840        public void after(Exchange exchange, Object data) throws Exception {
841            // noop
842        }
843    }
844
845}