001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.Collections; 022import java.util.Comparator; 023import java.util.HashSet; 024import java.util.LinkedHashMap; 025import java.util.LinkedHashSet; 026import java.util.List; 027import java.util.Locale; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.ExecutorService; 032import java.util.concurrent.Future; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035 036import org.apache.camel.CamelContext; 037import org.apache.camel.CamelContextAware; 038import org.apache.camel.Consumer; 039import org.apache.camel.Route; 040import org.apache.camel.Service; 041import org.apache.camel.ShutdownRoute; 042import org.apache.camel.ShutdownRunningTask; 043import org.apache.camel.Suspendable; 044import org.apache.camel.SuspendableService; 045import org.apache.camel.spi.InflightRepository; 046import org.apache.camel.spi.RouteStartupOrder; 047import org.apache.camel.spi.ShutdownAware; 048import org.apache.camel.spi.ShutdownPrepared; 049import org.apache.camel.spi.ShutdownStrategy; 050import org.apache.camel.support.ServiceSupport; 051import org.apache.camel.util.CollectionStringBuffer; 052import org.apache.camel.util.EventHelper; 053import org.apache.camel.util.ObjectHelper; 054import org.apache.camel.util.ServiceHelper; 055import org.apache.camel.util.StopWatch; 056import org.slf4j.Logger; 057import org.slf4j.LoggerFactory; 058 059/** 060 * Default {@link org.apache.camel.spi.ShutdownStrategy} which uses graceful shutdown. 061 * <p/> 062 * Graceful shutdown ensures that any inflight and pending messages will be taken into account 063 * and it will wait until these exchanges has been completed. 064 * <p/> 065 * This strategy will perform graceful shutdown in two steps: 066 * <ul> 067 * <li>Graceful - By suspending/stopping consumers, and let any in-flight exchanges complete</li> 068 * <li>Forced - After a given period of time, a timeout occurred and if there are still pending 069 * exchanges to complete, then a more aggressive forced strategy is performed.</li> 070 * </ul> 071 * The idea by the <tt>graceful</tt> shutdown strategy, is to stop taking in more new messages, 072 * and allow any existing inflight messages to complete. Then when there is no more inflight messages 073 * then the routes can be fully shutdown. This mean that if there is inflight messages then we will have 074 * to wait for these messages to complete. If they do not complete after a period of time, then a 075 * timeout triggers. And then a more aggressive strategy takes over. 076 * <p/> 077 * The idea by the <tt>forced</tt> shutdown strategy, is to stop continue processing messages. 078 * And force routes and its services to shutdown now. There is a risk when shutting down now, 079 * that some resources is not properly shutdown, which can cause side effects. The timeout value 080 * is by default 300 seconds, but can be customized. 081 * <p/> 082 * As this strategy will politely wait until all exchanges has been completed it can potential wait 083 * for a long time, and hence why a timeout value can be set. When the timeout triggers you can also 084 * specify whether the remainder consumers should be shutdown now or ignore. 085 * <p/> 086 * Will by default use a timeout of 300 seconds (5 minutes) by which it will shutdown now the remaining consumers. 087 * This ensures that when shutting down Camel it at some point eventually will shutdown. 088 * This behavior can of course be configured using the {@link #setTimeout(long)} and 089 * {@link #setShutdownNowOnTimeout(boolean)} methods. 090 * <p/> 091 * Routes will by default be shutdown in the reverse order of which they where started. 092 * You can customize this using the {@link #setShutdownRoutesInReverseOrder(boolean)} method. 093 * <p/> 094 * After route consumers have been shutdown, then any {@link ShutdownPrepared} services on the routes 095 * is being prepared for shutdown, by invoking {@link ShutdownPrepared#prepareShutdown(boolean)} which 096 * <tt>force=false</tt>. 097 * <p/> 098 * Then if a timeout occurred and the strategy has been configured with shutdown-now on timeout, then 099 * the strategy performs a more aggressive forced shutdown, by forcing all consumers to shutdown 100 * and then invokes {@link ShutdownPrepared#prepareShutdown(boolean)} with <tt>force=true</tt> 101 * on the services. This allows the services to know they should force shutdown now. 102 * <p/> 103 * When timeout occurred and a forced shutdown is happening, then there may be threads/tasks which are 104 * still inflight which may be rejected continued being routed. By default this can cause WARN and ERRORs 105 * to be logged. The option {@link #setSuppressLoggingOnTimeout(boolean)} can be used to suppress these 106 * logs, so they are logged at TRACE level instead. 107 * <p/> 108 * Also when a timeout occurred then information about the inflight exchanges is logged, if {@link #isLogInflightExchangesOnTimeout()} 109 * is enabled (is by default). This allows end users to known where these inflight exchanges currently are in the route(s), 110 * and how long time they have been inflight. 111 * <p/> 112 * This information can also be obtained from the {@link org.apache.camel.spi.InflightRepository} 113 * at all time during runtime. 114 * 115 * @version 116 */ 117public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy, CamelContextAware { 118 private static final Logger LOG = LoggerFactory.getLogger(DefaultShutdownStrategy.class); 119 120 private CamelContext camelContext; 121 private ExecutorService executor; 122 private long timeout = 5 * 60; 123 private TimeUnit timeUnit = TimeUnit.SECONDS; 124 private boolean shutdownNowOnTimeout = true; 125 private boolean shutdownRoutesInReverseOrder = true; 126 private boolean suppressLoggingOnTimeout; 127 private boolean logInflightExchangesOnTimeout = true; 128 129 private volatile boolean forceShutdown; 130 private final AtomicBoolean timeoutOccurred = new AtomicBoolean(); 131 private volatile Future<?> currentShutdownTaskFuture; 132 133 public DefaultShutdownStrategy() { 134 } 135 136 public DefaultShutdownStrategy(CamelContext camelContext) { 137 this.camelContext = camelContext; 138 } 139 140 public void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception { 141 shutdown(context, routes, getTimeout(), getTimeUnit()); 142 } 143 144 @Override 145 public void shutdownForced(CamelContext context, List<RouteStartupOrder> routes) throws Exception { 146 doShutdown(context, routes, getTimeout(), getTimeUnit(), false, false, true); 147 } 148 149 public void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception { 150 doShutdown(context, routes, getTimeout(), getTimeUnit(), true, false, false); 151 } 152 153 public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception { 154 doShutdown(context, routes, timeout, timeUnit, false, false, false); 155 } 156 157 public boolean shutdown(CamelContext context, RouteStartupOrder route, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception { 158 List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); 159 routes.add(route); 160 return doShutdown(context, routes, timeout, timeUnit, false, abortAfterTimeout, false); 161 } 162 163 public void suspend(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception { 164 doShutdown(context, routes, timeout, timeUnit, true, false, false); 165 } 166 167 protected boolean doShutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit, 168 boolean suspendOnly, boolean abortAfterTimeout, boolean forceShutdown) throws Exception { 169 170 // timeout must be a positive value 171 if (timeout <= 0) { 172 throw new IllegalArgumentException("Timeout must be a positive value"); 173 } 174 175 // just return if no routes to shutdown 176 if (routes.isEmpty()) { 177 return true; 178 } 179 180 StopWatch watch = new StopWatch(); 181 182 // at first sort according to route startup order 183 List<RouteStartupOrder> routesOrdered = new ArrayList<RouteStartupOrder>(routes); 184 Collections.sort(routesOrdered, new Comparator<RouteStartupOrder>() { 185 public int compare(RouteStartupOrder o1, RouteStartupOrder o2) { 186 return o1.getStartupOrder() - o2.getStartupOrder(); 187 } 188 }); 189 if (shutdownRoutesInReverseOrder) { 190 Collections.reverse(routesOrdered); 191 } 192 193 if (suspendOnly) { 194 LOG.info("Starting to graceful suspend " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")"); 195 } else { 196 LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")"); 197 } 198 199 // use another thread to perform the shutdowns so we can support timeout 200 timeoutOccurred.set(false); 201 currentShutdownTaskFuture = getExecutorService().submit(new ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly, abortAfterTimeout, timeoutOccurred)); 202 try { 203 currentShutdownTaskFuture.get(timeout, timeUnit); 204 } catch (ExecutionException e) { 205 // unwrap execution exception 206 throw ObjectHelper.wrapRuntimeCamelException(e.getCause()); 207 } catch (Exception e) { 208 // either timeout or interrupted exception was thrown so this is okay 209 // as interrupted would mean cancel was called on the currentShutdownTaskFuture to signal a forced timeout 210 211 // we hit a timeout, so set the flag 212 timeoutOccurred.set(true); 213 214 // timeout then cancel the task 215 currentShutdownTaskFuture.cancel(true); 216 217 // signal we are forcing shutdown now, since timeout occurred 218 this.forceShutdown = forceShutdown; 219 220 // if set, stop processing and return false to indicate that the shutdown is aborting 221 if (!forceShutdown && abortAfterTimeout) { 222 LOG.warn("Timeout occurred during graceful shutdown. Aborting the shutdown now." 223 + " Notice: some resources may still be running as graceful shutdown did not complete successfully."); 224 225 // we attempt to force shutdown so lets log the current inflight exchanges which are affected 226 logInflightExchanges(context, routes, isLogInflightExchangesOnTimeout()); 227 228 return false; 229 } else { 230 if (forceShutdown || shutdownNowOnTimeout) { 231 LOG.warn("Timeout occurred during graceful shutdown. Forcing the routes to be shutdown now." 232 + " Notice: some resources may still be running as graceful shutdown did not complete successfully."); 233 234 // we attempt to force shutdown so lets log the current inflight exchanges which are affected 235 logInflightExchanges(context, routes, isLogInflightExchangesOnTimeout()); 236 237 // force the routes to shutdown now 238 shutdownRoutesNow(routesOrdered); 239 240 // now the route consumers has been shutdown, then prepare route services for shutdown now (forced) 241 for (RouteStartupOrder order : routes) { 242 for (Service service : order.getServices()) { 243 prepareShutdown(service, false, true, true, isSuppressLoggingOnTimeout()); 244 } 245 } 246 } else { 247 LOG.warn("Timeout occurred during graceful shutdown. Will ignore shutting down the remainder routes." 248 + " Notice: some resources may still be running as graceful shutdown did not complete successfully."); 249 250 logInflightExchanges(context, routes, isLogInflightExchangesOnTimeout()); 251 } 252 } 253 } finally { 254 currentShutdownTaskFuture = null; 255 } 256 257 // convert to seconds as its easier to read than a big milli seconds number 258 long seconds = TimeUnit.SECONDS.convert(watch.stop(), TimeUnit.MILLISECONDS); 259 260 LOG.info("Graceful shutdown of " + routesOrdered.size() + " routes completed in " + seconds + " seconds"); 261 return true; 262 } 263 264 @Override 265 public boolean forceShutdown(Service service) { 266 return forceShutdown; 267 } 268 269 @Override 270 public boolean hasTimeoutOccurred() { 271 return timeoutOccurred.get(); 272 } 273 274 public void setTimeout(long timeout) { 275 if (timeout <= 0) { 276 throw new IllegalArgumentException("Timeout must be a positive value"); 277 } 278 this.timeout = timeout; 279 } 280 281 public long getTimeout() { 282 return timeout; 283 } 284 285 public void setTimeUnit(TimeUnit timeUnit) { 286 this.timeUnit = timeUnit; 287 } 288 289 public TimeUnit getTimeUnit() { 290 return timeUnit; 291 } 292 293 public void setShutdownNowOnTimeout(boolean shutdownNowOnTimeout) { 294 this.shutdownNowOnTimeout = shutdownNowOnTimeout; 295 } 296 297 public boolean isShutdownNowOnTimeout() { 298 return shutdownNowOnTimeout; 299 } 300 301 public boolean isShutdownRoutesInReverseOrder() { 302 return shutdownRoutesInReverseOrder; 303 } 304 305 public void setShutdownRoutesInReverseOrder(boolean shutdownRoutesInReverseOrder) { 306 this.shutdownRoutesInReverseOrder = shutdownRoutesInReverseOrder; 307 } 308 309 public boolean isSuppressLoggingOnTimeout() { 310 return suppressLoggingOnTimeout; 311 } 312 313 public void setSuppressLoggingOnTimeout(boolean suppressLoggingOnTimeout) { 314 this.suppressLoggingOnTimeout = suppressLoggingOnTimeout; 315 } 316 317 public boolean isLogInflightExchangesOnTimeout() { 318 return logInflightExchangesOnTimeout; 319 } 320 321 public void setLogInflightExchangesOnTimeout(boolean logInflightExchangesOnTimeout) { 322 this.logInflightExchangesOnTimeout = logInflightExchangesOnTimeout; 323 } 324 325 public CamelContext getCamelContext() { 326 return camelContext; 327 } 328 329 public void setCamelContext(CamelContext camelContext) { 330 this.camelContext = camelContext; 331 } 332 333 public Future<?> getCurrentShutdownTaskFuture() { 334 return currentShutdownTaskFuture; 335 } 336 337 /** 338 * Shutdown all the consumers immediately. 339 * 340 * @param routes the routes to shutdown 341 */ 342 protected void shutdownRoutesNow(List<RouteStartupOrder> routes) { 343 for (RouteStartupOrder order : routes) { 344 345 // set the route to shutdown as fast as possible by stopping after 346 // it has completed its current task 347 ShutdownRunningTask current = order.getRoute().getRouteContext().getShutdownRunningTask(); 348 if (current != ShutdownRunningTask.CompleteCurrentTaskOnly) { 349 LOG.debug("Changing shutdownRunningTask from {} to " + ShutdownRunningTask.CompleteCurrentTaskOnly 350 + " on route {} to shutdown faster", current, order.getRoute().getId()); 351 order.getRoute().getRouteContext().setShutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly); 352 } 353 354 for (Consumer consumer : order.getInputs()) { 355 shutdownNow(consumer); 356 } 357 } 358 } 359 360 /** 361 * Shutdown all the consumers immediately. 362 * 363 * @param consumers the consumers to shutdown 364 */ 365 protected void shutdownNow(List<Consumer> consumers) { 366 for (Consumer consumer : consumers) { 367 shutdownNow(consumer); 368 } 369 } 370 371 /** 372 * Shutdown the consumer immediately. 373 * 374 * @param consumer the consumer to shutdown 375 */ 376 protected static void shutdownNow(Consumer consumer) { 377 LOG.trace("Shutting down: {}", consumer); 378 379 // allow us to do custom work before delegating to service helper 380 try { 381 ServiceHelper.stopService(consumer); 382 } catch (Throwable e) { 383 LOG.warn("Error occurred while shutting down route: " + consumer + ". This exception will be ignored.", e); 384 // fire event 385 EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e); 386 } 387 388 LOG.trace("Shutdown complete for: {}", consumer); 389 } 390 391 /** 392 * Suspends/stops the consumer immediately. 393 * 394 * @param consumer the consumer to suspend 395 */ 396 protected static void suspendNow(Consumer consumer) { 397 LOG.trace("Suspending: {}", consumer); 398 399 // allow us to do custom work before delegating to service helper 400 try { 401 ServiceHelper.suspendService(consumer); 402 } catch (Throwable e) { 403 LOG.warn("Error occurred while suspending route: " + consumer + ". This exception will be ignored.", e); 404 // fire event 405 EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e); 406 } 407 408 LOG.trace("Suspend complete for: {}", consumer); 409 } 410 411 private ExecutorService getExecutorService() { 412 if (executor == null) { 413 // use a thread pool that allow to terminate idle threads so they do not hang around forever 414 executor = camelContext.getExecutorServiceManager().newThreadPool(this, "ShutdownTask", 0, 1); 415 } 416 return executor; 417 } 418 419 @Override 420 protected void doStart() throws Exception { 421 ObjectHelper.notNull(camelContext, "CamelContext"); 422 // reset option 423 forceShutdown = false; 424 timeoutOccurred.set(false); 425 } 426 427 @Override 428 protected void doStop() throws Exception { 429 // noop 430 } 431 432 @Override 433 protected void doShutdown() throws Exception { 434 if (executor != null) { 435 // force shutting down as we are shutting down Camel 436 camelContext.getExecutorServiceManager().shutdownNow(executor); 437 // should clear executor so we can restart by creating a new thread pool 438 executor = null; 439 } 440 } 441 442 /** 443 * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean, boolean)} method 444 * on the service if it implement this interface. 445 * 446 * @param service the service 447 * @param forced whether to force shutdown 448 * @param includeChildren whether to prepare the child of the service as well 449 */ 450 private static void prepareShutdown(Service service, boolean suspendOnly, boolean forced, boolean includeChildren, boolean suppressLogging) { 451 Set<Service> list; 452 if (includeChildren) { 453 // include error handlers as we want to prepare them for shutdown as well 454 list = ServiceHelper.getChildServices(service, true); 455 } else { 456 list = new LinkedHashSet<Service>(1); 457 list.add(service); 458 } 459 460 for (Service child : list) { 461 if (child instanceof ShutdownPrepared) { 462 try { 463 LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child); 464 ((ShutdownPrepared) child).prepareShutdown(suspendOnly, forced); 465 } catch (Exception e) { 466 if (suppressLogging) { 467 LOG.trace("Error during prepare shutdown on " + child + ". This exception will be ignored.", e); 468 } else { 469 LOG.warn("Error during prepare shutdown on " + child + ". This exception will be ignored.", e); 470 } 471 } 472 } 473 } 474 } 475 476 /** 477 * Holder for deferred consumers 478 */ 479 static class ShutdownDeferredConsumer { 480 private final Route route; 481 private final Consumer consumer; 482 483 ShutdownDeferredConsumer(Route route, Consumer consumer) { 484 this.route = route; 485 this.consumer = consumer; 486 } 487 488 Route getRoute() { 489 return route; 490 } 491 492 Consumer getConsumer() { 493 return consumer; 494 } 495 } 496 497 /** 498 * Shutdown task which shutdown all the routes in a graceful manner. 499 */ 500 static class ShutdownTask implements Runnable { 501 502 private final CamelContext context; 503 private final List<RouteStartupOrder> routes; 504 private final boolean suspendOnly; 505 private final boolean abortAfterTimeout; 506 private final long timeout; 507 private final TimeUnit timeUnit; 508 private final AtomicBoolean timeoutOccurred; 509 510 ShutdownTask(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit, 511 boolean suspendOnly, boolean abortAfterTimeout, AtomicBoolean timeoutOccurred) { 512 this.context = context; 513 this.routes = routes; 514 this.suspendOnly = suspendOnly; 515 this.abortAfterTimeout = abortAfterTimeout; 516 this.timeout = timeout; 517 this.timeUnit = timeUnit; 518 this.timeoutOccurred = timeoutOccurred; 519 } 520 521 public void run() { 522 // the strategy in this run method is to 523 // 1) go over the routes and shutdown those routes which can be shutdown asap 524 // some routes will be deferred to shutdown at the end, as they are needed 525 // by other routes so they can complete their tasks 526 // 2) wait until all inflight and pending exchanges has been completed 527 // 3) shutdown the deferred routes 528 529 LOG.debug("There are {} routes to {}", routes.size(), suspendOnly ? "suspend" : "shutdown"); 530 531 // list of deferred consumers to shutdown when all exchanges has been completed routed 532 // and thus there are no more inflight exchanges so they can be safely shutdown at that time 533 List<ShutdownDeferredConsumer> deferredConsumers = new ArrayList<ShutdownDeferredConsumer>(); 534 for (RouteStartupOrder order : routes) { 535 536 ShutdownRoute shutdownRoute = order.getRoute().getRouteContext().getShutdownRoute(); 537 ShutdownRunningTask shutdownRunningTask = order.getRoute().getRouteContext().getShutdownRunningTask(); 538 539 if (LOG.isTraceEnabled()) { 540 LOG.trace("{}{} with options [{},{}]", 541 new Object[]{suspendOnly ? "Suspending route: " : "Shutting down route: ", 542 order.getRoute().getId(), shutdownRoute, shutdownRunningTask}); 543 } 544 545 for (Consumer consumer : order.getInputs()) { 546 547 boolean suspend = false; 548 549 // assume we should shutdown if we are not deferred 550 boolean shutdown = shutdownRoute != ShutdownRoute.Defer; 551 552 if (shutdown) { 553 // if we are to shutdown then check whether we can suspend instead as its a more 554 // gentle way to graceful shutdown 555 556 // some consumers do not support shutting down so let them decide 557 // if a consumer is suspendable then prefer to use that and then shutdown later 558 if (consumer instanceof ShutdownAware) { 559 shutdown = !((ShutdownAware) consumer).deferShutdown(shutdownRunningTask); 560 } 561 if (shutdown && consumer instanceof Suspendable) { 562 // we prefer to suspend over shutdown 563 suspend = true; 564 } 565 } 566 567 // log at info level when a route has been shutdown (otherwise log at debug level to not be too noisy) 568 if (suspend) { 569 // only suspend it and then later shutdown it 570 suspendNow(consumer); 571 // add it to the deferred list so the route will be shutdown later 572 deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer)); 573 LOG.debug("Route: {} suspended and shutdown deferred, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint()); 574 } else if (shutdown) { 575 shutdownNow(consumer); 576 LOG.info("Route: {} shutdown complete, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint()); 577 } else { 578 // we will stop it later, but for now it must run to be able to help all inflight messages 579 // be safely completed 580 deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer)); 581 LOG.debug("Route: " + order.getRoute().getId() + (suspendOnly ? " shutdown deferred." : " suspension deferred.")); 582 } 583 } 584 } 585 586 // notify the services we intend to shutdown 587 for (RouteStartupOrder order : routes) { 588 for (Service service : order.getServices()) { 589 // skip the consumer as we handle that specially 590 if (service instanceof Consumer) { 591 continue; 592 } 593 prepareShutdown(service, suspendOnly, false, true, false); 594 } 595 } 596 597 // wait till there are no more pending and inflight messages 598 boolean done = false; 599 long loopDelaySeconds = 1; 600 long loopCount = 0; 601 while (!done && !timeoutOccurred.get()) { 602 int size = 0; 603 // number of inflights per route 604 final Map<String, Integer> routeInflight = new LinkedHashMap<String, Integer>(); 605 606 for (RouteStartupOrder order : routes) { 607 int inflight = context.getInflightRepository().size(order.getRoute().getId()); 608 inflight += getPendingInflightExchanges(order); 609 if (inflight > 0) { 610 String routeId = order.getRoute().getId(); 611 routeInflight.put(routeId, inflight); 612 size += inflight; 613 LOG.trace("{} inflight and pending exchanges for route: {}", inflight, routeId); 614 } 615 } 616 if (size > 0) { 617 try { 618 // build a message with inflight per route 619 CollectionStringBuffer csb = new CollectionStringBuffer(); 620 for (Map.Entry<String, Integer> entry : routeInflight.entrySet()) { 621 String row = String.format("%s = %s", entry.getKey(), entry.getValue()); 622 csb.append(row); 623 } 624 625 String msg = "Waiting as there are still " + size + " inflight and pending exchanges to complete, timeout in " 626 + (TimeUnit.SECONDS.convert(timeout, timeUnit) - (loopCount++ * loopDelaySeconds)) + " seconds."; 627 msg += " Inflights per route: [" + csb.toString() + "]"; 628 629 LOG.info(msg); 630 631 // log verbose if DEBUG logging is enabled 632 logInflightExchanges(context, routes, false); 633 634 Thread.sleep(loopDelaySeconds * 1000); 635 } catch (InterruptedException e) { 636 if (abortAfterTimeout) { 637 LOG.warn("Interrupted while waiting during graceful shutdown, will abort."); 638 return; 639 } else { 640 LOG.warn("Interrupted while waiting during graceful shutdown, will force shutdown now."); 641 break; 642 } 643 } 644 } else { 645 done = true; 646 } 647 } 648 649 // prepare for shutdown 650 for (ShutdownDeferredConsumer deferred : deferredConsumers) { 651 Consumer consumer = deferred.getConsumer(); 652 if (consumer instanceof ShutdownAware) { 653 LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId()); 654 boolean forced = context.getShutdownStrategy().forceShutdown(consumer); 655 boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout(); 656 prepareShutdown(consumer, suspendOnly, forced, false, suppress); 657 LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId()); 658 } 659 } 660 661 // now all messages has been completed then stop the deferred consumers 662 for (ShutdownDeferredConsumer deferred : deferredConsumers) { 663 Consumer consumer = deferred.getConsumer(); 664 if (suspendOnly) { 665 suspendNow(consumer); 666 LOG.info("Route: {} suspend complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint()); 667 } else { 668 shutdownNow(consumer); 669 LOG.info("Route: {} shutdown complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint()); 670 } 671 } 672 673 // now the route consumers has been shutdown, then prepare route services for shutdown 674 for (RouteStartupOrder order : routes) { 675 for (Service service : order.getServices()) { 676 boolean forced = context.getShutdownStrategy().forceShutdown(service); 677 boolean suppress = context.getShutdownStrategy().isSuppressLoggingOnTimeout(); 678 prepareShutdown(service, suspendOnly, forced, true, suppress); 679 } 680 } 681 } 682 683 } 684 685 /** 686 * Calculates the total number of inflight exchanges for the given route 687 * 688 * @param order the route 689 * @return number of inflight exchanges 690 */ 691 protected static int getPendingInflightExchanges(RouteStartupOrder order) { 692 int inflight = 0; 693 694 // the consumer is the 1st service so we always get the consumer 695 // the child services are EIPs in the routes which may also have pending 696 // inflight exchanges (such as the aggregator) 697 for (Service service : order.getServices()) { 698 Set<Service> children = ServiceHelper.getChildServices(service); 699 for (Service child : children) { 700 if (child instanceof ShutdownAware) { 701 inflight += ((ShutdownAware) child).getPendingExchangesSize(); 702 } 703 } 704 } 705 706 return inflight; 707 } 708 709 /** 710 * Logs information about the inflight exchanges 711 * 712 * @param infoLevel <tt>true</tt> to log at INFO level, <tt>false</tt> to log at DEBUG level 713 */ 714 protected static void logInflightExchanges(CamelContext camelContext, List<RouteStartupOrder> routes, boolean infoLevel) { 715 // check if we need to log 716 if (!infoLevel && !LOG.isDebugEnabled()) { 717 return; 718 } 719 720 Collection<InflightRepository.InflightExchange> inflights = camelContext.getInflightRepository().browse(); 721 int size = inflights.size(); 722 if (size == 0) { 723 return; 724 } 725 726 // filter so inflight must start from any of the routes 727 Set<String> routeIds = new HashSet<String>(); 728 for (RouteStartupOrder route : routes) { 729 routeIds.add(route.getRoute().getId()); 730 } 731 Collection<InflightRepository.InflightExchange> filtered = new ArrayList<InflightRepository.InflightExchange>(); 732 for (InflightRepository.InflightExchange inflight : inflights) { 733 String routeId = inflight.getExchange().getFromRouteId(); 734 if (routeIds.contains(routeId)) { 735 filtered.add(inflight); 736 } 737 } 738 739 size = filtered.size(); 740 if (size == 0) { 741 return; 742 } 743 744 StringBuilder sb = new StringBuilder("There are " + size + " inflight exchanges:"); 745 for (InflightRepository.InflightExchange inflight : filtered) { 746 sb.append("\n\tInflightExchange: [exchangeId=").append(inflight.getExchange().getExchangeId()) 747 .append(", fromRouteId=").append(inflight.getExchange().getFromRouteId()) 748 .append(", routeId=").append(inflight.getRouteId()) 749 .append(", nodeId=").append(inflight.getNodeId()) 750 .append(", elapsed=").append(inflight.getElapsed()) 751 .append(", duration=").append(inflight.getDuration()) 752 .append("]"); 753 } 754 755 if (infoLevel) { 756 LOG.info(sb.toString()); 757 } else { 758 LOG.debug(sb.toString()); 759 } 760 } 761 762}