001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.apache.camel.CamelContext; 027import org.apache.camel.Endpoint; 028import org.apache.camel.NoSuchEndpointException; 029import org.apache.camel.Processor; 030import org.apache.camel.Route; 031import org.apache.camel.RuntimeCamelException; 032import org.apache.camel.ShutdownRoute; 033import org.apache.camel.ShutdownRunningTask; 034import org.apache.camel.model.FromDefinition; 035import org.apache.camel.model.ProcessorDefinition; 036import org.apache.camel.model.RouteDefinition; 037import org.apache.camel.processor.CamelInternalProcessor; 038import org.apache.camel.processor.ContractAdvice; 039import org.apache.camel.processor.Pipeline; 040import org.apache.camel.spi.Contract; 041import org.apache.camel.spi.InterceptStrategy; 042import org.apache.camel.spi.RouteContext; 043import org.apache.camel.spi.RouteController; 044import org.apache.camel.spi.RouteError; 045import org.apache.camel.spi.RoutePolicy; 046import org.apache.camel.util.CamelContextHelper; 047import org.apache.camel.util.ObjectHelper; 048 049/** 050 * The context used to activate new routing rules 051 * 052 * @version 053 */ 054public class DefaultRouteContext implements RouteContext { 055 private final Map<ProcessorDefinition<?>, AtomicInteger> nodeIndex = new HashMap<ProcessorDefinition<?>, AtomicInteger>(); 056 private final RouteDefinition route; 057 private FromDefinition from; 058 private final Collection<Route> routes; 059 private Endpoint endpoint; 060 private final List<Processor> eventDrivenProcessors = new ArrayList<Processor>(); 061 private CamelContext camelContext; 062 private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>(); 063 private InterceptStrategy managedInterceptStrategy; 064 private boolean routeAdded; 065 private Boolean trace; 066 private Boolean messageHistory; 067 private Boolean logMask; 068 private Boolean logExhaustedMessageBody; 069 private Boolean streamCache; 070 private Boolean handleFault; 071 private Long delay; 072 private Boolean autoStartup = Boolean.TRUE; 073 private List<RoutePolicy> routePolicyList = new ArrayList<RoutePolicy>(); 074 private ShutdownRoute shutdownRoute; 075 private ShutdownRunningTask shutdownRunningTask; 076 private RouteError routeError; 077 private RouteController routeController; 078 079 public DefaultRouteContext(CamelContext camelContext, RouteDefinition route, FromDefinition from, Collection<Route> routes) { 080 this.camelContext = camelContext; 081 this.route = route; 082 this.from = from; 083 this.routes = routes; 084 } 085 086 /** 087 * Only used for lazy construction from inside ExpressionType 088 */ 089 public DefaultRouteContext(CamelContext camelContext) { 090 this.camelContext = camelContext; 091 this.routes = new ArrayList<Route>(); 092 this.route = new RouteDefinition("temporary"); 093 } 094 095 public Endpoint getEndpoint() { 096 if (endpoint == null) { 097 endpoint = from.resolveEndpoint(this); 098 } 099 return endpoint; 100 } 101 102 public FromDefinition getFrom() { 103 return from; 104 } 105 106 public RouteDefinition getRoute() { 107 return route; 108 } 109 110 public CamelContext getCamelContext() { 111 return camelContext; 112 } 113 114 public Endpoint resolveEndpoint(String uri) { 115 return route.resolveEndpoint(getCamelContext(), uri); 116 } 117 118 public Endpoint resolveEndpoint(String uri, String ref) { 119 Endpoint endpoint = null; 120 if (uri != null) { 121 endpoint = resolveEndpoint(uri); 122 if (endpoint == null) { 123 throw new NoSuchEndpointException(uri); 124 } 125 } 126 if (ref != null) { 127 endpoint = lookup(ref, Endpoint.class); 128 if (endpoint == null) { 129 throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref); 130 } 131 // Check the endpoint has the right CamelContext 132 if (!this.getCamelContext().equals(endpoint.getCamelContext())) { 133 throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does."); 134 } 135 try { 136 // need add the endpoint into service 137 getCamelContext().addService(endpoint); 138 } catch (Exception ex) { 139 throw new RuntimeCamelException(ex); 140 } 141 } 142 if (endpoint == null) { 143 throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this); 144 } else { 145 return endpoint; 146 } 147 } 148 149 public <T> T lookup(String name, Class<T> type) { 150 return getCamelContext().getRegistry().lookupByNameAndType(name, type); 151 } 152 153 public <T> Map<String, T> lookupByType(Class<T> type) { 154 return getCamelContext().getRegistry().findByTypeWithName(type); 155 } 156 157 @Override 158 public <T> T mandatoryLookup(String name, Class<T> type) { 159 return CamelContextHelper.mandatoryLookup(getCamelContext(), name, type); 160 } 161 162 public void commit() { 163 // now lets turn all of the event driven consumer processors into a single route 164 if (!eventDrivenProcessors.isEmpty()) { 165 Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors); 166 167 // force creating the route id so its known ahead of the route is started 168 String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory()); 169 170 // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW 171 CamelInternalProcessor internal = new CamelInternalProcessor(target); 172 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(this)); 173 174 // and then optionally add route policy processor if a custom policy is set 175 List<RoutePolicy> routePolicyList = getRoutePolicyList(); 176 if (routePolicyList != null && !routePolicyList.isEmpty()) { 177 for (RoutePolicy policy : routePolicyList) { 178 // add policy as service if we have not already done that (eg possible if two routes have the same service) 179 // this ensures Camel can control the lifecycle of the policy 180 if (!camelContext.hasService(policy)) { 181 try { 182 camelContext.addService(policy); 183 } catch (Exception e) { 184 throw ObjectHelper.wrapRuntimeCamelException(e); 185 } 186 } 187 } 188 189 internal.addAdvice(new CamelInternalProcessor.RoutePolicyAdvice(routePolicyList)); 190 } 191 192 // wrap in route inflight processor to track number of inflight exchanges for the route 193 internal.addAdvice(new CamelInternalProcessor.RouteInflightRepositoryAdvice(camelContext.getInflightRepository(), routeId)); 194 195 // wrap in JMX instrumentation processor that is used for performance stats 196 internal.addAdvice(new CamelInternalProcessor.InstrumentationAdvice("route")); 197 198 // wrap in route lifecycle 199 internal.addAdvice(new CamelInternalProcessor.RouteLifecycleAdvice()); 200 201 // wrap in REST binding 202 if (route.getRestBindingDefinition() != null) { 203 try { 204 internal.addAdvice(route.getRestBindingDefinition().createRestBindingAdvice(this)); 205 } catch (Exception e) { 206 throw ObjectHelper.wrapRuntimeCamelException(e); 207 } 208 } 209 210 // wrap in contract 211 if (route.getInputType() != null || route.getOutputType() != null) { 212 Contract contract = new Contract(); 213 if (route.getInputType() != null) { 214 contract.setInputType(route.getInputType().getUrn()); 215 contract.setValidateInput(route.getInputType().isValidate()); 216 } 217 if (route.getOutputType() != null) { 218 contract.setOutputType(route.getOutputType().getUrn()); 219 contract.setValidateOutput(route.getOutputType().isValidate()); 220 } 221 internal.addAdvice(new ContractAdvice(contract)); 222 // make sure to enable data type as its in use when using input/output types on routes 223 camelContext.setUseDataType(true); 224 } 225 226 // and create the route that wraps the UoW 227 Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), internal); 228 edcr.getProperties().put(Route.ID_PROPERTY, routeId); 229 edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode())); 230 edcr.getProperties().put(Route.DESCRIPTION_PROPERTY, route.getDescriptionText()); 231 if (route.getGroup() != null) { 232 edcr.getProperties().put(Route.GROUP_PROPERTY, route.getGroup()); 233 } 234 String rest = "false"; 235 if (route.isRest() != null && route.isRest()) { 236 rest = "true"; 237 } 238 edcr.getProperties().put(Route.REST_PROPERTY, rest); 239 240 // after the route is created then set the route on the policy processor so we get hold of it 241 CamelInternalProcessor.RoutePolicyAdvice task = internal.getAdvice(CamelInternalProcessor.RoutePolicyAdvice.class); 242 if (task != null) { 243 task.setRoute(edcr); 244 } 245 CamelInternalProcessor.RouteLifecycleAdvice task2 = internal.getAdvice(CamelInternalProcessor.RouteLifecycleAdvice.class); 246 if (task2 != null) { 247 task2.setRoute(edcr); 248 } 249 250 // invoke init on route policy 251 if (routePolicyList != null && !routePolicyList.isEmpty()) { 252 for (RoutePolicy policy : routePolicyList) { 253 policy.onInit(edcr); 254 } 255 } 256 257 routes.add(edcr); 258 } 259 } 260 261 public void addEventDrivenProcessor(Processor processor) { 262 eventDrivenProcessors.add(processor); 263 } 264 265 public List<InterceptStrategy> getInterceptStrategies() { 266 return interceptStrategies; 267 } 268 269 public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) { 270 this.interceptStrategies = interceptStrategies; 271 } 272 273 public void addInterceptStrategy(InterceptStrategy interceptStrategy) { 274 getInterceptStrategies().add(interceptStrategy); 275 } 276 277 public void setManagedInterceptStrategy(InterceptStrategy interceptStrategy) { 278 this.managedInterceptStrategy = interceptStrategy; 279 } 280 281 public InterceptStrategy getManagedInterceptStrategy() { 282 return managedInterceptStrategy; 283 } 284 285 public boolean isRouteAdded() { 286 return routeAdded; 287 } 288 289 public void setIsRouteAdded(boolean routeAdded) { 290 this.routeAdded = routeAdded; 291 } 292 293 public void setTracing(Boolean tracing) { 294 this.trace = tracing; 295 } 296 297 public Boolean isTracing() { 298 if (trace != null) { 299 return trace; 300 } else { 301 // fallback to the option from camel context 302 return getCamelContext().isTracing(); 303 } 304 } 305 306 public void setMessageHistory(Boolean messageHistory) { 307 this.messageHistory = messageHistory; 308 } 309 310 public Boolean isMessageHistory() { 311 if (messageHistory != null) { 312 return messageHistory; 313 } else { 314 // fallback to the option from camel context 315 return getCamelContext().isMessageHistory(); 316 } 317 } 318 319 public void setLogMask(Boolean logMask) { 320 this.logMask = logMask; 321 } 322 323 public Boolean isLogMask() { 324 if (logMask != null) { 325 return logMask; 326 } else { 327 // fallback to the option from camel context 328 return getCamelContext().isLogMask(); 329 } 330 } 331 332 public void setLogExhaustedMessageBody(Boolean logExhaustedMessageBody) { 333 this.logExhaustedMessageBody = logExhaustedMessageBody; 334 } 335 336 public Boolean isLogExhaustedMessageBody() { 337 if (logExhaustedMessageBody != null) { 338 return logExhaustedMessageBody; 339 } else { 340 // fallback to the option from camel context 341 return getCamelContext().isLogExhaustedMessageBody(); 342 } 343 } 344 345 public void setStreamCaching(Boolean cache) { 346 this.streamCache = cache; 347 } 348 349 public Boolean isStreamCaching() { 350 if (streamCache != null) { 351 return streamCache; 352 } else { 353 // fallback to the option from camel context 354 return getCamelContext().isStreamCaching(); 355 } 356 } 357 358 public void setHandleFault(Boolean handleFault) { 359 this.handleFault = handleFault; 360 } 361 362 public Boolean isHandleFault() { 363 if (handleFault != null) { 364 return handleFault; 365 } else { 366 // fallback to the option from camel context 367 return getCamelContext().isHandleFault(); 368 } 369 } 370 371 public void setDelayer(Long delay) { 372 this.delay = delay; 373 } 374 375 public Long getDelayer() { 376 if (delay != null) { 377 return delay; 378 } else { 379 // fallback to the option from camel context 380 return getCamelContext().getDelayer(); 381 } 382 } 383 384 public void setAutoStartup(Boolean autoStartup) { 385 this.autoStartup = autoStartup; 386 } 387 388 public Boolean isAutoStartup() { 389 if (autoStartup != null) { 390 return autoStartup; 391 } 392 // default to true 393 return true; 394 } 395 396 public void setShutdownRoute(ShutdownRoute shutdownRoute) { 397 this.shutdownRoute = shutdownRoute; 398 } 399 400 public void setAllowUseOriginalMessage(Boolean allowUseOriginalMessage) { 401 // can only be configured on CamelContext 402 getCamelContext().setAllowUseOriginalMessage(allowUseOriginalMessage); 403 } 404 405 public Boolean isAllowUseOriginalMessage() { 406 // can only be configured on CamelContext 407 return getCamelContext().isAllowUseOriginalMessage(); 408 } 409 410 public ShutdownRoute getShutdownRoute() { 411 if (shutdownRoute != null) { 412 return shutdownRoute; 413 } else { 414 // fallback to the option from camel context 415 return getCamelContext().getShutdownRoute(); 416 } 417 } 418 419 public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) { 420 this.shutdownRunningTask = shutdownRunningTask; 421 } 422 423 public ShutdownRunningTask getShutdownRunningTask() { 424 if (shutdownRunningTask != null) { 425 return shutdownRunningTask; 426 } else { 427 // fallback to the option from camel context 428 return getCamelContext().getShutdownRunningTask(); 429 } 430 } 431 432 public int getAndIncrement(ProcessorDefinition<?> node) { 433 AtomicInteger count = nodeIndex.get(node); 434 if (count == null) { 435 count = new AtomicInteger(); 436 nodeIndex.put(node, count); 437 } 438 return count.getAndIncrement(); 439 } 440 441 public void setRoutePolicyList(List<RoutePolicy> routePolicyList) { 442 this.routePolicyList = routePolicyList; 443 } 444 445 public List<RoutePolicy> getRoutePolicyList() { 446 return routePolicyList; 447 } 448 449 @Override 450 public RouteError getLastError() { 451 return routeError; 452 } 453 454 @Override 455 public void setLastError(RouteError routeError) { 456 this.routeError = routeError; 457 } 458 459 @Override 460 public RouteController getRouteController() { 461 return routeController; 462 } 463 464 @Override 465 public void setRouteController(RouteController routeController) { 466 this.routeController = routeController; 467 } 468}