001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.io.IOException; 020 import java.io.InputStream; 021 import java.util.ArrayList; 022 import java.util.Arrays; 023 import java.util.Collection; 024 import java.util.Collections; 025 import java.util.Date; 026 import java.util.HashMap; 027 import java.util.Iterator; 028 import java.util.LinkedHashMap; 029 import java.util.LinkedHashSet; 030 import java.util.List; 031 import java.util.Map; 032 import java.util.Set; 033 import java.util.TreeMap; 034 import java.util.concurrent.ScheduledExecutorService; 035 import java.util.concurrent.TimeUnit; 036 import java.util.concurrent.atomic.AtomicBoolean; 037 import java.util.concurrent.atomic.AtomicInteger; 038 import javax.naming.Context; 039 import javax.xml.bind.JAXBContext; 040 import javax.xml.bind.Unmarshaller; 041 042 import org.apache.camel.CamelContext; 043 import org.apache.camel.CamelContextAware; 044 import org.apache.camel.Component; 045 import org.apache.camel.Consumer; 046 import org.apache.camel.ConsumerTemplate; 047 import org.apache.camel.Endpoint; 048 import org.apache.camel.ErrorHandlerFactory; 049 import org.apache.camel.FailedToStartRouteException; 050 import org.apache.camel.IsSingleton; 051 import org.apache.camel.MultipleConsumersSupport; 052 import org.apache.camel.NoFactoryAvailableException; 053 import org.apache.camel.NoSuchEndpointException; 054 import org.apache.camel.Processor; 055 import org.apache.camel.Producer; 056 import org.apache.camel.ProducerTemplate; 057 import org.apache.camel.ResolveEndpointFailedException; 058 import org.apache.camel.Route; 059 import org.apache.camel.RoutesBuilder; 060 import org.apache.camel.RuntimeCamelException; 061 import org.apache.camel.Service; 062 import org.apache.camel.ServiceStatus; 063 import org.apache.camel.ShutdownRoute; 064 import org.apache.camel.ShutdownRunningTask; 065 import org.apache.camel.StartupListener; 066 import org.apache.camel.StatefulService; 067 import org.apache.camel.SuspendableService; 068 import org.apache.camel.TypeConverter; 069 import org.apache.camel.VetoCamelContextStartException; 070 import org.apache.camel.builder.ErrorHandlerBuilder; 071 import org.apache.camel.component.properties.PropertiesComponent; 072 import org.apache.camel.fabric.FabricTracer; 073 import org.apache.camel.impl.converter.BaseTypeConverterRegistry; 074 import org.apache.camel.impl.converter.DefaultTypeConverter; 075 import org.apache.camel.impl.converter.LazyLoadingTypeConverter; 076 import org.apache.camel.management.DefaultManagementMBeanAssembler; 077 import org.apache.camel.management.JmxSystemPropertyKeys; 078 import org.apache.camel.management.ManagementStrategyFactory; 079 import org.apache.camel.model.Constants; 080 import org.apache.camel.model.DataFormatDefinition; 081 import org.apache.camel.model.ModelCamelContext; 082 import org.apache.camel.model.RouteDefinition; 083 import org.apache.camel.model.RoutesDefinition; 084 import org.apache.camel.processor.interceptor.Debug; 085 import org.apache.camel.processor.interceptor.Delayer; 086 import org.apache.camel.processor.interceptor.HandleFault; 087 import org.apache.camel.processor.interceptor.StreamCaching; 088 import org.apache.camel.processor.interceptor.Tracer; 089 import org.apache.camel.spi.CamelContextNameStrategy; 090 import org.apache.camel.spi.ClassResolver; 091 import org.apache.camel.spi.ComponentResolver; 092 import org.apache.camel.spi.DataFormat; 093 import org.apache.camel.spi.DataFormatResolver; 094 import org.apache.camel.spi.Debugger; 095 import org.apache.camel.spi.EndpointStrategy; 096 import org.apache.camel.spi.EventNotifier; 097 import org.apache.camel.spi.ExecutorServiceManager; 098 import org.apache.camel.spi.FactoryFinder; 099 import org.apache.camel.spi.FactoryFinderResolver; 100 import org.apache.camel.spi.InflightRepository; 101 import org.apache.camel.spi.Injector; 102 import org.apache.camel.spi.InterceptStrategy; 103 import org.apache.camel.spi.Language; 104 import org.apache.camel.spi.LanguageResolver; 105 import org.apache.camel.spi.LifecycleStrategy; 106 import org.apache.camel.spi.ManagementMBeanAssembler; 107 import org.apache.camel.spi.ManagementNameStrategy; 108 import org.apache.camel.spi.ManagementStrategy; 109 import org.apache.camel.spi.NodeIdFactory; 110 import org.apache.camel.spi.PackageScanClassResolver; 111 import org.apache.camel.spi.ProcessorFactory; 112 import org.apache.camel.spi.Registry; 113 import org.apache.camel.spi.RouteContext; 114 import org.apache.camel.spi.RouteStartupOrder; 115 import org.apache.camel.spi.ServicePool; 116 import org.apache.camel.spi.ShutdownStrategy; 117 import org.apache.camel.spi.TypeConverterRegistry; 118 import org.apache.camel.spi.UuidGenerator; 119 import org.apache.camel.support.ServiceSupport; 120 import org.apache.camel.util.CamelContextHelper; 121 import org.apache.camel.util.CastUtils; 122 import org.apache.camel.util.EndpointHelper; 123 import org.apache.camel.util.EventHelper; 124 import org.apache.camel.util.ObjectHelper; 125 import org.apache.camel.util.ServiceHelper; 126 import org.apache.camel.util.StopWatch; 127 import org.apache.camel.util.TimeUtils; 128 import org.apache.camel.util.URISupport; 129 import org.slf4j.Logger; 130 import org.slf4j.LoggerFactory; 131 132 /** 133 * Represents the context used to configure routes and the policies to use. 134 * 135 * @version 136 */ 137 @SuppressWarnings("deprecation") 138 public class DefaultCamelContext extends ServiceSupport implements ModelCamelContext, SuspendableService { 139 private final transient Logger log = LoggerFactory.getLogger(getClass()); 140 private JAXBContext jaxbContext; 141 private CamelContextNameStrategy nameStrategy = new DefaultCamelContextNameStrategy(); 142 private ManagementNameStrategy managementNameStrategy = new DefaultManagementNameStrategy(this); 143 private String managementName; 144 private ClassLoader applicationContextClassLoader; 145 private Map<EndpointKey, Endpoint> endpoints; 146 private final AtomicInteger endpointKeyCounter = new AtomicInteger(); 147 private final List<EndpointStrategy> endpointStrategies = new ArrayList<EndpointStrategy>(); 148 private final Map<String, Component> components = new HashMap<String, Component>(); 149 private final Set<Route> routes = new LinkedHashSet<Route>(); 150 private final List<Service> servicesToClose = new ArrayList<Service>(); 151 private final Set<StartupListener> startupListeners = new LinkedHashSet<StartupListener>(); 152 private TypeConverter typeConverter; 153 private TypeConverterRegistry typeConverterRegistry; 154 private Injector injector; 155 private ComponentResolver componentResolver; 156 private boolean autoCreateComponents = true; 157 private LanguageResolver languageResolver = new DefaultLanguageResolver(); 158 private final Map<String, Language> languages = new HashMap<String, Language>(); 159 private Registry registry; 160 private List<LifecycleStrategy> lifecycleStrategies = new ArrayList<LifecycleStrategy>(); 161 private ManagementStrategy managementStrategy; 162 private ManagementMBeanAssembler managementMBeanAssembler; 163 private AtomicBoolean managementStrategyInitialized = new AtomicBoolean(false); 164 private final List<RouteDefinition> routeDefinitions = new ArrayList<RouteDefinition>(); 165 private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>(); 166 167 // special flags to control the first startup which can are special 168 private volatile boolean firstStartDone; 169 private volatile boolean doNotStartRoutesOnFirstStart; 170 private final ThreadLocal<Boolean> isStartingRoutes = new ThreadLocal<Boolean>(); 171 private Boolean autoStartup = Boolean.TRUE; 172 private Boolean trace = Boolean.FALSE; 173 private Boolean streamCache = Boolean.FALSE; 174 private Boolean handleFault = Boolean.FALSE; 175 private Boolean disableJMX = Boolean.FALSE; 176 private Boolean lazyLoadTypeConverters = Boolean.FALSE; 177 private Boolean useMDCLogging = Boolean.FALSE; 178 private Boolean useBreadcrumb = Boolean.TRUE; 179 private Long delay; 180 private ErrorHandlerFactory errorHandlerBuilder; 181 private ScheduledExecutorService errorHandlerExecutorService; 182 private Map<String, DataFormatDefinition> dataFormats = new HashMap<String, DataFormatDefinition>(); 183 private DataFormatResolver dataFormatResolver = new DefaultDataFormatResolver(); 184 private Map<String, String> properties = new HashMap<String, String>(); 185 private FactoryFinderResolver factoryFinderResolver = new DefaultFactoryFinderResolver(); 186 private FactoryFinder defaultFactoryFinder; 187 private PropertiesComponent propertiesComponent; 188 private final Map<String, FactoryFinder> factories = new HashMap<String, FactoryFinder>(); 189 private final Map<String, RouteService> routeServices = new LinkedHashMap<String, RouteService>(); 190 private final Map<String, RouteService> suspendedRouteServices = new LinkedHashMap<String, RouteService>(); 191 private ClassResolver classResolver = new DefaultClassResolver(); 192 private PackageScanClassResolver packageScanClassResolver; 193 // we use a capacity of 100 per endpoint, so for the same endpoint we have at most 100 producers in the pool 194 // so if we have 6 endpoints in the pool, we can have 6 x 100 producers in total 195 private ServicePool<Endpoint, Producer> producerServicePool = new SharedProducerServicePool(100); 196 private NodeIdFactory nodeIdFactory = new DefaultNodeIdFactory(); 197 private ProcessorFactory processorFactory; 198 private InterceptStrategy defaultTracer; 199 private InflightRepository inflightRepository = new DefaultInflightRepository(); 200 private final List<RouteStartupOrder> routeStartupOrder = new ArrayList<RouteStartupOrder>(); 201 // start auto assigning route ids using numbering 1000 and upwards 202 private int defaultRouteStartupOrder = 1000; 203 private ShutdownStrategy shutdownStrategy = new DefaultShutdownStrategy(this); 204 private ShutdownRoute shutdownRoute = ShutdownRoute.Default; 205 private ShutdownRunningTask shutdownRunningTask = ShutdownRunningTask.CompleteCurrentTaskOnly; 206 private ExecutorServiceManager executorServiceManager; 207 private Debugger debugger; 208 private UuidGenerator uuidGenerator = createDefaultUuidGenerator(); 209 private final StopWatch stopWatch = new StopWatch(false); 210 private Date startDate; 211 212 public DefaultCamelContext() { 213 this.executorServiceManager = new DefaultExecutorServiceManager(this); 214 215 // create endpoint registry at first since end users may access endpoints before CamelContext is started 216 this.endpoints = new EndpointRegistry(this); 217 218 // use WebSphere specific resolver if running on WebSphere 219 if (WebSpherePackageScanClassResolver.isWebSphereClassLoader(this.getClass().getClassLoader())) { 220 log.info("Using WebSphere specific PackageScanClassResolver"); 221 packageScanClassResolver = new WebSpherePackageScanClassResolver("META-INF/services/org/apache/camel/TypeConverter"); 222 } else { 223 packageScanClassResolver = new DefaultPackageScanClassResolver(); 224 } 225 } 226 227 /** 228 * Creates the {@link CamelContext} using the given JNDI context as the registry 229 * 230 * @param jndiContext the JNDI context 231 */ 232 public DefaultCamelContext(Context jndiContext) { 233 this(); 234 setJndiContext(jndiContext); 235 } 236 237 /** 238 * Creates the {@link CamelContext} using the given registry 239 * 240 * @param registry the registry 241 */ 242 public DefaultCamelContext(Registry registry) { 243 this(); 244 setRegistry(registry); 245 } 246 247 public String getName() { 248 return getNameStrategy().getName(); 249 } 250 251 /** 252 * Sets the name of the this context. 253 * 254 * @param name the name 255 */ 256 public void setName(String name) { 257 // use an explicit name strategy since an explicit name was provided to be used 258 this.nameStrategy = new ExplicitCamelContextNameStrategy(name); 259 } 260 261 public CamelContextNameStrategy getNameStrategy() { 262 return nameStrategy; 263 } 264 265 public void setNameStrategy(CamelContextNameStrategy nameStrategy) { 266 this.nameStrategy = nameStrategy; 267 } 268 269 public ManagementNameStrategy getManagementNameStrategy() { 270 return managementNameStrategy; 271 } 272 273 public void setManagementNameStrategy(ManagementNameStrategy managementNameStrategy) { 274 this.managementNameStrategy = managementNameStrategy; 275 } 276 277 public String getManagementName() { 278 return managementName; 279 } 280 281 public void setManagementName(String managementName) { 282 this.managementName = managementName; 283 } 284 285 public Component hasComponent(String componentName) { 286 return components.get(componentName); 287 } 288 289 public void addComponent(String componentName, final Component component) { 290 ObjectHelper.notNull(component, "component"); 291 synchronized (components) { 292 if (components.containsKey(componentName)) { 293 throw new IllegalArgumentException("Cannot add component as its already previously added: " + componentName); 294 } 295 component.setCamelContext(this); 296 components.put(componentName, component); 297 for (LifecycleStrategy strategy : lifecycleStrategies) { 298 strategy.onComponentAdd(componentName, component); 299 } 300 301 // keep reference to properties component up to date 302 if (component instanceof PropertiesComponent && "properties".equals(componentName)) { 303 propertiesComponent = (PropertiesComponent) component; 304 } 305 } 306 } 307 308 public Component getComponent(String name) { 309 // synchronize the look up and auto create so that 2 threads can't 310 // concurrently auto create the same component. 311 synchronized (components) { 312 Component component = components.get(name); 313 if (component == null && autoCreateComponents) { 314 try { 315 if (log.isDebugEnabled()) { 316 log.debug("Using ComponentResolver: {} to resolve component with name: {}", getComponentResolver(), name); 317 } 318 component = getComponentResolver().resolveComponent(name, this); 319 if (component != null) { 320 addComponent(name, component); 321 if (isStarted() || isStarting()) { 322 // If the component is looked up after the context is started, lets start it up. 323 if (component instanceof Service) { 324 startService((Service)component); 325 } 326 } 327 } 328 } catch (Exception e) { 329 throw new RuntimeCamelException("Cannot auto create component: " + name, e); 330 } 331 } 332 log.trace("getComponent({}) -> {}", name, component); 333 return component; 334 } 335 } 336 337 public <T extends Component> T getComponent(String name, Class<T> componentType) { 338 Component component = getComponent(name); 339 if (componentType.isInstance(component)) { 340 return componentType.cast(component); 341 } else { 342 String message; 343 if (component == null) { 344 message = "Did not find component given by the name: " + name; 345 } else { 346 message = "Found component of type: " + component.getClass() + " instead of expected: " + componentType; 347 } 348 throw new IllegalArgumentException(message); 349 } 350 } 351 352 public Component removeComponent(String componentName) { 353 synchronized (components) { 354 Component answer = components.remove(componentName); 355 if (answer != null) { 356 for (LifecycleStrategy strategy : lifecycleStrategies) { 357 strategy.onComponentRemove(componentName, answer); 358 } 359 } 360 // keep reference to properties component up to date 361 if (answer != null && "properties".equals(componentName)) { 362 propertiesComponent = null; 363 } 364 return answer; 365 } 366 } 367 368 // Endpoint Management Methods 369 // ----------------------------------------------------------------------- 370 371 public Collection<Endpoint> getEndpoints() { 372 return new ArrayList<Endpoint>(endpoints.values()); 373 } 374 375 public Map<String, Endpoint> getEndpointMap() { 376 TreeMap<String, Endpoint> answer = new TreeMap<String, Endpoint>(); 377 for (Map.Entry<EndpointKey, Endpoint> entry : endpoints.entrySet()) { 378 answer.put(entry.getKey().get(), entry.getValue()); 379 } 380 return answer; 381 } 382 383 public Endpoint hasEndpoint(String uri) { 384 return endpoints.get(getEndpointKey(uri)); 385 } 386 387 public Endpoint addEndpoint(String uri, Endpoint endpoint) throws Exception { 388 Endpoint oldEndpoint; 389 390 startService(endpoint); 391 oldEndpoint = endpoints.remove(getEndpointKey(uri)); 392 for (LifecycleStrategy strategy : lifecycleStrategies) { 393 strategy.onEndpointAdd(endpoint); 394 } 395 addEndpointToRegistry(uri, endpoint); 396 if (oldEndpoint != null) { 397 stopServices(oldEndpoint); 398 } 399 400 return oldEndpoint; 401 } 402 403 public Collection<Endpoint> removeEndpoints(String uri) throws Exception { 404 Collection<Endpoint> answer = new ArrayList<Endpoint>(); 405 Endpoint oldEndpoint = endpoints.remove(getEndpointKey(uri)); 406 if (oldEndpoint != null) { 407 answer.add(oldEndpoint); 408 stopServices(oldEndpoint); 409 } else { 410 for (Map.Entry<EndpointKey, Endpoint> entry : endpoints.entrySet()) { 411 oldEndpoint = entry.getValue(); 412 if (EndpointHelper.matchEndpoint(this, oldEndpoint.getEndpointUri(), uri)) { 413 try { 414 stopServices(oldEndpoint); 415 answer.add(oldEndpoint); 416 endpoints.remove(entry.getKey()); 417 } catch (Exception e) { 418 log.warn("Error stopping endpoint {}. This exception will be ignored.", oldEndpoint); 419 } 420 } 421 } 422 } 423 424 // notify lifecycle its being removed 425 for (Endpoint endpoint : answer) { 426 for (LifecycleStrategy strategy : lifecycleStrategies) { 427 strategy.onEndpointRemove(endpoint); 428 } 429 } 430 431 return answer; 432 } 433 434 public Endpoint getEndpoint(String uri) { 435 ObjectHelper.notEmpty(uri, "uri"); 436 437 log.trace("Getting endpoint with uri: {}", uri); 438 439 // in case path has property placeholders then try to let property component resolve those 440 try { 441 uri = resolvePropertyPlaceholders(uri); 442 } catch (Exception e) { 443 throw new ResolveEndpointFailedException(uri, e); 444 } 445 446 // normalize uri so we can do endpoint hits with minor mistakes and parameters is not in the same order 447 uri = normalizeEndpointUri(uri); 448 449 log.trace("Getting endpoint with normalized uri: {}", uri); 450 451 Endpoint answer; 452 String scheme = null; 453 EndpointKey key = getEndpointKey(uri); 454 answer = endpoints.get(key); 455 if (answer == null) { 456 try { 457 // Use the URI prefix to find the component. 458 String splitURI[] = ObjectHelper.splitOnCharacter(uri, ":", 2); 459 if (splitURI[1] != null) { 460 scheme = splitURI[0]; 461 log.trace("Endpoint uri: {} is from component with name: {}", uri, scheme); 462 Component component = getComponent(scheme); 463 464 // Ask the component to resolve the endpoint. 465 if (component != null) { 466 log.trace("Creating endpoint from uri: {} using component: {}", uri, component); 467 468 // Have the component create the endpoint if it can. 469 answer = component.createEndpoint(uri); 470 471 if (answer != null && log.isDebugEnabled()) { 472 log.debug("{} converted to endpoint: {} by component: {}", new Object[]{URISupport.sanitizeUri(uri), answer, component}); 473 } 474 } 475 } 476 477 if (answer == null) { 478 // no component then try in registry and elsewhere 479 answer = createEndpoint(uri); 480 log.trace("No component to create endpoint from uri: {} fallback lookup in registry -> {}", uri, answer); 481 } 482 483 if (answer != null) { 484 addService(answer); 485 answer = addEndpointToRegistry(uri, answer); 486 } 487 } catch (Exception e) { 488 throw new ResolveEndpointFailedException(uri, e); 489 } 490 } 491 492 // unknown scheme 493 if (answer == null && scheme != null) { 494 throw new ResolveEndpointFailedException(uri, "No component found with scheme: " + scheme); 495 } 496 497 return answer; 498 } 499 500 public <T extends Endpoint> T getEndpoint(String name, Class<T> endpointType) { 501 Endpoint endpoint = getEndpoint(name); 502 if (endpoint == null) { 503 throw new NoSuchEndpointException(name); 504 } 505 if (endpoint instanceof InterceptSendToEndpoint) { 506 endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate(); 507 } 508 if (endpointType.isInstance(endpoint)) { 509 return endpointType.cast(endpoint); 510 } else { 511 throw new IllegalArgumentException("The endpoint is not of type: " + endpointType 512 + " but is: " + endpoint.getClass().getCanonicalName()); 513 } 514 } 515 516 public void addRegisterEndpointCallback(EndpointStrategy strategy) { 517 if (!endpointStrategies.contains(strategy)) { 518 // let it be invoked for already registered endpoints so it can catch-up. 519 endpointStrategies.add(strategy); 520 for (Endpoint endpoint : getEndpoints()) { 521 Endpoint newEndpoint = strategy.registerEndpoint(endpoint.getEndpointUri(), endpoint); 522 if (newEndpoint != null) { 523 // put will replace existing endpoint with the new endpoint 524 endpoints.put(getEndpointKey(endpoint.getEndpointUri()), newEndpoint); 525 } 526 } 527 } 528 } 529 530 /** 531 * Strategy to add the given endpoint to the internal endpoint registry 532 * 533 * @param uri uri of the endpoint 534 * @param endpoint the endpoint to add 535 * @return the added endpoint 536 */ 537 protected Endpoint addEndpointToRegistry(String uri, Endpoint endpoint) { 538 ObjectHelper.notEmpty(uri, "uri"); 539 ObjectHelper.notNull(endpoint, "endpoint"); 540 541 // if there is endpoint strategies, then use the endpoints they return 542 // as this allows to intercept endpoints etc. 543 for (EndpointStrategy strategy : endpointStrategies) { 544 endpoint = strategy.registerEndpoint(uri, endpoint); 545 } 546 endpoints.put(getEndpointKey(uri, endpoint), endpoint); 547 return endpoint; 548 } 549 550 /** 551 * Normalize uri so we can do endpoint hits with minor mistakes and parameters is not in the same order. 552 * 553 * @param uri the uri 554 * @return normalized uri 555 * @throws ResolveEndpointFailedException if uri cannot be normalized 556 */ 557 protected static String normalizeEndpointUri(String uri) { 558 try { 559 uri = URISupport.normalizeUri(uri); 560 } catch (Exception e) { 561 throw new ResolveEndpointFailedException(uri, e); 562 } 563 return uri; 564 } 565 566 /** 567 * Gets the endpoint key to use for lookup or whe adding endpoints to the {@link EndpointRegistry} 568 * 569 * @param uri the endpoint uri 570 * @return the key 571 */ 572 protected EndpointKey getEndpointKey(String uri) { 573 return new EndpointKey(uri); 574 } 575 576 /** 577 * Gets the endpoint key to use for lookup or whe adding endpoints to the {@link EndpointRegistry} 578 * 579 * @param uri the endpoint uri 580 * @param endpoint the endpoint 581 * @return the key 582 */ 583 protected EndpointKey getEndpointKey(String uri, Endpoint endpoint) { 584 if (endpoint != null && !endpoint.isSingleton()) { 585 int counter = endpointKeyCounter.incrementAndGet(); 586 return new EndpointKey(uri + ":" + counter); 587 } else { 588 return new EndpointKey(uri); 589 } 590 } 591 592 // Route Management Methods 593 // ----------------------------------------------------------------------- 594 595 /** 596 * Returns the order in which the route inputs was started. 597 * <p/> 598 * The order may not be according to the startupOrder defined on the route. 599 * For example a route could be started manually later, or new routes added at runtime. 600 * 601 * @return a list in the order how routes was started 602 */ 603 public List<RouteStartupOrder> getRouteStartupOrder() { 604 return routeStartupOrder; 605 } 606 607 public List<Route> getRoutes() { 608 // lets return a copy of the collection as objects are removed later when services are stopped 609 return new ArrayList<Route>(routes); 610 } 611 612 public Route getRoute(String id) { 613 for (Route route : getRoutes()) { 614 if (route.getId().equals(id)) { 615 return route; 616 } 617 } 618 return null; 619 } 620 621 @Deprecated 622 public void setRoutes(List<Route> routes) { 623 throw new UnsupportedOperationException("Overriding existing routes is not supported yet, use addRouteCollection instead"); 624 } 625 626 synchronized void removeRouteCollection(Collection<Route> routes) { 627 this.routes.removeAll(routes); 628 } 629 630 synchronized void addRouteCollection(Collection<Route> routes) throws Exception { 631 this.routes.addAll(routes); 632 } 633 634 public void addRoutes(RoutesBuilder builder) throws Exception { 635 log.debug("Adding routes from builder: {}", builder); 636 // lets now add the routes from the builder 637 builder.addRoutesToCamelContext(this); 638 } 639 640 public synchronized RoutesDefinition loadRoutesDefinition(InputStream is) throws Exception { 641 // load routes using JAXB 642 if (jaxbContext == null) { 643 // must use classloader from CamelContext to have JAXB working 644 jaxbContext = JAXBContext.newInstance(Constants.JAXB_CONTEXT_PACKAGES, CamelContext.class.getClassLoader()); 645 } 646 647 Unmarshaller unmarshaller = jaxbContext.createUnmarshaller(); 648 Object result = unmarshaller.unmarshal(is); 649 650 if (result == null) { 651 throw new IOException("Cannot unmarshal to routes using JAXB from input stream: " + is); 652 } 653 654 // can either be routes or a single route 655 RoutesDefinition answer = null; 656 if (result instanceof RouteDefinition) { 657 RouteDefinition route = (RouteDefinition) result; 658 answer = new RoutesDefinition(); 659 answer.getRoutes().add(route); 660 } else if (result instanceof RoutesDefinition) { 661 answer = (RoutesDefinition) result; 662 } else { 663 throw new IllegalArgumentException("Unmarshalled object is an unsupported type: " + ObjectHelper.className(result) + " -> " + result); 664 } 665 666 return answer; 667 } 668 669 public synchronized void addRouteDefinitions(Collection<RouteDefinition> routeDefinitions) throws Exception { 670 for (RouteDefinition routeDefinition : routeDefinitions) { 671 removeRouteDefinition(routeDefinition); 672 } 673 this.routeDefinitions.addAll(routeDefinitions); 674 if (shouldStartRoutes()) { 675 startRouteDefinitions(routeDefinitions); 676 } 677 } 678 679 public void addRouteDefinition(RouteDefinition routeDefinition) throws Exception { 680 addRouteDefinitions(Arrays.asList(routeDefinition)); 681 } 682 683 /** 684 * Removes the route definition with the given key. 685 * 686 * @return true if one or more routes was removed 687 */ 688 protected boolean removeRouteDefinition(String key) { 689 boolean answer = false; 690 Iterator<RouteDefinition> iter = routeDefinitions.iterator(); 691 while (iter.hasNext()) { 692 RouteDefinition route = iter.next(); 693 if (route.idOrCreate(nodeIdFactory).equals(key)) { 694 iter.remove(); 695 answer = true; 696 } 697 } 698 return answer; 699 } 700 701 public synchronized void removeRouteDefinitions(Collection<RouteDefinition> routeDefinitions) throws Exception { 702 for (RouteDefinition routeDefinition : routeDefinitions) { 703 removeRouteDefinition(routeDefinition); 704 } 705 } 706 707 public synchronized void removeRouteDefinition(RouteDefinition routeDefinition) throws Exception { 708 String id = routeDefinition.idOrCreate(nodeIdFactory); 709 stopRoute(id); 710 removeRoute(id); 711 this.routeDefinitions.remove(routeDefinition); 712 } 713 714 public ServiceStatus getRouteStatus(String key) { 715 RouteService routeService = routeServices.get(key); 716 if (routeService != null) { 717 return routeService.getStatus(); 718 } 719 return null; 720 } 721 722 public void startRoute(RouteDefinition route) throws Exception { 723 // indicate we are staring the route using this thread so 724 // we are able to query this if needed 725 isStartingRoutes.set(true); 726 try { 727 // must ensure route is prepared, before we can start it 728 route.prepare(this); 729 730 List<Route> routes = new ArrayList<Route>(); 731 List<RouteContext> routeContexts = route.addRoutes(this, routes); 732 RouteService routeService = new RouteService(this, route, routeContexts, routes); 733 startRouteService(routeService, true); 734 } finally { 735 // we are done staring routes 736 isStartingRoutes.remove(); 737 } 738 } 739 740 public boolean isStartingRoutes() { 741 Boolean answer = isStartingRoutes.get(); 742 return answer != null && answer; 743 } 744 745 public void stopRoute(RouteDefinition route) throws Exception { 746 stopRoute(route.idOrCreate(nodeIdFactory)); 747 } 748 749 public synchronized void startRoute(String routeId) throws Exception { 750 RouteService routeService = routeServices.get(routeId); 751 if (routeService != null) { 752 startRouteService(routeService, false); 753 } 754 } 755 756 public synchronized void resumeRoute(String routeId) throws Exception { 757 if (!routeSupportsSuspension(routeId)) { 758 // start route if suspension is not supported 759 startRoute(routeId); 760 return; 761 } 762 763 RouteService routeService = routeServices.get(routeId); 764 if (routeService != null) { 765 resumeRouteService(routeService); 766 } 767 } 768 769 public synchronized boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception { 770 RouteService routeService = routeServices.get(routeId); 771 if (routeService != null) { 772 RouteStartupOrder route = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); 773 774 boolean completed = getShutdownStrategy().shutdown(this, route, timeout, timeUnit, abortAfterTimeout); 775 if (completed) { 776 // must stop route service as well 777 stopRouteService(routeService, false); 778 } else { 779 // shutdown was aborted, make sure route is re-started properly 780 startRouteService(routeService, false); 781 } 782 return completed; 783 } 784 return false; 785 } 786 787 public synchronized void stopRoute(String routeId) throws Exception { 788 RouteService routeService = routeServices.get(routeId); 789 if (routeService != null) { 790 List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); 791 RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); 792 routes.add(order); 793 794 getShutdownStrategy().shutdown(this, routes); 795 // must stop route service as well 796 stopRouteService(routeService, false); 797 } 798 } 799 800 public synchronized void stopRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { 801 RouteService routeService = routeServices.get(routeId); 802 if (routeService != null) { 803 List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); 804 RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); 805 routes.add(order); 806 807 getShutdownStrategy().shutdown(this, routes, timeout, timeUnit); 808 // must stop route service as well 809 stopRouteService(routeService, false); 810 } 811 } 812 813 public synchronized void shutdownRoute(String routeId) throws Exception { 814 RouteService routeService = routeServices.get(routeId); 815 if (routeService != null) { 816 List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); 817 RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); 818 routes.add(order); 819 820 getShutdownStrategy().shutdown(this, routes); 821 // must stop route service as well (and remove the routes from management) 822 stopRouteService(routeService, true); 823 } 824 } 825 826 public synchronized void shutdownRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { 827 RouteService routeService = routeServices.get(routeId); 828 if (routeService != null) { 829 List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); 830 RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); 831 routes.add(order); 832 833 getShutdownStrategy().shutdown(this, routes, timeout, timeUnit); 834 // must stop route service as well (and remove the routes from management) 835 stopRouteService(routeService, true); 836 } 837 } 838 839 public synchronized boolean removeRoute(String routeId) throws Exception { 840 RouteService routeService = routeServices.get(routeId); 841 if (routeService != null) { 842 if (getRouteStatus(routeId).isStopped()) { 843 routeService.setRemovingRoutes(true); 844 shutdownRouteService(routeService); 845 removeRouteDefinition(routeId); 846 routeServices.remove(routeId); 847 // remove route from startup order as well, as it was removed 848 Iterator<RouteStartupOrder> it = routeStartupOrder.iterator(); 849 while (it.hasNext()) { 850 RouteStartupOrder order = it.next(); 851 if (order.getRoute().getId().equals(routeId)) { 852 it.remove(); 853 } 854 } 855 return true; 856 } else { 857 return false; 858 } 859 } 860 return false; 861 } 862 863 public synchronized void suspendRoute(String routeId) throws Exception { 864 if (!routeSupportsSuspension(routeId)) { 865 // stop if we suspend is not supported 866 stopRoute(routeId); 867 return; 868 } 869 870 RouteService routeService = routeServices.get(routeId); 871 if (routeService != null) { 872 List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); 873 RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); 874 routes.add(order); 875 876 getShutdownStrategy().suspend(this, routes); 877 // must suspend route service as well 878 suspendRouteService(routeService); 879 } 880 } 881 882 public synchronized void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { 883 if (!routeSupportsSuspension(routeId)) { 884 stopRoute(routeId, timeout, timeUnit); 885 return; 886 } 887 888 RouteService routeService = routeServices.get(routeId); 889 if (routeService != null) { 890 List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); 891 RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoutes().iterator().next(), routeService); 892 routes.add(order); 893 894 getShutdownStrategy().suspend(this, routes, timeout, timeUnit); 895 // must suspend route service as well 896 suspendRouteService(routeService); 897 } 898 } 899 900 public void addService(Object object) throws Exception { 901 doAddService(object, true); 902 } 903 904 private void doAddService(Object object, boolean closeOnShutdown) throws Exception { 905 // inject CamelContext 906 if (object instanceof CamelContextAware) { 907 CamelContextAware aware = (CamelContextAware) object; 908 aware.setCamelContext(this); 909 } 910 911 if (object instanceof Service) { 912 Service service = (Service) object; 913 914 for (LifecycleStrategy strategy : lifecycleStrategies) { 915 if (service instanceof Endpoint) { 916 // use specialized endpoint add 917 strategy.onEndpointAdd((Endpoint) service); 918 } else { 919 strategy.onServiceAdd(this, service, null); 920 } 921 } 922 923 // only add to services to close if its a singleton 924 // otherwise we could for example end up with a lot of prototype scope endpoints 925 boolean singleton = true; // assume singleton by default 926 if (service instanceof IsSingleton) { 927 singleton = ((IsSingleton) service).isSingleton(); 928 } 929 // do not add endpoints as they have their own list 930 if (singleton && !(service instanceof Endpoint)) { 931 // only add to list of services to close if its not already there 932 if (closeOnShutdown && !hasService(service)) { 933 servicesToClose.add(service); 934 } 935 } 936 } 937 938 // and then ensure service is started (as stated in the javadoc) 939 if (object instanceof Service) { 940 startService((Service)object); 941 } else if (object instanceof Collection<?>) { 942 startServices((Collection<?>)object); 943 } 944 } 945 946 public boolean removeService(Object object) throws Exception { 947 if (object instanceof Service) { 948 Service service = (Service) object; 949 950 for (LifecycleStrategy strategy : lifecycleStrategies) { 951 if (service instanceof Endpoint) { 952 // use specialized endpoint remove 953 strategy.onEndpointRemove((Endpoint) service); 954 } else { 955 strategy.onServiceRemove(this, service, null); 956 } 957 } 958 return servicesToClose.remove(service); 959 } 960 return false; 961 } 962 963 public boolean hasService(Object object) { 964 if (object instanceof Service) { 965 Service service = (Service) object; 966 return servicesToClose.contains(service); 967 } 968 return false; 969 } 970 971 public void addStartupListener(StartupListener listener) throws Exception { 972 // either add to listener so we can invoke then later when CamelContext has been started 973 // or invoke the callback right now 974 if (isStarted()) { 975 listener.onCamelContextStarted(this, true); 976 } else { 977 startupListeners.add(listener); 978 } 979 } 980 981 // Helper methods 982 // ----------------------------------------------------------------------- 983 984 public Language resolveLanguage(String language) { 985 Language answer; 986 synchronized (languages) { 987 answer = languages.get(language); 988 989 // check if the language is singleton, if so return the shared instance 990 if (answer instanceof IsSingleton) { 991 boolean singleton = ((IsSingleton) answer).isSingleton(); 992 if (singleton) { 993 return answer; 994 } 995 } 996 997 // language not known or not singleton, then use resolver 998 answer = getLanguageResolver().resolveLanguage(language, this); 999 if (answer != null) { 1000 languages.put(language, answer); 1001 } 1002 } 1003 1004 // no language resolved 1005 return answer; 1006 } 1007 1008 public String getPropertyPrefixToken() { 1009 PropertiesComponent pc = getPropertiesComponent(); 1010 1011 if (pc != null) { 1012 return pc.getPrefixToken(); 1013 } else { 1014 return null; 1015 } 1016 } 1017 1018 public String getPropertySuffixToken() { 1019 PropertiesComponent pc = getPropertiesComponent(); 1020 1021 if (pc != null) { 1022 return pc.getSuffixToken(); 1023 } else { 1024 return null; 1025 } 1026 } 1027 1028 public String resolvePropertyPlaceholders(String text) throws Exception { 1029 // While it is more efficient to only do the lookup if we are sure we need the component, 1030 // with custom tokens, we cannot know if the URI contains a property or not without having 1031 // the component. We also lose fail-fast behavior for the missing component with this change. 1032 PropertiesComponent pc = getPropertiesComponent(); 1033 1034 // Do not parse uris that are designated for the properties component as it will handle that itself 1035 if (text != null && !text.startsWith("properties:")) { 1036 // No component, assume default tokens. 1037 if (pc == null && text.contains(PropertiesComponent.DEFAULT_PREFIX_TOKEN)) { 1038 throw new IllegalArgumentException("PropertiesComponent with name properties must be defined" 1039 + " in CamelContext to support property placeholders."); 1040 1041 // Component available, use actual tokens 1042 } else if (pc != null && text.contains(pc.getPrefixToken())) { 1043 // the parser will throw exception if property key was not found 1044 String answer = pc.parseUri(text); 1045 log.debug("Resolved text: {} -> {}", text, answer); 1046 return answer; 1047 } 1048 } 1049 1050 // return original text as is 1051 return text; 1052 } 1053 1054 // Properties 1055 // ----------------------------------------------------------------------- 1056 1057 public TypeConverter getTypeConverter() { 1058 if (typeConverter == null) { 1059 synchronized (this) { 1060 // we can synchronize on this as there is only one instance 1061 // of the camel context (its the container) 1062 typeConverter = createTypeConverter(); 1063 try { 1064 addService(typeConverter); 1065 } catch (Exception e) { 1066 throw ObjectHelper.wrapRuntimeCamelException(e); 1067 } 1068 } 1069 } 1070 return typeConverter; 1071 } 1072 1073 public void setTypeConverter(TypeConverter typeConverter) { 1074 this.typeConverter = typeConverter; 1075 } 1076 1077 public TypeConverterRegistry getTypeConverterRegistry() { 1078 if (typeConverterRegistry == null) { 1079 // init type converter as its lazy 1080 if (typeConverter == null) { 1081 getTypeConverter(); 1082 } 1083 if (typeConverter instanceof TypeConverterRegistry) { 1084 typeConverterRegistry = (TypeConverterRegistry) typeConverter; 1085 } 1086 } 1087 return typeConverterRegistry; 1088 } 1089 1090 public void setTypeConverterRegistry(TypeConverterRegistry typeConverterRegistry) { 1091 this.typeConverterRegistry = typeConverterRegistry; 1092 } 1093 1094 public Injector getInjector() { 1095 if (injector == null) { 1096 injector = createInjector(); 1097 } 1098 return injector; 1099 } 1100 1101 public void setInjector(Injector injector) { 1102 this.injector = injector; 1103 } 1104 1105 public ManagementMBeanAssembler getManagementMBeanAssembler() { 1106 if (managementMBeanAssembler == null) { 1107 managementMBeanAssembler = createManagementMBeanAssembler(); 1108 } 1109 return managementMBeanAssembler; 1110 } 1111 1112 public void setManagementMBeanAssembler(ManagementMBeanAssembler managementMBeanAssembler) { 1113 this.managementMBeanAssembler = managementMBeanAssembler; 1114 } 1115 1116 public ComponentResolver getComponentResolver() { 1117 if (componentResolver == null) { 1118 componentResolver = createComponentResolver(); 1119 } 1120 return componentResolver; 1121 } 1122 1123 public void setComponentResolver(ComponentResolver componentResolver) { 1124 this.componentResolver = componentResolver; 1125 } 1126 1127 public LanguageResolver getLanguageResolver() { 1128 if (languageResolver == null) { 1129 languageResolver = new DefaultLanguageResolver(); 1130 } 1131 return languageResolver; 1132 } 1133 1134 public void setLanguageResolver(LanguageResolver languageResolver) { 1135 this.languageResolver = languageResolver; 1136 } 1137 1138 public boolean isAutoCreateComponents() { 1139 return autoCreateComponents; 1140 } 1141 1142 public void setAutoCreateComponents(boolean autoCreateComponents) { 1143 this.autoCreateComponents = autoCreateComponents; 1144 } 1145 1146 public Registry getRegistry() { 1147 if (registry == null) { 1148 registry = createRegistry(); 1149 setRegistry(registry); 1150 } 1151 return registry; 1152 } 1153 1154 /** 1155 * Sets the registry to the given JNDI context 1156 * 1157 * @param jndiContext is the JNDI context to use as the registry 1158 * @see #setRegistry(org.apache.camel.spi.Registry) 1159 */ 1160 public void setJndiContext(Context jndiContext) { 1161 setRegistry(new JndiRegistry(jndiContext)); 1162 } 1163 1164 public void setRegistry(Registry registry) { 1165 // wrap the registry so we always do propery placeholder lookups 1166 if (!(registry instanceof PropertyPlaceholderDelegateRegistry)) { 1167 registry = new PropertyPlaceholderDelegateRegistry(this, registry); 1168 } 1169 this.registry = registry; 1170 } 1171 1172 public List<LifecycleStrategy> getLifecycleStrategies() { 1173 return lifecycleStrategies; 1174 } 1175 1176 public void setLifecycleStrategies(List<LifecycleStrategy> lifecycleStrategies) { 1177 this.lifecycleStrategies = lifecycleStrategies; 1178 } 1179 1180 public void addLifecycleStrategy(LifecycleStrategy lifecycleStrategy) { 1181 this.lifecycleStrategies.add(lifecycleStrategy); 1182 } 1183 1184 public synchronized List<RouteDefinition> getRouteDefinitions() { 1185 return routeDefinitions; 1186 } 1187 1188 public synchronized RouteDefinition getRouteDefinition(String id) { 1189 for (RouteDefinition route : routeDefinitions) { 1190 if (route.getId().equals(id)) { 1191 return route; 1192 } 1193 } 1194 return null; 1195 } 1196 1197 public List<InterceptStrategy> getInterceptStrategies() { 1198 return interceptStrategies; 1199 } 1200 1201 public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) { 1202 this.interceptStrategies = interceptStrategies; 1203 } 1204 1205 public void addInterceptStrategy(InterceptStrategy interceptStrategy) { 1206 getInterceptStrategies().add(interceptStrategy); 1207 1208 // for backwards compatible or if user add them here instead of the setXXX methods 1209 1210 if (interceptStrategy instanceof Tracer) { 1211 setTracing(true); 1212 } else if (interceptStrategy instanceof HandleFault) { 1213 setHandleFault(true); 1214 } else if (interceptStrategy instanceof StreamCaching) { 1215 setStreamCaching(true); 1216 } else if (interceptStrategy instanceof Delayer) { 1217 setDelayer(((Delayer)interceptStrategy).getDelay()); 1218 } 1219 } 1220 1221 public void setStreamCaching(Boolean cache) { 1222 this.streamCache = cache; 1223 } 1224 1225 public Boolean isStreamCaching() { 1226 return streamCache; 1227 } 1228 1229 public void setTracing(Boolean tracing) { 1230 this.trace = tracing; 1231 } 1232 1233 public Boolean isTracing() { 1234 return trace; 1235 } 1236 1237 public Boolean isHandleFault() { 1238 return handleFault; 1239 } 1240 1241 public void setHandleFault(Boolean handleFault) { 1242 this.handleFault = handleFault; 1243 } 1244 1245 public Long getDelayer() { 1246 return delay; 1247 } 1248 1249 public void setDelayer(Long delay) { 1250 this.delay = delay; 1251 } 1252 1253 public ProducerTemplate createProducerTemplate() { 1254 int size = CamelContextHelper.getMaximumCachePoolSize(this); 1255 return createProducerTemplate(size); 1256 } 1257 1258 public ProducerTemplate createProducerTemplate(int maximumCacheSize) { 1259 DefaultProducerTemplate answer = new DefaultProducerTemplate(this); 1260 answer.setMaximumCacheSize(maximumCacheSize); 1261 // start it so its ready to use 1262 try { 1263 startService(answer); 1264 } catch (Exception e) { 1265 throw ObjectHelper.wrapRuntimeCamelException(e); 1266 } 1267 return answer; 1268 } 1269 1270 public ConsumerTemplate createConsumerTemplate() { 1271 int size = CamelContextHelper.getMaximumCachePoolSize(this); 1272 return createConsumerTemplate(size); 1273 } 1274 1275 public ConsumerTemplate createConsumerTemplate(int maximumCacheSize) { 1276 DefaultConsumerTemplate answer = new DefaultConsumerTemplate(this); 1277 answer.setMaximumCacheSize(maximumCacheSize); 1278 // start it so its ready to use 1279 try { 1280 startService(answer); 1281 } catch (Exception e) { 1282 throw ObjectHelper.wrapRuntimeCamelException(e); 1283 } 1284 return answer; 1285 } 1286 1287 public ErrorHandlerBuilder getErrorHandlerBuilder() { 1288 return (ErrorHandlerBuilder)errorHandlerBuilder; 1289 } 1290 1291 public void setErrorHandlerBuilder(ErrorHandlerFactory errorHandlerBuilder) { 1292 this.errorHandlerBuilder = errorHandlerBuilder; 1293 } 1294 1295 public synchronized ScheduledExecutorService getErrorHandlerExecutorService() { 1296 if (errorHandlerExecutorService == null) { 1297 // setup default thread pool for error handler 1298 errorHandlerExecutorService = getExecutorServiceManager().newDefaultScheduledThreadPool("ErrorHandlerRedeliveryThreadPool", "ErrorHandlerRedeliveryTask"); 1299 } 1300 return errorHandlerExecutorService; 1301 } 1302 1303 public void setProducerServicePool(ServicePool<Endpoint, Producer> producerServicePool) { 1304 this.producerServicePool = producerServicePool; 1305 } 1306 1307 public ServicePool<Endpoint, Producer> getProducerServicePool() { 1308 return producerServicePool; 1309 } 1310 1311 public String getUptime() { 1312 // compute and log uptime 1313 if (startDate == null) { 1314 return "not started"; 1315 } 1316 long delta = new Date().getTime() - startDate.getTime(); 1317 return TimeUtils.printDuration(delta); 1318 } 1319 1320 @Override 1321 protected void doSuspend() throws Exception { 1322 EventHelper.notifyCamelContextSuspending(this); 1323 1324 log.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is suspending"); 1325 StopWatch watch = new StopWatch(); 1326 1327 // update list of started routes to be suspended 1328 // because we only want to suspend started routes 1329 // (so when we resume we only resume the routes which actually was suspended) 1330 for (Map.Entry<String, RouteService> entry : getRouteServices().entrySet()) { 1331 if (entry.getValue().getStatus().isStarted()) { 1332 suspendedRouteServices.put(entry.getKey(), entry.getValue()); 1333 } 1334 } 1335 1336 // assemble list of startup ordering so routes can be shutdown accordingly 1337 List<RouteStartupOrder> orders = new ArrayList<RouteStartupOrder>(); 1338 for (Map.Entry<String, RouteService> entry : suspendedRouteServices.entrySet()) { 1339 Route route = entry.getValue().getRoutes().iterator().next(); 1340 Integer order = entry.getValue().getRouteDefinition().getStartupOrder(); 1341 if (order == null) { 1342 order = defaultRouteStartupOrder++; 1343 } 1344 orders.add(new DefaultRouteStartupOrder(order, route, entry.getValue())); 1345 } 1346 1347 // suspend routes using the shutdown strategy so it can shutdown in correct order 1348 // routes which doesn't support suspension will be stopped instead 1349 getShutdownStrategy().suspend(this, orders); 1350 1351 // mark the route services as suspended or stopped 1352 for (RouteService service : suspendedRouteServices.values()) { 1353 if (routeSupportsSuspension(service.getId())) { 1354 service.suspend(); 1355 } else { 1356 service.stop(); 1357 } 1358 } 1359 1360 watch.stop(); 1361 if (log.isInfoEnabled()) { 1362 log.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is suspended in " + TimeUtils.printDuration(watch.taken())); 1363 } 1364 1365 EventHelper.notifyCamelContextSuspended(this); 1366 } 1367 1368 @Override 1369 protected void doResume() throws Exception { 1370 try { 1371 EventHelper.notifyCamelContextResuming(this); 1372 1373 log.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is resuming"); 1374 StopWatch watch = new StopWatch(); 1375 1376 // start the suspended routes (do not check for route clashes, and indicate) 1377 doStartOrResumeRoutes(suspendedRouteServices, false, true, true, false); 1378 1379 // mark the route services as resumed (will be marked as started) as well 1380 for (RouteService service : suspendedRouteServices.values()) { 1381 if (routeSupportsSuspension(service.getId())) { 1382 service.resume(); 1383 } else { 1384 service.start(); 1385 } 1386 } 1387 1388 watch.stop(); 1389 if (log.isInfoEnabled()) { 1390 log.info("Resumed " + suspendedRouteServices.size() + " routes"); 1391 log.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") resumed in " + TimeUtils.printDuration(watch.taken())); 1392 } 1393 1394 // and clear the list as they have been resumed 1395 suspendedRouteServices.clear(); 1396 1397 EventHelper.notifyCamelContextResumed(this); 1398 } catch (Exception e) { 1399 EventHelper.notifyCamelContextResumeFailed(this, e); 1400 throw e; 1401 } 1402 } 1403 1404 public void start() throws Exception { 1405 startDate = new Date(); 1406 stopWatch.restart(); 1407 log.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is starting"); 1408 1409 doNotStartRoutesOnFirstStart = !firstStartDone && !isAutoStartup(); 1410 1411 // if the context was configured with auto startup = false, and we are already started, 1412 // then we may need to start the routes on the 2nd start call 1413 if (firstStartDone && !isAutoStartup() && isStarted()) { 1414 // invoke this logic to warmup the routes and if possible also start the routes 1415 doStartOrResumeRoutes(routeServices, true, true, false, true); 1416 } 1417 1418 // super will invoke doStart which will prepare internal services and start routes etc. 1419 try { 1420 firstStartDone = true; 1421 super.start(); 1422 } catch (VetoCamelContextStartException e) { 1423 if (e.isRethrowException()) { 1424 throw e; 1425 } else { 1426 log.info("CamelContext ({}) vetoed to not start due {}", getName(), e.getMessage()); 1427 // swallow exception and change state of this camel context to stopped 1428 stop(); 1429 return; 1430 } 1431 } 1432 1433 stopWatch.stop(); 1434 if (log.isInfoEnabled()) { 1435 // count how many routes are actually started 1436 int started = 0; 1437 for (Route route : getRoutes()) { 1438 if (getRouteStatus(route.getId()).isStarted()) { 1439 started++; 1440 } 1441 } 1442 log.info("Total " + getRoutes().size() + " routes, of which " + started + " is started."); 1443 log.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") started in " + TimeUtils.printDuration(stopWatch.taken())); 1444 } 1445 EventHelper.notifyCamelContextStarted(this); 1446 } 1447 1448 // Implementation methods 1449 // ----------------------------------------------------------------------- 1450 1451 protected synchronized void doStart() throws Exception { 1452 try { 1453 doStartCamel(); 1454 } catch (Exception e) { 1455 // fire event that we failed to start 1456 EventHelper.notifyCamelContextStartupFailed(this, e); 1457 // rethrow cause 1458 throw e; 1459 } 1460 } 1461 1462 private void doStartCamel() throws Exception { 1463 if (isStreamCaching()) { 1464 // only add a new stream cache if not already configured 1465 if (StreamCaching.getStreamCaching(this) == null) { 1466 log.info("StreamCaching is enabled on CamelContext: " + getName()); 1467 addInterceptStrategy(new StreamCaching()); 1468 } 1469 } 1470 1471 if (isTracing()) { 1472 // tracing is added in the DefaultChannel so we can enable it on the fly 1473 log.info("Tracing is enabled on CamelContext: " + getName()); 1474 } 1475 1476 if (isUseMDCLogging()) { 1477 // log if MDC has been enabled 1478 log.info("MDC logging is enabled on CamelContext: " + getName()); 1479 } 1480 1481 if (isHandleFault()) { 1482 // only add a new handle fault if not already configured 1483 if (HandleFault.getHandleFault(this) == null) { 1484 log.info("HandleFault is enabled on CamelContext: " + getName()); 1485 addInterceptStrategy(new HandleFault()); 1486 } 1487 } 1488 1489 if (getDelayer() != null && getDelayer() > 0) { 1490 // only add a new delayer if not already configured 1491 if (Delayer.getDelayer(this) == null) { 1492 long millis = getDelayer(); 1493 log.info("Delayer is enabled with: " + millis + " ms. on CamelContext: " + getName()); 1494 addInterceptStrategy(new Delayer(millis)); 1495 } 1496 } 1497 1498 // register debugger 1499 if (getDebugger() != null) { 1500 log.info("Debugger: " + getDebugger() + " is enabled on CamelContext: " + getName()); 1501 // register this camel context on the debugger 1502 getDebugger().setCamelContext(this); 1503 startService(getDebugger()); 1504 addInterceptStrategy(new Debug(getDebugger())); 1505 } 1506 1507 // start management strategy before lifecycles are started 1508 ManagementStrategy managementStrategy = getManagementStrategy(); 1509 // inject CamelContext if aware 1510 if (managementStrategy instanceof CamelContextAware) { 1511 ((CamelContextAware) managementStrategy).setCamelContext(this); 1512 } 1513 ServiceHelper.startService(managementStrategy); 1514 1515 // start lifecycle strategies 1516 ServiceHelper.startServices(lifecycleStrategies); 1517 Iterator<LifecycleStrategy> it = lifecycleStrategies.iterator(); 1518 while (it.hasNext()) { 1519 LifecycleStrategy strategy = it.next(); 1520 try { 1521 strategy.onContextStart(this); 1522 } catch (VetoCamelContextStartException e) { 1523 // okay we should not start Camel since it was vetoed 1524 log.warn("Lifecycle strategy vetoed starting CamelContext ({}) due {}", getName(), e.getMessage()); 1525 throw e; 1526 } catch (Exception e) { 1527 log.warn("Lifecycle strategy " + strategy + " failed starting CamelContext ({}) due {}", getName(), e.getMessage()); 1528 throw e; 1529 } 1530 } 1531 1532 // start notifiers as services 1533 for (EventNotifier notifier : getManagementStrategy().getEventNotifiers()) { 1534 if (notifier instanceof Service) { 1535 Service service = (Service) notifier; 1536 for (LifecycleStrategy strategy : lifecycleStrategies) { 1537 strategy.onServiceAdd(this, service, null); 1538 } 1539 } 1540 if (notifier instanceof Service) { 1541 startService((Service)notifier); 1542 } 1543 } 1544 1545 // must let some bootstrap service be started before we can notify the starting event 1546 EventHelper.notifyCamelContextStarting(this); 1547 1548 forceLazyInitialization(); 1549 1550 // re-create endpoint registry as the cache size limit may be set after the constructor of this instance was called. 1551 // and we needed to create endpoints up-front as it may be accessed before this context is started 1552 endpoints = new EndpointRegistry(this, endpoints); 1553 addService(endpoints); 1554 // special for executorServiceManager as want to stop it manually 1555 doAddService(executorServiceManager, false); 1556 addService(producerServicePool); 1557 addService(inflightRepository); 1558 addService(shutdownStrategy); 1559 addService(packageScanClassResolver); 1560 1561 // eager lookup any configured properties component to avoid subsequent lookup attempts which may impact performance 1562 // due we use properties component for property placeholder resolution at runtime 1563 Component existing = hasComponent("properties"); 1564 if (existing == null) { 1565 // no existing properties component so lookup and add as component if possible 1566 propertiesComponent = getRegistry().lookup("properties", PropertiesComponent.class); 1567 if (propertiesComponent != null) { 1568 addComponent("properties", propertiesComponent); 1569 } 1570 } else { 1571 // store reference to the existing properties component 1572 if (existing instanceof PropertiesComponent) { 1573 propertiesComponent = (PropertiesComponent) existing; 1574 } else { 1575 // properties component must be expected type 1576 throw new IllegalArgumentException("Found properties component of type: " + existing.getClass() + " instead of expected: " + PropertiesComponent.class); 1577 } 1578 } 1579 1580 // start components 1581 startServices(components.values()); 1582 1583 // fuse specific 1584 FabricTracer tracer = new FabricTracer(this); 1585 addService(tracer); 1586 addInterceptStrategy(tracer); 1587 1588 // start the route definitions before the routes is started 1589 startRouteDefinitions(routeDefinitions); 1590 1591 // start routes 1592 if (doNotStartRoutesOnFirstStart) { 1593 log.debug("Skip starting of routes as CamelContext has been configured with autoStartup=false"); 1594 } 1595 1596 // invoke this logic to warmup the routes and if possible also start the routes 1597 doStartOrResumeRoutes(routeServices, true, !doNotStartRoutesOnFirstStart, false, true); 1598 1599 // starting will continue in the start method 1600 } 1601 1602 protected synchronized void doStop() throws Exception { 1603 stopWatch.restart(); 1604 log.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is shutting down"); 1605 EventHelper.notifyCamelContextStopping(this); 1606 1607 // stop route inputs in the same order as they was started so we stop the very first inputs first 1608 try { 1609 // force shutting down routes as they may otherwise cause shutdown to hang 1610 shutdownStrategy.shutdownForced(this, getRouteStartupOrder()); 1611 } catch (Throwable e) { 1612 log.warn("Error occurred while shutting down routes. This exception will be ignored.", e); 1613 } 1614 getRouteStartupOrder().clear(); 1615 1616 shutdownServices(routeServices.values()); 1617 // do not clear route services or startup listeners as we can start Camel again and get the route back as before 1618 1619 // but clear any suspend routes 1620 suspendedRouteServices.clear(); 1621 1622 // the stop order is important 1623 1624 // shutdown default error handler thread pool 1625 if (errorHandlerExecutorService != null) { 1626 getExecutorServiceManager().shutdown(errorHandlerExecutorService); 1627 errorHandlerExecutorService = null; 1628 } 1629 1630 // shutdown debugger 1631 ServiceHelper.stopAndShutdownService(getDebugger()); 1632 1633 shutdownServices(endpoints.values()); 1634 endpoints.clear(); 1635 1636 shutdownServices(components.values()); 1637 components.clear(); 1638 1639 try { 1640 for (LifecycleStrategy strategy : lifecycleStrategies) { 1641 strategy.onContextStop(this); 1642 } 1643 } catch (Throwable e) { 1644 log.warn("Error occurred while stopping lifecycle strategies. This exception will be ignored.", e); 1645 } 1646 1647 // shutdown services as late as possible 1648 shutdownServices(servicesToClose); 1649 servicesToClose.clear(); 1650 1651 // must notify that we are stopped before stopping the management strategy 1652 EventHelper.notifyCamelContextStopped(this); 1653 1654 // stop the notifier service 1655 for (EventNotifier notifier : getManagementStrategy().getEventNotifiers()) { 1656 shutdownServices(notifier); 1657 } 1658 1659 // shutdown executor service and management as the last one 1660 shutdownServices(executorServiceManager); 1661 shutdownServices(managementStrategy); 1662 shutdownServices(lifecycleStrategies); 1663 // do not clear lifecycleStrategies as we can start Camel again and get the route back as before 1664 1665 // stop the lazy created so they can be re-created on restart 1666 forceStopLazyInitialization(); 1667 1668 stopWatch.stop(); 1669 if (log.isInfoEnabled()) { 1670 log.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") is shutdown in " + TimeUtils.printDuration(stopWatch.taken()) + ". Uptime " + getUptime() + "."); 1671 } 1672 1673 // and clear start date 1674 startDate = null; 1675 } 1676 1677 /** 1678 * Starts or resumes the routes 1679 * 1680 * @param routeServices the routes to start (will only start a route if its not already started) 1681 * @param checkClash whether to check for startup ordering clash 1682 * @param startConsumer whether the route consumer should be started. Can be used to warmup the route without starting the consumer. 1683 * @param resumeConsumer whether the route consumer should be resumed. 1684 * @param addingRoutes whether we are adding new routes 1685 * @throws Exception is thrown if error starting routes 1686 */ 1687 protected void doStartOrResumeRoutes(Map<String, RouteService> routeServices, boolean checkClash, 1688 boolean startConsumer, boolean resumeConsumer, boolean addingRoutes) throws Exception { 1689 // filter out already started routes 1690 Map<String, RouteService> filtered = new LinkedHashMap<String, RouteService>(); 1691 for (Map.Entry<String, RouteService> entry : routeServices.entrySet()) { 1692 boolean startable = false; 1693 1694 Consumer consumer = entry.getValue().getRoutes().iterator().next().getConsumer(); 1695 if (consumer instanceof SuspendableService) { 1696 // consumer could be suspended, which is not reflected in the RouteService status 1697 startable = ((SuspendableService) consumer).isSuspended(); 1698 } 1699 1700 if (!startable && consumer instanceof StatefulService) { 1701 // consumer could be stopped, which is not reflected in the RouteService status 1702 startable = ((StatefulService) consumer).getStatus().isStartable(); 1703 } else if (!startable) { 1704 // no consumer so use state from route service 1705 startable = entry.getValue().getStatus().isStartable(); 1706 } 1707 1708 if (startable) { 1709 filtered.put(entry.getKey(), entry.getValue()); 1710 } 1711 } 1712 1713 if (!filtered.isEmpty()) { 1714 // the context is now considered started (i.e. isStarted() == true)) 1715 // starting routes is done after, not during context startup 1716 safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values()); 1717 } 1718 1719 // now notify any startup aware listeners as all the routes etc has been started, 1720 // allowing the listeners to do custom work after routes has been started 1721 for (StartupListener startup : startupListeners) { 1722 startup.onCamelContextStarted(this, isStarted()); 1723 } 1724 } 1725 1726 protected boolean routeSupportsSuspension(String routeId) { 1727 RouteService routeService = routeServices.get(routeId); 1728 if (routeService != null) { 1729 return routeService.getRoutes().iterator().next().supportsSuspension(); 1730 } 1731 return false; 1732 } 1733 1734 private void shutdownServices(Object service) { 1735 // do not rethrow exception as we want to keep shutting down in case of problems 1736 1737 // allow us to do custom work before delegating to service helper 1738 try { 1739 if (service instanceof Service) { 1740 ServiceHelper.stopAndShutdownService(service); 1741 } else if (service instanceof Collection) { 1742 ServiceHelper.stopAndShutdownServices((Collection<?>)service); 1743 } 1744 } catch (Throwable e) { 1745 log.warn("Error occurred while shutting down service: " + service + ". This exception will be ignored.", e); 1746 // fire event 1747 EventHelper.notifyServiceStopFailure(this, service, e); 1748 } 1749 } 1750 1751 private void shutdownServices(Collection<?> services) { 1752 // reverse stopping by default 1753 shutdownServices(services, true); 1754 } 1755 1756 private void shutdownServices(Collection<?> services, boolean reverse) { 1757 Collection<Object> list = CastUtils.cast(services); 1758 if (reverse) { 1759 List<Object> reverseList = new ArrayList<Object>(services); 1760 Collections.reverse(reverseList); 1761 list = reverseList; 1762 } 1763 1764 for (Object service : list) { 1765 shutdownServices(service); 1766 } 1767 } 1768 1769 private void startService(Service service) throws Exception { 1770 // and register startup aware so they can be notified when 1771 // camel context has been started 1772 if (service instanceof StartupListener) { 1773 StartupListener listener = (StartupListener) service; 1774 addStartupListener(listener); 1775 } 1776 1777 service.start(); 1778 } 1779 1780 private void startServices(Collection<?> services) throws Exception { 1781 for (Object element : services) { 1782 if (element instanceof Service) { 1783 startService((Service)element); 1784 } 1785 } 1786 } 1787 1788 private void stopServices(Object service) throws Exception { 1789 // allow us to do custom work before delegating to service helper 1790 try { 1791 ServiceHelper.stopService(service); 1792 } catch (Exception e) { 1793 // fire event 1794 EventHelper.notifyServiceStopFailure(this, service, e); 1795 // rethrow to signal error with stopping 1796 throw e; 1797 } 1798 } 1799 1800 protected void startRouteDefinitions(Collection<RouteDefinition> list) throws Exception { 1801 if (list != null) { 1802 for (RouteDefinition route : list) { 1803 startRoute(route); 1804 } 1805 } 1806 } 1807 1808 /** 1809 * Starts the given route service 1810 */ 1811 protected synchronized void startRouteService(RouteService routeService, boolean addingRoutes) throws Exception { 1812 // we may already be starting routes so remember this, so we can unset accordingly in finally block 1813 boolean alreadyStartingRoutes = isStartingRoutes(); 1814 if (!alreadyStartingRoutes) { 1815 isStartingRoutes.set(true); 1816 } 1817 1818 try { 1819 // the route service could have been suspended, and if so then resume it instead 1820 if (routeService.getStatus().isSuspended()) { 1821 resumeRouteService(routeService); 1822 } else { 1823 // start the route service 1824 routeServices.put(routeService.getId(), routeService); 1825 if (shouldStartRoutes()) { 1826 // this method will log the routes being started 1827 safelyStartRouteServices(true, true, true, false, addingRoutes, routeService); 1828 // start route services if it was configured to auto startup and we are not adding routes 1829 boolean autoStartup = routeService.getRouteDefinition().isAutoStartup(this); 1830 if (!addingRoutes || autoStartup) { 1831 // start the route since auto start is enabled or we are starting a route (not adding new routes) 1832 routeService.start(); 1833 } 1834 } 1835 } 1836 } finally { 1837 if (!alreadyStartingRoutes) { 1838 isStartingRoutes.remove(); 1839 } 1840 } 1841 } 1842 1843 /** 1844 * Resumes the given route service 1845 */ 1846 protected synchronized void resumeRouteService(RouteService routeService) throws Exception { 1847 // the route service could have been stopped, and if so then start it instead 1848 if (!routeService.getStatus().isSuspended()) { 1849 startRouteService(routeService, false); 1850 } else { 1851 // resume the route service 1852 if (shouldStartRoutes()) { 1853 // this method will log the routes being started 1854 safelyStartRouteServices(true, false, true, true, false, routeService); 1855 // must resume route service as well 1856 routeService.resume(); 1857 } 1858 } 1859 } 1860 1861 protected synchronized void stopRouteService(RouteService routeService, boolean removingRoutes) throws Exception { 1862 routeService.setRemovingRoutes(removingRoutes); 1863 stopRouteService(routeService); 1864 } 1865 1866 protected void logRouteState(Route route, String state) { 1867 if (log.isInfoEnabled()) { 1868 if (route.getConsumer() != null) { 1869 log.info("Route: {} is {}, was consuming from: {}", new Object[]{route.getId(), state, route.getConsumer().getEndpoint()}); 1870 } else { 1871 log.info("Route: {} is {}.", route.getId(), state); 1872 } 1873 } 1874 } 1875 1876 protected synchronized void stopRouteService(RouteService routeService) throws Exception { 1877 routeService.stop(); 1878 for (Route route : routeService.getRoutes()) { 1879 logRouteState(route, "stopped"); 1880 } 1881 } 1882 1883 protected synchronized void shutdownRouteService(RouteService routeService) throws Exception { 1884 routeService.shutdown(); 1885 for (Route route : routeService.getRoutes()) { 1886 logRouteState(route, "shutdown and removed"); 1887 } 1888 } 1889 1890 protected synchronized void suspendRouteService(RouteService routeService) throws Exception { 1891 routeService.setRemovingRoutes(false); 1892 routeService.suspend(); 1893 for (Route route : routeService.getRoutes()) { 1894 logRouteState(route, "suspended"); 1895 } 1896 } 1897 1898 /** 1899 * Starts the routes services in a proper manner which ensures the routes will be started in correct order, 1900 * check for clash and that the routes will also be shutdown in correct order as well. 1901 * <p/> 1902 * This method <b>must</b> be used to start routes in a safe manner. 1903 * 1904 * @param checkClash whether to check for startup order clash 1905 * @param startConsumer whether the route consumer should be started. Can be used to warmup the route without starting the consumer. 1906 * @param resumeConsumer whether the route consumer should be resumed. 1907 * @param addingRoutes whether we are adding new routes 1908 * @param routeServices the routes 1909 * @throws Exception is thrown if error starting the routes 1910 */ 1911 protected synchronized void safelyStartRouteServices(boolean checkClash, boolean startConsumer, boolean resumeConsumer, 1912 boolean addingRoutes, Collection<RouteService> routeServices) throws Exception { 1913 // list of inputs to start when all the routes have been prepared for starting 1914 // we use a tree map so the routes will be ordered according to startup order defined on the route 1915 Map<Integer, DefaultRouteStartupOrder> inputs = new TreeMap<Integer, DefaultRouteStartupOrder>(); 1916 1917 // figure out the order in which the routes should be started 1918 for (RouteService routeService : routeServices) { 1919 DefaultRouteStartupOrder order = doPrepareRouteToBeStarted(routeService); 1920 // check for clash before we add it as input 1921 if (checkClash) { 1922 doCheckStartupOrderClash(order, inputs); 1923 } 1924 inputs.put(order.getStartupOrder(), order); 1925 } 1926 1927 // warm up routes before we start them 1928 doWarmUpRoutes(inputs, startConsumer); 1929 1930 if (startConsumer) { 1931 if (resumeConsumer) { 1932 // and now resume the routes 1933 doResumeRouteConsumers(inputs, addingRoutes); 1934 } else { 1935 // and now start the routes 1936 // and check for clash with multiple consumers of the same endpoints which is not allowed 1937 doStartRouteConsumers(inputs, addingRoutes); 1938 } 1939 } 1940 1941 // inputs no longer needed 1942 inputs.clear(); 1943 } 1944 1945 /** 1946 * @see #safelyStartRouteServices(boolean,boolean,boolean,boolean,java.util.Collection) 1947 */ 1948 protected synchronized void safelyStartRouteServices(boolean forceAutoStart, boolean checkClash, boolean startConsumer, 1949 boolean resumeConsumer, boolean addingRoutes, RouteService... routeServices) throws Exception { 1950 safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, Arrays.asList(routeServices)); 1951 } 1952 1953 private DefaultRouteStartupOrder doPrepareRouteToBeStarted(RouteService routeService) { 1954 // add the inputs from this route service to the list to start afterwards 1955 // should be ordered according to the startup number 1956 Integer startupOrder = routeService.getRouteDefinition().getStartupOrder(); 1957 if (startupOrder == null) { 1958 // auto assign a default startup order 1959 startupOrder = defaultRouteStartupOrder++; 1960 } 1961 1962 // create holder object that contains information about this route to be started 1963 Route route = routeService.getRoutes().iterator().next(); 1964 return new DefaultRouteStartupOrder(startupOrder, route, routeService); 1965 } 1966 1967 private boolean doCheckStartupOrderClash(DefaultRouteStartupOrder answer, Map<Integer, DefaultRouteStartupOrder> inputs) throws FailedToStartRouteException { 1968 // TODO: There could potential be routeId clash as well, so we should check for that as well 1969 1970 // check for clash by startupOrder id 1971 DefaultRouteStartupOrder other = inputs.get(answer.getStartupOrder()); 1972 if (other != null && answer != other) { 1973 String otherId = other.getRoute().getId(); 1974 throw new FailedToStartRouteException(answer.getRoute().getId(), "startupOrder clash. Route " + otherId + " already has startupOrder " 1975 + answer.getStartupOrder() + " configured which this route have as well. Please correct startupOrder to be unique among all your routes."); 1976 } 1977 // check in existing already started as well 1978 for (RouteStartupOrder order : routeStartupOrder) { 1979 String otherId = order.getRoute().getId(); 1980 if (answer.getRoute().getId().equals(otherId)) { 1981 // its the same route id so skip clash check as its the same route (can happen when using suspend/resume) 1982 } else if (answer.getStartupOrder() == order.getStartupOrder()) { 1983 throw new FailedToStartRouteException(answer.getRoute().getId(), "startupOrder clash. Route " + otherId + " already has startupOrder " 1984 + answer.getStartupOrder() + " configured which this route have as well. Please correct startupOrder to be unique among all your routes."); 1985 } 1986 } 1987 return true; 1988 } 1989 1990 private void doWarmUpRoutes(Map<Integer, DefaultRouteStartupOrder> inputs, boolean autoStartup) throws Exception { 1991 // now prepare the routes by starting its services before we start the input 1992 for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) { 1993 // defer starting inputs till later as we want to prepare the routes by starting 1994 // all their processors and child services etc. 1995 // then later we open the floods to Camel by starting the inputs 1996 // what this does is to ensure Camel is more robust on starting routes as all routes 1997 // will then be prepared in time before we start inputs which will consume messages to be routed 1998 RouteService routeService = entry.getValue().getRouteService(); 1999 log.debug("Warming up route id: {} having autoStartup={}", routeService.getId(), autoStartup); 2000 routeService.warmUp(); 2001 } 2002 } 2003 2004 private void doResumeRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean addingRoutes) throws Exception { 2005 doStartOrResumeRouteConsumers(inputs, true, addingRoutes); 2006 } 2007 2008 private void doStartRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean addingRoutes) throws Exception { 2009 doStartOrResumeRouteConsumers(inputs, false, addingRoutes); 2010 } 2011 2012 private void doStartOrResumeRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean resumeOnly, boolean addingRoute) throws Exception { 2013 List<Endpoint> routeInputs = new ArrayList<Endpoint>(); 2014 2015 for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) { 2016 Integer order = entry.getKey(); 2017 Route route = entry.getValue().getRoute(); 2018 RouteService routeService = entry.getValue().getRouteService(); 2019 2020 // if we are starting camel, then skip routes which are configured to not be auto started 2021 boolean autoStartup = routeService.getRouteDefinition().isAutoStartup(this); 2022 if (addingRoute && !autoStartup) { 2023 log.info("Skipping starting of route " + routeService.getId() + " as its configured with autoStartup=false"); 2024 continue; 2025 } 2026 2027 // start the service 2028 for (Consumer consumer : routeService.getInputs().values()) { 2029 Endpoint endpoint = consumer.getEndpoint(); 2030 2031 // check multiple consumer violation, with the other routes to be started 2032 if (!doCheckMultipleConsumerSupportClash(endpoint, routeInputs)) { 2033 throw new FailedToStartRouteException(routeService.getId(), 2034 "Multiple consumers for the same endpoint is not allowed: " + endpoint); 2035 } 2036 2037 // check for multiple consumer violations with existing routes which 2038 // have already been started, or is currently starting 2039 List<Endpoint> existingEndpoints = new ArrayList<Endpoint>(); 2040 for (Route existingRoute : getRoutes()) { 2041 if (route.getId().equals(existingRoute.getId())) { 2042 // skip ourselves 2043 continue; 2044 } 2045 Endpoint existing = existingRoute.getEndpoint(); 2046 ServiceStatus status = getRouteStatus(existingRoute.getId()); 2047 if (status != null && (status.isStarted() || status.isStarting())) { 2048 existingEndpoints.add(existing); 2049 } 2050 } 2051 if (!doCheckMultipleConsumerSupportClash(endpoint, existingEndpoints)) { 2052 throw new FailedToStartRouteException(routeService.getId(), 2053 "Multiple consumers for the same endpoint is not allowed: " + endpoint); 2054 } 2055 2056 // start the consumer on the route 2057 log.debug("Route: {} >>> {}", route.getId(), route); 2058 if (resumeOnly) { 2059 log.debug("Resuming consumer (order: {}) on route: {}", order, route.getId()); 2060 } else { 2061 log.debug("Starting consumer (order: {}) on route: {}", order, route.getId()); 2062 } 2063 2064 if (resumeOnly && route.supportsSuspension()) { 2065 // if we are resuming and the route can be resumed 2066 ServiceHelper.resumeService(consumer); 2067 log.info("Route: " + route.getId() + " resumed and consuming from: " + endpoint); 2068 } else { 2069 // when starting we should invoke the lifecycle strategies 2070 for (LifecycleStrategy strategy : lifecycleStrategies) { 2071 strategy.onServiceAdd(this, consumer, route); 2072 } 2073 startService(consumer); 2074 log.info("Route: " + route.getId() + " started and consuming from: " + endpoint); 2075 } 2076 2077 routeInputs.add(endpoint); 2078 2079 // add to the order which they was started, so we know how to stop them in reverse order 2080 // but only add if we haven't already registered it before (we dont want to double add when restarting) 2081 boolean found = false; 2082 for (RouteStartupOrder other : routeStartupOrder) { 2083 if (other.getRoute().getId() == route.getId()) { 2084 found = true; 2085 break; 2086 } 2087 } 2088 if (!found) { 2089 routeStartupOrder.add(entry.getValue()); 2090 } 2091 } 2092 2093 if (resumeOnly) { 2094 routeService.resume(); 2095 } else { 2096 // and start the route service (no need to start children as they are already warmed up) 2097 routeService.start(false); 2098 } 2099 } 2100 } 2101 2102 private boolean doCheckMultipleConsumerSupportClash(Endpoint endpoint, List<Endpoint> routeInputs) { 2103 // is multiple consumers supported 2104 boolean multipleConsumersSupported = false; 2105 if (endpoint instanceof MultipleConsumersSupport) { 2106 multipleConsumersSupported = ((MultipleConsumersSupport) endpoint).isMultipleConsumersSupported(); 2107 } 2108 2109 if (multipleConsumersSupported) { 2110 // multiple consumer allowed, so return true 2111 return true; 2112 } 2113 2114 // check in progress list 2115 if (routeInputs.contains(endpoint)) { 2116 return false; 2117 } 2118 2119 return true; 2120 } 2121 2122 /** 2123 * Force some lazy initialization to occur upfront before we start any 2124 * components and create routes 2125 */ 2126 protected void forceLazyInitialization() { 2127 getInjector(); 2128 getLanguageResolver(); 2129 getTypeConverterRegistry(); 2130 getTypeConverter(); 2131 } 2132 2133 /** 2134 * Force clear lazy initialization so they can be re-created on restart 2135 */ 2136 protected void forceStopLazyInitialization() { 2137 injector = null; 2138 languageResolver = null; 2139 typeConverterRegistry = null; 2140 typeConverter = null; 2141 } 2142 2143 /** 2144 * Lazily create a default implementation 2145 */ 2146 protected TypeConverter createTypeConverter() { 2147 BaseTypeConverterRegistry answer; 2148 if (isLazyLoadTypeConverters()) { 2149 answer = new LazyLoadingTypeConverter(packageScanClassResolver, getInjector(), getDefaultFactoryFinder()); 2150 } else { 2151 answer = new DefaultTypeConverter(packageScanClassResolver, getInjector(), getDefaultFactoryFinder()); 2152 } 2153 setTypeConverterRegistry(answer); 2154 return answer; 2155 } 2156 2157 /** 2158 * Lazily create a default implementation 2159 */ 2160 protected Injector createInjector() { 2161 FactoryFinder finder = getDefaultFactoryFinder(); 2162 try { 2163 return (Injector) finder.newInstance("Injector"); 2164 } catch (NoFactoryAvailableException e) { 2165 // lets use the default injector 2166 return new DefaultInjector(this); 2167 } 2168 } 2169 2170 /** 2171 * Lazily create a default implementation 2172 */ 2173 protected ManagementMBeanAssembler createManagementMBeanAssembler() { 2174 return new DefaultManagementMBeanAssembler(); 2175 } 2176 2177 /** 2178 * Lazily create a default implementation 2179 */ 2180 protected ComponentResolver createComponentResolver() { 2181 return new DefaultComponentResolver(); 2182 } 2183 2184 /** 2185 * Lazily create a default implementation 2186 */ 2187 protected Registry createRegistry() { 2188 return new JndiRegistry(); 2189 } 2190 2191 /** 2192 * A pluggable strategy to allow an endpoint to be created without requiring 2193 * a component to be its factory, such as for looking up the URI inside some 2194 * {@link Registry} 2195 * 2196 * @param uri the uri for the endpoint to be created 2197 * @return the newly created endpoint or null if it could not be resolved 2198 */ 2199 protected Endpoint createEndpoint(String uri) { 2200 Object value = getRegistry().lookup(uri); 2201 if (value instanceof Endpoint) { 2202 return (Endpoint) value; 2203 } else if (value instanceof Processor) { 2204 return new ProcessorEndpoint(uri, this, (Processor) value); 2205 } else if (value != null) { 2206 return convertBeanToEndpoint(uri, value); 2207 } 2208 return null; 2209 } 2210 2211 /** 2212 * Strategy method for attempting to convert the bean from a {@link Registry} to an endpoint using 2213 * some kind of transformation or wrapper 2214 * 2215 * @param uri the uri for the endpoint (and name in the registry) 2216 * @param bean the bean to be converted to an endpoint, which will be not null 2217 * @return a new endpoint 2218 */ 2219 protected Endpoint convertBeanToEndpoint(String uri, Object bean) { 2220 throw new IllegalArgumentException("uri: " + uri + " bean: " + bean 2221 + " could not be converted to an Endpoint"); 2222 } 2223 2224 /** 2225 * Should we start newly added routes? 2226 */ 2227 protected boolean shouldStartRoutes() { 2228 return isStarted() && !isStarting(); 2229 } 2230 2231 /** 2232 * Gets the properties component in use. 2233 * Returns {@code null} if no properties component is in use. 2234 */ 2235 protected PropertiesComponent getPropertiesComponent() { 2236 return propertiesComponent; 2237 } 2238 2239 public void setDataFormats(Map<String, DataFormatDefinition> dataFormats) { 2240 this.dataFormats = dataFormats; 2241 } 2242 2243 public Map<String, DataFormatDefinition> getDataFormats() { 2244 return dataFormats; 2245 } 2246 2247 public Map<String, String> getProperties() { 2248 return properties; 2249 } 2250 2251 public void setProperties(Map<String, String> properties) { 2252 this.properties = properties; 2253 } 2254 2255 public FactoryFinder getDefaultFactoryFinder() { 2256 if (defaultFactoryFinder == null) { 2257 defaultFactoryFinder = factoryFinderResolver.resolveDefaultFactoryFinder(getClassResolver()); 2258 } 2259 return defaultFactoryFinder; 2260 } 2261 2262 public void setFactoryFinderResolver(FactoryFinderResolver resolver) { 2263 this.factoryFinderResolver = resolver; 2264 } 2265 2266 public FactoryFinder getFactoryFinder(String path) throws NoFactoryAvailableException { 2267 synchronized (factories) { 2268 FactoryFinder answer = factories.get(path); 2269 if (answer == null) { 2270 answer = factoryFinderResolver.resolveFactoryFinder(getClassResolver(), path); 2271 factories.put(path, answer); 2272 } 2273 return answer; 2274 } 2275 } 2276 2277 public ClassResolver getClassResolver() { 2278 return classResolver; 2279 } 2280 2281 public void setClassResolver(ClassResolver classResolver) { 2282 this.classResolver = classResolver; 2283 } 2284 2285 public PackageScanClassResolver getPackageScanClassResolver() { 2286 return packageScanClassResolver; 2287 } 2288 2289 public void setPackageScanClassResolver(PackageScanClassResolver packageScanClassResolver) { 2290 this.packageScanClassResolver = packageScanClassResolver; 2291 } 2292 2293 public List<String> getComponentNames() { 2294 synchronized (components) { 2295 List<String> answer = new ArrayList<String>(); 2296 for (String name : components.keySet()) { 2297 answer.add(name); 2298 } 2299 return answer; 2300 } 2301 } 2302 2303 public List<String> getLanguageNames() { 2304 synchronized (languages) { 2305 List<String> answer = new ArrayList<String>(); 2306 for (String name : languages.keySet()) { 2307 answer.add(name); 2308 } 2309 return answer; 2310 } 2311 } 2312 2313 public NodeIdFactory getNodeIdFactory() { 2314 return nodeIdFactory; 2315 } 2316 2317 public void setNodeIdFactory(NodeIdFactory idFactory) { 2318 this.nodeIdFactory = idFactory; 2319 } 2320 2321 public ManagementStrategy getManagementStrategy() { 2322 synchronized (managementStrategyInitialized) { 2323 if (managementStrategyInitialized.compareAndSet(false, true)) { 2324 managementStrategy = createManagementStrategy(); 2325 } 2326 return managementStrategy; 2327 } 2328 } 2329 2330 public void setManagementStrategy(ManagementStrategy managementStrategy) { 2331 synchronized (managementStrategyInitialized) { 2332 if (managementStrategyInitialized.get()) { 2333 log.warn("Resetting ManagementStrategy for context " + getName()); 2334 } 2335 2336 this.managementStrategy = managementStrategy; 2337 managementStrategyInitialized.set(true); 2338 } 2339 } 2340 2341 public InterceptStrategy getDefaultTracer() { 2342 if (defaultTracer == null) { 2343 defaultTracer = new Tracer(); 2344 } 2345 return defaultTracer; 2346 } 2347 2348 public void setDefaultTracer(InterceptStrategy defaultTracer) { 2349 this.defaultTracer = defaultTracer; 2350 } 2351 2352 public void disableJMX() { 2353 disableJMX = true; 2354 } 2355 2356 public InflightRepository getInflightRepository() { 2357 return inflightRepository; 2358 } 2359 2360 public void setInflightRepository(InflightRepository repository) { 2361 this.inflightRepository = repository; 2362 } 2363 2364 public void setAutoStartup(Boolean autoStartup) { 2365 this.autoStartup = autoStartup; 2366 } 2367 2368 public Boolean isAutoStartup() { 2369 return autoStartup != null && autoStartup; 2370 } 2371 2372 @Deprecated 2373 public Boolean isLazyLoadTypeConverters() { 2374 return lazyLoadTypeConverters != null && lazyLoadTypeConverters; 2375 } 2376 2377 @Deprecated 2378 public void setLazyLoadTypeConverters(Boolean lazyLoadTypeConverters) { 2379 this.lazyLoadTypeConverters = lazyLoadTypeConverters; 2380 } 2381 2382 public Boolean isUseMDCLogging() { 2383 return useMDCLogging != null && useMDCLogging; 2384 } 2385 2386 public void setUseMDCLogging(Boolean useMDCLogging) { 2387 this.useMDCLogging = useMDCLogging; 2388 } 2389 2390 public Boolean isUseBreadcrumb() { 2391 return useBreadcrumb != null && useBreadcrumb; 2392 } 2393 2394 public void setUseBreadcrumb(Boolean useBreadcrumb) { 2395 this.useBreadcrumb = useBreadcrumb; 2396 } 2397 2398 public ClassLoader getApplicationContextClassLoader() { 2399 return applicationContextClassLoader; 2400 } 2401 2402 public void setApplicationContextClassLoader(ClassLoader classLoader) { 2403 applicationContextClassLoader = classLoader; 2404 } 2405 2406 public DataFormatResolver getDataFormatResolver() { 2407 return dataFormatResolver; 2408 } 2409 2410 public void setDataFormatResolver(DataFormatResolver dataFormatResolver) { 2411 this.dataFormatResolver = dataFormatResolver; 2412 } 2413 2414 public DataFormat resolveDataFormat(String name) { 2415 return dataFormatResolver.resolveDataFormat(name, this); 2416 } 2417 2418 public DataFormatDefinition resolveDataFormatDefinition(String name) { 2419 // lookup type and create the data format from it 2420 DataFormatDefinition type = lookup(this, name, DataFormatDefinition.class); 2421 if (type == null && getDataFormats() != null) { 2422 type = getDataFormats().get(name); 2423 } 2424 return type; 2425 } 2426 2427 private static <T> T lookup(CamelContext context, String ref, Class<T> type) { 2428 try { 2429 return context.getRegistry().lookup(ref, type); 2430 } catch (Exception e) { 2431 // need to ignore not same type and return it as null 2432 return null; 2433 } 2434 } 2435 2436 public ShutdownStrategy getShutdownStrategy() { 2437 return shutdownStrategy; 2438 } 2439 2440 public void setShutdownStrategy(ShutdownStrategy shutdownStrategy) { 2441 this.shutdownStrategy = shutdownStrategy; 2442 } 2443 2444 public ShutdownRoute getShutdownRoute() { 2445 return shutdownRoute; 2446 } 2447 2448 public void setShutdownRoute(ShutdownRoute shutdownRoute) { 2449 this.shutdownRoute = shutdownRoute; 2450 } 2451 2452 public ShutdownRunningTask getShutdownRunningTask() { 2453 return shutdownRunningTask; 2454 } 2455 2456 public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) { 2457 this.shutdownRunningTask = shutdownRunningTask; 2458 } 2459 2460 public ExecutorServiceManager getExecutorServiceManager() { 2461 return this.executorServiceManager; 2462 } 2463 2464 @Deprecated 2465 public org.apache.camel.spi.ExecutorServiceStrategy getExecutorServiceStrategy() { 2466 // its okay to create a new instance as its stateless, and just delegate 2467 // ExecutorServiceManager which is the new API 2468 return new DefaultExecutorServiceStrategy(this); 2469 } 2470 2471 public void setExecutorServiceManager(ExecutorServiceManager executorServiceManager) { 2472 this.executorServiceManager = executorServiceManager; 2473 } 2474 2475 public ProcessorFactory getProcessorFactory() { 2476 return processorFactory; 2477 } 2478 2479 public void setProcessorFactory(ProcessorFactory processorFactory) { 2480 this.processorFactory = processorFactory; 2481 } 2482 2483 public Debugger getDebugger() { 2484 return debugger; 2485 } 2486 2487 public void setDebugger(Debugger debugger) { 2488 this.debugger = debugger; 2489 } 2490 2491 public UuidGenerator getUuidGenerator() { 2492 return uuidGenerator; 2493 } 2494 2495 public void setUuidGenerator(UuidGenerator uuidGenerator) { 2496 this.uuidGenerator = uuidGenerator; 2497 } 2498 2499 protected Map<String, RouteService> getRouteServices() { 2500 return routeServices; 2501 } 2502 2503 protected ManagementStrategy createManagementStrategy() { 2504 return new ManagementStrategyFactory().create(this, disableJMX || Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)); 2505 } 2506 2507 @Override 2508 public String toString() { 2509 return "CamelContext(" + getName() + ")"; 2510 } 2511 2512 /** 2513 * Reset context counter to a preset value. Mostly used for tests to ensure a predictable getName() 2514 * 2515 * @param value new value for the context counter 2516 */ 2517 public static void setContextCounter(int value) { 2518 DefaultCamelContextNameStrategy.setCounter(value); 2519 DefaultManagementNameStrategy.setCounter(value); 2520 } 2521 2522 private static UuidGenerator createDefaultUuidGenerator() { 2523 if (System.getProperty("com.google.appengine.runtime.environment") != null) { 2524 // either "Production" or "Development" 2525 return new JavaUuidGenerator(); 2526 } else { 2527 return new ActiveMQUuidGenerator(); 2528 } 2529 } 2530 }