001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.HashMap; 022 import java.util.LinkedHashSet; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.Set; 026 import java.util.concurrent.atomic.AtomicBoolean; 027 028 import org.apache.camel.CamelContext; 029 import org.apache.camel.Channel; 030 import org.apache.camel.Consumer; 031 import org.apache.camel.Processor; 032 import org.apache.camel.Route; 033 import org.apache.camel.Service; 034 import org.apache.camel.model.OnCompletionDefinition; 035 import org.apache.camel.model.OnExceptionDefinition; 036 import org.apache.camel.model.ProcessorDefinition; 037 import org.apache.camel.model.RouteDefinition; 038 import org.apache.camel.processor.ErrorHandler; 039 import org.apache.camel.spi.LifecycleStrategy; 040 import org.apache.camel.spi.RouteContext; 041 import org.apache.camel.spi.RoutePolicy; 042 import org.apache.camel.support.ChildServiceSupport; 043 import org.apache.camel.util.EventHelper; 044 import org.apache.camel.util.ServiceHelper; 045 import org.slf4j.Logger; 046 import org.slf4j.LoggerFactory; 047 048 /** 049 * Represents the runtime objects for a given {@link RouteDefinition} so that it can be stopped independently 050 * of other routes 051 * 052 * @version 053 */ 054 public class RouteService extends ChildServiceSupport { 055 056 private static final Logger LOG = LoggerFactory.getLogger(RouteService.class); 057 058 private final DefaultCamelContext camelContext; 059 private final RouteDefinition routeDefinition; 060 private final List<RouteContext> routeContexts; 061 private final List<Route> routes; 062 private final String id; 063 private boolean removingRoutes; 064 private final Map<Route, Consumer> inputs = new HashMap<Route, Consumer>(); 065 private final AtomicBoolean warmUpDone = new AtomicBoolean(false); 066 private final AtomicBoolean endpointDone = new AtomicBoolean(false); 067 068 public RouteService(DefaultCamelContext camelContext, RouteDefinition routeDefinition, List<RouteContext> routeContexts, List<Route> routes) { 069 this.camelContext = camelContext; 070 this.routeDefinition = routeDefinition; 071 this.routeContexts = routeContexts; 072 this.routes = routes; 073 this.id = routeDefinition.idOrCreate(camelContext.getNodeIdFactory()); 074 } 075 076 public String getId() { 077 return id; 078 } 079 080 public CamelContext getCamelContext() { 081 return camelContext; 082 } 083 084 public List<RouteContext> getRouteContexts() { 085 return routeContexts; 086 } 087 088 public RouteDefinition getRouteDefinition() { 089 return routeDefinition; 090 } 091 092 public Collection<Route> getRoutes() { 093 return routes; 094 } 095 096 /** 097 * Gets the inputs to the routes. 098 * 099 * @return list of {@link Consumer} as inputs for the routes 100 */ 101 public Map<Route, Consumer> getInputs() { 102 return inputs; 103 } 104 105 public boolean isRemovingRoutes() { 106 return removingRoutes; 107 } 108 109 public void setRemovingRoutes(boolean removingRoutes) { 110 this.removingRoutes = removingRoutes; 111 } 112 113 public synchronized void warmUp() throws Exception { 114 if (endpointDone.compareAndSet(false, true)) { 115 // endpoints should only be started once as they can be reused on other routes 116 // and whatnot, thus their lifecycle is to start once, and only to stop when Camel shutdown 117 for (Route route : routes) { 118 // ensure endpoint is started first (before the route services, such as the consumer) 119 ServiceHelper.startService(route.getEndpoint()); 120 } 121 } 122 123 if (warmUpDone.compareAndSet(false, true)) { 124 125 for (Route route : routes) { 126 // warm up the route first 127 route.warmUp(); 128 129 LOG.debug("Starting services on route: {}", route.getId()); 130 List<Service> services = route.getServices(); 131 132 // callback that we are staring these services 133 route.onStartingServices(services); 134 135 // gather list of services to start as we need to start child services as well 136 Set<Service> list = new LinkedHashSet<Service>(); 137 for (Service service : services) { 138 list.addAll(ServiceHelper.getChildServices(service)); 139 } 140 141 // split into consumers and child services as we need to start the consumers 142 // afterwards to avoid them being active while the others start 143 List<Service> childServices = new ArrayList<Service>(); 144 for (Service service : list) { 145 if (service instanceof Consumer) { 146 inputs.put(route, (Consumer) service); 147 } else { 148 childServices.add(service); 149 } 150 } 151 startChildService(route, childServices); 152 } 153 154 // ensure lifecycle strategy is invoked which among others enlist the route in JMX 155 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 156 strategy.onRoutesAdd(routes); 157 } 158 159 // add routes to camel context 160 camelContext.addRouteCollection(routes); 161 } 162 } 163 164 protected void doStart() throws Exception { 165 // ensure we are warmed up before starting the route 166 warmUp(); 167 168 for (Route route : routes) { 169 // start the route itself 170 ServiceHelper.startService(route); 171 172 // invoke callbacks on route policy 173 if (route.getRouteContext().getRoutePolicyList() != null) { 174 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 175 routePolicy.onStart(route); 176 } 177 } 178 179 // fire event 180 EventHelper.notifyRouteStarted(camelContext, route); 181 } 182 } 183 184 protected void doStop() throws Exception { 185 186 // if we are stopping CamelContext then we are shutting down 187 boolean isShutdownCamelContext = camelContext.isStopping(); 188 189 if (isShutdownCamelContext || isRemovingRoutes()) { 190 // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown 191 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 192 strategy.onRoutesRemove(routes); 193 } 194 } 195 196 for (Route route : routes) { 197 LOG.debug("Stopping services on route: {}", route.getId()); 198 199 // gather list of services to stop as we need to start child services as well 200 List<Service> services = new ArrayList<Service>(); 201 services.addAll(route.getServices()); 202 // also get route scoped services 203 doGetRouteScopedServices(services, route); 204 Set<Service> list = new LinkedHashSet<Service>(); 205 for (Service service : services) { 206 list.addAll(ServiceHelper.getChildServices(service)); 207 } 208 // also get route scoped error handler (which must be done last) 209 doGetRouteScopedErrorHandler(list, route); 210 211 // stop services 212 stopChildService(route, list, isShutdownCamelContext); 213 214 // stop the route itself 215 if (isShutdownCamelContext) { 216 ServiceHelper.stopAndShutdownServices(route); 217 } else { 218 ServiceHelper.stopServices(route); 219 } 220 221 // invoke callbacks on route policy 222 if (route.getRouteContext().getRoutePolicyList() != null) { 223 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 224 routePolicy.onStop(route); 225 } 226 } 227 // fire event 228 EventHelper.notifyRouteStopped(camelContext, route); 229 } 230 if (isRemovingRoutes()) { 231 camelContext.removeRouteCollection(routes); 232 } 233 // need to warm up again 234 warmUpDone.set(false); 235 } 236 237 @Override 238 protected void doShutdown() throws Exception { 239 for (Route route : routes) { 240 LOG.debug("Shutting down services on route: {}", route.getId()); 241 242 // gather list of services to stop as we need to start child services as well 243 List<Service> services = new ArrayList<Service>(); 244 services.addAll(route.getServices()); 245 // also get route scoped services 246 doGetRouteScopedServices(services, route); 247 Set<Service> list = new LinkedHashSet<Service>(); 248 for (Service service : services) { 249 list.addAll(ServiceHelper.getChildServices(service)); 250 } 251 // also get route scoped error handler (which must be done last) 252 doGetRouteScopedErrorHandler(list, route); 253 254 // shutdown services 255 stopChildService(route, list, true); 256 257 // shutdown the route itself 258 ServiceHelper.stopAndShutdownServices(route); 259 260 // endpoints should only be stopped when Camel is shutting down 261 // see more details in the warmUp method 262 ServiceHelper.stopAndShutdownServices(route.getEndpoint()); 263 // invoke callbacks on route policy 264 if (route.getRouteContext().getRoutePolicyList() != null) { 265 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 266 routePolicy.onRemove(route); 267 } 268 } 269 } 270 271 // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown 272 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 273 strategy.onRoutesRemove(routes); 274 } 275 276 // remove the routes from the inflight registry 277 for (Route route : routes) { 278 camelContext.getInflightRepository().removeRoute(route.getId()); 279 } 280 281 // remove the routes from the collections 282 camelContext.removeRouteCollection(routes); 283 284 // clear inputs on shutdown 285 inputs.clear(); 286 warmUpDone.set(false); 287 endpointDone.set(false); 288 } 289 290 @Override 291 protected void doSuspend() throws Exception { 292 // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy 293 // to safely suspend and resume 294 for (Route route : routes) { 295 if (route.getRouteContext().getRoutePolicyList() != null) { 296 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 297 routePolicy.onSuspend(route); 298 } 299 } 300 } 301 } 302 303 @Override 304 protected void doResume() throws Exception { 305 // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy 306 // to safely suspend and resume 307 for (Route route : routes) { 308 if (route.getRouteContext().getRoutePolicyList() != null) { 309 for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) { 310 routePolicy.onResume(route); 311 } 312 } 313 } 314 } 315 316 protected void startChildService(Route route, List<Service> services) throws Exception { 317 for (Service service : services) { 318 LOG.debug("Starting child service on route: {} -> {}", route.getId(), service); 319 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 320 strategy.onServiceAdd(camelContext, service, route); 321 } 322 ServiceHelper.startService(service); 323 addChildService(service); 324 } 325 } 326 327 protected void stopChildService(Route route, Set<Service> services, boolean shutdown) throws Exception { 328 for (Service service : services) { 329 LOG.debug("{} child service on route: {} -> {}", new Object[]{shutdown ? "Shutting down" : "Stopping", route.getId(), service}); 330 if (service instanceof ErrorHandler) { 331 // special for error handlers 332 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 333 strategy.onErrorHandlerRemove(route.getRouteContext(), (Processor) service, route.getRouteContext().getRoute().getErrorHandlerBuilder()); 334 } 335 } else { 336 for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { 337 strategy.onServiceRemove(camelContext, service, route); 338 } 339 } 340 if (shutdown) { 341 ServiceHelper.stopAndShutdownService(service); 342 } else { 343 ServiceHelper.stopService(service); 344 } 345 removeChildService(service); 346 } 347 } 348 349 /** 350 * Gather the route scoped error handler from the given route 351 */ 352 private void doGetRouteScopedErrorHandler(Set<Service> services, Route route) { 353 // only include error handlers if they are route scoped 354 boolean includeErrorHandler = !routeDefinition.isContextScopedErrorHandler(route.getRouteContext().getCamelContext()); 355 List<Service> extra = new ArrayList<Service>(); 356 if (includeErrorHandler) { 357 for (Service service : services) { 358 if (service instanceof Channel) { 359 Processor eh = ((Channel) service).getErrorHandler(); 360 if (eh != null && eh instanceof Service) { 361 extra.add((Service) eh); 362 } 363 } 364 } 365 } 366 if (!extra.isEmpty()) { 367 services.addAll(extra); 368 } 369 } 370 371 /** 372 * Gather all other kind of route scoped services from the given route, except error handler 373 */ 374 private void doGetRouteScopedServices(List<Service> services, Route route) { 375 for (ProcessorDefinition<?> output : route.getRouteContext().getRoute().getOutputs()) { 376 if (output instanceof OnExceptionDefinition) { 377 OnExceptionDefinition onExceptionDefinition = (OnExceptionDefinition) output; 378 if (onExceptionDefinition.isRouteScoped()) { 379 Processor errorHandler = onExceptionDefinition.getErrorHandler(route.getId()); 380 if (errorHandler != null && errorHandler instanceof Service) { 381 services.add((Service) errorHandler); 382 } 383 } 384 } else if (output instanceof OnCompletionDefinition) { 385 OnCompletionDefinition onCompletionDefinition = (OnCompletionDefinition) output; 386 if (onCompletionDefinition.isRouteScoped()) { 387 Processor onCompletionProcessor = onCompletionDefinition.getOnCompletion(route.getId()); 388 if (onCompletionProcessor != null && onCompletionProcessor instanceof Service) { 389 services.add((Service) onCompletionProcessor); 390 } 391 } 392 } 393 } 394 } 395 396 }