001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.ArrayList;
020    import java.util.Collections;
021    import java.util.Comparator;
022    import java.util.LinkedHashSet;
023    import java.util.List;
024    import java.util.Locale;
025    import java.util.Set;
026    import java.util.concurrent.ExecutionException;
027    import java.util.concurrent.ExecutorService;
028    import java.util.concurrent.Future;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.TimeoutException;
031    
032    import org.apache.camel.CamelContext;
033    import org.apache.camel.CamelContextAware;
034    import org.apache.camel.Consumer;
035    import org.apache.camel.Route;
036    import org.apache.camel.Service;
037    import org.apache.camel.ShutdownRoute;
038    import org.apache.camel.ShutdownRunningTask;
039    import org.apache.camel.SuspendableService;
040    import org.apache.camel.spi.RouteStartupOrder;
041    import org.apache.camel.spi.ShutdownAware;
042    import org.apache.camel.spi.ShutdownPrepared;
043    import org.apache.camel.spi.ShutdownStrategy;
044    import org.apache.camel.support.ServiceSupport;
045    import org.apache.camel.util.EventHelper;
046    import org.apache.camel.util.ObjectHelper;
047    import org.apache.camel.util.ServiceHelper;
048    import org.apache.camel.util.StopWatch;
049    import org.slf4j.Logger;
050    import org.slf4j.LoggerFactory;
051    
052    /**
053     * Default {@link org.apache.camel.spi.ShutdownStrategy} which uses graceful shutdown.
054     * <p/>
055     * Graceful shutdown ensures that any inflight and pending messages will be taken into account
056     * and it will wait until these exchanges has been completed.
057     * <p/>
058     * This strategy will perform graceful shutdown in two steps:
059     * <ul>
060     *     <li>Graceful - By suspending/stopping consumers, and let any in-flight exchanges complete</li>
061     *     <li>Forced - After a given period of time, a timeout occurred and if there are still pending
062     *     exchanges to complete, then a more aggressive forced strategy is performed.</li>
063     * </ul>
064     * The idea by the <tt>graceful</tt> shutdown strategy, is to stop taking in more new messages,
065     * and allow any existing inflight messages to complete. Then when there is no more inflight messages
066     * then the routes can be fully shutdown. This mean that if there is inflight messages then we will have
067     * to wait for these messages to complete. If they do not complete after a period of time, then a
068     * timeout triggers. And then a more aggressive strategy takes over.
069     * <p/>
070     * The idea by the <tt>forced</tt> shutdown strategy, is to stop continue processing messages.
071     * And force routes and its services to shutdown now. There is a risk when shutting down now,
072     * that some resources is not properly shutdown, which can cause side effects. The timeout value
073     * is by default 300 seconds, but can be customized. 
074     * <p/>
075     * As this strategy will politely wait until all exchanges has been completed it can potential wait
076     * for a long time, and hence why a timeout value can be set. When the timeout triggers you can also
077     * specify whether the remainder consumers should be shutdown now or ignore.
078     * <p/>
079     * Will by default use a timeout of 300 seconds (5 minutes) by which it will shutdown now the remaining consumers.
080     * This ensures that when shutting down Camel it at some point eventually will shutdown.
081     * This behavior can of course be configured using the {@link #setTimeout(long)} and
082     * {@link #setShutdownNowOnTimeout(boolean)} methods.
083     * <p/>
084     * Routes will by default be shutdown in the reverse order of which they where started.
085     * You can customize this using the {@link #setShutdownRoutesInReverseOrder(boolean)} method.
086     * <p/>
087     * After route consumers have been shutdown, then any {@link ShutdownPrepared} services on the routes
088     * is being prepared for shutdown, by invoking {@link ShutdownPrepared#prepareShutdown(boolean)} which
089     * <tt>force=false</tt>.
090     * <p/>
091     * Then if a timeout occurred and the strategy has been configured with shutdown-now on timeout, then
092     * the strategy performs a more aggressive forced shutdown, by forcing all consumers to shutdown
093     * and then invokes {@link ShutdownPrepared#prepareShutdown(boolean)} with <tt>force=true</tt>
094     * on the services. This allows the services to know they should force shutdown now.
095     *
096     * @version 
097     */
098    public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy, CamelContextAware {
099        private static final transient Logger LOG = LoggerFactory.getLogger(DefaultShutdownStrategy.class);
100    
101        private CamelContext camelContext;
102        private ExecutorService executor;
103        private long timeout = 5 * 60;
104        private TimeUnit timeUnit = TimeUnit.SECONDS;
105        private boolean shutdownNowOnTimeout = true;
106        private boolean shutdownRoutesInReverseOrder = true;
107        private volatile boolean forceShutdown;
108    
109        public DefaultShutdownStrategy() {
110        }
111    
112        public DefaultShutdownStrategy(CamelContext camelContext) {
113            this.camelContext = camelContext;
114        }
115    
116        public void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
117            shutdown(context, routes, getTimeout(), getTimeUnit());
118        }
119    
120        @Override
121        public void shutdownForced(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
122            doShutdown(context, routes, getTimeout(), getTimeUnit(), false, false, true);
123        }
124    
125        public void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
126            doShutdown(context, routes, getTimeout(), getTimeUnit(), true, false, false);
127        }
128    
129        public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
130            doShutdown(context, routes, timeout, timeUnit, false, false, false);
131        }
132    
133        public boolean shutdown(CamelContext context, RouteStartupOrder route, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception {
134            List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
135            routes.add(route);
136            return doShutdown(context, routes, timeout, timeUnit, false, abortAfterTimeout, false);
137        }
138    
139        public void suspend(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
140            doShutdown(context, routes, timeout, timeUnit, true, false, false);
141        }
142    
143        protected boolean doShutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit,
144                                     boolean suspendOnly, boolean abortAfterTimeout, boolean forceShutdown) throws Exception {
145    
146            // just return if no routes to shutdown
147            if (routes.isEmpty()) {
148                return true;
149            }
150    
151            StopWatch watch = new StopWatch();
152    
153            // at first sort according to route startup order
154            List<RouteStartupOrder> routesOrdered = new ArrayList<RouteStartupOrder>(routes);
155            Collections.sort(routesOrdered, new Comparator<RouteStartupOrder>() {
156                public int compare(RouteStartupOrder o1, RouteStartupOrder o2) {
157                    return o1.getStartupOrder() - o2.getStartupOrder();
158                }
159            });
160            if (shutdownRoutesInReverseOrder) {
161                Collections.reverse(routesOrdered);
162            }
163    
164            if (timeout > 0) {
165                LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")");
166            } else {
167                LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (no timeout)");
168            }
169    
170            // use another thread to perform the shutdowns so we can support timeout
171            Future<?> future = getExecutorService().submit(new ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly, abortAfterTimeout));
172            try {
173                if (timeout > 0) {
174                    future.get(timeout, timeUnit);
175                } else {
176                    future.get();
177                }
178            } catch (TimeoutException e) {
179                // timeout then cancel the task
180                future.cancel(true);
181    
182                // signal we are forcing shutdown now, since timeout occurred
183                this.forceShutdown = forceShutdown;
184    
185                // if set, stop processing and return false to indicate that the shutdown is aborting
186                if (!forceShutdown && abortAfterTimeout) {
187                    LOG.warn("Timeout occurred. Aborting the shutdown now.");
188                    return false;
189                } else {
190                    if (forceShutdown || shutdownNowOnTimeout) {
191                        LOG.warn("Timeout occurred. Now forcing the routes to be shutdown now.");
192                        // force the routes to shutdown now
193                        shutdownRoutesNow(routesOrdered);
194    
195                        // now the route consumers has been shutdown, then prepare route services for shutdown now (forced)
196                        for (RouteStartupOrder order : routes) {
197                            for (Service service : order.getServices()) {
198                                prepareShutdown(service, true, true);
199                            }
200                        }
201                    } else {
202                        LOG.warn("Timeout occurred. Will ignore shutting down the remainder routes.");
203                    }
204                }
205            } catch (ExecutionException e) {
206                // unwrap execution exception
207                throw ObjectHelper.wrapRuntimeCamelException(e.getCause());
208            }
209    
210            // convert to seconds as its easier to read than a big milli seconds number
211            long seconds = TimeUnit.SECONDS.convert(watch.stop(), TimeUnit.MILLISECONDS);
212    
213            LOG.info("Graceful shutdown of " + routesOrdered.size() + " routes completed in " + seconds + " seconds");
214            return true;
215        }
216    
217        @Override
218        public boolean forceShutdown(Service service) {
219            return forceShutdown;
220        }
221    
222        public void setTimeout(long timeout) {
223            this.timeout = timeout;
224        }
225    
226        public long getTimeout() {
227            return timeout;
228        }
229    
230        public void setTimeUnit(TimeUnit timeUnit) {
231            this.timeUnit = timeUnit;
232        }
233    
234        public TimeUnit getTimeUnit() {
235            return timeUnit;
236        }
237    
238        public void setShutdownNowOnTimeout(boolean shutdownNowOnTimeout) {
239            this.shutdownNowOnTimeout = shutdownNowOnTimeout;
240        }
241    
242        public boolean isShutdownNowOnTimeout() {
243            return shutdownNowOnTimeout;
244        }
245    
246        public boolean isShutdownRoutesInReverseOrder() {
247            return shutdownRoutesInReverseOrder;
248        }
249    
250        public void setShutdownRoutesInReverseOrder(boolean shutdownRoutesInReverseOrder) {
251            this.shutdownRoutesInReverseOrder = shutdownRoutesInReverseOrder;
252        }
253    
254        public CamelContext getCamelContext() {
255            return camelContext;
256        }
257    
258        public void setCamelContext(CamelContext camelContext) {
259            this.camelContext = camelContext;
260        }
261    
262        /**
263         * Shutdown all the consumers immediately.
264         *
265         * @param routes the routes to shutdown
266         */
267        protected void shutdownRoutesNow(List<RouteStartupOrder> routes) {
268            for (RouteStartupOrder order : routes) {
269    
270                // set the route to shutdown as fast as possible by stopping after
271                // it has completed its current task
272                ShutdownRunningTask current = order.getRoute().getRouteContext().getShutdownRunningTask();
273                if (current != ShutdownRunningTask.CompleteCurrentTaskOnly) {
274                    LOG.debug("Changing shutdownRunningTask from {} to " +  ShutdownRunningTask.CompleteCurrentTaskOnly
275                        + " on route {} to shutdown faster", current, order.getRoute().getId());
276                    order.getRoute().getRouteContext().setShutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly);
277                }
278    
279                for (Consumer consumer : order.getInputs()) {
280                    shutdownNow(consumer);
281                }
282            }
283        }
284    
285        /**
286         * Shutdown all the consumers immediately.
287         *
288         * @param consumers the consumers to shutdown
289         */
290        protected void shutdownNow(List<Consumer> consumers) {
291            for (Consumer consumer : consumers) {
292                shutdownNow(consumer);
293            }
294        }
295    
296        /**
297         * Shutdown the consumer immediately.
298         *
299         * @param consumer the consumer to shutdown
300         */
301        protected static void shutdownNow(Consumer consumer) {
302            LOG.trace("Shutting down: {}", consumer);
303    
304            // allow us to do custom work before delegating to service helper
305            try {
306                ServiceHelper.stopService(consumer);
307            } catch (Throwable e) {
308                LOG.warn("Error occurred while shutting down route: " + consumer + ". This exception will be ignored.", e);
309                // fire event
310                EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e);
311            }
312    
313            LOG.trace("Shutdown complete for: {}", consumer);
314        }
315    
316        /**
317         * Suspends/stops the consumer immediately.
318         *
319         * @param consumer the consumer to suspend
320         */
321        protected static void suspendNow(Consumer consumer) {
322            LOG.trace("Suspending: {}", consumer);
323    
324            // allow us to do custom work before delegating to service helper
325            try {
326                ServiceHelper.suspendService(consumer);
327            } catch (Throwable e) {
328                LOG.warn("Error occurred while suspending route: " + consumer + ". This exception will be ignored.", e);
329                // fire event
330                EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e);
331            }
332    
333            LOG.trace("Suspend complete for: {}", consumer);
334        }
335    
336        private ExecutorService getExecutorService() {
337            if (executor == null) {
338                executor = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "ShutdownTask");
339            }
340            return executor;
341        }
342    
343        @Override
344        protected void doStart() throws Exception {
345            ObjectHelper.notNull(camelContext, "CamelContext");
346            // reset option
347            forceShutdown = false;
348        }
349    
350        @Override
351        protected void doStop() throws Exception {
352            // noop
353        }
354    
355        @Override
356        protected void doShutdown() throws Exception {
357            if (executor != null) {
358                camelContext.getExecutorServiceManager().shutdownNow(executor);
359                // should clear executor so we can restart by creating a new thread pool
360                executor = null;
361            }
362        }
363    
364        /**
365         * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean)} method
366         * on the service if it implement this interface.
367         * 
368         * @param service the service
369         * @param forced  whether to force shutdown
370         * @param includeChildren whether to prepare the child of the service as well
371         */
372        private static void prepareShutdown(Service service, boolean forced, boolean includeChildren) {
373            Set<Service> list;
374            if (includeChildren) {
375                list = ServiceHelper.getChildServices(service);
376            } else {
377                list = new LinkedHashSet<Service>(1);
378                list.add(service);
379            }
380    
381            for (Service child : list) {
382                if (child instanceof ShutdownPrepared) {
383                    try {
384                        LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child);
385                        ((ShutdownPrepared) child).prepareShutdown(forced);
386                    } catch (Exception e) {
387                        LOG.warn("Error during prepare shutdown on " + child + ". This exception will be ignored.", e);
388                    }
389                }
390            }
391        }
392    
393        /**
394         * Holder for deferred consumers
395         */
396        static class ShutdownDeferredConsumer {
397            private final Route route;
398            private final Consumer consumer;
399    
400            ShutdownDeferredConsumer(Route route, Consumer consumer) {
401                this.route = route;
402                this.consumer = consumer;
403            }
404    
405            Route getRoute() {
406                return route;
407            }
408    
409            Consumer getConsumer() {
410                return consumer;
411            }
412        }
413    
414        /**
415         * Shutdown task which shutdown all the routes in a graceful manner.
416         */
417        static class ShutdownTask implements Runnable {
418    
419            private final CamelContext context;
420            private final List<RouteStartupOrder> routes;
421            private final boolean suspendOnly;
422            private final boolean abortAfterTimeout;
423            private final long timeout;
424            private final TimeUnit timeUnit;
425    
426            public ShutdownTask(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit,
427                                boolean suspendOnly, boolean abortAfterTimeout) {
428                this.context = context;
429                this.routes = routes;
430                this.suspendOnly = suspendOnly;
431                this.abortAfterTimeout = abortAfterTimeout;
432                this.timeout = timeout;
433                this.timeUnit = timeUnit;
434            }
435    
436            public void run() {
437                // the strategy in this run method is to
438                // 1) go over the routes and shutdown those routes which can be shutdown asap
439                //    some routes will be deferred to shutdown at the end, as they are needed
440                //    by other routes so they can complete their tasks
441                // 2) wait until all inflight and pending exchanges has been completed
442                // 3) shutdown the deferred routes
443    
444                LOG.debug("There are {} routes to {}", routes.size(), suspendOnly ? "suspend" : "shutdown");
445    
446                // list of deferred consumers to shutdown when all exchanges has been completed routed
447                // and thus there are no more inflight exchanges so they can be safely shutdown at that time
448                List<ShutdownDeferredConsumer> deferredConsumers = new ArrayList<ShutdownDeferredConsumer>();
449                for (RouteStartupOrder order : routes) {
450    
451                    ShutdownRoute shutdownRoute = order.getRoute().getRouteContext().getShutdownRoute();
452                    ShutdownRunningTask shutdownRunningTask = order.getRoute().getRouteContext().getShutdownRunningTask();
453    
454                    if (LOG.isTraceEnabled()) {
455                        LOG.trace("{}{} with options [{},{}]",
456                                new Object[]{suspendOnly ? "Suspending route: " : "Shutting down route: ",
457                                    order.getRoute().getId(), shutdownRoute, shutdownRunningTask});
458                    }
459    
460                    for (Consumer consumer : order.getInputs()) {
461    
462                        boolean suspend = false;
463    
464                        // assume we should shutdown if we are not deferred
465                        boolean shutdown = shutdownRoute != ShutdownRoute.Defer;
466    
467                        if (shutdown) {
468                            // if we are to shutdown then check whether we can suspend instead as its a more
469                            // gentle way to graceful shutdown
470    
471                            // some consumers do not support shutting down so let them decide
472                            // if a consumer is suspendable then prefer to use that and then shutdown later
473                            if (consumer instanceof ShutdownAware) {
474                                shutdown = !((ShutdownAware) consumer).deferShutdown(shutdownRunningTask);
475                            }
476                            if (shutdown && consumer instanceof SuspendableService) {
477                                // we prefer to suspend over shutdown
478                                suspend = true;
479                            }
480                        }
481    
482                        // log at info level when a route has been shutdown (otherwise log at debug level to not be too noisy)
483                        if (suspend) {
484                            // only suspend it and then later shutdown it
485                            suspendNow(consumer);
486                            // add it to the deferred list so the route will be shutdown later
487                            deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
488                            LOG.debug("Route: {} suspended and shutdown deferred, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint());
489                        } else if (shutdown) {
490                            shutdownNow(consumer);
491                            LOG.info("Route: {} shutdown complete, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint());
492                        } else {
493                            // we will stop it later, but for now it must run to be able to help all inflight messages
494                            // be safely completed
495                            deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
496                            LOG.debug("Route: " + order.getRoute().getId() + (suspendOnly ? " shutdown deferred." : " suspension deferred."));
497                        }
498                    }
499                }
500    
501                // wait till there are no more pending and inflight messages
502                boolean done = false;
503                long loopDelaySeconds = 1;
504                long loopCount = 0;
505                while (!done) {
506                    int size = 0;
507                    for (RouteStartupOrder order : routes) {
508                        int inflight = context.getInflightRepository().size(order.getRoute().getId());
509                        for (Consumer consumer : order.getInputs()) {
510                            // include any additional pending exchanges on some consumers which may have internal
511                            // memory queues such as seda
512                            if (consumer instanceof ShutdownAware) {
513                                inflight += ((ShutdownAware) consumer).getPendingExchangesSize();
514                            }
515                        }
516                        if (inflight > 0) {
517                            size += inflight;
518                            LOG.trace("{} inflight and pending exchanges for route: {}", inflight, order.getRoute().getId());
519                        }
520                    }
521                    if (size > 0) {
522                        try {
523                            LOG.info("Waiting as there are still " + size + " inflight and pending exchanges to complete, timeout in "
524                                     + (TimeUnit.SECONDS.convert(timeout, timeUnit) - (loopCount++ * loopDelaySeconds)) + " seconds.");
525                            Thread.sleep(loopDelaySeconds * 1000);
526                        } catch (InterruptedException e) {
527                            if (abortAfterTimeout) {
528                                LOG.warn("Interrupted while waiting during graceful shutdown, will abort.");
529                                return;
530                            } else {
531                                LOG.warn("Interrupted while waiting during graceful shutdown, will force shutdown now.");
532                                break;
533                            }
534                        }
535                    } else {
536                        done = true;
537                    }
538                }
539    
540                // prepare for shutdown
541                for (ShutdownDeferredConsumer deferred : deferredConsumers) {
542                    Consumer consumer = deferred.getConsumer();
543                    if (consumer instanceof ShutdownAware) {
544                        LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId());
545                        boolean forced = context.getShutdownStrategy().forceShutdown(consumer);
546                        prepareShutdown(consumer, forced, false);
547                        LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId());
548                    }
549                }
550    
551                // now all messages has been completed then stop the deferred consumers
552                for (ShutdownDeferredConsumer deferred : deferredConsumers) {
553                    Consumer consumer = deferred.getConsumer();
554                    if (suspendOnly) {
555                        suspendNow(consumer);
556                        LOG.info("Route: {} suspend complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint());
557                    } else {
558                        shutdownNow(consumer);
559                        LOG.info("Route: {} shutdown complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint());
560                    }
561                }
562    
563                // now the route consumers has been shutdown, then prepare route services for shutdown
564                for (RouteStartupOrder order : routes) {
565                    for (Service service : order.getServices()) {
566                        boolean forced = context.getShutdownStrategy().forceShutdown(service);
567                        prepareShutdown(service, forced, true);
568                    }
569                }
570            }
571    
572        }
573    
574    }