001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor.interceptor; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.List; 022import java.util.Map; 023 024import org.apache.camel.AsyncProcessor; 025import org.apache.camel.CamelContext; 026import org.apache.camel.CamelContextAware; 027import org.apache.camel.Channel; 028import org.apache.camel.Exchange; 029import org.apache.camel.Processor; 030import org.apache.camel.model.ModelChannel; 031import org.apache.camel.model.OnCompletionDefinition; 032import org.apache.camel.model.OnExceptionDefinition; 033import org.apache.camel.model.ProcessorDefinition; 034import org.apache.camel.model.ProcessorDefinitionHelper; 035import org.apache.camel.model.RouteDefinition; 036import org.apache.camel.model.RouteDefinitionHelper; 037import org.apache.camel.processor.CamelInternalProcessor; 038import org.apache.camel.processor.InterceptorToAsyncProcessorBridge; 039import org.apache.camel.processor.WrapProcessor; 040import org.apache.camel.spi.InterceptStrategy; 041import org.apache.camel.spi.MessageHistoryFactory; 042import org.apache.camel.spi.RouteContext; 043import org.apache.camel.util.OrderedComparator; 044import org.apache.camel.util.ServiceHelper; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048/** 049 * DefaultChannel is the default {@link Channel}. 050 * <p/> 051 * The current implementation is just a composite containing the interceptors and error handler 052 * that beforehand was added to the route graph directly. 053 * <br/> 054 * With this {@link Channel} we can in the future implement better strategies for routing the 055 * {@link Exchange} in the route graph, as we have a {@link Channel} between each and every node 056 * in the graph. 057 * 058 * @version 059 */ 060public class DefaultChannel extends CamelInternalProcessor implements ModelChannel { 061 062 private static final Logger LOG = LoggerFactory.getLogger(DefaultChannel.class); 063 064 private final List<InterceptStrategy> interceptors = new ArrayList<InterceptStrategy>(); 065 private Processor errorHandler; 066 // the next processor (non wrapped) 067 private Processor nextProcessor; 068 // the real output to invoke that has been wrapped 069 private Processor output; 070 private ProcessorDefinition<?> definition; 071 private ProcessorDefinition<?> childDefinition; 072 private CamelContext camelContext; 073 private RouteContext routeContext; 074 075 public void setNextProcessor(Processor next) { 076 this.nextProcessor = next; 077 } 078 079 public Processor getOutput() { 080 // the errorHandler is already decorated with interceptors 081 // so it contain the entire chain of processors, so we can safely use it directly as output 082 // if no error handler provided we use the output 083 // TODO: Camel 3.0 we should determine the output dynamically at runtime instead of having the 084 // the error handlers, interceptors, etc. woven in at design time 085 return errorHandler != null ? errorHandler : output; 086 } 087 088 @Override 089 public boolean hasNext() { 090 return nextProcessor != null; 091 } 092 093 @Override 094 public List<Processor> next() { 095 if (!hasNext()) { 096 return null; 097 } 098 List<Processor> answer = new ArrayList<Processor>(1); 099 answer.add(nextProcessor); 100 return answer; 101 } 102 103 public void setOutput(Processor output) { 104 this.output = output; 105 } 106 107 public Processor getNextProcessor() { 108 return nextProcessor; 109 } 110 111 public boolean hasInterceptorStrategy(Class<?> type) { 112 for (InterceptStrategy strategy : interceptors) { 113 if (type.isInstance(strategy)) { 114 return true; 115 } 116 } 117 return false; 118 } 119 120 public void setErrorHandler(Processor errorHandler) { 121 this.errorHandler = errorHandler; 122 } 123 124 public Processor getErrorHandler() { 125 return errorHandler; 126 } 127 128 public void addInterceptStrategy(InterceptStrategy strategy) { 129 interceptors.add(strategy); 130 } 131 132 public void addInterceptStrategies(List<InterceptStrategy> strategies) { 133 interceptors.addAll(strategies); 134 } 135 136 public List<InterceptStrategy> getInterceptStrategies() { 137 return interceptors; 138 } 139 140 public ProcessorDefinition<?> getProcessorDefinition() { 141 return definition; 142 } 143 144 public void setChildDefinition(ProcessorDefinition<?> childDefinition) { 145 this.childDefinition = childDefinition; 146 } 147 148 public RouteContext getRouteContext() { 149 return routeContext; 150 } 151 152 @Override 153 protected void doStart() throws Exception { 154 // the output has now been created, so assign the output as the processor 155 setProcessor(getOutput()); 156 ServiceHelper.startServices(errorHandler, output); 157 } 158 159 @Override 160 protected void doStop() throws Exception { 161 if (!isContextScoped()) { 162 // only stop services if not context scoped (as context scoped is reused by others) 163 ServiceHelper.stopServices(output, errorHandler); 164 } 165 } 166 167 @Override 168 protected void doShutdown() throws Exception { 169 ServiceHelper.stopAndShutdownServices(output, errorHandler); 170 } 171 172 private boolean isContextScoped() { 173 if (definition instanceof OnExceptionDefinition) { 174 return !((OnExceptionDefinition) definition).isRouteScoped(); 175 } else if (definition instanceof OnCompletionDefinition) { 176 return !((OnCompletionDefinition) definition).isRouteScoped(); 177 } 178 179 return false; 180 } 181 182 @SuppressWarnings("deprecation") 183 public void initChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception { 184 this.routeContext = routeContext; 185 this.definition = outputDefinition; 186 this.camelContext = routeContext.getCamelContext(); 187 188 Processor target = nextProcessor; 189 Processor next; 190 191 // init CamelContextAware as early as possible on target 192 if (target instanceof CamelContextAware) { 193 ((CamelContextAware) target).setCamelContext(camelContext); 194 } 195 196 // the definition to wrap should be the fine grained, 197 // so if a child is set then use it, if not then its the original output used 198 ProcessorDefinition<?> targetOutputDef = childDefinition != null ? childDefinition : outputDefinition; 199 LOG.debug("Initialize channel for target: '{}'", targetOutputDef); 200 201 // fix parent/child relationship. This will be the case of the routes has been 202 // defined using XML DSL or end user may have manually assembled a route from the model. 203 // Background note: parent/child relationship is assembled on-the-fly when using Java DSL (fluent builders) 204 // where as when using XML DSL (JAXB) then it fixed after, but if people are using custom interceptors 205 // then we need to fix the parent/child relationship beforehand, and thus we can do it here 206 // ideally we need the design time route -> runtime route to be a 2-phase pass (scheduled work for Camel 3.0) 207 if (childDefinition != null && outputDefinition != childDefinition) { 208 childDefinition.setParent(outputDefinition); 209 } 210 211 // force the creation of an id 212 RouteDefinitionHelper.forceAssignIds(routeContext.getCamelContext(), definition); 213 214 // first wrap the output with the managed strategy if any 215 InterceptStrategy managed = routeContext.getManagedInterceptStrategy(); 216 if (managed != null) { 217 next = target == nextProcessor ? null : nextProcessor; 218 target = managed.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next); 219 } 220 221 // then wrap the output with the backlog and tracer (backlog first, as we do not want regular tracer to tracer the backlog) 222 InterceptStrategy tracer = getOrCreateBacklogTracer(); 223 camelContext.addService(tracer); 224 if (tracer instanceof BacklogTracer) { 225 BacklogTracer backlogTracer = (BacklogTracer) tracer; 226 227 RouteDefinition route = ProcessorDefinitionHelper.getRoute(definition); 228 boolean first = false; 229 if (route != null && !route.getOutputs().isEmpty()) { 230 first = route.getOutputs().get(0) == definition; 231 } 232 233 addAdvice(new BacklogTracerAdvice(backlogTracer, targetOutputDef, route, first)); 234 235 // add debugger as well so we have both tracing and debugging out of the box 236 InterceptStrategy debugger = getOrCreateBacklogDebugger(); 237 camelContext.addService(debugger); 238 if (debugger instanceof BacklogDebugger) { 239 BacklogDebugger backlogDebugger = (BacklogDebugger) debugger; 240 addAdvice(new BacklogDebuggerAdvice(backlogDebugger, target, targetOutputDef)); 241 } 242 } 243 244 if (routeContext.isMessageHistory()) { 245 // add message history advice 246 MessageHistoryFactory factory = camelContext.getMessageHistoryFactory(); 247 addAdvice(new MessageHistoryAdvice(factory, targetOutputDef)); 248 } 249 250 // the regular tracer is not a task on internalProcessor as this is not really needed 251 // end users have to explicit enable the tracer to use it, and then its okay if we wrap 252 // the processors (but by default tracer is disabled, and therefore we do not wrap processors) 253 tracer = getOrCreateTracer(); 254 camelContext.addService(tracer); 255 if (tracer != null) { 256 TraceInterceptor trace = (TraceInterceptor) tracer.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, null); 257 // trace interceptor need to have a reference to route context so we at runtime can enable/disable tracing on-the-fly 258 trace.setRouteContext(routeContext); 259 target = trace; 260 } 261 262 // sort interceptors according to ordered 263 interceptors.sort(new OrderedComparator()); 264 // then reverse list so the first will be wrapped last, as it would then be first being invoked 265 Collections.reverse(interceptors); 266 // wrap the output with the configured interceptors 267 for (InterceptStrategy strategy : interceptors) { 268 next = target == nextProcessor ? null : nextProcessor; 269 // skip tracer as we did the specially beforehand and it could potentially be added as an interceptor strategy 270 if (strategy instanceof Tracer) { 271 continue; 272 } 273 // skip stream caching as it must be wrapped as outer most, which we do later 274 if (strategy instanceof StreamCaching) { 275 continue; 276 } 277 // use the fine grained definition (eg the child if available). Its always possible to get back to the parent 278 Processor wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next); 279 if (!(wrapped instanceof AsyncProcessor)) { 280 LOG.warn("Interceptor: " + strategy + " at: " + outputDefinition + " does not return an AsyncProcessor instance." 281 + " This causes the asynchronous routing engine to not work as optimal as possible." 282 + " See more details at the InterceptStrategy javadoc." 283 + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine," 284 + " but its not the most optimal solution. Please consider changing your interceptor to comply."); 285 286 // use a bridge and wrap again which allows us to adapt and leverage the asynchronous routing engine anyway 287 // however its not the most optimal solution, but we can still run. 288 InterceptorToAsyncProcessorBridge bridge = new InterceptorToAsyncProcessorBridge(target); 289 wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, bridge, next); 290 // Avoid the stack overflow 291 if (!wrapped.equals(bridge)) { 292 bridge.setTarget(wrapped); 293 } else { 294 // Just skip the wrapped processor 295 bridge.setTarget(null); 296 } 297 wrapped = bridge; 298 } 299 if (!(wrapped instanceof WrapProcessor)) { 300 // wrap the target so it becomes a service and we can manage its lifecycle 301 wrapped = new WrapProcessor(wrapped, target); 302 } 303 target = wrapped; 304 } 305 306 if (routeContext.isStreamCaching()) { 307 addAdvice(new StreamCachingAdvice(camelContext.getStreamCachingStrategy())); 308 } 309 310 if (routeContext.getDelayer() != null && routeContext.getDelayer() > 0) { 311 addAdvice(new DelayerAdvice(routeContext.getDelayer())); 312 } 313 314 // sets the delegate to our wrapped output 315 output = target; 316 } 317 318 @Override 319 public void postInitChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception { 320 // noop 321 } 322 323 private InterceptStrategy getOrCreateTracer() { 324 // only use tracer if explicit enabled 325 if (camelContext.isTracing() != null && !camelContext.isTracing()) { 326 return null; 327 } 328 329 InterceptStrategy tracer = Tracer.getTracer(camelContext); 330 if (tracer == null) { 331 if (camelContext.getRegistry() != null) { 332 // lookup in registry 333 Map<String, Tracer> map = camelContext.getRegistry().findByTypeWithName(Tracer.class); 334 if (map.size() == 1) { 335 tracer = map.values().iterator().next(); 336 } 337 } 338 if (tracer == null) { 339 // fallback to use the default tracer 340 tracer = camelContext.getDefaultTracer(); 341 342 // configure and use any trace formatter if any exists 343 Map<String, TraceFormatter> formatters = camelContext.getRegistry().findByTypeWithName(TraceFormatter.class); 344 if (formatters.size() == 1) { 345 TraceFormatter formatter = formatters.values().iterator().next(); 346 if (tracer instanceof Tracer) { 347 ((Tracer) tracer).setFormatter(formatter); 348 } 349 } 350 } 351 } 352 353 return tracer; 354 } 355 356 private InterceptStrategy getOrCreateBacklogTracer() { 357 InterceptStrategy tracer = BacklogTracer.getBacklogTracer(camelContext); 358 if (tracer == null) { 359 if (camelContext.getRegistry() != null) { 360 // lookup in registry 361 Map<String, BacklogTracer> map = camelContext.getRegistry().findByTypeWithName(BacklogTracer.class); 362 if (map.size() == 1) { 363 tracer = map.values().iterator().next(); 364 } 365 } 366 if (tracer == null) { 367 // fallback to use the default tracer 368 tracer = camelContext.getDefaultBacklogTracer(); 369 } 370 } 371 372 return tracer; 373 } 374 375 private InterceptStrategy getOrCreateBacklogDebugger() { 376 InterceptStrategy debugger = BacklogDebugger.getBacklogDebugger(camelContext); 377 if (debugger == null) { 378 if (camelContext.getRegistry() != null) { 379 // lookup in registry 380 Map<String, BacklogDebugger> map = camelContext.getRegistry().findByTypeWithName(BacklogDebugger.class); 381 if (map.size() == 1) { 382 debugger = map.values().iterator().next(); 383 } 384 } 385 if (debugger == null) { 386 // fallback to use the default debugger 387 debugger = camelContext.getDefaultBacklogDebugger(); 388 } 389 } 390 391 return debugger; 392 } 393 394 @Override 395 public String toString() { 396 // just output the next processor as all the interceptors and error handler is just too verbose 397 return "Channel[" + nextProcessor + "]"; 398 } 399 400}