001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.LinkedHashSet;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.Set;
026    import java.util.concurrent.atomic.AtomicBoolean;
027    
028    import org.apache.camel.CamelContext;
029    import org.apache.camel.Channel;
030    import org.apache.camel.Consumer;
031    import org.apache.camel.Processor;
032    import org.apache.camel.Route;
033    import org.apache.camel.Service;
034    import org.apache.camel.model.OnCompletionDefinition;
035    import org.apache.camel.model.OnExceptionDefinition;
036    import org.apache.camel.model.ProcessorDefinition;
037    import org.apache.camel.model.RouteDefinition;
038    import org.apache.camel.processor.ErrorHandler;
039    import org.apache.camel.spi.LifecycleStrategy;
040    import org.apache.camel.spi.RouteContext;
041    import org.apache.camel.spi.RoutePolicy;
042    import org.apache.camel.support.ChildServiceSupport;
043    import org.apache.camel.util.EventHelper;
044    import org.apache.camel.util.ServiceHelper;
045    import org.slf4j.Logger;
046    import org.slf4j.LoggerFactory;
047    
048    /**
049     * Represents the runtime objects for a given {@link RouteDefinition} so that it can be stopped independently
050     * of other routes
051     *
052     * @version 
053     */
054    public class RouteService extends ChildServiceSupport {
055    
056        private static final Logger LOG = LoggerFactory.getLogger(RouteService.class);
057    
058        private final DefaultCamelContext camelContext;
059        private final RouteDefinition routeDefinition;
060        private final List<RouteContext> routeContexts;
061        private final List<Route> routes;
062        private final String id;
063        private boolean removingRoutes;
064        private final Map<Route, Consumer> inputs = new HashMap<Route, Consumer>();
065        private final AtomicBoolean warmUpDone = new AtomicBoolean(false);
066        private final AtomicBoolean endpointDone = new AtomicBoolean(false);
067    
068        public RouteService(DefaultCamelContext camelContext, RouteDefinition routeDefinition, List<RouteContext> routeContexts, List<Route> routes) {
069            this.camelContext = camelContext;
070            this.routeDefinition = routeDefinition;
071            this.routeContexts = routeContexts;
072            this.routes = routes;
073            this.id = routeDefinition.idOrCreate(camelContext.getNodeIdFactory());
074        }
075    
076        public String getId() {
077            return id;
078        }
079    
080        public CamelContext getCamelContext() {
081            return camelContext;
082        }
083    
084        public List<RouteContext> getRouteContexts() {
085            return routeContexts;
086        }
087    
088        public RouteDefinition getRouteDefinition() {
089            return routeDefinition;
090        }
091    
092        public Collection<Route> getRoutes() {
093            return routes;
094        }
095    
096        /**
097         * Gets the inputs to the routes.
098         *
099         * @return list of {@link Consumer} as inputs for the routes
100         */
101        public Map<Route, Consumer> getInputs() {
102            return inputs;
103        }
104    
105        public boolean isRemovingRoutes() {
106            return removingRoutes;
107        }
108    
109        public void setRemovingRoutes(boolean removingRoutes) {
110            this.removingRoutes = removingRoutes;
111        }
112    
113        public synchronized void warmUp() throws Exception {
114            if (endpointDone.compareAndSet(false, true)) {
115                // endpoints should only be started once as they can be reused on other routes
116                // and whatnot, thus their lifecycle is to start once, and only to stop when Camel shutdown
117                for (Route route : routes) {
118                    // ensure endpoint is started first (before the route services, such as the consumer)
119                    ServiceHelper.startService(route.getEndpoint());
120                }
121            }
122    
123            if (warmUpDone.compareAndSet(false, true)) {
124    
125                for (Route route : routes) {
126                    // warm up the route first
127                    route.warmUp();
128    
129                    LOG.debug("Starting services on route: {}", route.getId());
130                    List<Service> services = route.getServices();
131    
132                    // callback that we are staring these services
133                    route.onStartingServices(services);
134    
135                    // gather list of services to start as we need to start child services as well
136                    Set<Service> list = new LinkedHashSet<Service>();
137                    for (Service service : services) {
138                        list.addAll(ServiceHelper.getChildServices(service));
139                    }
140    
141                    // split into consumers and child services as we need to start the consumers
142                    // afterwards to avoid them being active while the others start
143                    List<Service> childServices = new ArrayList<Service>();
144                    for (Service service : list) {
145                        if (service instanceof Consumer) {
146                            inputs.put(route, (Consumer) service);
147                        } else {
148                            childServices.add(service);
149                        }
150                    }
151                    startChildService(route, childServices);
152                }
153    
154                // ensure lifecycle strategy is invoked which among others enlist the route in JMX
155                for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
156                    strategy.onRoutesAdd(routes);
157                }
158    
159                // add routes to camel context
160                camelContext.addRouteCollection(routes);
161            }
162        }
163    
164        protected void doStart() throws Exception {
165            // ensure we are warmed up before starting the route
166            warmUp();
167    
168            for (Route route : routes) {
169                // start the route itself
170                ServiceHelper.startService(route);
171    
172                // invoke callbacks on route policy
173                if (route.getRouteContext().getRoutePolicyList() != null) {
174                    for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
175                        routePolicy.onStart(route);
176                    }
177                }
178    
179                // fire event
180                EventHelper.notifyRouteStarted(camelContext, route);
181            }
182        }
183    
184        protected void doStop() throws Exception {
185    
186            // if we are stopping CamelContext then we are shutting down
187            boolean isShutdownCamelContext = camelContext.isStopping();
188    
189            if (isShutdownCamelContext || isRemovingRoutes()) {
190                // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown
191                for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
192                    strategy.onRoutesRemove(routes);
193                }
194            }
195            
196            for (Route route : routes) {
197                LOG.debug("Stopping services on route: {}", route.getId());
198    
199                // gather list of services to stop as we need to start child services as well
200                List<Service> services = new ArrayList<Service>();
201                services.addAll(route.getServices());
202                // also get route scoped services
203                doGetRouteScopedServices(services, route);
204                Set<Service> list = new LinkedHashSet<Service>();
205                for (Service service : services) {
206                    list.addAll(ServiceHelper.getChildServices(service));
207                }
208                // also get route scoped error handler (which must be done last)
209                doGetRouteScopedErrorHandler(list, route);
210    
211                // stop services
212                stopChildService(route, list, isShutdownCamelContext);
213    
214                // stop the route itself
215                if (isShutdownCamelContext) {
216                    ServiceHelper.stopAndShutdownServices(route);
217                } else {
218                    ServiceHelper.stopServices(route);
219                }
220    
221                // invoke callbacks on route policy
222                if (route.getRouteContext().getRoutePolicyList() != null) {
223                    for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
224                        routePolicy.onStop(route);
225                    }
226                }
227                // fire event
228                EventHelper.notifyRouteStopped(camelContext, route);
229            }
230            if (isRemovingRoutes()) {
231                camelContext.removeRouteCollection(routes);
232            }
233            // need to warm up again
234            warmUpDone.set(false);
235        }
236    
237        @Override
238        protected void doShutdown() throws Exception {
239            for (Route route : routes) {
240                LOG.debug("Shutting down services on route: {}", route.getId());
241    
242                // gather list of services to stop as we need to start child services as well
243                List<Service> services = new ArrayList<Service>();
244                services.addAll(route.getServices());
245                // also get route scoped services
246                doGetRouteScopedServices(services, route);
247                Set<Service> list = new LinkedHashSet<Service>();
248                for (Service service : services) {
249                    list.addAll(ServiceHelper.getChildServices(service));
250                }
251                // also get route scoped error handler (which must be done last)
252                doGetRouteScopedErrorHandler(list, route);
253    
254                // shutdown services
255                stopChildService(route, list, true);
256    
257                // shutdown the route itself
258                ServiceHelper.stopAndShutdownServices(route);
259    
260                // endpoints should only be stopped when Camel is shutting down
261                // see more details in the warmUp method
262                ServiceHelper.stopAndShutdownServices(route.getEndpoint());
263                // invoke callbacks on route policy
264                if (route.getRouteContext().getRoutePolicyList() != null) {
265                    for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
266                        routePolicy.onRemove(route);
267                    }
268                }
269            }
270    
271            // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown
272            for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
273                strategy.onRoutesRemove(routes);
274            }
275            
276            // remove the routes from the inflight registry
277            for (Route route : routes) {
278                camelContext.getInflightRepository().removeRoute(route.getId());
279            }
280    
281            // remove the routes from the collections
282            camelContext.removeRouteCollection(routes);
283            
284            // clear inputs on shutdown
285            inputs.clear();
286            warmUpDone.set(false);
287            endpointDone.set(false);
288        }
289    
290        @Override
291        protected void doSuspend() throws Exception {
292            // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy
293            // to safely suspend and resume
294            for (Route route : routes) {
295                if (route.getRouteContext().getRoutePolicyList() != null) {
296                    for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
297                        routePolicy.onSuspend(route);
298                    }
299                }
300            }
301        }
302    
303        @Override
304        protected void doResume() throws Exception {
305            // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy
306            // to safely suspend and resume
307            for (Route route : routes) {
308                if (route.getRouteContext().getRoutePolicyList() != null) {
309                    for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
310                        routePolicy.onResume(route);
311                    }
312                }
313            }
314        }
315    
316        protected void startChildService(Route route, List<Service> services) throws Exception {
317            for (Service service : services) {
318                LOG.debug("Starting child service on route: {} -> {}", route.getId(), service);
319                for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
320                    strategy.onServiceAdd(camelContext, service, route);
321                }
322                ServiceHelper.startService(service);
323                addChildService(service);
324            }
325        }
326    
327        protected void stopChildService(Route route, Set<Service> services, boolean shutdown) throws Exception {
328            for (Service service : services) {
329                LOG.debug("{} child service on route: {} -> {}", new Object[]{shutdown ? "Shutting down" : "Stopping", route.getId(), service});
330                if (service instanceof ErrorHandler) {
331                    // special for error handlers
332                    for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
333                        strategy.onErrorHandlerRemove(route.getRouteContext(), (Processor) service, route.getRouteContext().getRoute().getErrorHandlerBuilder());
334                    }
335                } else {
336                    for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
337                        strategy.onServiceRemove(camelContext, service, route);
338                    }
339                }
340                if (shutdown) {
341                    ServiceHelper.stopAndShutdownService(service);
342                } else {
343                    ServiceHelper.stopService(service);
344                }
345                removeChildService(service);
346            }
347        }
348    
349        /**
350         * Gather the route scoped error handler from the given route
351         */
352        private void doGetRouteScopedErrorHandler(Set<Service> services, Route route) {
353            // only include error handlers if they are route scoped
354            boolean includeErrorHandler = !routeDefinition.isContextScopedErrorHandler(route.getRouteContext().getCamelContext());
355            List<Service> extra = new ArrayList<Service>();
356            if (includeErrorHandler) {
357                for (Service service : services) {
358                    if (service instanceof Channel) {
359                        Processor eh = ((Channel) service).getErrorHandler();
360                        if (eh != null && eh instanceof Service) {
361                            extra.add((Service) eh);
362                        }
363                    }
364                }
365            }
366            if (!extra.isEmpty()) {
367                services.addAll(extra);
368            }
369        }
370    
371        /**
372         * Gather all other kind of route scoped services from the given route, except error handler
373         */
374        private void doGetRouteScopedServices(List<Service> services, Route route) {
375            for (ProcessorDefinition<?> output : route.getRouteContext().getRoute().getOutputs()) {
376                if (output instanceof OnExceptionDefinition) {
377                    OnExceptionDefinition onExceptionDefinition = (OnExceptionDefinition) output;
378                    if (onExceptionDefinition.isRouteScoped()) {
379                        Processor errorHandler = onExceptionDefinition.getErrorHandler(route.getId());
380                        if (errorHandler != null && errorHandler instanceof Service) {
381                            services.add((Service) errorHandler);
382                        }
383                    }
384                } else if (output instanceof OnCompletionDefinition) {
385                    OnCompletionDefinition onCompletionDefinition = (OnCompletionDefinition) output;
386                    if (onCompletionDefinition.isRouteScoped()) {
387                        Processor onCompletionProcessor = onCompletionDefinition.getOnCompletion(route.getId());
388                        if (onCompletionProcessor != null && onCompletionProcessor instanceof Service) {
389                            services.add((Service) onCompletionProcessor);
390                        }
391                    }
392                }
393            }
394        }
395    
396    }