001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.interceptor; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.List; 022 import java.util.Map; 023 import java.util.concurrent.RejectedExecutionException; 024 025 import org.apache.camel.AsyncCallback; 026 import org.apache.camel.AsyncProcessor; 027 import org.apache.camel.CamelContext; 028 import org.apache.camel.CamelContextAware; 029 import org.apache.camel.Channel; 030 import org.apache.camel.Exchange; 031 import org.apache.camel.Processor; 032 import org.apache.camel.Service; 033 import org.apache.camel.model.ModelChannel; 034 import org.apache.camel.model.ProcessorDefinition; 035 import org.apache.camel.processor.InterceptorToAsyncProcessorBridge; 036 import org.apache.camel.processor.RouteContextProcessor; 037 import org.apache.camel.processor.WrapProcessor; 038 import org.apache.camel.spi.InterceptStrategy; 039 import org.apache.camel.spi.LifecycleStrategy; 040 import org.apache.camel.spi.RouteContext; 041 import org.apache.camel.support.ServiceSupport; 042 import org.apache.camel.util.AsyncProcessorHelper; 043 import org.apache.camel.util.ObjectHelper; 044 import org.apache.camel.util.OrderedComparator; 045 import org.apache.camel.util.ServiceHelper; 046 import org.slf4j.Logger; 047 import org.slf4j.LoggerFactory; 048 049 /** 050 * DefaultChannel is the default {@link Channel}. 051 * <p/> 052 * The current implementation is just a composite containing the interceptors and error handler 053 * that beforehand was added to the route graph directly. 054 * <br/> 055 * With this {@link Channel} we can in the future implement better strategies for routing the 056 * {@link Exchange} in the route graph, as we have a {@link Channel} between each and every node 057 * in the graph. 058 * 059 * @version 060 */ 061 public class DefaultChannel extends ServiceSupport implements ModelChannel { 062 063 private static final transient Logger LOG = LoggerFactory.getLogger(DefaultChannel.class); 064 065 private final List<InterceptStrategy> interceptors = new ArrayList<InterceptStrategy>(); 066 private Processor errorHandler; 067 // the next processor (non wrapped) 068 private Processor nextProcessor; 069 // the real output to invoke that has been wrapped 070 private Processor output; 071 private ProcessorDefinition<?> definition; 072 private ProcessorDefinition<?> childDefinition; 073 private CamelContext camelContext; 074 private RouteContext routeContext; 075 private RouteContextProcessor routeContextProcessor; 076 077 public List<Processor> next() { 078 List<Processor> answer = new ArrayList<Processor>(1); 079 answer.add(nextProcessor); 080 return answer; 081 } 082 083 public boolean hasNext() { 084 return nextProcessor != null; 085 } 086 087 public void setNextProcessor(Processor next) { 088 this.nextProcessor = next; 089 } 090 091 public Processor getOutput() { 092 // the errorHandler is already decorated with interceptors 093 // so it contain the entire chain of processors, so we can safely use it directly as output 094 // if no error handler provided we use the output 095 // TODO: Camel 3.0 we should determine the output dynamically at runtime instead of having the 096 // the error handlers, interceptors, etc. woven in at design time 097 return errorHandler != null ? errorHandler : output; 098 } 099 100 public void setOutput(Processor output) { 101 this.output = output; 102 } 103 104 public Processor getNextProcessor() { 105 return nextProcessor; 106 } 107 108 public boolean hasInterceptorStrategy(Class<?> type) { 109 for (InterceptStrategy strategy : interceptors) { 110 if (type.isInstance(strategy)) { 111 return true; 112 } 113 } 114 return false; 115 } 116 117 public void setErrorHandler(Processor errorHandler) { 118 this.errorHandler = errorHandler; 119 } 120 121 public Processor getErrorHandler() { 122 return errorHandler; 123 } 124 125 public void addInterceptStrategy(InterceptStrategy strategy) { 126 interceptors.add(strategy); 127 } 128 129 public void addInterceptStrategies(List<InterceptStrategy> strategies) { 130 interceptors.addAll(strategies); 131 } 132 133 public List<InterceptStrategy> getInterceptStrategies() { 134 return interceptors; 135 } 136 137 public ProcessorDefinition<?> getProcessorDefinition() { 138 return definition; 139 } 140 141 public void setChildDefinition(ProcessorDefinition<?> childDefinition) { 142 this.childDefinition = childDefinition; 143 } 144 145 public RouteContext getRouteContext() { 146 return routeContext; 147 } 148 149 @Override 150 protected void doStart() throws Exception { 151 // create route context processor to wrap output 152 routeContextProcessor = new RouteContextProcessor(routeContext, getOutput()); 153 ServiceHelper.startServices(errorHandler, output, routeContextProcessor); 154 } 155 156 @Override 157 protected void doStop() throws Exception { 158 ServiceHelper.stopServices(output, errorHandler, routeContextProcessor); 159 } 160 161 public void initChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception { 162 this.routeContext = routeContext; 163 this.definition = outputDefinition; 164 this.camelContext = routeContext.getCamelContext(); 165 166 Processor target = nextProcessor; 167 Processor next; 168 169 // init CamelContextAware as early as possible on target 170 if (target instanceof CamelContextAware) { 171 ((CamelContextAware) target).setCamelContext(camelContext); 172 } 173 174 // the definition to wrap should be the fine grained, 175 // so if a child is set then use it, if not then its the original output used 176 ProcessorDefinition<?> targetOutputDef = childDefinition != null ? childDefinition : outputDefinition; 177 LOG.debug("Initialize channel for target: '{}'", targetOutputDef); 178 179 // fix parent/child relationship. This will be the case of the routes has been 180 // defined using XML DSL or end user may have manually assembled a route from the model. 181 // Background note: parent/child relationship is assembled on-the-fly when using Java DSL (fluent builders) 182 // where as when using XML DSL (JAXB) then it fixed after, but if people are using custom interceptors 183 // then we need to fix the parent/child relationship beforehand, and thus we can do it here 184 // ideally we need the design time route -> runtime route to be a 2-phase pass (scheduled work for Camel 3.0) 185 if (childDefinition != null && outputDefinition != childDefinition) { 186 childDefinition.setParent(outputDefinition); 187 } 188 189 // first wrap the output with the managed strategy if any 190 InterceptStrategy managed = routeContext.getManagedInterceptStrategy(); 191 if (managed != null) { 192 next = target == nextProcessor ? null : nextProcessor; 193 target = managed.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next); 194 } 195 196 // then wrap the output with the tracer 197 TraceInterceptor trace = (TraceInterceptor) getOrCreateTracer().wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, null); 198 // trace interceptor need to have a reference to route context so we at runtime can enable/disable tracing on-the-fly 199 trace.setRouteContext(routeContext); 200 target = trace; 201 202 // sort interceptors according to ordered 203 Collections.sort(interceptors, new OrderedComparator()); 204 // then reverse list so the first will be wrapped last, as it would then be first being invoked 205 Collections.reverse(interceptors); 206 // wrap the output with the configured interceptors 207 for (InterceptStrategy strategy : interceptors) { 208 next = target == nextProcessor ? null : nextProcessor; 209 // skip tracer as we did the specially beforehand and it could potentially be added as an interceptor strategy 210 if (strategy instanceof Tracer) { 211 continue; 212 } 213 // skip stream caching as it must be wrapped as outer most, which we do later 214 if (strategy instanceof StreamCaching) { 215 continue; 216 } 217 // use the fine grained definition (eg the child if available). Its always possible to get back to the parent 218 Processor wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next); 219 if (!(wrapped instanceof AsyncProcessor)) { 220 LOG.warn("Interceptor: " + strategy + " at: " + outputDefinition + " does not return an AsyncProcessor instance." 221 + " This causes the asynchronous routing engine to not work as optimal as possible." 222 + " See more details at the InterceptStrategy javadoc." 223 + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine," 224 + " but its not the most optimal solution. Please consider changing your interceptor to comply."); 225 226 // use a bridge and wrap again which allows us to adapt and leverage the asynchronous routing engine anyway 227 // however its not the most optimal solution, but we can still run. 228 InterceptorToAsyncProcessorBridge bridge = new InterceptorToAsyncProcessorBridge(target); 229 wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, bridge, next); 230 bridge.setTarget(wrapped); 231 wrapped = bridge; 232 } 233 // ensure target gets wrapped so we can control its lifecycle 234 if (!(wrapped instanceof WrapProcessor)) { 235 wrapped = new WrapProcessor(wrapped, target); 236 } 237 target = wrapped; 238 } 239 240 // sets the delegate to our wrapped output 241 output = target; 242 } 243 244 @Override 245 public void postInitChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception { 246 for (InterceptStrategy strategy : interceptors) { 247 // apply stream caching at the end as it should be outer most 248 if (strategy instanceof StreamCaching) { 249 if (errorHandler != null) { 250 errorHandler = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, errorHandler, null); 251 } else { 252 output = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, output, null); 253 } 254 break; 255 } 256 } 257 } 258 259 private InterceptStrategy getOrCreateTracer() { 260 InterceptStrategy tracer = Tracer.getTracer(camelContext); 261 if (tracer == null) { 262 if (camelContext.getRegistry() != null) { 263 // lookup in registry 264 Map<String, Tracer> map = camelContext.getRegistry().lookupByType(Tracer.class); 265 if (map.size() == 1) { 266 tracer = map.values().iterator().next(); 267 } 268 } 269 if (tracer == null) { 270 // fallback to use the default tracer 271 tracer = camelContext.getDefaultTracer(); 272 273 // configure and use any trace formatter if any exists 274 Map<String, TraceFormatter> formatters = camelContext.getRegistry().lookupByType(TraceFormatter.class); 275 if (formatters.size() == 1) { 276 TraceFormatter formatter = formatters.values().iterator().next(); 277 if (tracer instanceof Tracer) { 278 ((Tracer) tracer).setFormatter(formatter); 279 } 280 } 281 } 282 } 283 284 // which we must manage as well 285 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 286 if (tracer instanceof Service) { 287 strategy.onServiceAdd(camelContext, (Service) tracer, null); 288 } 289 } 290 291 return tracer; 292 } 293 294 public void process(Exchange exchange) throws Exception { 295 AsyncProcessorHelper.process(this, exchange); 296 } 297 298 public boolean process(final Exchange exchange, final AsyncCallback callback) { 299 Processor processor = getOutput(); 300 if (processor == null || !continueProcessing(exchange)) { 301 // we should not continue routing so we are done 302 callback.done(true); 303 return true; 304 } 305 306 // process the exchange using the route context processor 307 ObjectHelper.notNull(routeContextProcessor, "RouteContextProcessor", this); 308 return routeContextProcessor.process(exchange, callback); 309 } 310 311 /** 312 * Strategy to determine if we should continue processing the {@link Exchange}. 313 */ 314 protected boolean continueProcessing(Exchange exchange) { 315 Object stop = exchange.getProperty(Exchange.ROUTE_STOP); 316 if (stop != null) { 317 boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); 318 if (doStop) { 319 LOG.debug("Exchange is marked to stop routing: {}", exchange); 320 return false; 321 } 322 } 323 324 // determine if we can still run, or the camel context is forcing a shutdown 325 boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this); 326 if (forceShutdown) { 327 LOG.debug("Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: {}", exchange); 328 if (exchange.getException() == null) { 329 exchange.setException(new RejectedExecutionException()); 330 } 331 return false; 332 } 333 334 // yes we can continue 335 return true; 336 } 337 338 @Override 339 public String toString() { 340 // just output the next processor as all the interceptors and error handler is just too verbose 341 return "Channel[" + nextProcessor + "]"; 342 } 343 344 }