001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.interceptor;
018    
019    import java.util.ArrayList;
020    import java.util.Collections;
021    import java.util.List;
022    import java.util.Map;
023    import java.util.concurrent.RejectedExecutionException;
024    
025    import org.apache.camel.AsyncCallback;
026    import org.apache.camel.AsyncProcessor;
027    import org.apache.camel.CamelContext;
028    import org.apache.camel.CamelContextAware;
029    import org.apache.camel.Channel;
030    import org.apache.camel.Exchange;
031    import org.apache.camel.Processor;
032    import org.apache.camel.Service;
033    import org.apache.camel.model.ModelChannel;
034    import org.apache.camel.model.ProcessorDefinition;
035    import org.apache.camel.processor.InterceptorToAsyncProcessorBridge;
036    import org.apache.camel.processor.RouteContextProcessor;
037    import org.apache.camel.processor.WrapProcessor;
038    import org.apache.camel.spi.InterceptStrategy;
039    import org.apache.camel.spi.LifecycleStrategy;
040    import org.apache.camel.spi.RouteContext;
041    import org.apache.camel.support.ServiceSupport;
042    import org.apache.camel.util.AsyncProcessorHelper;
043    import org.apache.camel.util.ObjectHelper;
044    import org.apache.camel.util.OrderedComparator;
045    import org.apache.camel.util.ServiceHelper;
046    import org.slf4j.Logger;
047    import org.slf4j.LoggerFactory;
048    
049    /**
050     * DefaultChannel is the default {@link Channel}.
051     * <p/>
052     * The current implementation is just a composite containing the interceptors and error handler
053     * that beforehand was added to the route graph directly.
054     * <br/>
055     * With this {@link Channel} we can in the future implement better strategies for routing the
056     * {@link Exchange} in the route graph, as we have a {@link Channel} between each and every node
057     * in the graph.
058     *
059     * @version 
060     */
061    public class DefaultChannel extends ServiceSupport implements ModelChannel {
062    
063        private static final transient Logger LOG = LoggerFactory.getLogger(DefaultChannel.class);
064    
065        private final List<InterceptStrategy> interceptors = new ArrayList<InterceptStrategy>();
066        private Processor errorHandler;
067        // the next processor (non wrapped)
068        private Processor nextProcessor;
069        // the real output to invoke that has been wrapped
070        private Processor output;
071        private ProcessorDefinition<?> definition;
072        private ProcessorDefinition<?> childDefinition;
073        private CamelContext camelContext;
074        private RouteContext routeContext;
075        private RouteContextProcessor routeContextProcessor;
076    
077        public List<Processor> next() {
078            List<Processor> answer = new ArrayList<Processor>(1);
079            answer.add(nextProcessor);
080            return answer;
081        }
082    
083        public boolean hasNext() {
084            return nextProcessor != null;
085        }
086    
087        public void setNextProcessor(Processor next) {
088            this.nextProcessor = next;
089        }
090    
091        public Processor getOutput() {
092            // the errorHandler is already decorated with interceptors
093            // so it contain the entire chain of processors, so we can safely use it directly as output
094            // if no error handler provided we use the output
095            // TODO: Camel 3.0 we should determine the output dynamically at runtime instead of having the
096            // the error handlers, interceptors, etc. woven in at design time
097            return errorHandler != null ? errorHandler : output;
098        }
099    
100        public void setOutput(Processor output) {
101            this.output = output;
102        }
103    
104        public Processor getNextProcessor() {
105            return nextProcessor;
106        }
107    
108        public boolean hasInterceptorStrategy(Class<?> type) {
109            for (InterceptStrategy strategy : interceptors) {
110                if (type.isInstance(strategy)) {
111                    return true;
112                }
113            }
114            return false;
115        }
116    
117        public void setErrorHandler(Processor errorHandler) {
118            this.errorHandler = errorHandler;
119        }
120    
121        public Processor getErrorHandler() {
122            return errorHandler;
123        }
124    
125        public void addInterceptStrategy(InterceptStrategy strategy) {
126            interceptors.add(strategy);
127        }
128    
129        public void addInterceptStrategies(List<InterceptStrategy> strategies) {
130            interceptors.addAll(strategies);
131        }
132    
133        public List<InterceptStrategy> getInterceptStrategies() {
134            return interceptors;
135        }
136    
137        public ProcessorDefinition<?> getProcessorDefinition() {
138            return definition;
139        }
140    
141        public void setChildDefinition(ProcessorDefinition<?> childDefinition) {
142            this.childDefinition = childDefinition;
143        }
144    
145        public RouteContext getRouteContext() {
146            return routeContext;
147        }
148    
149        @Override
150        protected void doStart() throws Exception {
151            // create route context processor to wrap output
152            routeContextProcessor = new RouteContextProcessor(routeContext, getOutput());
153            ServiceHelper.startServices(errorHandler, output, routeContextProcessor);
154        }
155    
156        @Override
157        protected void doStop() throws Exception {
158            ServiceHelper.stopServices(output, errorHandler, routeContextProcessor);
159        }
160    
161        public void initChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception {
162            this.routeContext = routeContext;
163            this.definition = outputDefinition;
164            this.camelContext = routeContext.getCamelContext();
165    
166            Processor target = nextProcessor;
167            Processor next;
168    
169            // init CamelContextAware as early as possible on target
170            if (target instanceof CamelContextAware) {
171                ((CamelContextAware) target).setCamelContext(camelContext);
172            }
173    
174            // the definition to wrap should be the fine grained,
175            // so if a child is set then use it, if not then its the original output used
176            ProcessorDefinition<?> targetOutputDef = childDefinition != null ? childDefinition : outputDefinition;
177            LOG.debug("Initialize channel for target: '{}'", targetOutputDef);
178    
179            // fix parent/child relationship. This will be the case of the routes has been
180            // defined using XML DSL or end user may have manually assembled a route from the model.
181            // Background note: parent/child relationship is assembled on-the-fly when using Java DSL (fluent builders)
182            // where as when using XML DSL (JAXB) then it fixed after, but if people are using custom interceptors
183            // then we need to fix the parent/child relationship beforehand, and thus we can do it here
184            // ideally we need the design time route -> runtime route to be a 2-phase pass (scheduled work for Camel 3.0)
185            if (childDefinition != null && outputDefinition != childDefinition) {
186                childDefinition.setParent(outputDefinition);
187            }
188    
189            // first wrap the output with the managed strategy if any
190            InterceptStrategy managed = routeContext.getManagedInterceptStrategy();
191            if (managed != null) {
192                next = target == nextProcessor ? null : nextProcessor;
193                target = managed.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next);
194            }
195    
196            // then wrap the output with the tracer
197            TraceInterceptor trace = (TraceInterceptor) getOrCreateTracer().wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, null);
198            // trace interceptor need to have a reference to route context so we at runtime can enable/disable tracing on-the-fly
199            trace.setRouteContext(routeContext);
200            target = trace;
201    
202            // sort interceptors according to ordered
203            Collections.sort(interceptors, new OrderedComparator());
204            // then reverse list so the first will be wrapped last, as it would then be first being invoked
205            Collections.reverse(interceptors);
206            // wrap the output with the configured interceptors
207            for (InterceptStrategy strategy : interceptors) {
208                next = target == nextProcessor ? null : nextProcessor;
209                // skip tracer as we did the specially beforehand and it could potentially be added as an interceptor strategy
210                if (strategy instanceof Tracer) {
211                    continue;
212                }
213                // skip stream caching as it must be wrapped as outer most, which we do later
214                if (strategy instanceof StreamCaching) {
215                    continue;
216                }
217                // use the fine grained definition (eg the child if available). Its always possible to get back to the parent
218                Processor wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next);
219                if (!(wrapped instanceof AsyncProcessor)) {
220                    LOG.warn("Interceptor: " + strategy + " at: " + outputDefinition + " does not return an AsyncProcessor instance."
221                            + " This causes the asynchronous routing engine to not work as optimal as possible."
222                            + " See more details at the InterceptStrategy javadoc."
223                            + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine,"
224                            + " but its not the most optimal solution. Please consider changing your interceptor to comply.");
225    
226                    // use a bridge and wrap again which allows us to adapt and leverage the asynchronous routing engine anyway
227                    // however its not the most optimal solution, but we can still run.
228                    InterceptorToAsyncProcessorBridge bridge = new InterceptorToAsyncProcessorBridge(target);
229                    wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, bridge, next);
230                    bridge.setTarget(wrapped);
231                    wrapped = bridge;
232                }
233                // ensure target gets wrapped so we can control its lifecycle
234                if (!(wrapped instanceof WrapProcessor)) {
235                    wrapped = new WrapProcessor(wrapped, target);
236                }
237                target = wrapped;
238            }
239    
240            // sets the delegate to our wrapped output
241            output = target;
242        }
243    
244        @Override
245        public void postInitChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception {
246            for (InterceptStrategy strategy : interceptors) {
247                // apply stream caching at the end as it should be outer most
248                if (strategy instanceof StreamCaching) {
249                    if (errorHandler != null) {
250                        errorHandler = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, errorHandler, null);
251                    } else {
252                        output = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, output, null);
253                    }
254                    break;
255                }
256            }
257        }
258    
259        private InterceptStrategy getOrCreateTracer() {
260            InterceptStrategy tracer = Tracer.getTracer(camelContext);
261            if (tracer == null) {
262                if (camelContext.getRegistry() != null) {
263                    // lookup in registry
264                    Map<String, Tracer> map = camelContext.getRegistry().lookupByType(Tracer.class);
265                    if (map.size() == 1) {
266                        tracer = map.values().iterator().next();
267                    }
268                }
269                if (tracer == null) {
270                    // fallback to use the default tracer
271                    tracer = camelContext.getDefaultTracer();
272    
273                    // configure and use any trace formatter if any exists
274                    Map<String, TraceFormatter> formatters = camelContext.getRegistry().lookupByType(TraceFormatter.class);
275                    if (formatters.size() == 1) {
276                        TraceFormatter formatter = formatters.values().iterator().next();
277                        if (tracer instanceof Tracer) {
278                            ((Tracer) tracer).setFormatter(formatter);
279                        }
280                    }
281                }
282            }
283    
284            // which we must manage as well
285            for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
286                if (tracer instanceof Service) {
287                    strategy.onServiceAdd(camelContext, (Service) tracer, null);
288                }
289            }
290    
291            return tracer;
292        }
293    
294        public void process(Exchange exchange) throws Exception {
295            AsyncProcessorHelper.process(this, exchange);
296        }
297    
298        public boolean process(final Exchange exchange, final AsyncCallback callback) {
299            Processor processor = getOutput();
300            if (processor == null || !continueProcessing(exchange)) {
301                // we should not continue routing so we are done
302                callback.done(true);
303                return true;
304            }
305    
306            // process the exchange using the route context processor
307            ObjectHelper.notNull(routeContextProcessor, "RouteContextProcessor", this);
308            return routeContextProcessor.process(exchange, callback);
309        }
310    
311        /**
312         * Strategy to determine if we should continue processing the {@link Exchange}.
313         */
314        protected boolean continueProcessing(Exchange exchange) {
315            Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
316            if (stop != null) {
317                boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
318                if (doStop) {
319                    LOG.debug("Exchange is marked to stop routing: {}", exchange);
320                    return false;
321                }
322            }
323    
324            // determine if we can still run, or the camel context is forcing a shutdown
325            boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
326            if (forceShutdown) {
327                LOG.debug("Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: {}", exchange);
328                if (exchange.getException() == null) {
329                    exchange.setException(new RejectedExecutionException());
330                }
331                return false;
332            }
333    
334            // yes we can continue
335            return true;
336        }
337    
338        @Override
339        public String toString() {
340            // just output the next processor as all the interceptors and error handler is just too verbose
341            return "Channel[" + nextProcessor + "]";
342        }
343    
344    }