001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.LinkedHashSet;
023import java.util.List;
024import java.util.Map;
025import java.util.Set;
026import java.util.concurrent.atomic.AtomicBoolean;
027
028import org.apache.camel.CamelContext;
029import org.apache.camel.Channel;
030import org.apache.camel.Consumer;
031import org.apache.camel.Endpoint;
032import org.apache.camel.EndpointAware;
033import org.apache.camel.FailedToCreateRouteException;
034import org.apache.camel.Processor;
035import org.apache.camel.Route;
036import org.apache.camel.RouteAware;
037import org.apache.camel.Service;
038import org.apache.camel.model.OnCompletionDefinition;
039import org.apache.camel.model.OnExceptionDefinition;
040import org.apache.camel.model.ProcessorDefinition;
041import org.apache.camel.model.RouteDefinition;
042import org.apache.camel.processor.ErrorHandler;
043import org.apache.camel.spi.LifecycleStrategy;
044import org.apache.camel.spi.RouteContext;
045import org.apache.camel.spi.RoutePolicy;
046import org.apache.camel.support.ChildServiceSupport;
047import org.apache.camel.util.EventHelper;
048import org.apache.camel.util.ServiceHelper;
049import org.slf4j.Logger;
050import org.slf4j.LoggerFactory;
051
052/**
053 * Represents the runtime objects for a given {@link RouteDefinition} so that it can be stopped independently
054 * of other routes
055 *
056 * @version 
057 */
058public class RouteService extends ChildServiceSupport {
059
060    private static final Logger LOG = LoggerFactory.getLogger(RouteService.class);
061
062    private final DefaultCamelContext camelContext;
063    private final RouteDefinition routeDefinition;
064    private final List<RouteContext> routeContexts;
065    private final List<Route> routes;
066    private final String id;
067    private boolean removingRoutes;
068    private final Map<Route, Consumer> inputs = new HashMap<Route, Consumer>();
069    private final AtomicBoolean warmUpDone = new AtomicBoolean(false);
070    private final AtomicBoolean endpointDone = new AtomicBoolean(false);
071
072    public RouteService(DefaultCamelContext camelContext, RouteDefinition routeDefinition, List<RouteContext> routeContexts, List<Route> routes) {
073        this.camelContext = camelContext;
074        this.routeDefinition = routeDefinition;
075        this.routeContexts = routeContexts;
076        this.routes = routes;
077        this.id = routeDefinition.idOrCreate(camelContext.getNodeIdFactory());
078    }
079
080    public String getId() {
081        return id;
082    }
083
084    public CamelContext getCamelContext() {
085        return camelContext;
086    }
087
088    public List<RouteContext> getRouteContexts() {
089        return routeContexts;
090    }
091
092    public RouteDefinition getRouteDefinition() {
093        return routeDefinition;
094    }
095
096    public Collection<Route> getRoutes() {
097        return routes;
098    }
099
100    /**
101     * Gather all the endpoints this route service uses
102     * <p/>
103     * This implementation finds the endpoints by searching all the child services
104     * for {@link org.apache.camel.EndpointAware} processors which uses an endpoint.
105     */
106    public Set<Endpoint> gatherEndpoints() {
107        Set<Endpoint> answer = new LinkedHashSet<Endpoint>();
108        for (Route route : routes) {
109            Set<Service> services = gatherChildServices(route, true);
110            for (Service service : services) {
111                if (service instanceof EndpointAware) {
112                    Endpoint endpoint = ((EndpointAware) service).getEndpoint();
113                    if (endpoint != null) {
114                        answer.add(endpoint);
115                    }
116                }
117            }
118        }
119        return answer;
120    }
121
122    /**
123     * Gets the inputs to the routes.
124     *
125     * @return list of {@link Consumer} as inputs for the routes
126     */
127    public Map<Route, Consumer> getInputs() {
128        return inputs;
129    }
130
131    public boolean isRemovingRoutes() {
132        return removingRoutes;
133    }
134
135    public void setRemovingRoutes(boolean removingRoutes) {
136        this.removingRoutes = removingRoutes;
137    }
138
139    public void warmUp() throws Exception {
140        try {
141            doWarmUp();
142        } catch (Exception e) {
143            throw new FailedToCreateRouteException(routeDefinition.getId(), routeDefinition.toString(), e);
144        }
145    }
146
147    protected synchronized void doWarmUp() throws Exception {
148        if (endpointDone.compareAndSet(false, true)) {
149            // endpoints should only be started once as they can be reused on other routes
150            // and whatnot, thus their lifecycle is to start once, and only to stop when Camel shutdown
151            for (Route route : routes) {
152                // ensure endpoint is started first (before the route services, such as the consumer)
153                ServiceHelper.startService(route.getEndpoint());
154            }
155        }
156
157        if (warmUpDone.compareAndSet(false, true)) {
158
159            for (Route route : routes) {
160                // warm up the route first
161                route.warmUp();
162
163                LOG.debug("Starting services on route: {}", route.getId());
164                List<Service> services = route.getServices();
165
166                // callback that we are staring these services
167                route.onStartingServices(services);
168
169                // gather list of services to start as we need to start child services as well
170                Set<Service> list = new LinkedHashSet<Service>();
171                for (Service service : services) {
172                    list.addAll(ServiceHelper.getChildServices(service));
173                }
174
175                // split into consumers and child services as we need to start the consumers
176                // afterwards to avoid them being active while the others start
177                List<Service> childServices = new ArrayList<Service>();
178                for (Service service : list) {
179
180                    // inject the route
181                    if (service instanceof RouteAware) {
182                        ((RouteAware) service).setRoute(route);
183                    }
184
185                    if (service instanceof Consumer) {
186                        inputs.put(route, (Consumer) service);
187                    } else {
188                        childServices.add(service);
189                    }
190                }
191                startChildService(route, childServices);
192
193                // fire event
194                EventHelper.notifyRouteAdded(camelContext, route);
195            }
196
197            // ensure lifecycle strategy is invoked which among others enlist the route in JMX
198            for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
199                strategy.onRoutesAdd(routes);
200            }
201
202            // add routes to camel context
203            camelContext.addRouteCollection(routes);
204        }
205    }
206
207    protected void doStart() throws Exception {
208        warmUp();
209
210        for (Route route : routes) {
211            // start the route itself
212            ServiceHelper.startService(route);
213
214            // invoke callbacks on route policy
215            if (route.getRouteContext().getRoutePolicyList() != null) {
216                for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
217                    routePolicy.onStart(route);
218                }
219            }
220
221            // fire event
222            EventHelper.notifyRouteStarted(camelContext, route);
223        }
224    }
225
226    protected void doStop() throws Exception {
227
228        // if we are stopping CamelContext then we are shutting down
229        boolean isShutdownCamelContext = camelContext.isStopping();
230
231        if (isShutdownCamelContext || isRemovingRoutes()) {
232            // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown
233            for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
234                strategy.onRoutesRemove(routes);
235            }
236        }
237        
238        for (Route route : routes) {
239            LOG.debug("Stopping services on route: {}", route.getId());
240
241            // gather list of services to stop as we need to start child services as well
242            Set<Service> services = gatherChildServices(route, true);
243
244            // stop services
245            stopChildService(route, services, isShutdownCamelContext);
246
247            // stop the route itself
248            if (isShutdownCamelContext) {
249                ServiceHelper.stopAndShutdownServices(route);
250            } else {
251                ServiceHelper.stopServices(route);
252            }
253
254            // invoke callbacks on route policy
255            if (route.getRouteContext().getRoutePolicyList() != null) {
256                for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
257                    routePolicy.onStop(route);
258                }
259            }
260            // fire event
261            EventHelper.notifyRouteStopped(camelContext, route);
262        }
263        if (isRemovingRoutes()) {
264            camelContext.removeRouteCollection(routes);
265        }
266        // need to warm up again
267        warmUpDone.set(false);
268    }
269
270    @Override
271    protected void doShutdown() throws Exception {
272        for (Route route : routes) {
273            LOG.debug("Shutting down services on route: {}", route.getId());
274
275            // gather list of services to stop as we need to start child services as well
276            Set<Service> services = gatherChildServices(route, true);
277
278            // shutdown services
279            stopChildService(route, services, true);
280
281            // shutdown the route itself
282            ServiceHelper.stopAndShutdownServices(route);
283
284            // endpoints should only be stopped when Camel is shutting down
285            // see more details in the warmUp method
286            ServiceHelper.stopAndShutdownServices(route.getEndpoint());
287            // invoke callbacks on route policy
288            if (route.getRouteContext().getRoutePolicyList() != null) {
289                for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
290                    routePolicy.onRemove(route);
291                }
292            }
293            // fire event
294            EventHelper.notifyRouteRemoved(camelContext, route);
295        }
296
297        // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown
298        for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
299            strategy.onRoutesRemove(routes);
300        }
301        
302        // remove the routes from the inflight registry
303        for (Route route : routes) {
304            camelContext.getInflightRepository().removeRoute(route.getId());
305        }
306
307        // remove the routes from the collections
308        camelContext.removeRouteCollection(routes);
309        
310        // clear inputs on shutdown
311        inputs.clear();
312        warmUpDone.set(false);
313        endpointDone.set(false);
314    }
315
316    @Override
317    protected void doSuspend() throws Exception {
318        // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy
319        // to safely suspend and resume
320        for (Route route : routes) {
321            if (route.getRouteContext().getRoutePolicyList() != null) {
322                for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
323                    routePolicy.onSuspend(route);
324                }
325            }
326        }
327    }
328
329    @Override
330    protected void doResume() throws Exception {
331        // suspend and resume logic is provided by DefaultCamelContext which leverages ShutdownStrategy
332        // to safely suspend and resume
333        for (Route route : routes) {
334            if (route.getRouteContext().getRoutePolicyList() != null) {
335                for (RoutePolicy routePolicy : route.getRouteContext().getRoutePolicyList()) {
336                    routePolicy.onResume(route);
337                }
338            }
339        }
340    }
341
342    protected void startChildService(Route route, List<Service> services) throws Exception {
343        for (Service service : services) {
344            LOG.debug("Starting child service on route: {} -> {}", route.getId(), service);
345            for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
346                strategy.onServiceAdd(camelContext, service, route);
347            }
348            ServiceHelper.startService(service);
349            addChildService(service);
350        }
351    }
352
353    protected void stopChildService(Route route, Set<Service> services, boolean shutdown) throws Exception {
354        for (Service service : services) {
355            LOG.debug("{} child service on route: {} -> {}", new Object[]{shutdown ? "Shutting down" : "Stopping", route.getId(), service});
356            if (service instanceof ErrorHandler) {
357                // special for error handlers
358                for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
359                    strategy.onErrorHandlerRemove(route.getRouteContext(), (Processor) service, route.getRouteContext().getRoute().getErrorHandlerBuilder());
360                }
361            } else {
362                for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) {
363                    strategy.onServiceRemove(camelContext, service, route);
364                }
365            }
366            if (shutdown) {
367                ServiceHelper.stopAndShutdownService(service);
368            } else {
369                ServiceHelper.stopService(service);
370            }
371            removeChildService(service);
372        }
373    }
374
375    /**
376     * Gather all child services
377     */
378    private Set<Service> gatherChildServices(Route route, boolean includeErrorHandler) {
379        // gather list of services to stop as we need to start child services as well
380        List<Service> services = new ArrayList<Service>();
381        services.addAll(route.getServices());
382        // also get route scoped services
383        doGetRouteScopedServices(services, route);
384        Set<Service> list = new LinkedHashSet<Service>();
385        for (Service service : services) {
386            list.addAll(ServiceHelper.getChildServices(service));
387        }
388        if (includeErrorHandler) {
389            // also get route scoped error handler (which must be done last)
390            doGetRouteScopedErrorHandler(list, route);
391        }
392        Set<Service> answer = new LinkedHashSet<Service>();
393        answer.addAll(list);
394        return answer;
395    }
396
397    /**
398     * Gather the route scoped error handler from the given route
399     */
400    private void doGetRouteScopedErrorHandler(Set<Service> services, Route route) {
401        // only include error handlers if they are route scoped
402        boolean includeErrorHandler = !routeDefinition.isContextScopedErrorHandler(route.getRouteContext().getCamelContext());
403        List<Service> extra = new ArrayList<Service>();
404        if (includeErrorHandler) {
405            for (Service service : services) {
406                if (service instanceof Channel) {
407                    Processor eh = ((Channel) service).getErrorHandler();
408                    if (eh != null && eh instanceof Service) {
409                        extra.add((Service) eh);
410                    }
411                }
412            }
413        }
414        if (!extra.isEmpty()) {
415            services.addAll(extra);
416        }
417    }
418
419    /**
420     * Gather all other kind of route scoped services from the given route, except error handler
421     */
422    private void doGetRouteScopedServices(List<Service> services, Route route) {
423        for (ProcessorDefinition<?> output : route.getRouteContext().getRoute().getOutputs()) {
424            if (output instanceof OnExceptionDefinition) {
425                OnExceptionDefinition onExceptionDefinition = (OnExceptionDefinition) output;
426                if (onExceptionDefinition.isRouteScoped()) {
427                    Processor errorHandler = onExceptionDefinition.getErrorHandler(route.getId());
428                    if (errorHandler != null && errorHandler instanceof Service) {
429                        services.add((Service) errorHandler);
430                    }
431                }
432            } else if (output instanceof OnCompletionDefinition) {
433                OnCompletionDefinition onCompletionDefinition = (OnCompletionDefinition) output;
434                if (onCompletionDefinition.isRouteScoped()) {
435                    Processor onCompletionProcessor = onCompletionDefinition.getOnCompletion(route.getId());
436                    if (onCompletionProcessor != null && onCompletionProcessor instanceof Service) {
437                        services.add((Service) onCompletionProcessor);
438                    }
439                }
440            }
441        }
442    }
443
444}