001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor.interceptor;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.List;
022import java.util.Map;
023
024import org.apache.camel.AsyncProcessor;
025import org.apache.camel.CamelContext;
026import org.apache.camel.CamelContextAware;
027import org.apache.camel.Channel;
028import org.apache.camel.Exchange;
029import org.apache.camel.Processor;
030import org.apache.camel.model.ModelChannel;
031import org.apache.camel.model.OnCompletionDefinition;
032import org.apache.camel.model.OnExceptionDefinition;
033import org.apache.camel.model.ProcessorDefinition;
034import org.apache.camel.model.ProcessorDefinitionHelper;
035import org.apache.camel.model.RouteDefinition;
036import org.apache.camel.model.RouteDefinitionHelper;
037import org.apache.camel.processor.CamelInternalProcessor;
038import org.apache.camel.processor.InterceptorToAsyncProcessorBridge;
039import org.apache.camel.processor.WrapProcessor;
040import org.apache.camel.spi.InterceptStrategy;
041import org.apache.camel.spi.MessageHistoryFactory;
042import org.apache.camel.spi.RouteContext;
043import org.apache.camel.util.OrderedComparator;
044import org.apache.camel.util.ServiceHelper;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048/**
049 * DefaultChannel is the default {@link Channel}.
050 * <p/>
051 * The current implementation is just a composite containing the interceptors and error handler
052 * that beforehand was added to the route graph directly.
053 * <br/>
054 * With this {@link Channel} we can in the future implement better strategies for routing the
055 * {@link Exchange} in the route graph, as we have a {@link Channel} between each and every node
056 * in the graph.
057 *
058 * @version 
059 */
060public class DefaultChannel extends CamelInternalProcessor implements ModelChannel {
061
062    private static final Logger LOG = LoggerFactory.getLogger(DefaultChannel.class);
063
064    private final List<InterceptStrategy> interceptors = new ArrayList<InterceptStrategy>();
065    private Processor errorHandler;
066    // the next processor (non wrapped)
067    private Processor nextProcessor;
068    // the real output to invoke that has been wrapped
069    private Processor output;
070    private ProcessorDefinition<?> definition;
071    private ProcessorDefinition<?> childDefinition;
072    private CamelContext camelContext;
073    private RouteContext routeContext;
074
075    public void setNextProcessor(Processor next) {
076        this.nextProcessor = next;
077    }
078
079    public Processor getOutput() {
080        // the errorHandler is already decorated with interceptors
081        // so it contain the entire chain of processors, so we can safely use it directly as output
082        // if no error handler provided we use the output
083        // TODO: Camel 3.0 we should determine the output dynamically at runtime instead of having the
084        // the error handlers, interceptors, etc. woven in at design time
085        return errorHandler != null ? errorHandler : output;
086    }
087
088    @Override
089    public boolean hasNext() {
090        return nextProcessor != null;
091    }
092
093    @Override
094    public List<Processor> next() {
095        if (!hasNext()) {
096            return null;
097        }
098        List<Processor> answer = new ArrayList<Processor>(1);
099        answer.add(nextProcessor);
100        return answer;
101    }
102
103    public void setOutput(Processor output) {
104        this.output = output;
105    }
106
107    public Processor getNextProcessor() {
108        return nextProcessor;
109    }
110
111    public boolean hasInterceptorStrategy(Class<?> type) {
112        for (InterceptStrategy strategy : interceptors) {
113            if (type.isInstance(strategy)) {
114                return true;
115            }
116        }
117        return false;
118    }
119
120    public void setErrorHandler(Processor errorHandler) {
121        this.errorHandler = errorHandler;
122    }
123
124    public Processor getErrorHandler() {
125        return errorHandler;
126    }
127
128    public void addInterceptStrategy(InterceptStrategy strategy) {
129        interceptors.add(strategy);
130    }
131
132    public void addInterceptStrategies(List<InterceptStrategy> strategies) {
133        interceptors.addAll(strategies);
134    }
135
136    public List<InterceptStrategy> getInterceptStrategies() {
137        return interceptors;
138    }
139
140    public ProcessorDefinition<?> getProcessorDefinition() {
141        return definition;
142    }
143
144    public void setChildDefinition(ProcessorDefinition<?> childDefinition) {
145        this.childDefinition = childDefinition;
146    }
147
148    public RouteContext getRouteContext() {
149        return routeContext;
150    }
151
152    @Override
153    protected void doStart() throws Exception {
154        // the output has now been created, so assign the output as the processor
155        setProcessor(getOutput());
156        ServiceHelper.startServices(errorHandler, output);
157    }
158
159    @Override
160    protected void doStop() throws Exception {
161        if (!isContextScoped()) {
162            // only stop services if not context scoped (as context scoped is reused by others)
163            ServiceHelper.stopServices(output, errorHandler);
164        }
165    }
166
167    @Override
168    protected void doShutdown() throws Exception {
169        ServiceHelper.stopAndShutdownServices(output, errorHandler);
170    }
171
172    private boolean isContextScoped() {
173        if (definition instanceof OnExceptionDefinition) {
174            return !((OnExceptionDefinition) definition).isRouteScoped();
175        } else if (definition instanceof OnCompletionDefinition) {
176            return !((OnCompletionDefinition) definition).isRouteScoped();
177        }
178
179        return false;
180    }
181
182    @SuppressWarnings("deprecation")
183    public void initChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception {
184        this.routeContext = routeContext;
185        this.definition = outputDefinition;
186        this.camelContext = routeContext.getCamelContext();
187
188        Processor target = nextProcessor;
189        Processor next;
190
191        // init CamelContextAware as early as possible on target
192        if (target instanceof CamelContextAware) {
193            ((CamelContextAware) target).setCamelContext(camelContext);
194        }
195
196        // the definition to wrap should be the fine grained,
197        // so if a child is set then use it, if not then its the original output used
198        ProcessorDefinition<?> targetOutputDef = childDefinition != null ? childDefinition : outputDefinition;
199        LOG.debug("Initialize channel for target: '{}'", targetOutputDef);
200
201        // fix parent/child relationship. This will be the case of the routes has been
202        // defined using XML DSL or end user may have manually assembled a route from the model.
203        // Background note: parent/child relationship is assembled on-the-fly when using Java DSL (fluent builders)
204        // where as when using XML DSL (JAXB) then it fixed after, but if people are using custom interceptors
205        // then we need to fix the parent/child relationship beforehand, and thus we can do it here
206        // ideally we need the design time route -> runtime route to be a 2-phase pass (scheduled work for Camel 3.0)
207        if (childDefinition != null && outputDefinition != childDefinition) {
208            childDefinition.setParent(outputDefinition);
209        }
210
211        // force the creation of an id
212        RouteDefinitionHelper.forceAssignIds(routeContext.getCamelContext(), definition);
213
214        // first wrap the output with the managed strategy if any
215        InterceptStrategy managed = routeContext.getManagedInterceptStrategy();
216        if (managed != null) {
217            next = target == nextProcessor ? null : nextProcessor;
218            target = managed.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next);
219        }
220
221        // then wrap the output with the backlog and tracer (backlog first, as we do not want regular tracer to tracer the backlog)
222        InterceptStrategy tracer = getOrCreateBacklogTracer();
223        camelContext.addService(tracer);
224        if (tracer instanceof BacklogTracer) {
225            BacklogTracer backlogTracer = (BacklogTracer) tracer;
226
227            RouteDefinition route = ProcessorDefinitionHelper.getRoute(definition);
228            boolean first = false;
229            if (route != null && !route.getOutputs().isEmpty()) {
230                first = route.getOutputs().get(0) == definition;
231            }
232
233            addAdvice(new BacklogTracerAdvice(backlogTracer, targetOutputDef, route, first));
234
235            // add debugger as well so we have both tracing and debugging out of the box
236            InterceptStrategy debugger = getOrCreateBacklogDebugger();
237            camelContext.addService(debugger);
238            if (debugger instanceof BacklogDebugger) {
239                BacklogDebugger backlogDebugger = (BacklogDebugger) debugger;
240                addAdvice(new BacklogDebuggerAdvice(backlogDebugger, target, targetOutputDef));
241            }
242        }
243
244        if (routeContext.isMessageHistory()) {
245            // add message history advice
246            MessageHistoryFactory factory = camelContext.getMessageHistoryFactory();
247            addAdvice(new MessageHistoryAdvice(factory, targetOutputDef));
248        }
249
250        // the regular tracer is not a task on internalProcessor as this is not really needed
251        // end users have to explicit enable the tracer to use it, and then its okay if we wrap
252        // the processors (but by default tracer is disabled, and therefore we do not wrap processors)
253        tracer = getOrCreateTracer();
254        camelContext.addService(tracer);
255        if (tracer != null) {
256            TraceInterceptor trace = (TraceInterceptor) tracer.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, null);
257            // trace interceptor need to have a reference to route context so we at runtime can enable/disable tracing on-the-fly
258            trace.setRouteContext(routeContext);
259            target = trace;
260        }
261
262        // sort interceptors according to ordered
263        interceptors.sort(new OrderedComparator());
264        // then reverse list so the first will be wrapped last, as it would then be first being invoked
265        Collections.reverse(interceptors);
266        // wrap the output with the configured interceptors
267        for (InterceptStrategy strategy : interceptors) {
268            next = target == nextProcessor ? null : nextProcessor;
269            // skip tracer as we did the specially beforehand and it could potentially be added as an interceptor strategy
270            if (strategy instanceof Tracer) {
271                continue;
272            }
273            // skip stream caching as it must be wrapped as outer most, which we do later
274            if (strategy instanceof StreamCaching) {
275                continue;
276            }
277            // use the fine grained definition (eg the child if available). Its always possible to get back to the parent
278            Processor wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next);
279            if (!(wrapped instanceof AsyncProcessor)) {
280                LOG.warn("Interceptor: " + strategy + " at: " + outputDefinition + " does not return an AsyncProcessor instance."
281                        + " This causes the asynchronous routing engine to not work as optimal as possible."
282                        + " See more details at the InterceptStrategy javadoc."
283                        + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine,"
284                        + " but its not the most optimal solution. Please consider changing your interceptor to comply.");
285
286                // use a bridge and wrap again which allows us to adapt and leverage the asynchronous routing engine anyway
287                // however its not the most optimal solution, but we can still run.
288                InterceptorToAsyncProcessorBridge bridge = new InterceptorToAsyncProcessorBridge(target);
289                wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, bridge, next);
290                // Avoid the stack overflow
291                if (!wrapped.equals(bridge)) {
292                    bridge.setTarget(wrapped);
293                } else {
294                    // Just skip the wrapped processor
295                    bridge.setTarget(null);
296                }
297                wrapped = bridge;
298            }
299            if (!(wrapped instanceof WrapProcessor)) {
300                // wrap the target so it becomes a service and we can manage its lifecycle
301                wrapped = new WrapProcessor(wrapped, target);
302            }
303            target = wrapped;
304        }
305
306        if (routeContext.isStreamCaching()) {
307            addAdvice(new StreamCachingAdvice(camelContext.getStreamCachingStrategy()));
308        }
309
310        if (routeContext.getDelayer() != null && routeContext.getDelayer() > 0) {
311            addAdvice(new DelayerAdvice(routeContext.getDelayer()));
312        }
313
314        // sets the delegate to our wrapped output
315        output = target;
316    }
317
318    @Override
319    public void postInitChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception {
320        // noop
321    }
322
323    private InterceptStrategy getOrCreateTracer() {
324        // only use tracer if explicit enabled
325        if (camelContext.isTracing() != null && !camelContext.isTracing()) {
326            return null;
327        }
328
329        InterceptStrategy tracer = Tracer.getTracer(camelContext);
330        if (tracer == null) {
331            if (camelContext.getRegistry() != null) {
332                // lookup in registry
333                Map<String, Tracer> map = camelContext.getRegistry().findByTypeWithName(Tracer.class);
334                if (map.size() == 1) {
335                    tracer = map.values().iterator().next();
336                }
337            }
338            if (tracer == null) {
339                // fallback to use the default tracer
340                tracer = camelContext.getDefaultTracer();
341
342                // configure and use any trace formatter if any exists
343                Map<String, TraceFormatter> formatters = camelContext.getRegistry().findByTypeWithName(TraceFormatter.class);
344                if (formatters.size() == 1) {
345                    TraceFormatter formatter = formatters.values().iterator().next();
346                    if (tracer instanceof Tracer) {
347                        ((Tracer) tracer).setFormatter(formatter);
348                    }
349                }
350            }
351        }
352
353        return tracer;
354    }
355
356    private InterceptStrategy getOrCreateBacklogTracer() {
357        InterceptStrategy tracer = BacklogTracer.getBacklogTracer(camelContext);
358        if (tracer == null) {
359            if (camelContext.getRegistry() != null) {
360                // lookup in registry
361                Map<String, BacklogTracer> map = camelContext.getRegistry().findByTypeWithName(BacklogTracer.class);
362                if (map.size() == 1) {
363                    tracer = map.values().iterator().next();
364                }
365            }
366            if (tracer == null) {
367                // fallback to use the default tracer
368                tracer = camelContext.getDefaultBacklogTracer();
369            }
370        }
371
372        return tracer;
373    }
374
375    private InterceptStrategy getOrCreateBacklogDebugger() {
376        InterceptStrategy debugger = BacklogDebugger.getBacklogDebugger(camelContext);
377        if (debugger == null) {
378            if (camelContext.getRegistry() != null) {
379                // lookup in registry
380                Map<String, BacklogDebugger> map = camelContext.getRegistry().findByTypeWithName(BacklogDebugger.class);
381                if (map.size() == 1) {
382                    debugger = map.values().iterator().next();
383                }
384            }
385            if (debugger == null) {
386                // fallback to use the default debugger
387                debugger = camelContext.getDefaultBacklogDebugger();
388            }
389        }
390
391        return debugger;
392    }
393
394    @Override
395    public String toString() {
396        // just output the next processor as all the interceptors and error handler is just too verbose
397        return "Channel[" + nextProcessor + "]";
398    }
399
400}