001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.Collections;
022import java.util.Comparator;
023import java.util.HashSet;
024import java.util.LinkedHashMap;
025import java.util.LinkedHashSet;
026import java.util.List;
027import java.util.Locale;
028import java.util.Map;
029import java.util.Set;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.ExecutorService;
032import java.util.concurrent.Future;
033import java.util.concurrent.TimeUnit;
034import java.util.concurrent.atomic.AtomicBoolean;
035
036import org.apache.camel.CamelContext;
037import org.apache.camel.CamelContextAware;
038import org.apache.camel.Consumer;
039import org.apache.camel.Route;
040import org.apache.camel.Service;
041import org.apache.camel.ShutdownRoute;
042import org.apache.camel.ShutdownRunningTask;
043import org.apache.camel.Suspendable;
044import org.apache.camel.SuspendableService;
045import org.apache.camel.spi.InflightRepository;
046import org.apache.camel.spi.RouteStartupOrder;
047import org.apache.camel.spi.ShutdownAware;
048import org.apache.camel.spi.ShutdownPrepared;
049import org.apache.camel.spi.ShutdownStrategy;
050import org.apache.camel.support.ServiceSupport;
051import org.apache.camel.util.CollectionStringBuffer;
052import org.apache.camel.util.EventHelper;
053import org.apache.camel.util.ObjectHelper;
054import org.apache.camel.util.ServiceHelper;
055import org.apache.camel.util.StopWatch;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059/**
060 * Default {@link org.apache.camel.spi.ShutdownStrategy} which uses graceful shutdown.
061 * <p/>
062 * Graceful shutdown ensures that any inflight and pending messages will be taken into account
063 * and it will wait until these exchanges has been completed.
064 * <p/>
065 * This strategy will perform graceful shutdown in two steps:
066 * <ul>
067 *     <li>Graceful - By suspending/stopping consumers, and let any in-flight exchanges complete</li>
068 *     <li>Forced - After a given period of time, a timeout occurred and if there are still pending
069 *     exchanges to complete, then a more aggressive forced strategy is performed.</li>
070 * </ul>
071 * The idea by the <tt>graceful</tt> shutdown strategy, is to stop taking in more new messages,
072 * and allow any existing inflight messages to complete. Then when there is no more inflight messages
073 * then the routes can be fully shutdown. This mean that if there is inflight messages then we will have
074 * to wait for these messages to complete. If they do not complete after a period of time, then a
075 * timeout triggers. And then a more aggressive strategy takes over.
076 * <p/>
077 * The idea by the <tt>forced</tt> shutdown strategy, is to stop continue processing messages.
078 * And force routes and its services to shutdown now. There is a risk when shutting down now,
079 * that some resources is not properly shutdown, which can cause side effects. The timeout value
080 * is by default 300 seconds, but can be customized.
081 * <p/>
082 * As this strategy will politely wait until all exchanges has been completed it can potential wait
083 * for a long time, and hence why a timeout value can be set. When the timeout triggers you can also
084 * specify whether the remainder consumers should be shutdown now or ignore.
085 * <p/>
086 * Will by default use a timeout of 300 seconds (5 minutes) by which it will shutdown now the remaining consumers.
087 * This ensures that when shutting down Camel it at some point eventually will shutdown.
088 * This behavior can of course be configured using the {@link #setTimeout(long)} and
089 * {@link #setShutdownNowOnTimeout(boolean)} methods.
090 * <p/>
091 * Routes will by default be shutdown in the reverse order of which they where started.
092 * You can customize this using the {@link #setShutdownRoutesInReverseOrder(boolean)} method.
093 * <p/>
094 * After route consumers have been shutdown, then any {@link ShutdownPrepared} services on the routes
095 * is being prepared for shutdown, by invoking {@link ShutdownPrepared#prepareShutdown(boolean)} which
096 * <tt>force=false</tt>.
097 * <p/>
098 * Then if a timeout occurred and the strategy has been configured with shutdown-now on timeout, then
099 * the strategy performs a more aggressive forced shutdown, by forcing all consumers to shutdown
100 * and then invokes {@link ShutdownPrepared#prepareShutdown(boolean)} with <tt>force=true</tt>
101 * on the services. This allows the services to know they should force shutdown now.
102 * <p/>
103 * When timeout occurred and a forced shutdown is happening, then there may be threads/tasks which are
104 * still inflight which may be rejected continued being routed. By default this can cause WARN and ERRORs
105 * to be logged. The option {@link #setSuppressLoggingOnTimeout(boolean)} can be used to suppress these
106 * logs, so they are logged at TRACE level instead.
107 * <p/>
108 * Also when a timeout occurred then information about the inflight exchanges is logged, if {@link #isLogInflightExchangesOnTimeout()}
109 * is enabled (is by default). This allows end users to known where these inflight exchanges currently are in the route(s),
110 * and how long time they have been inflight.
111 * <p/>
112 * This information can also be obtained from the {@link org.apache.camel.spi.InflightRepository}
113 * at all time during runtime.
114 *
115 * @version
116 */
117public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy, CamelContextAware {
118    private static final Logger LOG = LoggerFactory.getLogger(DefaultShutdownStrategy.class);
119
120    private CamelContext camelContext;
121    private ExecutorService executor;
122    private long timeout = 5 * 60;
123    private TimeUnit timeUnit = TimeUnit.SECONDS;
124    private boolean shutdownNowOnTimeout = true;
125    private boolean shutdownRoutesInReverseOrder = true;
126    private boolean suppressLoggingOnTimeout;
127    private boolean logInflightExchangesOnTimeout = true;
128
129    private volatile boolean forceShutdown;
130    private final AtomicBoolean timeoutOccurred = new AtomicBoolean();
131    private volatile Future<?> currentShutdownTaskFuture;
132
133    public DefaultShutdownStrategy() {
134    }
135
136    public DefaultShutdownStrategy(CamelContext camelContext) {
137        this.camelContext = camelContext;
138    }
139
140    public void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
141        shutdown(context, routes, getTimeout(), getTimeUnit());
142    }
143
144    @Override
145    public void shutdownForced(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
146        doShutdown(context, routes, getTimeout(), getTimeUnit(), false, false, true);
147    }
148
149    public void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception {
150        doShutdown(context, routes, getTimeout(), getTimeUnit(), true, false, false);
151    }
152
153    public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
154        doShutdown(context, routes, timeout, timeUnit, false, false, false);
155    }
156
157    public boolean shutdown(CamelContext context, RouteStartupOrder route, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception {
158        List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1);
159        routes.add(route);
160        return doShutdown(context, routes, timeout, timeUnit, false, abortAfterTimeout, false);
161    }
162
163    public void suspend(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception {
164        doShutdown(context, routes, timeout, timeUnit, true, false, false);
165    }
166
167    protected boolean doShutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit,
168                                 boolean suspendOnly, boolean abortAfterTimeout, boolean forceShutdown) throws Exception {
169
170        // timeout must be a positive value
171        if (timeout <= 0) {
172            throw new IllegalArgumentException("Timeout must be a positive value");
173        }
174
175        // just return if no routes to shutdown
176        if (routes.isEmpty()) {
177            return true;
178        }
179
180        StopWatch watch = new StopWatch();
181
182        // at first sort according to route startup order
183        List<RouteStartupOrder> routesOrdered = new ArrayList<RouteStartupOrder>(routes);
184        routesOrdered.sort(new Comparator<RouteStartupOrder>() {
185            public int compare(RouteStartupOrder o1, RouteStartupOrder o2) {
186                return o1.getStartupOrder() - o2.getStartupOrder();
187            }
188        });
189        if (shutdownRoutesInReverseOrder) {
190            Collections.reverse(routesOrdered);
191        }
192
193        if (suspendOnly) {
194            LOG.info("Starting to graceful suspend " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")");
195        } else {
196            LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")");
197        }
198
199        // use another thread to perform the shutdowns so we can support timeout
200        timeoutOccurred.set(false);
201        currentShutdownTaskFuture = getExecutorService().submit(new ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly, abortAfterTimeout, timeoutOccurred));
202        try {
203            currentShutdownTaskFuture.get(timeout, timeUnit);
204        } catch (ExecutionException e) {
205            // unwrap execution exception
206            throw ObjectHelper.wrapRuntimeCamelException(e.getCause());
207        } catch (Exception e) {
208            // either timeout or interrupted exception was thrown so this is okay
209            // as interrupted would mean cancel was called on the currentShutdownTaskFuture to signal a forced timeout
210
211            // we hit a timeout, so set the flag
212            timeoutOccurred.set(true);
213
214            // timeout then cancel the task
215            currentShutdownTaskFuture.cancel(true);
216
217            // signal we are forcing shutdown now, since timeout occurred
218            this.forceShutdown = forceShutdown;
219
220            // if set, stop processing and return false to indicate that the shutdown is aborting
221            if (!forceShutdown && abortAfterTimeout) {
222                LOG.warn("Timeout occurred during graceful shutdown. Aborting the shutdown now."
223                        + " Notice: some resources may still be running as graceful shutdown did not complete successfully.");
224
225                // we attempt to force shutdown so lets log the current inflight exchanges which are affected
226                logInflightExchanges(context, routes, isLogInflightExchangesOnTimeout());
227
228                return false;
229            } else {
230                if (forceShutdown || shutdownNowOnTimeout) {
231                    LOG.warn("Timeout occurred during graceful shutdown. Forcing the routes to be shutdown now."
232                            + " Notice: some resources may still be running as graceful shutdown did not complete successfully.");
233
234                    // we attempt to force shutdown so lets log the current inflight exchanges which are affected
235                    logInflightExchanges(context, routes, isLogInflightExchangesOnTimeout());
236
237                    // force the routes to shutdown now
238                    shutdownRoutesNow(routesOrdered);
239
240                    // now the route consumers has been shutdown, then prepare route services for shutdown now (forced)
241                    for (RouteStartupOrder order : routes) {
242                        for (Service service : order.getServices()) {
243                            prepareShutdown(service, false, true, true, isSuppressLoggingOnTimeout());
244                        }
245                    }
246                } else {
247                    LOG.warn("Timeout occurred during graceful shutdown. Will ignore shutting down the remainder routes."
248                            + " Notice: some resources may still be running as graceful shutdown did not complete successfully.");
249
250                    logInflightExchanges(context, routes, isLogInflightExchangesOnTimeout());
251                }
252            }
253        } finally {
254            currentShutdownTaskFuture = null;
255        }
256
257        // convert to seconds as its easier to read than a big milli seconds number
258        long seconds = TimeUnit.SECONDS.convert(watch.stop(), TimeUnit.MILLISECONDS);
259
260        LOG.info("Graceful shutdown of " + routesOrdered.size() + " routes completed in " + seconds + " seconds");
261        return true;
262    }
263
264    @Override
265    public boolean forceShutdown(Service service) {
266        return forceShutdown;
267    }
268
269    @Override
270    public boolean hasTimeoutOccurred() {
271        return timeoutOccurred.get();
272    }
273
274    public void setTimeout(long timeout) {
275        if (timeout <= 0) {
276            throw new IllegalArgumentException("Timeout must be a positive value");
277        }
278        this.timeout = timeout;
279    }
280
281    public long getTimeout() {
282        return timeout;
283    }
284
285    public void setTimeUnit(TimeUnit timeUnit) {
286        this.timeUnit = timeUnit;
287    }
288
289    public TimeUnit getTimeUnit() {
290        return timeUnit;
291    }
292
293    public void setShutdownNowOnTimeout(boolean shutdownNowOnTimeout) {
294        this.shutdownNowOnTimeout = shutdownNowOnTimeout;
295    }
296
297    public boolean isShutdownNowOnTimeout() {
298        return shutdownNowOnTimeout;
299    }
300
301    public boolean isShutdownRoutesInReverseOrder() {
302        return shutdownRoutesInReverseOrder;
303    }
304
305    public void setShutdownRoutesInReverseOrder(boolean shutdownRoutesInReverseOrder) {
306        this.shutdownRoutesInReverseOrder = shutdownRoutesInReverseOrder;
307    }
308
309    public boolean isSuppressLoggingOnTimeout() {
310        return suppressLoggingOnTimeout;
311    }
312
313    public void setSuppressLoggingOnTimeout(boolean suppressLoggingOnTimeout) {
314        this.suppressLoggingOnTimeout = suppressLoggingOnTimeout;
315    }
316
317    public boolean isLogInflightExchangesOnTimeout() {
318        return logInflightExchangesOnTimeout;
319    }
320
321    public void setLogInflightExchangesOnTimeout(boolean logInflightExchangesOnTimeout) {
322        this.logInflightExchangesOnTimeout = logInflightExchangesOnTimeout;
323    }
324
325    public CamelContext getCamelContext() {
326        return camelContext;
327    }
328
329    public void setCamelContext(CamelContext camelContext) {
330        this.camelContext = camelContext;
331    }
332
333    public Future<?> getCurrentShutdownTaskFuture() {
334        return currentShutdownTaskFuture;
335    }
336
337    /**
338     * Shutdown all the consumers immediately.
339     *
340     * @param routes the routes to shutdown
341     */
342    protected void shutdownRoutesNow(List<RouteStartupOrder> routes) {
343        for (RouteStartupOrder order : routes) {
344
345            // set the route to shutdown as fast as possible by stopping after
346            // it has completed its current task
347            ShutdownRunningTask current = order.getRoute().getRouteContext().getShutdownRunningTask();
348            if (current != ShutdownRunningTask.CompleteCurrentTaskOnly) {
349                LOG.debug("Changing shutdownRunningTask from {} to " +  ShutdownRunningTask.CompleteCurrentTaskOnly
350                    + " on route {} to shutdown faster", current, order.getRoute().getId());
351                order.getRoute().getRouteContext().setShutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly);
352            }
353
354            for (Consumer consumer : order.getInputs()) {
355                shutdownNow(consumer);
356            }
357        }
358    }
359
360    /**
361     * Shutdown all the consumers immediately.
362     *
363     * @param consumers the consumers to shutdown
364     */
365    protected void shutdownNow(List<Consumer> consumers) {
366        for (Consumer consumer : consumers) {
367            shutdownNow(consumer);
368        }
369    }
370
371    /**
372     * Shutdown the consumer immediately.
373     *
374     * @param consumer the consumer to shutdown
375     */
376    protected static void shutdownNow(Consumer consumer) {
377        LOG.trace("Shutting down: {}", consumer);
378
379        // allow us to do custom work before delegating to service helper
380        try {
381            ServiceHelper.stopService(consumer);
382        } catch (Throwable e) {
383            LOG.warn("Error occurred while shutting down route: " + consumer + ". This exception will be ignored.", e);
384            // fire event
385            EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e);
386        }
387
388        LOG.trace("Shutdown complete for: {}", consumer);
389    }
390
391    /**
392     * Suspends/stops the consumer immediately.
393     *
394     * @param consumer the consumer to suspend
395     */
396    protected static void suspendNow(Consumer consumer) {
397        LOG.trace("Suspending: {}", consumer);
398
399        // allow us to do custom work before delegating to service helper
400        try {
401            ServiceHelper.suspendService(consumer);
402        } catch (Throwable e) {
403            LOG.warn("Error occurred while suspending route: " + consumer + ". This exception will be ignored.", e);
404            // fire event
405            EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e);
406        }
407
408        LOG.trace("Suspend complete for: {}", consumer);
409    }
410
411    private ExecutorService getExecutorService() {
412        if (executor == null) {
413            // use a thread pool that allow to terminate idle threads so they do not hang around forever
414            executor = camelContext.getExecutorServiceManager().newThreadPool(this, "ShutdownTask", 0, 1);
415        }
416        return executor;
417    }
418
419    @Override
420    protected void doStart() throws Exception {
421        ObjectHelper.notNull(camelContext, "CamelContext");
422        // reset option
423        forceShutdown = false;
424        timeoutOccurred.set(false);
425    }
426
427    @Override
428    protected void doStop() throws Exception {
429        // noop
430    }
431
432    @Override
433    protected void doShutdown() throws Exception {
434        if (executor != null) {
435            // force shutting down as we are shutting down Camel
436            camelContext.getExecutorServiceManager().shutdownNow(executor);
437            // should clear executor so we can restart by creating a new thread pool
438            executor = null;
439        }
440    }
441
442    /**
443     * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean, boolean)} method
444     * on the service if it implement this interface.
445     *
446     * @param service the service
447     * @param forced  whether to force shutdown
448     * @param includeChildren whether to prepare the child of the service as well
449     */
450    private static void prepareShutdown(Service service, boolean suspendOnly, boolean forced, boolean includeChildren, boolean suppressLogging) {
451        Set<Service> list;
452        if (includeChildren) {
453            // include error handlers as we want to prepare them for shutdown as well
454            list = ServiceHelper.getChildServices(service, true);
455        } else {
456            list = new LinkedHashSet<Service>(1);
457            list.add(service);
458        }
459
460        for (Service child : list) {
461            if (child instanceof ShutdownPrepared) {
462                try {
463                    LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child);
464                    ((ShutdownPrepared) child).prepareShutdown(suspendOnly, forced);
465                } catch (Exception e) {
466                    if (suppressLogging) {
467                        LOG.trace("Error during prepare shutdown on " + child + ". This exception will be ignored.", e);
468                    } else {
469                        LOG.warn("Error during prepare shutdown on " + child + ". This exception will be ignored.", e);
470                    }
471                }
472            }
473        }
474    }
475
476    /**
477     * Holder for deferred consumers
478     */
479    static class ShutdownDeferredConsumer {
480        private final Route route;
481        private final Consumer consumer;
482
483        ShutdownDeferredConsumer(Route route, Consumer consumer) {
484            this.route = route;
485            this.consumer = consumer;
486        }
487
488        Route getRoute() {
489            return route;
490        }
491
492        Consumer getConsumer() {
493            return consumer;
494        }
495    }
496
497    /**
498     * Shutdown task which shutdown all the routes in a graceful manner.
499     */
500    static class ShutdownTask implements Runnable {
501
502        private final CamelContext context;
503        private final List<RouteStartupOrder> routes;
504        private final boolean suspendOnly;
505        private final boolean abortAfterTimeout;
506        private final long timeout;
507        private final TimeUnit timeUnit;
508        private final AtomicBoolean timeoutOccurred;
509
510        ShutdownTask(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit,
511                            boolean suspendOnly, boolean abortAfterTimeout, AtomicBoolean timeoutOccurred) {
512            this.context = context;
513            this.routes = routes;
514            this.suspendOnly = suspendOnly;
515            this.abortAfterTimeout = abortAfterTimeout;
516            this.timeout = timeout;
517            this.timeUnit = timeUnit;
518            this.timeoutOccurred = timeoutOccurred;
519        }
520
521        public void run() {
522            // the strategy in this run method is to
523            // 1) go over the routes and shutdown those routes which can be shutdown asap
524            //    some routes will be deferred to shutdown at the end, as they are needed
525            //    by other routes so they can complete their tasks
526            // 2) wait until all inflight and pending exchanges has been completed
527            // 3) shutdown the deferred routes
528
529            LOG.debug("There are {} routes to {}", routes.size(), suspendOnly ? "suspend" : "shutdown");
530
531            // list of deferred consumers to shutdown when all exchanges has been completed routed
532            // and thus there are no more inflight exchanges so they can be safely shutdown at that time
533            List<ShutdownDeferredConsumer> deferredConsumers = new ArrayList<ShutdownDeferredConsumer>();
534            for (RouteStartupOrder order : routes) {
535
536                ShutdownRoute shutdownRoute = order.getRoute().getRouteContext().getShutdownRoute();
537                ShutdownRunningTask shutdownRunningTask = order.getRoute().getRouteContext().getShutdownRunningTask();
538
539                if (LOG.isTraceEnabled()) {
540                    LOG.trace("{}{} with options [{},{}]",
541                            new Object[]{suspendOnly ? "Suspending route: " : "Shutting down route: ",
542                                order.getRoute().getId(), shutdownRoute, shutdownRunningTask});
543                }
544
545                for (Consumer consumer : order.getInputs()) {
546
547                    boolean suspend = false;
548
549                    // assume we should shutdown if we are not deferred
550                    boolean shutdown = shutdownRoute != ShutdownRoute.Defer;
551
552                    if (shutdown) {
553                        // if we are to shutdown then check whether we can suspend instead as its a more
554                        // gentle way to graceful shutdown
555
556                        // some consumers do not support shutting down so let them decide
557                        // if a consumer is suspendable then prefer to use that and then shutdown later
558                        if (consumer instanceof ShutdownAware) {
559                            shutdown = !((ShutdownAware) consumer).deferShutdown(shutdownRunningTask);
560                        }
561                        if (shutdown && consumer instanceof Suspendable) {
562                            // we prefer to suspend over shutdown
563                            suspend = true;
564                        }
565                    }
566
567                    // log at info level when a route has been shutdown (otherwise log at debug level to not be too noisy)
568                    if (suspend) {
569                        // only suspend it and then later shutdown it
570                        suspendNow(consumer);
571                        // add it to the deferred list so the route will be shutdown later
572                        deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
573                        LOG.debug("Route: {} suspended and shutdown deferred, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint());
574                    } else if (shutdown) {
575                        shutdownNow(consumer);
576                        LOG.info("Route: {} shutdown complete, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint());
577                    } else {
578                        // we will stop it later, but for now it must run to be able to help all inflight messages
579                        // be safely completed
580                        deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer));
581                        LOG.debug("Route: " + order.getRoute().getId() + (suspendOnly ? " shutdown deferred." : " suspension deferred."));
582                    }
583                }
584            }
585
586            // notify the services we intend to shutdown
587            for (RouteStartupOrder order : routes) {
588                for (Service service : order.getServices()) {
589                    // skip the consumer as we handle that specially
590                    if (service instanceof Consumer) {
591                        continue;
592                    }
593                    prepareShutdown(service, suspendOnly, false, true, false);
594                }
595            }
596
597            // wait till there are no more pending and inflight messages
598            boolean done = false;
599            long loopDelaySeconds = 1;
600            long loopCount = 0;
601            while (!done && !timeoutOccurred.get()) {
602                int size = 0;
603                // number of inflights per route
604                final Map<String, Integer> routeInflight = new LinkedHashMap<String, Integer>();
605
606                for (RouteStartupOrder order : routes) {
607                    int inflight = context.getInflightRepository().size(order.getRoute().getId());
608                    inflight += getPendingInflightExchanges(order);
609                    if (inflight > 0) {
610                        String routeId = order.getRoute().getId();
611                        routeInflight.put(routeId, inflight);
612                        size += inflight;
613                        LOG.trace("{} inflight and pending exchanges for route: {}", inflight, routeId);
614                    }
615                }
616                if (size > 0) {
617                    try {
618                        // build a message with inflight per route
619                        CollectionStringBuffer csb = new CollectionStringBuffer();
620                        for (Map.Entry<String, Integer> entry : routeInflight.entrySet()) {
621                            String row = String.format("%s = %s", entry.getKey(), entry.getValue());
622                            csb.append(row);
623                        }
624
625                        String msg = "Waiting as there are still " + size + " inflight and pending exchanges to complete, timeout in "
626                                + (TimeUnit.SECONDS.convert(timeout, timeUnit) - (loopCount++ * loopDelaySeconds)) + " seconds.";
627                        msg += " Inflights per route: [" + csb.toString() + "]";
628
629                        LOG.info(msg);
630
631                        // log verbose if DEBUG logging is enabled
632                        logInflightExchanges(context, routes, false);
633
634                        Thread.sleep(loopDelaySeconds * 1000);
635                    } catch (InterruptedException e) {
636                        if (abortAfterTimeout) {
637                            LOG.warn("Interrupted while waiting during graceful shutdown, will abort.");
638                            return;
639                        } else {
640                            LOG.warn("Interrupted while waiting during graceful shutdown, will force shutdown now.");
641                            break;
642                        }
643                    }
644                } else {
645                    done = true;
646                }
647            }
648
649            // prepare for shutdown
650            for (ShutdownDeferredConsumer deferred : deferredConsumers) {
651                Consumer consumer = deferred.getConsumer();
652                if (consumer instanceof ShutdownAware) {
653                    LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId());
654                    boolean forced = context.getShutdownStrategy().forceShutdown(consumer);
655                    boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout();
656                    prepareShutdown(consumer, suspendOnly, forced, false, suppress);
657                    LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId());
658                }
659            }
660
661            // now all messages has been completed then stop the deferred consumers
662            for (ShutdownDeferredConsumer deferred : deferredConsumers) {
663                Consumer consumer = deferred.getConsumer();
664                if (suspendOnly) {
665                    suspendNow(consumer);
666                    LOG.info("Route: {} suspend complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint());
667                } else {
668                    shutdownNow(consumer);
669                    LOG.info("Route: {} shutdown complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint());
670                }
671            }
672
673            // now the route consumers has been shutdown, then prepare route services for shutdown
674            for (RouteStartupOrder order : routes) {
675                for (Service service : order.getServices()) {
676                    boolean forced = context.getShutdownStrategy().forceShutdown(service);
677                    boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout();
678                    prepareShutdown(service, suspendOnly, forced, true, suppress);
679                }
680            }
681        }
682
683    }
684
685    /**
686     * Calculates the total number of inflight exchanges for the given route
687     *
688     * @param order the route
689     * @return number of inflight exchanges
690     */
691    protected static int getPendingInflightExchanges(RouteStartupOrder order) {
692        int inflight = 0;
693
694        // the consumer is the 1st service so we always get the consumer
695        // the child services are EIPs in the routes which may also have pending
696        // inflight exchanges (such as the aggregator)
697        for (Service service : order.getServices()) {
698            Set<Service> children = ServiceHelper.getChildServices(service);
699            for (Service child : children) {
700                if (child instanceof ShutdownAware) {
701                    inflight += ((ShutdownAware) child).getPendingExchangesSize();
702                }
703            }
704        }
705
706        return inflight;
707    }
708
709    /**
710     * Logs information about the inflight exchanges
711     *
712     * @param infoLevel <tt>true</tt> to log at INFO level, <tt>false</tt> to log at DEBUG level
713     */
714    protected static void logInflightExchanges(CamelContext camelContext, List<RouteStartupOrder> routes, boolean infoLevel) {
715        // check if we need to log
716        if (!infoLevel && !LOG.isDebugEnabled()) {
717            return;
718        }
719
720        Collection<InflightRepository.InflightExchange> inflights = camelContext.getInflightRepository().browse();
721        int size = inflights.size();
722        if (size == 0) {
723            return;
724        }
725
726        // filter so inflight must start from any of the routes
727        Set<String> routeIds = new HashSet<String>();
728        for (RouteStartupOrder route : routes) {
729            routeIds.add(route.getRoute().getId());
730        }
731        Collection<InflightRepository.InflightExchange> filtered = new ArrayList<InflightRepository.InflightExchange>();
732        for (InflightRepository.InflightExchange inflight : inflights) {
733            String routeId = inflight.getExchange().getFromRouteId();
734            if (routeIds.contains(routeId)) {
735                filtered.add(inflight);
736            }
737        }
738
739        size = filtered.size();
740        if (size == 0) {
741            return;
742        }
743
744        StringBuilder sb = new StringBuilder("There are " + size + " inflight exchanges:");
745        for (InflightRepository.InflightExchange inflight : filtered) {
746            sb.append("\n\tInflightExchange: [exchangeId=").append(inflight.getExchange().getExchangeId())
747                    .append(", fromRouteId=").append(inflight.getExchange().getFromRouteId())
748                    .append(", routeId=").append(inflight.getRouteId())
749                    .append(", nodeId=").append(inflight.getNodeId())
750                    .append(", elapsed=").append(inflight.getElapsed())
751                    .append(", duration=").append(inflight.getDuration())
752                    .append("]");
753        }
754
755        if (infoLevel) {
756            LOG.info(sb.toString());
757        } else {
758            LOG.debug(sb.toString());
759        }
760    }
761
762}