001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.ArrayList; 020 import java.util.HashMap; 021 import java.util.Iterator; 022 import java.util.List; 023 import java.util.Map; 024 import java.util.concurrent.ExecutorService; 025 import java.util.concurrent.ScheduledExecutorService; 026 import java.util.concurrent.ThreadFactory; 027 import java.util.concurrent.ThreadPoolExecutor; 028 import java.util.concurrent.TimeUnit; 029 030 import org.apache.camel.CamelContext; 031 import org.apache.camel.NamedNode; 032 import org.apache.camel.ThreadPoolRejectedPolicy; 033 import org.apache.camel.model.OptionalIdentifiedDefinition; 034 import org.apache.camel.model.ProcessorDefinition; 035 import org.apache.camel.model.ProcessorDefinitionHelper; 036 import org.apache.camel.model.RouteDefinition; 037 import org.apache.camel.spi.ExecutorServiceManager; 038 import org.apache.camel.spi.LifecycleStrategy; 039 import org.apache.camel.spi.ThreadPoolFactory; 040 import org.apache.camel.spi.ThreadPoolProfile; 041 import org.apache.camel.support.ServiceSupport; 042 import org.apache.camel.util.ObjectHelper; 043 import org.apache.camel.util.URISupport; 044 import org.apache.camel.util.concurrent.CamelThreadFactory; 045 import org.apache.camel.util.concurrent.SizedScheduledExecutorService; 046 import org.apache.camel.util.concurrent.ThreadHelper; 047 import org.slf4j.Logger; 048 import org.slf4j.LoggerFactory; 049 050 /** 051 * @version 052 */ 053 public class DefaultExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager { 054 private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceManager.class); 055 056 private final CamelContext camelContext; 057 private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory(); 058 private final List<ExecutorService> executorServices = new ArrayList<ExecutorService>(); 059 private String threadNamePattern; 060 private String defaultThreadPoolProfileId = "defaultThreadPoolProfile"; 061 private final Map<String, ThreadPoolProfile> threadPoolProfiles = new HashMap<String, ThreadPoolProfile>(); 062 private ThreadPoolProfile builtIndefaultProfile; 063 064 public DefaultExecutorServiceManager(CamelContext camelContext) { 065 this.camelContext = camelContext; 066 067 builtIndefaultProfile = new ThreadPoolProfile(defaultThreadPoolProfileId); 068 builtIndefaultProfile.setDefaultProfile(true); 069 builtIndefaultProfile.setPoolSize(10); 070 builtIndefaultProfile.setMaxPoolSize(20); 071 builtIndefaultProfile.setKeepAliveTime(60L); 072 builtIndefaultProfile.setTimeUnit(TimeUnit.SECONDS); 073 builtIndefaultProfile.setMaxQueueSize(1000); 074 builtIndefaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns); 075 076 registerThreadPoolProfile(builtIndefaultProfile); 077 } 078 079 @Override 080 public ThreadPoolFactory getThreadPoolFactory() { 081 return threadPoolFactory; 082 } 083 084 @Override 085 public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) { 086 this.threadPoolFactory = threadPoolFactory; 087 } 088 089 @Override 090 public void registerThreadPoolProfile(ThreadPoolProfile profile) { 091 ObjectHelper.notNull(profile, "profile"); 092 ObjectHelper.notEmpty(profile.getId(), "id", profile); 093 threadPoolProfiles.put(profile.getId(), profile); 094 } 095 096 @Override 097 public ThreadPoolProfile getThreadPoolProfile(String id) { 098 return threadPoolProfiles.get(id); 099 } 100 101 @Override 102 public ThreadPoolProfile getDefaultThreadPoolProfile() { 103 return getThreadPoolProfile(defaultThreadPoolProfileId); 104 } 105 106 @Override 107 public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) { 108 threadPoolProfiles.remove(defaultThreadPoolProfileId); 109 defaultThreadPoolProfile.addDefaults(builtIndefaultProfile); 110 111 LOG.info("Using custom DefaultThreadPoolProfile: " + defaultThreadPoolProfile); 112 113 this.defaultThreadPoolProfileId = defaultThreadPoolProfile.getId(); 114 defaultThreadPoolProfile.setDefaultProfile(true); 115 registerThreadPoolProfile(defaultThreadPoolProfile); 116 } 117 118 @Override 119 public String getThreadNamePattern() { 120 return threadNamePattern; 121 } 122 123 @Override 124 public void setThreadNamePattern(String threadNamePattern) { 125 // must set camel id here in the pattern and let the other placeholders be resolved on demand 126 String name = threadNamePattern.replaceFirst("#camelId#", this.camelContext.getName()); 127 this.threadNamePattern = name; 128 } 129 130 @Override 131 public String resolveThreadName(String name) { 132 return ThreadHelper.resolveThreadName(threadNamePattern, name); 133 } 134 135 @Override 136 public ExecutorService newDefaultThreadPool(Object source, String name) { 137 return newThreadPool(source, name, getDefaultThreadPoolProfile()); 138 } 139 140 @Override 141 public ScheduledExecutorService newDefaultScheduledThreadPool(Object source, String name) { 142 return newScheduledThreadPool(source, name, getDefaultThreadPoolProfile()); 143 } 144 145 @Override 146 public ExecutorService newThreadPool(Object source, String name, String profileId) { 147 ThreadPoolProfile profile = getThreadPoolProfile(profileId); 148 if (profile != null) { 149 return newThreadPool(source, name, profile); 150 } else { 151 // no profile with that id 152 return null; 153 } 154 } 155 156 @Override 157 public ExecutorService newThreadPool(Object source, String name, ThreadPoolProfile profile) { 158 String sanitizedName = URISupport.sanitizeUri(name); 159 ObjectHelper.notNull(profile, "ThreadPoolProfile"); 160 161 ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile(); 162 profile.addDefaults(defaultProfile); 163 164 ThreadFactory threadFactory = createThreadFactory(sanitizedName, true); 165 ExecutorService executorService = threadPoolFactory.newThreadPool(profile, threadFactory); 166 onThreadPoolCreated(executorService, source, profile.getId()); 167 if (LOG.isDebugEnabled()) { 168 LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, executorService}); 169 } 170 171 return executorService; 172 } 173 174 @Override 175 public ExecutorService newThreadPool(Object source, String name, int poolSize, int maxPoolSize) { 176 ThreadPoolProfile profile = new ThreadPoolProfile(name); 177 profile.setPoolSize(poolSize); 178 profile.setMaxPoolSize(maxPoolSize); 179 return newThreadPool(source, name, profile); 180 } 181 182 @Override 183 public ExecutorService newSingleThreadExecutor(Object source, String name) { 184 return newFixedThreadPool(source, name, 1); 185 } 186 187 @Override 188 public ExecutorService newCachedThreadPool(Object source, String name) { 189 String sanitizedName = URISupport.sanitizeUri(name); 190 ExecutorService answer = threadPoolFactory.newCachedThreadPool(createThreadFactory(sanitizedName, true)); 191 onThreadPoolCreated(answer, source, null); 192 193 if (LOG.isDebugEnabled()) { 194 LOG.debug("Created new CachedThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer}); 195 } 196 return answer; 197 } 198 199 @Override 200 public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) { 201 ThreadPoolProfile profile = new ThreadPoolProfile(name); 202 profile.setPoolSize(poolSize); 203 profile.setMaxPoolSize(poolSize); 204 profile.setKeepAliveTime(0L); 205 return newThreadPool(source, name, profile); 206 } 207 208 @Override 209 public ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name) { 210 return newScheduledThreadPool(source, name, 1); 211 } 212 213 @Override 214 public ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile) { 215 String sanitizedName = URISupport.sanitizeUri(name); 216 profile.addDefaults(getDefaultThreadPoolProfile()); 217 ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(profile, createThreadFactory(sanitizedName, true)); 218 onThreadPoolCreated(answer, source, null); 219 220 if (LOG.isDebugEnabled()) { 221 LOG.debug("Created new ScheduledThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer}); 222 } 223 return answer; 224 } 225 226 @Override 227 public ScheduledExecutorService newScheduledThreadPool(Object source, String name, String profileId) { 228 ThreadPoolProfile profile = getThreadPoolProfile(profileId); 229 if (profile != null) { 230 return newScheduledThreadPool(source, name, profile); 231 } else { 232 // no profile with that id 233 return null; 234 } 235 } 236 237 @Override 238 public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) { 239 ThreadPoolProfile profile = new ThreadPoolProfile(name); 240 profile.setPoolSize(poolSize); 241 return newScheduledThreadPool(source, name, profile); 242 } 243 244 @Override 245 public void shutdown(ExecutorService executorService) { 246 if (executorService == null) { 247 return; 248 } 249 250 251 if (!executorService.isShutdown()) { 252 LOG.debug("Shutdown ExecutorService: {}", executorService); 253 executorService.shutdown(); 254 LOG.trace("Shutdown ExecutorService: {} complete.", executorService); 255 } 256 257 if (executorService instanceof ThreadPoolExecutor) { 258 ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService; 259 for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { 260 lifecycle.onThreadPoolRemove(camelContext, threadPool); 261 } 262 } 263 264 // remove reference as its shutdown 265 executorServices.remove(executorService); 266 } 267 268 @Override 269 public List<Runnable> shutdownNow(ExecutorService executorService) { 270 return doShutdownNow(executorService, true); 271 } 272 273 private List<Runnable> doShutdownNow(ExecutorService executorService, boolean remove) { 274 ObjectHelper.notNull(executorService, "executorService"); 275 276 List<Runnable> answer = null; 277 if (!executorService.isShutdown()) { 278 LOG.debug("ShutdownNow ExecutorService: {}", executorService); 279 answer = executorService.shutdownNow(); 280 LOG.trace("ShutdownNow ExecutorService: {} complete.", executorService); 281 } 282 283 if (executorService instanceof ThreadPoolExecutor) { 284 ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService; 285 for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { 286 lifecycle.onThreadPoolRemove(camelContext, threadPool); 287 } 288 } 289 290 // remove reference as its shutdown 291 if (remove) { 292 executorServices.remove(executorService); 293 } 294 295 return answer; 296 } 297 298 /** 299 * Strategy callback when a new {@link java.util.concurrent.ExecutorService} have been created. 300 * 301 * @param executorService the created {@link java.util.concurrent.ExecutorService} 302 */ 303 protected void onNewExecutorService(ExecutorService executorService) { 304 // noop 305 } 306 307 @Override 308 protected void doStart() throws Exception { 309 if (threadNamePattern == null) { 310 // set default name pattern which includes the camel context name 311 threadNamePattern = "Camel (" + camelContext.getName() + ") thread ##counter# - #name#"; 312 } 313 } 314 315 @Override 316 protected void doStop() throws Exception { 317 // noop 318 } 319 320 @Override 321 protected void doShutdown() throws Exception { 322 // shutdown all executor services by looping 323 for (ExecutorService executorService : executorServices) { 324 // only log if something goes wrong as we want to shutdown them all 325 try { 326 // must not remove during looping, as we clear the list afterwards 327 doShutdownNow(executorService, false); 328 } catch (Throwable e) { 329 LOG.warn("Error occurred during shutdown of ExecutorService: " 330 + executorService + ". This exception will be ignored.", e); 331 } 332 } 333 executorServices.clear(); 334 335 // do not clear the default profile as we could potential be restarted 336 Iterator<ThreadPoolProfile> it = threadPoolProfiles.values().iterator(); 337 while (it.hasNext()) { 338 ThreadPoolProfile profile = it.next(); 339 if (!profile.isDefaultProfile()) { 340 it.remove(); 341 } 342 } 343 } 344 345 /** 346 * Invoked when a new thread pool is created. 347 * This implementation will invoke the {@link LifecycleStrategy#onThreadPoolAdd(org.apache.camel.CamelContext, 348 * java.util.concurrent.ThreadPoolExecutor, String, String, String, String) LifecycleStrategy.onThreadPoolAdd} method, 349 * which for example will enlist the thread pool in JMX management. 350 * 351 * @param executorService the thread pool 352 * @param source the source to use the thread pool 353 * @param threadPoolProfileId profile id, if the thread pool was created from a thread pool profile 354 */ 355 private void onThreadPoolCreated(ExecutorService executorService, Object source, String threadPoolProfileId) { 356 // add to internal list of thread pools 357 executorServices.add(executorService); 358 359 String id; 360 String sourceId = null; 361 String routeId = null; 362 363 // extract id from source 364 if (source instanceof NamedNode) { 365 id = ((OptionalIdentifiedDefinition<?>) source).idOrCreate(this.camelContext.getNodeIdFactory()); 366 // and let source be the short name of the pattern 367 sourceId = ((NamedNode) source).getShortName(); 368 } else if (source instanceof String) { 369 id = (String) source; 370 } else if (source != null) { 371 // fallback and use the simple class name with hashcode for the id so its unique for this given source 372 id = source.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(source) + ")"; 373 } else { 374 // no source, so fallback and use the simple class name from thread pool and its hashcode identity so its unique 375 id = executorService.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(executorService) + ")"; 376 } 377 378 // id is mandatory 379 ObjectHelper.notEmpty(id, "id for thread pool " + executorService); 380 381 // extract route id if possible 382 if (source instanceof ProcessorDefinition) { 383 RouteDefinition route = ProcessorDefinitionHelper.getRoute((ProcessorDefinition<?>) source); 384 if (route != null) { 385 routeId = route.idOrCreate(this.camelContext.getNodeIdFactory()); 386 } 387 } 388 389 // let lifecycle strategy be notified as well which can let it be managed in JMX as well 390 ThreadPoolExecutor threadPool = null; 391 if (executorService instanceof ThreadPoolExecutor) { 392 threadPool = (ThreadPoolExecutor) executorService; 393 } else if (executorService instanceof SizedScheduledExecutorService) { 394 threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor(); 395 } 396 if (threadPool != null) { 397 for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) { 398 lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId); 399 } 400 } 401 402 // now call strategy to allow custom logic 403 onNewExecutorService(executorService); 404 } 405 406 private ThreadFactory createThreadFactory(String name, boolean isDaemon) { 407 ThreadFactory threadFactory = new CamelThreadFactory(threadNamePattern, name, isDaemon); 408 return threadFactory; 409 } 410 411 }