001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.Iterator; 020import java.util.LinkedHashSet; 021import java.util.List; 022import java.util.Map; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.CopyOnWriteArrayList; 026import java.util.concurrent.ExecutorService; 027import java.util.concurrent.ScheduledExecutorService; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.ThreadPoolExecutor; 030import java.util.concurrent.TimeUnit; 031 032import org.apache.camel.CamelContext; 033import org.apache.camel.NamedNode; 034import org.apache.camel.StaticService; 035import org.apache.camel.ThreadPoolRejectedPolicy; 036import org.apache.camel.model.OptionalIdentifiedDefinition; 037import org.apache.camel.model.ProcessorDefinition; 038import org.apache.camel.model.ProcessorDefinitionHelper; 039import org.apache.camel.model.RouteDefinition; 040import org.apache.camel.spi.ExecutorServiceManager; 041import org.apache.camel.spi.LifecycleStrategy; 042import org.apache.camel.spi.ThreadPoolFactory; 043import org.apache.camel.spi.ThreadPoolProfile; 044import org.apache.camel.support.ServiceSupport; 045import org.apache.camel.util.ObjectHelper; 046import org.apache.camel.util.StopWatch; 047import org.apache.camel.util.TimeUtils; 048import org.apache.camel.util.URISupport; 049import org.apache.camel.util.concurrent.CamelThreadFactory; 050import org.apache.camel.util.concurrent.SizedScheduledExecutorService; 051import org.apache.camel.util.concurrent.ThreadHelper; 052import org.slf4j.Logger; 053import org.slf4j.LoggerFactory; 054 055/** 056 * Default {@link org.apache.camel.spi.ExecutorServiceManager}. 057 * 058 * @version 059 */ 060public class DefaultExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager { 061 private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceManager.class); 062 063 private final CamelContext camelContext; 064 private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory(); 065 private final List<ExecutorService> executorServices = new CopyOnWriteArrayList<ExecutorService>(); 066 private String threadNamePattern; 067 private long shutdownAwaitTermination = 10000; 068 private String defaultThreadPoolProfileId = "defaultThreadPoolProfile"; 069 private final Map<String, ThreadPoolProfile> threadPoolProfiles = new ConcurrentHashMap<String, ThreadPoolProfile>(); 070 private ThreadPoolProfile defaultProfile; 071 072 public DefaultExecutorServiceManager(CamelContext camelContext) { 073 this.camelContext = camelContext; 074 075 defaultProfile = new ThreadPoolProfile(defaultThreadPoolProfileId); 076 defaultProfile.setDefaultProfile(true); 077 defaultProfile.setPoolSize(10); 078 defaultProfile.setMaxPoolSize(20); 079 defaultProfile.setKeepAliveTime(60L); 080 defaultProfile.setTimeUnit(TimeUnit.SECONDS); 081 defaultProfile.setMaxQueueSize(1000); 082 defaultProfile.setAllowCoreThreadTimeOut(false); 083 defaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns); 084 085 registerThreadPoolProfile(defaultProfile); 086 } 087 088 @Override 089 public ThreadPoolFactory getThreadPoolFactory() { 090 return threadPoolFactory; 091 } 092 093 @Override 094 public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) { 095 this.threadPoolFactory = threadPoolFactory; 096 } 097 098 @Override 099 public void registerThreadPoolProfile(ThreadPoolProfile profile) { 100 ObjectHelper.notNull(profile, "profile"); 101 ObjectHelper.notEmpty(profile.getId(), "id", profile); 102 threadPoolProfiles.put(profile.getId(), profile); 103 } 104 105 @Override 106 public ThreadPoolProfile getThreadPoolProfile(String id) { 107 return threadPoolProfiles.get(id); 108 } 109 110 @Override 111 public ThreadPoolProfile getDefaultThreadPoolProfile() { 112 return getThreadPoolProfile(defaultThreadPoolProfileId); 113 } 114 115 @Override 116 public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) { 117 threadPoolProfiles.remove(defaultThreadPoolProfileId); 118 defaultThreadPoolProfile.addDefaults(defaultProfile); 119 120 LOG.info("Using custom DefaultThreadPoolProfile: " + defaultThreadPoolProfile); 121 122 this.defaultThreadPoolProfileId = defaultThreadPoolProfile.getId(); 123 defaultThreadPoolProfile.setDefaultProfile(true); 124 registerThreadPoolProfile(defaultThreadPoolProfile); 125 } 126 127 @Override 128 public String getThreadNamePattern() { 129 return threadNamePattern; 130 } 131 132 @Override 133 public void setThreadNamePattern(String threadNamePattern) { 134 // must set camel id here in the pattern and let the other placeholders be resolved on demand 135 String name = threadNamePattern.replaceFirst("#camelId#", this.camelContext.getName()); 136 this.threadNamePattern = name; 137 } 138 139 @Override 140 public long getShutdownAwaitTermination() { 141 return shutdownAwaitTermination; 142 } 143 144 @Override 145 public void setShutdownAwaitTermination(long shutdownAwaitTermination) { 146 this.shutdownAwaitTermination = shutdownAwaitTermination; 147 } 148 149 @Override 150 public String resolveThreadName(String name) { 151 return ThreadHelper.resolveThreadName(threadNamePattern, name); 152 } 153 154 @Override 155 public Thread newThread(String name, Runnable runnable) { 156 ThreadFactory factory = createThreadFactory(name, true); 157 return factory.newThread(runnable); 158 } 159 160 @Override 161 public ExecutorService newDefaultThreadPool(Object source, String name) { 162 return newThreadPool(source, name, getDefaultThreadPoolProfile()); 163 } 164 165 @Override 166 public ScheduledExecutorService newDefaultScheduledThreadPool(Object source, String name) { 167 return newScheduledThreadPool(source, name, getDefaultThreadPoolProfile()); 168 } 169 170 @Override 171 public ExecutorService newThreadPool(Object source, String name, String profileId) { 172 ThreadPoolProfile profile = getThreadPoolProfile(profileId); 173 if (profile != null) { 174 return newThreadPool(source, name, profile); 175 } else { 176 // no profile with that id 177 return null; 178 } 179 } 180 181 @Override 182 public ExecutorService newThreadPool(Object source, String name, ThreadPoolProfile profile) { 183 String sanitizedName = URISupport.sanitizeUri(name); 184 ObjectHelper.notNull(profile, "ThreadPoolProfile"); 185 186 ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile(); 187 profile.addDefaults(defaultProfile); 188 189 ThreadFactory threadFactory = createThreadFactory(sanitizedName, true); 190 ExecutorService executorService = threadPoolFactory.newThreadPool(profile, threadFactory); 191 onThreadPoolCreated(executorService, source, profile.getId()); 192 if (LOG.isDebugEnabled()) { 193 LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, executorService}); 194 } 195 196 return executorService; 197 } 198 199 @Override 200 public ExecutorService newThreadPool(Object source, String name, int poolSize, int maxPoolSize) { 201 ThreadPoolProfile profile = new ThreadPoolProfile(name); 202 profile.setPoolSize(poolSize); 203 profile.setMaxPoolSize(maxPoolSize); 204 return newThreadPool(source, name, profile); 205 } 206 207 @Override 208 public ExecutorService newSingleThreadExecutor(Object source, String name) { 209 return newFixedThreadPool(source, name, 1); 210 } 211 212 @Override 213 public ExecutorService newCachedThreadPool(Object source, String name) { 214 String sanitizedName = URISupport.sanitizeUri(name); 215 ExecutorService answer = threadPoolFactory.newCachedThreadPool(createThreadFactory(sanitizedName, true)); 216 onThreadPoolCreated(answer, source, null); 217 218 if (LOG.isDebugEnabled()) { 219 LOG.debug("Created new CachedThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer}); 220 } 221 return answer; 222 } 223 224 @Override 225 public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) { 226 ThreadPoolProfile profile = new ThreadPoolProfile(name); 227 profile.setPoolSize(poolSize); 228 profile.setMaxPoolSize(poolSize); 229 profile.setKeepAliveTime(0L); 230 return newThreadPool(source, name, profile); 231 } 232 233 @Override 234 public ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name) { 235 return newScheduledThreadPool(source, name, 1); 236 } 237 238 @Override 239 public ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile) { 240 String sanitizedName = URISupport.sanitizeUri(name); 241 profile.addDefaults(getDefaultThreadPoolProfile()); 242 ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(profile, createThreadFactory(sanitizedName, true)); 243 onThreadPoolCreated(answer, source, null); 244 245 if (LOG.isDebugEnabled()) { 246 LOG.debug("Created new ScheduledThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer}); 247 } 248 return answer; 249 } 250 251 @Override 252 public ScheduledExecutorService newScheduledThreadPool(Object source, String name, String profileId) { 253 ThreadPoolProfile profile = getThreadPoolProfile(profileId); 254 if (profile != null) { 255 return newScheduledThreadPool(source, name, profile); 256 } else { 257 // no profile with that id 258 return null; 259 } 260 } 261 262 @Override 263 public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) { 264 ThreadPoolProfile profile = new ThreadPoolProfile(name); 265 profile.setPoolSize(poolSize); 266 return newScheduledThreadPool(source, name, profile); 267 } 268 269 @Override 270 public void shutdown(ExecutorService executorService) { 271 doShutdown(executorService, 0, false); 272 } 273 274 @Override 275 public void shutdownGraceful(ExecutorService executorService) { 276 doShutdown(executorService, getShutdownAwaitTermination(), false); 277 } 278 279 @Override 280 public void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) { 281 doShutdown(executorService, shutdownAwaitTermination, false); 282 } 283 284 private boolean doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean failSafe) { 285 if (executorService == null) { 286 return false; 287 } 288 289 boolean warned = false; 290 291 // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively 292 // and try shutting down again. In both cases we wait at most the given shutdown timeout value given 293 // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus 294 // we ought to shutdown much faster) 295 if (!executorService.isShutdown()) { 296 StopWatch watch = new StopWatch(); 297 298 LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination); 299 executorService.shutdown(); 300 301 if (shutdownAwaitTermination > 0) { 302 try { 303 if (!awaitTermination(executorService, shutdownAwaitTermination)) { 304 warned = true; 305 LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService); 306 executorService.shutdownNow(); 307 // we are now shutting down aggressively, so wait to see if we can completely shutdown or not 308 if (!awaitTermination(executorService, shutdownAwaitTermination)) { 309 LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService); 310 } 311 } 312 } catch (InterruptedException e) { 313 warned = true; 314 LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService); 315 // we were interrupted during shutdown, so force shutdown 316 executorService.shutdownNow(); 317 } 318 } 319 320 // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log 321 if (warned) { 322 LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.", 323 new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())}); 324 } else if (LOG.isDebugEnabled()) { 325 LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.", 326 new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())}); 327 } 328 } 329 330 // let lifecycle strategy be notified as well which can let it be managed in JMX as well 331 ThreadPoolExecutor threadPool = null; 332 if (executorService instanceof ThreadPoolExecutor) { 333 threadPool = (ThreadPoolExecutor) executorService; 334 } else if (executorService instanceof SizedScheduledExecutorService) { 335 threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); 336 } 337 if (threadPool != null) { 338 for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { 339 lifecycle.onThreadPoolRemove(camelContext, threadPool); 340 } 341 } 342 343 // remove reference as its shutdown (do not remove if fail-safe) 344 if (!failSafe) { 345 executorServices.remove(executorService); 346 } 347 348 return warned; 349 } 350 351 @Override 352 public List<Runnable> shutdownNow(ExecutorService executorService) { 353 return doShutdownNow(executorService, false); 354 } 355 356 private List<Runnable> doShutdownNow(ExecutorService executorService, boolean failSafe) { 357 ObjectHelper.notNull(executorService, "executorService"); 358 359 List<Runnable> answer = null; 360 if (!executorService.isShutdown()) { 361 if (failSafe) { 362 // log as warn, as we shutdown as fail-safe, so end user should see more details in the log. 363 LOG.warn("Forcing shutdown of ExecutorService: {}", executorService); 364 } else { 365 LOG.debug("Forcing shutdown of ExecutorService: {}", executorService); 366 } 367 answer = executorService.shutdownNow(); 368 if (LOG.isTraceEnabled()) { 369 LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.", 370 new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()}); 371 } 372 } 373 374 // let lifecycle strategy be notified as well which can let it be managed in JMX as well 375 ThreadPoolExecutor threadPool = null; 376 if (executorService instanceof ThreadPoolExecutor) { 377 threadPool = (ThreadPoolExecutor) executorService; 378 } else if (executorService instanceof SizedScheduledExecutorService) { 379 threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); 380 } 381 if (threadPool != null) { 382 for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { 383 lifecycle.onThreadPoolRemove(camelContext, threadPool); 384 } 385 } 386 387 // remove reference as its shutdown (do not remove if fail-safe) 388 if (!failSafe) { 389 executorServices.remove(executorService); 390 } 391 392 return answer; 393 } 394 395 @Override 396 public boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException { 397 // log progress every 2nd second so end user is aware of we are shutting down 398 StopWatch watch = new StopWatch(); 399 long interval = Math.min(2000, shutdownAwaitTermination); 400 boolean done = false; 401 while (!done && interval > 0) { 402 if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) { 403 done = true; 404 } else { 405 LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService); 406 // recalculate interval 407 interval = Math.min(2000, shutdownAwaitTermination - watch.taken()); 408 } 409 } 410 411 return done; 412 } 413 414 /** 415 * Strategy callback when a new {@link java.util.concurrent.ExecutorService} have been created. 416 * 417 * @param executorService the created {@link java.util.concurrent.ExecutorService} 418 */ 419 protected void onNewExecutorService(ExecutorService executorService) { 420 // noop 421 } 422 423 @Override 424 protected void doStart() throws Exception { 425 if (threadNamePattern == null) { 426 // set default name pattern which includes the camel context name 427 threadNamePattern = "Camel (" + camelContext.getName() + ") thread ##counter# - #name#"; 428 } 429 } 430 431 @Override 432 protected void doStop() throws Exception { 433 // noop 434 } 435 436 @Override 437 protected void doShutdown() throws Exception { 438 // shutdown all remainder executor services by looping and doing this aggressively 439 // as by normal all threads pool should have been shutdown using proper lifecycle 440 // by their EIPs, components etc. This is acting as a fail-safe during shutdown 441 // of CamelContext itself. 442 Set<ExecutorService> forced = new LinkedHashSet<ExecutorService>(); 443 if (!executorServices.isEmpty()) { 444 // at first give a bit of time to shutdown nicely as the thread pool is most likely in the process of being shutdown also 445 LOG.debug("Giving time for {} ExecutorService's to shutdown properly (acting as fail-safe)", executorServices.size()); 446 for (ExecutorService executorService : executorServices) { 447 try { 448 boolean warned = doShutdown(executorService, getShutdownAwaitTermination(), true); 449 // remember the thread pools that was forced to shutdown (eg warned) 450 if (warned) { 451 forced.add(executorService); 452 } 453 } catch (Throwable e) { 454 // only log if something goes wrong as we want to shutdown them all 455 LOG.warn("Error occurred during shutdown of ExecutorService: " 456 + executorService + ". This exception will be ignored.", e); 457 } 458 } 459 } 460 461 // log the thread pools which was forced to shutdown so it may help the user to identify a problem of his 462 if (!forced.isEmpty()) { 463 LOG.warn("Forced shutdown of {} ExecutorService's which has not been shutdown properly (acting as fail-safe)", forced.size()); 464 for (ExecutorService executorService : forced) { 465 LOG.warn(" forced -> {}", executorService); 466 } 467 } 468 forced.clear(); 469 470 // clear list 471 executorServices.clear(); 472 473 // do not clear the default profile as we could potential be restarted 474 Iterator<ThreadPoolProfile> it = threadPoolProfiles.values().iterator(); 475 while (it.hasNext()) { 476 ThreadPoolProfile profile = it.next(); 477 if (!profile.isDefaultProfile()) { 478 it.remove(); 479 } 480 } 481 } 482 483 /** 484 * Invoked when a new thread pool is created. 485 * This implementation will invoke the {@link LifecycleStrategy#onThreadPoolAdd(org.apache.camel.CamelContext, 486 * java.util.concurrent.ThreadPoolExecutor, String, String, String, String) LifecycleStrategy.onThreadPoolAdd} method, 487 * which for example will enlist the thread pool in JMX management. 488 * 489 * @param executorService the thread pool 490 * @param source the source to use the thread pool 491 * @param threadPoolProfileId profile id, if the thread pool was created from a thread pool profile 492 */ 493 private void onThreadPoolCreated(ExecutorService executorService, Object source, String threadPoolProfileId) { 494 // add to internal list of thread pools 495 executorServices.add(executorService); 496 497 String id; 498 String sourceId = null; 499 String routeId = null; 500 501 // extract id from source 502 if (source instanceof NamedNode) { 503 id = ((OptionalIdentifiedDefinition<?>) source).idOrCreate(this.camelContext.getNodeIdFactory()); 504 // and let source be the short name of the pattern 505 sourceId = ((NamedNode) source).getShortName(); 506 } else if (source instanceof String) { 507 id = (String) source; 508 } else if (source != null) { 509 if (source instanceof StaticService) { 510 // the source is static service so its name would be unique 511 id = source.getClass().getSimpleName(); 512 } else { 513 // fallback and use the simple class name with hashcode for the id so its unique for this given source 514 id = source.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(source) + ")"; 515 } 516 } else { 517 // no source, so fallback and use the simple class name from thread pool and its hashcode identity so its unique 518 id = executorService.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(executorService) + ")"; 519 } 520 521 // id is mandatory 522 ObjectHelper.notEmpty(id, "id for thread pool " + executorService); 523 524 // extract route id if possible 525 if (source instanceof ProcessorDefinition) { 526 RouteDefinition route = ProcessorDefinitionHelper.getRoute((ProcessorDefinition<?>) source); 527 if (route != null) { 528 routeId = route.idOrCreate(this.camelContext.getNodeIdFactory()); 529 } 530 } 531 532 // let lifecycle strategy be notified as well which can let it be managed in JMX as well 533 ThreadPoolExecutor threadPool = null; 534 if (executorService instanceof ThreadPoolExecutor) { 535 threadPool = (ThreadPoolExecutor) executorService; 536 } else if (executorService instanceof SizedScheduledExecutorService) { 537 threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); 538 } 539 if (threadPool != null) { 540 for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { 541 lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId); 542 } 543 } 544 545 // now call strategy to allow custom logic 546 onNewExecutorService(executorService); 547 } 548 549 private ThreadFactory createThreadFactory(String name, boolean isDaemon) { 550 ThreadFactory threadFactory = new CamelThreadFactory(threadNamePattern, name, isDaemon); 551 return threadFactory; 552 } 553 554}