001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.HashMap; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.concurrent.atomic.AtomicInteger; 025 026 import org.apache.camel.CamelContext; 027 import org.apache.camel.Endpoint; 028 import org.apache.camel.NoSuchEndpointException; 029 import org.apache.camel.Processor; 030 import org.apache.camel.Route; 031 import org.apache.camel.ShutdownRoute; 032 import org.apache.camel.ShutdownRunningTask; 033 import org.apache.camel.management.InstrumentationProcessor; 034 import org.apache.camel.model.FromDefinition; 035 import org.apache.camel.model.ProcessorDefinition; 036 import org.apache.camel.model.RouteDefinition; 037 import org.apache.camel.processor.Pipeline; 038 import org.apache.camel.processor.RouteInflightRepositoryProcessor; 039 import org.apache.camel.processor.RoutePolicyProcessor; 040 import org.apache.camel.processor.UnitOfWorkProcessor; 041 import org.apache.camel.spi.InterceptStrategy; 042 import org.apache.camel.spi.RouteContext; 043 import org.apache.camel.spi.RoutePolicy; 044 import org.apache.camel.util.CamelContextHelper; 045 import org.apache.camel.util.ObjectHelper; 046 047 /** 048 * The context used to activate new routing rules 049 * 050 * @version 051 */ 052 public class DefaultRouteContext implements RouteContext { 053 private final Map<ProcessorDefinition<?>, AtomicInteger> nodeIndex = new HashMap<ProcessorDefinition<?>, AtomicInteger>(); 054 private final RouteDefinition route; 055 private FromDefinition from; 056 private final Collection<Route> routes; 057 private Endpoint endpoint; 058 private final List<Processor> eventDrivenProcessors = new ArrayList<Processor>(); 059 private CamelContext camelContext; 060 private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>(); 061 private InterceptStrategy managedInterceptStrategy; 062 private boolean routeAdded; 063 private Boolean trace; 064 private Boolean streamCache; 065 private Boolean handleFault; 066 private Long delay; 067 private Boolean autoStartup = Boolean.TRUE; 068 private List<RoutePolicy> routePolicyList = new ArrayList<RoutePolicy>(); 069 private ShutdownRoute shutdownRoute; 070 private ShutdownRunningTask shutdownRunningTask; 071 072 public DefaultRouteContext(CamelContext camelContext, RouteDefinition route, FromDefinition from, Collection<Route> routes) { 073 this.camelContext = camelContext; 074 this.route = route; 075 this.from = from; 076 this.routes = routes; 077 } 078 079 /** 080 * Only used for lazy construction from inside ExpressionType 081 */ 082 public DefaultRouteContext(CamelContext camelContext) { 083 this.camelContext = camelContext; 084 this.routes = new ArrayList<Route>(); 085 this.route = new RouteDefinition("temporary"); 086 } 087 088 public Endpoint getEndpoint() { 089 if (endpoint == null) { 090 endpoint = from.resolveEndpoint(this); 091 } 092 return endpoint; 093 } 094 095 public FromDefinition getFrom() { 096 return from; 097 } 098 099 public RouteDefinition getRoute() { 100 return route; 101 } 102 103 public CamelContext getCamelContext() { 104 return camelContext; 105 } 106 107 public Endpoint resolveEndpoint(String uri) { 108 return route.resolveEndpoint(getCamelContext(), uri); 109 } 110 111 public Endpoint resolveEndpoint(String uri, String ref) { 112 Endpoint endpoint = null; 113 if (uri != null) { 114 endpoint = resolveEndpoint(uri); 115 if (endpoint == null) { 116 throw new NoSuchEndpointException(uri); 117 } 118 } 119 if (ref != null) { 120 endpoint = lookup(ref, Endpoint.class); 121 if (endpoint == null) { 122 throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref); 123 } 124 // Check the endpoint has the right CamelContext 125 if (!this.getCamelContext().equals(endpoint.getCamelContext())) { 126 throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does."); 127 } 128 } 129 if (endpoint == null) { 130 throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this); 131 } else { 132 return endpoint; 133 } 134 } 135 136 public <T> T lookup(String name, Class<T> type) { 137 return getCamelContext().getRegistry().lookup(name, type); 138 } 139 140 public <T> Map<String, T> lookupByType(Class<T> type) { 141 return getCamelContext().getRegistry().lookupByType(type); 142 } 143 144 @Override 145 public <T> T mandatoryLookup(String name, Class<T> type) { 146 return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type); 147 } 148 149 public void commit() { 150 // now lets turn all of the event driven consumer processors into a single route 151 if (!eventDrivenProcessors.isEmpty()) { 152 Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors); 153 154 // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW 155 UnitOfWorkProcessor unitOfWorkProcessor = new UnitOfWorkProcessor(this, target); 156 157 // and then optionally add route policy processor if a custom policy is set 158 RoutePolicyProcessor routePolicyProcessor = null; 159 List<RoutePolicy> routePolicyList = getRoutePolicyList(); 160 if (routePolicyList != null && !routePolicyList.isEmpty()) { 161 for (RoutePolicy policy : routePolicyList) { 162 // add policy as service if we have not already done that (eg possible if two routes have the same service) 163 // this ensures Camel can control the lifecycle of the policy 164 if (!camelContext.hasService(policy)) { 165 try { 166 camelContext.addService(policy); 167 } catch (Exception e) { 168 throw ObjectHelper.wrapRuntimeCamelException(e); 169 } 170 } 171 } 172 routePolicyProcessor = new RoutePolicyProcessor(unitOfWorkProcessor, routePolicyList); 173 target = routePolicyProcessor; 174 } else { 175 target = unitOfWorkProcessor; 176 } 177 178 // wrap in route inflight processor to track number of inflight exchanges for the route 179 RouteInflightRepositoryProcessor inflight = new RouteInflightRepositoryProcessor(camelContext.getInflightRepository(), target); 180 181 // and wrap it by a instrumentation processor that is to be used for performance stats 182 // for this particular route 183 InstrumentationProcessor instrument = new InstrumentationProcessor(); 184 instrument.setType("route"); 185 instrument.setProcessor(inflight); 186 187 // and create the route that wraps the UoW 188 Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), instrument); 189 // create the route id 190 String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory()); 191 edcr.getProperties().put(Route.ID_PROPERTY, routeId); 192 edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode())); 193 if (route.getGroup() != null) { 194 edcr.getProperties().put(Route.GROUP_PROPERTY, route.getGroup()); 195 } 196 197 // after the route is created then set the route on the policy processor so we get hold of it 198 if (routePolicyProcessor != null) { 199 routePolicyProcessor.setRoute(edcr); 200 } 201 // after the route is created then set the route on the inflight processor so we get hold of it 202 inflight.setRoute(edcr); 203 204 // invoke init on route policy 205 if (routePolicyList != null && !routePolicyList.isEmpty()) { 206 for (RoutePolicy policy : routePolicyList) { 207 policy.onInit(edcr); 208 } 209 } 210 211 routes.add(edcr); 212 } 213 } 214 215 public void addEventDrivenProcessor(Processor processor) { 216 eventDrivenProcessors.add(processor); 217 } 218 219 public List<InterceptStrategy> getInterceptStrategies() { 220 return interceptStrategies; 221 } 222 223 public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) { 224 this.interceptStrategies = interceptStrategies; 225 } 226 227 public void addInterceptStrategy(InterceptStrategy interceptStrategy) { 228 getInterceptStrategies().add(interceptStrategy); 229 } 230 231 public void setManagedInterceptStrategy(InterceptStrategy interceptStrategy) { 232 this.managedInterceptStrategy = interceptStrategy; 233 } 234 235 public InterceptStrategy getManagedInterceptStrategy() { 236 return managedInterceptStrategy; 237 } 238 239 public boolean isRouteAdded() { 240 return routeAdded; 241 } 242 243 public void setIsRouteAdded(boolean routeAdded) { 244 this.routeAdded = routeAdded; 245 } 246 247 public void setTracing(Boolean tracing) { 248 this.trace = tracing; 249 } 250 251 public Boolean isTracing() { 252 if (trace != null) { 253 return trace; 254 } else { 255 // fallback to the option from camel context 256 return getCamelContext().isTracing(); 257 } 258 } 259 260 public void setStreamCaching(Boolean cache) { 261 this.streamCache = cache; 262 } 263 264 public Boolean isStreamCaching() { 265 if (streamCache != null) { 266 return streamCache; 267 } else { 268 // fallback to the option from camel context 269 return getCamelContext().isStreamCaching(); 270 } 271 } 272 273 public void setHandleFault(Boolean handleFault) { 274 this.handleFault = handleFault; 275 } 276 277 public Boolean isHandleFault() { 278 if (handleFault != null) { 279 return handleFault; 280 } else { 281 // fallback to the option from camel context 282 return getCamelContext().isHandleFault(); 283 } 284 } 285 286 public void setDelayer(Long delay) { 287 this.delay = delay; 288 } 289 290 public Long getDelayer() { 291 if (delay != null) { 292 return delay; 293 } else { 294 // fallback to the option from camel context 295 return getCamelContext().getDelayer(); 296 } 297 } 298 299 public void setAutoStartup(Boolean autoStartup) { 300 this.autoStartup = autoStartup; 301 } 302 303 public Boolean isAutoStartup() { 304 if (autoStartup != null) { 305 return autoStartup; 306 } 307 // default to true 308 return true; 309 } 310 311 public void setShutdownRoute(ShutdownRoute shutdownRoute) { 312 this.shutdownRoute = shutdownRoute; 313 } 314 315 public ShutdownRoute getShutdownRoute() { 316 if (shutdownRoute != null) { 317 return shutdownRoute; 318 } else { 319 // fallback to the option from camel context 320 return getCamelContext().getShutdownRoute(); 321 } 322 } 323 324 public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) { 325 this.shutdownRunningTask = shutdownRunningTask; 326 } 327 328 public ShutdownRunningTask getShutdownRunningTask() { 329 if (shutdownRunningTask != null) { 330 return shutdownRunningTask; 331 } else { 332 // fallback to the option from camel context 333 return getCamelContext().getShutdownRunningTask(); 334 } 335 } 336 337 public int getAndIncrement(ProcessorDefinition<?> node) { 338 AtomicInteger count = nodeIndex.get(node); 339 if (count == null) { 340 count = new AtomicInteger(); 341 nodeIndex.put(node, count); 342 } 343 return count.getAndIncrement(); 344 } 345 346 public void setRoutePolicyList(List<RoutePolicy> routePolicyList) { 347 this.routePolicyList = routePolicyList; 348 } 349 350 public List<RoutePolicy> getRoutePolicyList() { 351 return routePolicyList; 352 } 353 }