001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.camel.CamelContext; 027import org.apache.camel.Endpoint; 028import org.apache.camel.NoSuchEndpointException; 029import org.apache.camel.Processor; 030import org.apache.camel.Route; 031import org.apache.camel.RuntimeCamelException; 032import org.apache.camel.ShutdownRoute; 033import org.apache.camel.ShutdownRunningTask; 034import org.apache.camel.model.FromDefinition; 035import org.apache.camel.model.ProcessorDefinition; 036import org.apache.camel.model.RouteDefinition; 037import org.apache.camel.processor.CamelInternalProcessor; 038import org.apache.camel.processor.Pipeline; 039import org.apache.camel.spi.InterceptStrategy; 040import org.apache.camel.spi.RouteContext; 041import org.apache.camel.spi.RoutePolicy; 042import org.apache.camel.util.CamelContextHelper; 043import org.apache.camel.util.ObjectHelper; 044 045/** 046 * The context used to activate new routing rules 047 * 048 * @version 049 */ 050public class DefaultRouteContext implements RouteContext { 051 private final Map<ProcessorDefinition<?>, AtomicInteger> nodeIndex = new HashMap<ProcessorDefinition<?>, AtomicInteger>(); 052 private final RouteDefinition route; 053 private FromDefinition from; 054 private final Collection<Route> routes; 055 private Endpoint endpoint; 056 private final List<Processor> eventDrivenProcessors = new ArrayList<Processor>(); 057 private CamelContext camelContext; 058 private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>(); 059 private InterceptStrategy managedInterceptStrategy; 060 private boolean routeAdded; 061 private Boolean trace; 062 private Boolean messageHistory; 063 private Boolean logExhaustedMessageBody; 064 private Boolean streamCache; 065 private Boolean handleFault; 066 private Long delay; 067 private Boolean autoStartup = Boolean.TRUE; 068 private List<RoutePolicy> routePolicyList = new ArrayList<RoutePolicy>(); 069 private ShutdownRoute shutdownRoute; 070 private ShutdownRunningTask shutdownRunningTask; 071 072 public DefaultRouteContext(CamelContext camelContext, RouteDefinition route, FromDefinition from, Collection<Route> routes) { 073 this.camelContext = camelContext; 074 this.route = route; 075 this.from = from; 076 this.routes = routes; 077 } 078 079 /** 080 * Only used for lazy construction from inside ExpressionType 081 */ 082 public DefaultRouteContext(CamelContext camelContext) { 083 this.camelContext = camelContext; 084 this.routes = new ArrayList<Route>(); 085 this.route = new RouteDefinition("temporary"); 086 } 087 088 public Endpoint getEndpoint() { 089 if (endpoint == null) { 090 endpoint = from.resolveEndpoint(this); 091 } 092 return endpoint; 093 } 094 095 public FromDefinition getFrom() { 096 return from; 097 } 098 099 public RouteDefinition getRoute() { 100 return route; 101 } 102 103 public CamelContext getCamelContext() { 104 return camelContext; 105 } 106 107 public Endpoint resolveEndpoint(String uri) { 108 return route.resolveEndpoint(getCamelContext(), uri); 109 } 110 111 public Endpoint resolveEndpoint(String uri, String ref) { 112 Endpoint endpoint = null; 113 if (uri != null) { 114 endpoint = resolveEndpoint(uri); 115 if (endpoint == null) { 116 throw new NoSuchEndpointException(uri); 117 } 118 } 119 if (ref != null) { 120 endpoint = lookup(ref, Endpoint.class); 121 if (endpoint == null) { 122 throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref); 123 } 124 // Check the endpoint has the right CamelContext 125 if (!this.getCamelContext().equals(endpoint.getCamelContext())) { 126 throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does."); 127 } 128 try { 129 // need add the endpoint into service 130 getCamelContext().addService(endpoint); 131 } catch (Exception ex) { 132 throw new RuntimeCamelException(ex); 133 } 134 } 135 if (endpoint == null) { 136 throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this); 137 } else { 138 return endpoint; 139 } 140 } 141 142 public <T> T lookup(String name, Class<T> type) { 143 return getCamelContext().getRegistry().lookupByNameAndType(name, type); 144 } 145 146 public <T> Map<String, T> lookupByType(Class<T> type) { 147 return getCamelContext().getRegistry().findByTypeWithName(type); 148 } 149 150 @Override 151 public <T> T mandatoryLookup(String name, Class<T> type) { 152 return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type); 153 } 154 155 public void commit() { 156 // now lets turn all of the event driven consumer processors into a single route 157 if (!eventDrivenProcessors.isEmpty()) { 158 Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors); 159 160 // force creating the route id so its known ahead of the route is started 161 String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory()); 162 163 // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW 164 CamelInternalProcessor internal = new CamelInternalProcessor(target); 165 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(this)); 166 167 // and then optionally add route policy processor if a custom policy is set 168 List<RoutePolicy> routePolicyList = getRoutePolicyList(); 169 if (routePolicyList != null && !routePolicyList.isEmpty()) { 170 for (RoutePolicy policy : routePolicyList) { 171 // add policy as service if we have not already done that (eg possible if two routes have the same service) 172 // this ensures Camel can control the lifecycle of the policy 173 if (!camelContext.hasService(policy)) { 174 try { 175 camelContext.addService(policy); 176 } catch (Exception e) { 177 throw ObjectHelper.wrapRuntimeCamelException(e); 178 } 179 } 180 } 181 182 internal.addAdvice(new CamelInternalProcessor.RoutePolicyAdvice(routePolicyList)); 183 } 184 185 // wrap in route inflight processor to track number of inflight exchanges for the route 186 internal.addAdvice(new CamelInternalProcessor.RouteInflightRepositoryAdvice(camelContext.getInflightRepository(), routeId)); 187 188 // wrap in JMX instrumentation processor that is used for performance stats 189 internal.addAdvice(new CamelInternalProcessor.InstrumentationAdvice("route")); 190 191 // wrap in route lifecycle 192 internal.addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice()); 193 194 // and create the route that wraps the UoW 195 Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal); 196 edcr.getProperties().put(Route.ID_PROPERTY, routeId); 197 edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode())); 198 edcr.getProperties().put(Route.DESCRIPTION_PROPERTY, route.getDescriptionText()); 199 if (route.getGroup() != null) { 200 edcr.getProperties().put(Route.GROUP_PROPERTY, route.getGroup()); 201 } 202 String rest = "false"; 203 if (route.isRest() != null && route.isRest()) { 204 rest = "true"; 205 } 206 edcr.getProperties().put(Route.REST_PROPERTY, rest); 207 208 // after the route is created then set the route on the policy processor so we get hold of it 209 CamelInternalProcessor.RoutePolicyAdvice task = internal.getAdvice(CamelInternalProcessor.RoutePolicyAdvice.class); 210 if (task != null) { 211 task.setRoute(edcr); 212 } 213 CamelInternalProcessor.RouteLifecycleAdvice task2 = internal.getAdvice(CamelInternalProcessor.RouteLifecycleAdvice.class); 214 if (task2 != null) { 215 task2.setRoute(edcr); 216 } 217 218 // invoke init on route policy 219 if (routePolicyList != null && !routePolicyList.isEmpty()) { 220 for (RoutePolicy policy : routePolicyList) { 221 policy.onInit(edcr); 222 } 223 } 224 225 routes.add(edcr); 226 } 227 } 228 229 public void addEventDrivenProcessor(Processor processor) { 230 eventDrivenProcessors.add(processor); 231 } 232 233 public List<InterceptStrategy> getInterceptStrategies() { 234 return interceptStrategies; 235 } 236 237 public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) { 238 this.interceptStrategies = interceptStrategies; 239 } 240 241 public void addInterceptStrategy(InterceptStrategy interceptStrategy) { 242 getInterceptStrategies().add(interceptStrategy); 243 } 244 245 public void setManagedInterceptStrategy(InterceptStrategy interceptStrategy) { 246 this.managedInterceptStrategy = interceptStrategy; 247 } 248 249 public InterceptStrategy getManagedInterceptStrategy() { 250 return managedInterceptStrategy; 251 } 252 253 public boolean isRouteAdded() { 254 return routeAdded; 255 } 256 257 public void setIsRouteAdded(boolean routeAdded) { 258 this.routeAdded = routeAdded; 259 } 260 261 public void setTracing(Boolean tracing) { 262 this.trace = tracing; 263 } 264 265 public Boolean isTracing() { 266 if (trace != null) { 267 return trace; 268 } else { 269 // fallback to the option from camel context 270 return getCamelContext().isTracing(); 271 } 272 } 273 274 public void setMessageHistory(Boolean messageHistory) { 275 this.messageHistory = messageHistory; 276 } 277 278 public Boolean isMessageHistory() { 279 if (messageHistory != null) { 280 return messageHistory; 281 } else { 282 // fallback to the option from camel context 283 return getCamelContext().isMessageHistory(); 284 } 285 } 286 287 public void setLogExhaustedMessageBody(Boolean logExhaustedMessageBody) { 288 this.logExhaustedMessageBody = logExhaustedMessageBody; 289 } 290 291 public Boolean isLogExhaustedMessageBody() { 292 if (logExhaustedMessageBody != null) { 293 return logExhaustedMessageBody; 294 } else { 295 // fallback to the option from camel context 296 return getCamelContext().isLogExhaustedMessageBody(); 297 } 298 } 299 300 public void setStreamCaching(Boolean cache) { 301 this.streamCache = cache; 302 } 303 304 public Boolean isStreamCaching() { 305 if (streamCache != null) { 306 return streamCache; 307 } else { 308 // fallback to the option from camel context 309 return getCamelContext().isStreamCaching(); 310 } 311 } 312 313 public void setHandleFault(Boolean handleFault) { 314 this.handleFault = handleFault; 315 } 316 317 public Boolean isHandleFault() { 318 if (handleFault != null) { 319 return handleFault; 320 } else { 321 // fallback to the option from camel context 322 return getCamelContext().isHandleFault(); 323 } 324 } 325 326 public void setDelayer(Long delay) { 327 this.delay = delay; 328 } 329 330 public Long getDelayer() { 331 if (delay != null) { 332 return delay; 333 } else { 334 // fallback to the option from camel context 335 return getCamelContext().getDelayer(); 336 } 337 } 338 339 public void setAutoStartup(Boolean autoStartup) { 340 this.autoStartup = autoStartup; 341 } 342 343 public Boolean isAutoStartup() { 344 if (autoStartup != null) { 345 return autoStartup; 346 } 347 // default to true 348 return true; 349 } 350 351 public void setShutdownRoute(ShutdownRoute shutdownRoute) { 352 this.shutdownRoute = shutdownRoute; 353 } 354 355 public void setAllowUseOriginalMessage(Boolean allowUseOriginalMessage) { 356 // can only be configured on CamelContext 357 getCamelContext().setAllowUseOriginalMessage(allowUseOriginalMessage); 358 } 359 360 public Boolean isAllowUseOriginalMessage() { 361 // can only be configured on CamelContext 362 return getCamelContext().isAllowUseOriginalMessage(); 363 } 364 365 public ShutdownRoute getShutdownRoute() { 366 if (shutdownRoute != null) { 367 return shutdownRoute; 368 } else { 369 // fallback to the option from camel context 370 return getCamelContext().getShutdownRoute(); 371 } 372 } 373 374 public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) { 375 this.shutdownRunningTask = shutdownRunningTask; 376 } 377 378 public ShutdownRunningTask getShutdownRunningTask() { 379 if (shutdownRunningTask != null) { 380 return shutdownRunningTask; 381 } else { 382 // fallback to the option from camel context 383 return getCamelContext().getShutdownRunningTask(); 384 } 385 } 386 387 public int getAndIncrement(ProcessorDefinition<?> node) { 388 AtomicInteger count = nodeIndex.get(node); 389 if (count == null) { 390 count = new AtomicInteger(); 391 nodeIndex.put(node, count); 392 } 393 return count.getAndIncrement(); 394 } 395 396 public void setRoutePolicyList(List<RoutePolicy> routePolicyList) { 397 this.routePolicyList = routePolicyList; 398 } 399 400 public List<RoutePolicy> getRoutePolicyList() { 401 return routePolicyList; 402 } 403}