001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.Comparator; 022 import java.util.LinkedHashSet; 023 import java.util.List; 024 import java.util.Locale; 025 import java.util.Set; 026 import java.util.concurrent.ExecutionException; 027 import java.util.concurrent.ExecutorService; 028 import java.util.concurrent.Future; 029 import java.util.concurrent.TimeUnit; 030 import java.util.concurrent.TimeoutException; 031 032 import org.apache.camel.CamelContext; 033 import org.apache.camel.CamelContextAware; 034 import org.apache.camel.Consumer; 035 import org.apache.camel.Route; 036 import org.apache.camel.Service; 037 import org.apache.camel.ShutdownRoute; 038 import org.apache.camel.ShutdownRunningTask; 039 import org.apache.camel.SuspendableService; 040 import org.apache.camel.spi.RouteStartupOrder; 041 import org.apache.camel.spi.ShutdownAware; 042 import org.apache.camel.spi.ShutdownPrepared; 043 import org.apache.camel.spi.ShutdownStrategy; 044 import org.apache.camel.support.ServiceSupport; 045 import org.apache.camel.util.EventHelper; 046 import org.apache.camel.util.ObjectHelper; 047 import org.apache.camel.util.ServiceHelper; 048 import org.apache.camel.util.StopWatch; 049 import org.slf4j.Logger; 050 import org.slf4j.LoggerFactory; 051 052 /** 053 * Default {@link org.apache.camel.spi.ShutdownStrategy} which uses graceful shutdown. 054 * <p/> 055 * Graceful shutdown ensures that any inflight and pending messages will be taken into account 056 * and it will wait until these exchanges has been completed. 057 * <p/> 058 * This strategy will perform graceful shutdown in two steps: 059 * <ul> 060 * <li>Graceful - By suspending/stopping consumers, and let any in-flight exchanges complete</li> 061 * <li>Forced - After a given period of time, a timeout occurred and if there are still pending 062 * exchanges to complete, then a more aggressive forced strategy is performed.</li> 063 * </ul> 064 * The idea by the <tt>graceful</tt> shutdown strategy, is to stop taking in more new messages, 065 * and allow any existing inflight messages to complete. Then when there is no more inflight messages 066 * then the routes can be fully shutdown. This mean that if there is inflight messages then we will have 067 * to wait for these messages to complete. If they do not complete after a period of time, then a 068 * timeout triggers. And then a more aggressive strategy takes over. 069 * <p/> 070 * The idea by the <tt>forced</tt> shutdown strategy, is to stop continue processing messages. 071 * And force routes and its services to shutdown now. There is a risk when shutting down now, 072 * that some resources is not properly shutdown, which can cause side effects. The timeout value 073 * is by default 300 seconds, but can be customized. 074 * <p/> 075 * As this strategy will politely wait until all exchanges has been completed it can potential wait 076 * for a long time, and hence why a timeout value can be set. When the timeout triggers you can also 077 * specify whether the remainder consumers should be shutdown now or ignore. 078 * <p/> 079 * Will by default use a timeout of 300 seconds (5 minutes) by which it will shutdown now the remaining consumers. 080 * This ensures that when shutting down Camel it at some point eventually will shutdown. 081 * This behavior can of course be configured using the {@link #setTimeout(long)} and 082 * {@link #setShutdownNowOnTimeout(boolean)} methods. 083 * <p/> 084 * Routes will by default be shutdown in the reverse order of which they where started. 085 * You can customize this using the {@link #setShutdownRoutesInReverseOrder(boolean)} method. 086 * <p/> 087 * After route consumers have been shutdown, then any {@link ShutdownPrepared} services on the routes 088 * is being prepared for shutdown, by invoking {@link ShutdownPrepared#prepareShutdown(boolean)} which 089 * <tt>force=false</tt>. 090 * <p/> 091 * Then if a timeout occurred and the strategy has been configured with shutdown-now on timeout, then 092 * the strategy performs a more aggressive forced shutdown, by forcing all consumers to shutdown 093 * and then invokes {@link ShutdownPrepared#prepareShutdown(boolean)} with <tt>force=true</tt> 094 * on the services. This allows the services to know they should force shutdown now. 095 * 096 * @version 097 */ 098 public class DefaultShutdownStrategy extends ServiceSupport implements ShutdownStrategy, CamelContextAware { 099 private static final transient Logger LOG = LoggerFactory.getLogger(DefaultShutdownStrategy.class); 100 101 private CamelContext camelContext; 102 private ExecutorService executor; 103 private long timeout = 5 * 60; 104 private TimeUnit timeUnit = TimeUnit.SECONDS; 105 private boolean shutdownNowOnTimeout = true; 106 private boolean shutdownRoutesInReverseOrder = true; 107 private volatile boolean forceShutdown; 108 109 public DefaultShutdownStrategy() { 110 } 111 112 public DefaultShutdownStrategy(CamelContext camelContext) { 113 this.camelContext = camelContext; 114 } 115 116 public void shutdown(CamelContext context, List<RouteStartupOrder> routes) throws Exception { 117 shutdown(context, routes, getTimeout(), getTimeUnit()); 118 } 119 120 @Override 121 public void shutdownForced(CamelContext context, List<RouteStartupOrder> routes) throws Exception { 122 doShutdown(context, routes, getTimeout(), getTimeUnit(), false, false, true); 123 } 124 125 public void suspend(CamelContext context, List<RouteStartupOrder> routes) throws Exception { 126 doShutdown(context, routes, getTimeout(), getTimeUnit(), true, false, false); 127 } 128 129 public void shutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception { 130 doShutdown(context, routes, timeout, timeUnit, false, false, false); 131 } 132 133 public boolean shutdown(CamelContext context, RouteStartupOrder route, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception { 134 List<RouteStartupOrder> routes = new ArrayList<RouteStartupOrder>(1); 135 routes.add(route); 136 return doShutdown(context, routes, timeout, timeUnit, false, abortAfterTimeout, false); 137 } 138 139 public void suspend(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit) throws Exception { 140 doShutdown(context, routes, timeout, timeUnit, true, false, false); 141 } 142 143 protected boolean doShutdown(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit, 144 boolean suspendOnly, boolean abortAfterTimeout, boolean forceShutdown) throws Exception { 145 146 // just return if no routes to shutdown 147 if (routes.isEmpty()) { 148 return true; 149 } 150 151 StopWatch watch = new StopWatch(); 152 153 // at first sort according to route startup order 154 List<RouteStartupOrder> routesOrdered = new ArrayList<RouteStartupOrder>(routes); 155 Collections.sort(routesOrdered, new Comparator<RouteStartupOrder>() { 156 public int compare(RouteStartupOrder o1, RouteStartupOrder o2) { 157 return o1.getStartupOrder() - o2.getStartupOrder(); 158 } 159 }); 160 if (shutdownRoutesInReverseOrder) { 161 Collections.reverse(routesOrdered); 162 } 163 164 if (timeout > 0) { 165 LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (timeout " + timeout + " " + timeUnit.toString().toLowerCase(Locale.ENGLISH) + ")"); 166 } else { 167 LOG.info("Starting to graceful shutdown " + routesOrdered.size() + " routes (no timeout)"); 168 } 169 170 // use another thread to perform the shutdowns so we can support timeout 171 Future<?> future = getExecutorService().submit(new ShutdownTask(context, routesOrdered, timeout, timeUnit, suspendOnly, abortAfterTimeout)); 172 try { 173 if (timeout > 0) { 174 future.get(timeout, timeUnit); 175 } else { 176 future.get(); 177 } 178 } catch (TimeoutException e) { 179 // timeout then cancel the task 180 future.cancel(true); 181 182 // signal we are forcing shutdown now, since timeout occurred 183 this.forceShutdown = forceShutdown; 184 185 // if set, stop processing and return false to indicate that the shutdown is aborting 186 if (!forceShutdown && abortAfterTimeout) { 187 LOG.warn("Timeout occurred. Aborting the shutdown now."); 188 return false; 189 } else { 190 if (forceShutdown || shutdownNowOnTimeout) { 191 LOG.warn("Timeout occurred. Now forcing the routes to be shutdown now."); 192 // force the routes to shutdown now 193 shutdownRoutesNow(routesOrdered); 194 195 // now the route consumers has been shutdown, then prepare route services for shutdown now (forced) 196 for (RouteStartupOrder order : routes) { 197 for (Service service : order.getServices()) { 198 prepareShutdown(service, true, true); 199 } 200 } 201 } else { 202 LOG.warn("Timeout occurred. Will ignore shutting down the remainder routes."); 203 } 204 } 205 } catch (ExecutionException e) { 206 // unwrap execution exception 207 throw ObjectHelper.wrapRuntimeCamelException(e.getCause()); 208 } 209 210 // convert to seconds as its easier to read than a big milli seconds number 211 long seconds = TimeUnit.SECONDS.convert(watch.stop(), TimeUnit.MILLISECONDS); 212 213 LOG.info("Graceful shutdown of " + routesOrdered.size() + " routes completed in " + seconds + " seconds"); 214 return true; 215 } 216 217 @Override 218 public boolean forceShutdown(Service service) { 219 return forceShutdown; 220 } 221 222 public void setTimeout(long timeout) { 223 this.timeout = timeout; 224 } 225 226 public long getTimeout() { 227 return timeout; 228 } 229 230 public void setTimeUnit(TimeUnit timeUnit) { 231 this.timeUnit = timeUnit; 232 } 233 234 public TimeUnit getTimeUnit() { 235 return timeUnit; 236 } 237 238 public void setShutdownNowOnTimeout(boolean shutdownNowOnTimeout) { 239 this.shutdownNowOnTimeout = shutdownNowOnTimeout; 240 } 241 242 public boolean isShutdownNowOnTimeout() { 243 return shutdownNowOnTimeout; 244 } 245 246 public boolean isShutdownRoutesInReverseOrder() { 247 return shutdownRoutesInReverseOrder; 248 } 249 250 public void setShutdownRoutesInReverseOrder(boolean shutdownRoutesInReverseOrder) { 251 this.shutdownRoutesInReverseOrder = shutdownRoutesInReverseOrder; 252 } 253 254 public CamelContext getCamelContext() { 255 return camelContext; 256 } 257 258 public void setCamelContext(CamelContext camelContext) { 259 this.camelContext = camelContext; 260 } 261 262 /** 263 * Shutdown all the consumers immediately. 264 * 265 * @param routes the routes to shutdown 266 */ 267 protected void shutdownRoutesNow(List<RouteStartupOrder> routes) { 268 for (RouteStartupOrder order : routes) { 269 270 // set the route to shutdown as fast as possible by stopping after 271 // it has completed its current task 272 ShutdownRunningTask current = order.getRoute().getRouteContext().getShutdownRunningTask(); 273 if (current != ShutdownRunningTask.CompleteCurrentTaskOnly) { 274 LOG.debug("Changing shutdownRunningTask from {} to " + ShutdownRunningTask.CompleteCurrentTaskOnly 275 + " on route {} to shutdown faster", current, order.getRoute().getId()); 276 order.getRoute().getRouteContext().setShutdownRunningTask(ShutdownRunningTask.CompleteCurrentTaskOnly); 277 } 278 279 for (Consumer consumer : order.getInputs()) { 280 shutdownNow(consumer); 281 } 282 } 283 } 284 285 /** 286 * Shutdown all the consumers immediately. 287 * 288 * @param consumers the consumers to shutdown 289 */ 290 protected void shutdownNow(List<Consumer> consumers) { 291 for (Consumer consumer : consumers) { 292 shutdownNow(consumer); 293 } 294 } 295 296 /** 297 * Shutdown the consumer immediately. 298 * 299 * @param consumer the consumer to shutdown 300 */ 301 protected static void shutdownNow(Consumer consumer) { 302 LOG.trace("Shutting down: {}", consumer); 303 304 // allow us to do custom work before delegating to service helper 305 try { 306 ServiceHelper.stopService(consumer); 307 } catch (Throwable e) { 308 LOG.warn("Error occurred while shutting down route: " + consumer + ". This exception will be ignored.", e); 309 // fire event 310 EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e); 311 } 312 313 LOG.trace("Shutdown complete for: {}", consumer); 314 } 315 316 /** 317 * Suspends/stops the consumer immediately. 318 * 319 * @param consumer the consumer to suspend 320 */ 321 protected static void suspendNow(Consumer consumer) { 322 LOG.trace("Suspending: {}", consumer); 323 324 // allow us to do custom work before delegating to service helper 325 try { 326 ServiceHelper.suspendService(consumer); 327 } catch (Throwable e) { 328 LOG.warn("Error occurred while suspending route: " + consumer + ". This exception will be ignored.", e); 329 // fire event 330 EventHelper.notifyServiceStopFailure(consumer.getEndpoint().getCamelContext(), consumer, e); 331 } 332 333 LOG.trace("Suspend complete for: {}", consumer); 334 } 335 336 private ExecutorService getExecutorService() { 337 if (executor == null) { 338 executor = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "ShutdownTask"); 339 } 340 return executor; 341 } 342 343 @Override 344 protected void doStart() throws Exception { 345 ObjectHelper.notNull(camelContext, "CamelContext"); 346 // reset option 347 forceShutdown = false; 348 } 349 350 @Override 351 protected void doStop() throws Exception { 352 // noop 353 } 354 355 @Override 356 protected void doShutdown() throws Exception { 357 if (executor != null) { 358 camelContext.getExecutorServiceManager().shutdownNow(executor); 359 // should clear executor so we can restart by creating a new thread pool 360 executor = null; 361 } 362 } 363 364 /** 365 * Prepares the services for shutdown, by invoking the {@link ShutdownPrepared#prepareShutdown(boolean)} method 366 * on the service if it implement this interface. 367 * 368 * @param service the service 369 * @param forced whether to force shutdown 370 * @param includeChildren whether to prepare the child of the service as well 371 */ 372 private static void prepareShutdown(Service service, boolean forced, boolean includeChildren) { 373 Set<Service> list; 374 if (includeChildren) { 375 list = ServiceHelper.getChildServices(service); 376 } else { 377 list = new LinkedHashSet<Service>(1); 378 list.add(service); 379 } 380 381 for (Service child : list) { 382 if (child instanceof ShutdownPrepared) { 383 try { 384 LOG.trace("Preparing {} shutdown on {}", forced ? "forced" : "", child); 385 ((ShutdownPrepared) child).prepareShutdown(forced); 386 } catch (Exception e) { 387 LOG.warn("Error during prepare shutdown on " + child + ". This exception will be ignored.", e); 388 } 389 } 390 } 391 } 392 393 /** 394 * Holder for deferred consumers 395 */ 396 static class ShutdownDeferredConsumer { 397 private final Route route; 398 private final Consumer consumer; 399 400 ShutdownDeferredConsumer(Route route, Consumer consumer) { 401 this.route = route; 402 this.consumer = consumer; 403 } 404 405 Route getRoute() { 406 return route; 407 } 408 409 Consumer getConsumer() { 410 return consumer; 411 } 412 } 413 414 /** 415 * Shutdown task which shutdown all the routes in a graceful manner. 416 */ 417 static class ShutdownTask implements Runnable { 418 419 private final CamelContext context; 420 private final List<RouteStartupOrder> routes; 421 private final boolean suspendOnly; 422 private final boolean abortAfterTimeout; 423 private final long timeout; 424 private final TimeUnit timeUnit; 425 426 public ShutdownTask(CamelContext context, List<RouteStartupOrder> routes, long timeout, TimeUnit timeUnit, 427 boolean suspendOnly, boolean abortAfterTimeout) { 428 this.context = context; 429 this.routes = routes; 430 this.suspendOnly = suspendOnly; 431 this.abortAfterTimeout = abortAfterTimeout; 432 this.timeout = timeout; 433 this.timeUnit = timeUnit; 434 } 435 436 public void run() { 437 // the strategy in this run method is to 438 // 1) go over the routes and shutdown those routes which can be shutdown asap 439 // some routes will be deferred to shutdown at the end, as they are needed 440 // by other routes so they can complete their tasks 441 // 2) wait until all inflight and pending exchanges has been completed 442 // 3) shutdown the deferred routes 443 444 LOG.debug("There are {} routes to {}", routes.size(), suspendOnly ? "suspend" : "shutdown"); 445 446 // list of deferred consumers to shutdown when all exchanges has been completed routed 447 // and thus there are no more inflight exchanges so they can be safely shutdown at that time 448 List<ShutdownDeferredConsumer> deferredConsumers = new ArrayList<ShutdownDeferredConsumer>(); 449 for (RouteStartupOrder order : routes) { 450 451 ShutdownRoute shutdownRoute = order.getRoute().getRouteContext().getShutdownRoute(); 452 ShutdownRunningTask shutdownRunningTask = order.getRoute().getRouteContext().getShutdownRunningTask(); 453 454 if (LOG.isTraceEnabled()) { 455 LOG.trace("{}{} with options [{},{}]", 456 new Object[]{suspendOnly ? "Suspending route: " : "Shutting down route: ", 457 order.getRoute().getId(), shutdownRoute, shutdownRunningTask}); 458 } 459 460 for (Consumer consumer : order.getInputs()) { 461 462 boolean suspend = false; 463 464 // assume we should shutdown if we are not deferred 465 boolean shutdown = shutdownRoute != ShutdownRoute.Defer; 466 467 if (shutdown) { 468 // if we are to shutdown then check whether we can suspend instead as its a more 469 // gentle way to graceful shutdown 470 471 // some consumers do not support shutting down so let them decide 472 // if a consumer is suspendable then prefer to use that and then shutdown later 473 if (consumer instanceof ShutdownAware) { 474 shutdown = !((ShutdownAware) consumer).deferShutdown(shutdownRunningTask); 475 } 476 if (shutdown && consumer instanceof SuspendableService) { 477 // we prefer to suspend over shutdown 478 suspend = true; 479 } 480 } 481 482 // log at info level when a route has been shutdown (otherwise log at debug level to not be too noisy) 483 if (suspend) { 484 // only suspend it and then later shutdown it 485 suspendNow(consumer); 486 // add it to the deferred list so the route will be shutdown later 487 deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer)); 488 LOG.debug("Route: {} suspended and shutdown deferred, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint()); 489 } else if (shutdown) { 490 shutdownNow(consumer); 491 LOG.info("Route: {} shutdown complete, was consuming from: {}", order.getRoute().getId(), order.getRoute().getEndpoint()); 492 } else { 493 // we will stop it later, but for now it must run to be able to help all inflight messages 494 // be safely completed 495 deferredConsumers.add(new ShutdownDeferredConsumer(order.getRoute(), consumer)); 496 LOG.debug("Route: " + order.getRoute().getId() + (suspendOnly ? " shutdown deferred." : " suspension deferred.")); 497 } 498 } 499 } 500 501 // wait till there are no more pending and inflight messages 502 boolean done = false; 503 long loopDelaySeconds = 1; 504 long loopCount = 0; 505 while (!done) { 506 int size = 0; 507 for (RouteStartupOrder order : routes) { 508 int inflight = context.getInflightRepository().size(order.getRoute().getId()); 509 for (Consumer consumer : order.getInputs()) { 510 // include any additional pending exchanges on some consumers which may have internal 511 // memory queues such as seda 512 if (consumer instanceof ShutdownAware) { 513 inflight += ((ShutdownAware) consumer).getPendingExchangesSize(); 514 } 515 } 516 if (inflight > 0) { 517 size += inflight; 518 LOG.trace("{} inflight and pending exchanges for route: {}", inflight, order.getRoute().getId()); 519 } 520 } 521 if (size > 0) { 522 try { 523 LOG.info("Waiting as there are still " + size + " inflight and pending exchanges to complete, timeout in " 524 + (TimeUnit.SECONDS.convert(timeout, timeUnit) - (loopCount++ * loopDelaySeconds)) + " seconds."); 525 Thread.sleep(loopDelaySeconds * 1000); 526 } catch (InterruptedException e) { 527 if (abortAfterTimeout) { 528 LOG.warn("Interrupted while waiting during graceful shutdown, will abort."); 529 return; 530 } else { 531 LOG.warn("Interrupted while waiting during graceful shutdown, will force shutdown now."); 532 break; 533 } 534 } 535 } else { 536 done = true; 537 } 538 } 539 540 // prepare for shutdown 541 for (ShutdownDeferredConsumer deferred : deferredConsumers) { 542 Consumer consumer = deferred.getConsumer(); 543 if (consumer instanceof ShutdownAware) { 544 LOG.trace("Route: {} preparing to shutdown.", deferred.getRoute().getId()); 545 boolean forced = context.getShutdownStrategy().forceShutdown(consumer); 546 prepareShutdown(consumer, forced, false); 547 LOG.debug("Route: {} preparing to shutdown complete.", deferred.getRoute().getId()); 548 } 549 } 550 551 // now all messages has been completed then stop the deferred consumers 552 for (ShutdownDeferredConsumer deferred : deferredConsumers) { 553 Consumer consumer = deferred.getConsumer(); 554 if (suspendOnly) { 555 suspendNow(consumer); 556 LOG.info("Route: {} suspend complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint()); 557 } else { 558 shutdownNow(consumer); 559 LOG.info("Route: {} shutdown complete, was consuming from: {}", deferred.getRoute().getId(), deferred.getConsumer().getEndpoint()); 560 } 561 } 562 563 // now the route consumers has been shutdown, then prepare route services for shutdown 564 for (RouteStartupOrder order : routes) { 565 for (Service service : order.getServices()) { 566 boolean forced = context.getShutdownStrategy().forceShutdown(service); 567 prepareShutdown(service, forced, true); 568 } 569 } 570 } 571 572 } 573 574 }