001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.LinkedHashSet; 023import java.util.List; 024import java.util.Map; 025import java.util.Set; 026import java.util.concurrent.atomic.AtomicBoolean; 027 028import org.apache.camel.CamelContext; 029import org.apache.camel.Channel; 030import org.apache.camel.Consumer; 031import org.apache.camel.Endpoint; 032import org.apache.camel.EndpointAware; 033import org.apache.camel.FailedToCreateRouteException; 034import org.apache.camel.Processor; 035import org.apache.camel.Route; 036import org.apache.camel.RouteAware; 037import org.apache.camel.Service; 038import org.apache.camel.model.OnCompletionDefinition; 039import org.apache.camel.model.OnExceptionDefinition; 040import org.apache.camel.model.ProcessorDefinition; 041import org.apache.camel.model.RouteDefinition; 042import org.apache.camel.processor.ErrorHandler; 043import org.apache.camel.spi.LifecycleStrategy; 044import org.apache.camel.spi.RouteContext; 045import org.apache.camel.spi.RoutePolicy; 046import org.apache.camel.support.ChildServiceSupport; 047import org.apache.camel.util.EventHelper; 048import org.apache.camel.util.ServiceHelper; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052/** 053 * Represents the runtime objects for a given {@link RouteDefinition} so that it can be stopped independently 054 * of other routes 055 * 056 * @version 057 */ 058public class RouteService extends ChildServiceSupport { 059 060 private static final Logger LOG = LoggerFactory.getLogger(RouteService.class); 061 062 private final DefaultCamelContext camelContext; 063 private final RouteDefinition routeDefinition; 064 private final List<RouteContext> routeContexts; 065 private final List<Route> routes; 066 private final String id; 067 private boolean removingRoutes; 068 private final Map<Route, Consumer> inputs = new HashMap<Route, Consumer>(); 069 private final AtomicBoolean warmUpDone = new AtomicBoolean(false); 070 private final AtomicBoolean endpointDone = new AtomicBoolean(false); 071 072 public RouteService(DefaultCamelContext camelContext, RouteDefinition routeDefinition, List<RouteContext> routeContexts, List<Route> routes) { 073 this.camelContext = camelContext; 074 this.routeDefinition = routeDefinition; 075 this.routeContexts = routeContexts; 076 this.routes = routes; 077 this.id = routeDefinition.idOrCreate(camelContext.getNodeIdFactory()); 078 } 079 080 public String getId() { 081 return id; 082 } 083 084 public CamelContext getCamelContext() { 085 return camelContext; 086 } 087 088 public List<RouteContext> getRouteContexts() { 089 return routeContexts; 090 } 091 092 public RouteDefinition getRouteDefinition() { 093 return routeDefinition; 094 } 095 096 public Collection<Route> getRoutes() { 097 return routes; 098 } 099 100 /** 101 * Gather all the endpoints this route service uses 102 * <p/> 103 * This implementation finds the endpoints by searching all the child services 104 * for {@link org.apache.camel.EndpointAware} processors which uses an endpoint. 105 */ 106 public Set<Endpoint> gatherEndpoints() { 107 Set<Endpoint> answer = new LinkedHashSet<Endpoint>(); 108 for (Route route : routes) { 109 Set<Service> services = gatherChildServices(route, true); 110 for (Service service : services) { 111 if (service instanceof EndpointAware) { 112 Endpoint endpoint = ((EndpointAware) service).getEndpoint(); 113 if (endpoint != null) { 114 answer.add(endpoint); 115 } 116 } 117 } 118 } 119 return answer; 120 } 121 122 /** 123 * Gets the inputs to the routes. 124 * 125 * @return list of {@link Consumer} as inputs for the routes 126 */ 127 public Map<Route, Consumer> getInputs() { 128 return inputs; 129 } 130 131 public boolean isRemovingRoutes() { 132 return removingRoutes; 133 } 134 135 public void setRemovingRoutes(boolean removingRoutes) { 136 this.removingRoutes = removingRoutes; 137 } 138 139 public void warmUp() throws Exception { 140 try { 141 doWarmUp(); 142 } catch (Exception e) { 143 throw new FailedToCreateRouteException(routeDefinition.getId(), routeDefinition.toString(), e); 144 } 145 } 146 147 protected synchronized void doWarmUp() throws Exception { 148 if (endpointDone.compareAndSet(false, true)) { 149 // endpoints should only be started once as they can be reused on other routes 150 // and whatnot, thus their lifecycle is to start once, and only to stop when Camel shutdown 151 for (Route route : routes) { 152 // ensure endpoint is started first (before the route services, such as the consumer) 153 ServiceHelper.startService(route.getEndpoint()); 154 } 155 } 156 157 if (warmUpDone.compareAndSet(false, true)) { 158 159 for (Route route : routes) { 160 // warm up the route first 161 route.warmUp(); 162 163 LOG.debug("Starting services on route: {}", route.getId()); 164 List<Service> services = route.getServices(); 165 166 // callback that we are staring these services 167 route.onStartingServices(services); 168 169 // gather list of services to start as we need to start child services as well 170 Set<Service> list = new LinkedHashSet<Service>(); 171 for (Service service : services) { 172 list.addAll(ServiceHelper.getChildServices(service)); 173 } 174 175 // split into consumers and child services as we need to start the consumers 176 // afterwards to avoid them being active while the others start 177 List<Service> childServices = new ArrayList<Service>(); 178 for (Service service : list) { 179 180 // inject the route 181 if (service instanceof RouteAware) { 182 ((RouteAware) service).setRoute(route); 183 } 184 185 if (service instanceof Consumer) { 186 inputs.put(route, (Consumer) service); 187 } else { 188 childServices.add(service); 189 } 190 } 191 startChildService(route, childServices); 192 193 // fire event 194 EventHelper.notifyRouteAdded(camelContext, route); 195 } 196 197 // ensure lifecycle strategy is invoked which among others enlist the route in JMX 198 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 199 strategy.onRoutesAdd(routes); 200 } 201 202 // add routes to camel context 203 camelContext.addRouteCollection(routes); 204 } 205 } 206 207 protected void doStart() throws Exception { 208 warmUp(); 209 210 for (Route route : routes) { 211 // start the route itself 212 ServiceHelper.startService(route); 213 214 // invoke callbacks on route policy 215 if (route.getRouteContext().getRoutePolicyList() != null) { 216 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 217 routePolicy.onStart(route); 218 } 219 } 220 221 // fire event 222 EventHelper.notifyRouteStarted(camelContext, route); 223 } 224 } 225 226 protected void doStop() throws Exception { 227 228 // if we are stopping CamelContext then we are shutting down 229 boolean isShutdownCamelContext = camelContext.isStopping(); 230 231 if (isShutdownCamelContext || isRemovingRoutes()) { 232 // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown 233 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 234 strategy.onRoutesRemove(routes); 235 } 236 } 237 238 for (Route route : routes) { 239 LOG.debug("Stopping services on route: {}", route.getId()); 240 241 // gather list of services to stop as we need to start child services as well 242 Set<Service> services = gatherChildServices(route, true); 243 244 // stop services 245 stopChildService(route, services, isShutdownCamelContext); 246 247 // stop the route itself 248 if (isShutdownCamelContext) { 249 ServiceHelper.stopAndShutdownServices(route); 250 } else { 251 ServiceHelper.stopServices(route); 252 } 253 254 // invoke callbacks on route policy 255 if (route.getRouteContext().getRoutePolicyList() != null) { 256 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 257 routePolicy.onStop(route); 258 } 259 } 260 // fire event 261 EventHelper.notifyRouteStopped(camelContext, route); 262 } 263 if (isRemovingRoutes()) { 264 camelContext.removeRouteCollection(routes); 265 } 266 // need to warm up again 267 warmUpDone.set(false); 268 } 269 270 @Override 271 protected void doShutdown() throws Exception { 272 for (Route route : routes) { 273 LOG.debug("Shutting down services on route: {}", route.getId()); 274 275 // gather list of services to stop as we need to start child services as well 276 Set<Service> services = gatherChildServices(route, true); 277 278 // shutdown services 279 stopChildService(route, services, true); 280 281 // shutdown the route itself 282 ServiceHelper.stopAndShutdownServices(route); 283 284 // endpoints should only be stopped when Camel is shutting down 285 // see more details in the warmUp method 286 ServiceHelper.stopAndShutdownServices(route.getEndpoint()); 287 // invoke callbacks on route policy 288 if (route.getRouteContext().getRoutePolicyList() != null) { 289 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 290 routePolicy.onRemove(route); 291 } 292 } 293 // fire event 294 EventHelper.notifyRouteRemoved(camelContext, route); 295 } 296 297 // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown 298 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 299 strategy.onRoutesRemove(routes); 300 } 301 302 // remove the routes from the inflight registry 303 for (Route route : routes) { 304 camelContext.getInflightRepository().removeRoute(route.getId()); 305 } 306 307 // remove the routes from the collections 308 camelContext.removeRouteCollection(routes); 309 310 // clear inputs on shutdown 311 inputs.clear(); 312 warmUpDone.set(false); 313 endpointDone.set(false); 314 } 315 316 @Override 317 protected void doSuspend() throws Exception { 318 // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy 319 // to safely suspend and resume 320 for (Route route : routes) { 321 if (route.getRouteContext().getRoutePolicyList() != null) { 322 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 323 routePolicy.onSuspend(route); 324 } 325 } 326 } 327 } 328 329 @Override 330 protected void doResume() throws Exception { 331 // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy 332 // to safely suspend and resume 333 for (Route route : routes) { 334 if (route.getRouteContext().getRoutePolicyList() != null) { 335 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 336 routePolicy.onResume(route); 337 } 338 } 339 } 340 } 341 342 protected void startChildService(Route route, List<Service> services) throws Exception { 343 for (Service service : services) { 344 LOG.debug("Starting child service on route: {} -> {}", route.getId(), service); 345 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 346 strategy.onServiceAdd(camelContext, service, route); 347 } 348 ServiceHelper.startService(service); 349 addChildService(service); 350 } 351 } 352 353 protected void stopChildService(Route route, Set<Service> services, boolean shutdown) throws Exception { 354 for (Service service : services) { 355 LOG.debug("{} child service on route: {} -> {}", new Object[]{shutdown ? "Shutting down" : "Stopping", route.getId(), service}); 356 if (service instanceof ErrorHandler) { 357 // special for error handlers 358 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 359 strategy.onErrorHandlerRemove(route.getRouteContext(), (Processor) service, route.getRouteContext().getRoute().getErrorHandlerBuilder()); 360 } 361 } else { 362 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 363 strategy.onServiceRemove(camelContext, service, route); 364 } 365 } 366 if (shutdown) { 367 ServiceHelper.stopAndShutdownService(service); 368 } else { 369 ServiceHelper.stopService(service); 370 } 371 removeChildService(service); 372 } 373 } 374 375 /** 376 * Gather all child services 377 */ 378 private Set<Service> gatherChildServices(Route route, boolean includeErrorHandler) { 379 // gather list of services to stop as we need to start child services as well 380 List<Service> services = new ArrayList<Service>(); 381 services.addAll(route.getServices()); 382 // also get route scoped services 383 doGetRouteScopedServices(services, route); 384 Set<Service> list = new LinkedHashSet<Service>(); 385 for (Service service : services) { 386 list.addAll(ServiceHelper.getChildServices(service)); 387 } 388 if (includeErrorHandler) { 389 // also get route scoped error handler (which must be done last) 390 doGetRouteScopedErrorHandler(list, route); 391 } 392 Set<Service> answer = new LinkedHashSet<Service>(); 393 answer.addAll(list); 394 return answer; 395 } 396 397 /** 398 * Gather the route scoped error handler from the given route 399 */ 400 private void doGetRouteScopedErrorHandler(Set<Service> services, Route route) { 401 // only include error handlers if they are route scoped 402 boolean includeErrorHandler = !routeDefinition.isContextScopedErrorHandler(route.getRouteContext().getCamelContext()); 403 List<Service> extra = new ArrayList<Service>(); 404 if (includeErrorHandler) { 405 for (Service service : services) { 406 if (service instanceof Channel) { 407 Processor eh = ((Channel) service).getErrorHandler(); 408 if (eh != null && eh instanceof Service) { 409 extra.add((Service) eh); 410 } 411 } 412 } 413 } 414 if (!extra.isEmpty()) { 415 services.addAll(extra); 416 } 417 } 418 419 /** 420 * Gather all other kind of route scoped services from the given route, except error handler 421 */ 422 private void doGetRouteScopedServices(List<Service> services, Route route) { 423 for (ProcessorDefinition<?> output : route.getRouteContext().getRoute().getOutputs()) { 424 if (output instanceof OnExceptionDefinition) { 425 OnExceptionDefinition onExceptionDefinition = (OnExceptionDefinition) output; 426 if (onExceptionDefinition.isRouteScoped()) { 427 Processor errorHandler = onExceptionDefinition.getErrorHandler(route.getId()); 428 if (errorHandler != null && errorHandler instanceof Service) { 429 services.add((Service) errorHandler); 430 } 431 } 432 } else if (output instanceof OnCompletionDefinition) { 433 OnCompletionDefinition onCompletionDefinition = (OnCompletionDefinition) output; 434 if (onCompletionDefinition.isRouteScoped()) { 435 Processor onCompletionProcessor = onCompletionDefinition.getOnCompletion(route.getId()); 436 if (onCompletionProcessor != null && onCompletionProcessor instanceof Service) { 437 services.add((Service) onCompletionProcessor); 438 } 439 } 440 } 441 } 442 } 443 444}