001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.Iterator;
020import java.util.LinkedHashSet;
021import java.util.List;
022import java.util.Map;
023import java.util.Set;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.ScheduledExecutorService;
028import java.util.concurrent.ThreadFactory;
029import java.util.concurrent.ThreadPoolExecutor;
030import java.util.concurrent.TimeUnit;
031
032import org.apache.camel.CamelContext;
033import org.apache.camel.NamedNode;
034import org.apache.camel.StaticService;
035import org.apache.camel.ThreadPoolRejectedPolicy;
036import org.apache.camel.model.OptionalIdentifiedDefinition;
037import org.apache.camel.model.ProcessorDefinition;
038import org.apache.camel.model.ProcessorDefinitionHelper;
039import org.apache.camel.model.RouteDefinition;
040import org.apache.camel.spi.ExecutorServiceManager;
041import org.apache.camel.spi.LifecycleStrategy;
042import org.apache.camel.spi.ThreadPoolFactory;
043import org.apache.camel.spi.ThreadPoolProfile;
044import org.apache.camel.support.ServiceSupport;
045import org.apache.camel.util.ObjectHelper;
046import org.apache.camel.util.StopWatch;
047import org.apache.camel.util.TimeUtils;
048import org.apache.camel.util.URISupport;
049import org.apache.camel.util.concurrent.CamelThreadFactory;
050import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
051import org.apache.camel.util.concurrent.ThreadHelper;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Default {@link org.apache.camel.spi.ExecutorServiceManager}.
057 *
058 * @version 
059 */
060public class DefaultExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager {
061    private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceManager.class);
062
063    private final CamelContext camelContext;
064    private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory();
065    private final List<ExecutorService> executorServices = new CopyOnWriteArrayList<ExecutorService>();
066    private String threadNamePattern;
067    private long shutdownAwaitTermination = 10000;
068    private String defaultThreadPoolProfileId = "defaultThreadPoolProfile";
069    private final Map<String, ThreadPoolProfile> threadPoolProfiles = new ConcurrentHashMap<String, ThreadPoolProfile>();
070    private ThreadPoolProfile defaultProfile;
071
072    public DefaultExecutorServiceManager(CamelContext camelContext) {
073        this.camelContext = camelContext;
074
075        defaultProfile = new ThreadPoolProfile(defaultThreadPoolProfileId);
076        defaultProfile.setDefaultProfile(true);
077        defaultProfile.setPoolSize(10);
078        defaultProfile.setMaxPoolSize(20);
079        defaultProfile.setKeepAliveTime(60L);
080        defaultProfile.setTimeUnit(TimeUnit.SECONDS);
081        defaultProfile.setMaxQueueSize(1000);
082        defaultProfile.setAllowCoreThreadTimeOut(false);
083        defaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns);
084
085        registerThreadPoolProfile(defaultProfile);
086    }
087
088    @Override
089    public ThreadPoolFactory getThreadPoolFactory() {
090        return threadPoolFactory;
091    }
092
093    @Override
094    public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) {
095        this.threadPoolFactory = threadPoolFactory;
096    }
097
098    @Override
099    public void registerThreadPoolProfile(ThreadPoolProfile profile) {
100        ObjectHelper.notNull(profile, "profile");
101        ObjectHelper.notEmpty(profile.getId(), "id", profile);
102        threadPoolProfiles.put(profile.getId(), profile);
103    }
104
105    @Override
106    public ThreadPoolProfile getThreadPoolProfile(String id) {
107        return threadPoolProfiles.get(id);
108    }
109
110    @Override
111    public ThreadPoolProfile getDefaultThreadPoolProfile() {
112        return getThreadPoolProfile(defaultThreadPoolProfileId);
113    }
114
115    @Override
116    public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) {
117        threadPoolProfiles.remove(defaultThreadPoolProfileId);
118        defaultThreadPoolProfile.addDefaults(defaultProfile);
119
120        LOG.info("Using custom DefaultThreadPoolProfile: " + defaultThreadPoolProfile);
121
122        this.defaultThreadPoolProfileId = defaultThreadPoolProfile.getId();
123        defaultThreadPoolProfile.setDefaultProfile(true);
124        registerThreadPoolProfile(defaultThreadPoolProfile);
125    }
126
127    @Override
128    public String getThreadNamePattern() {
129        return threadNamePattern;
130    }
131
132    @Override
133    public void setThreadNamePattern(String threadNamePattern) {
134        // must set camel id here in the pattern and let the other placeholders be resolved on demand
135        String name = threadNamePattern.replaceFirst("#camelId#", this.camelContext.getName());
136        this.threadNamePattern = name;
137    }
138
139    @Override
140    public long getShutdownAwaitTermination() {
141        return shutdownAwaitTermination;
142    }
143
144    @Override
145    public void setShutdownAwaitTermination(long shutdownAwaitTermination) {
146        this.shutdownAwaitTermination = shutdownAwaitTermination;
147    }
148
149    @Override
150    public String resolveThreadName(String name) {
151        return ThreadHelper.resolveThreadName(threadNamePattern, name);
152    }
153
154    @Override
155    public Thread newThread(String name, Runnable runnable) {
156        ThreadFactory factory = createThreadFactory(name, true);
157        return factory.newThread(runnable);
158    }
159
160    @Override
161    public ExecutorService newDefaultThreadPool(Object source, String name) {
162        return newThreadPool(source, name, getDefaultThreadPoolProfile());
163    }
164
165    @Override
166    public ScheduledExecutorService newDefaultScheduledThreadPool(Object source, String name) {
167        return newScheduledThreadPool(source, name, getDefaultThreadPoolProfile());
168    }
169
170    @Override
171    public ExecutorService newThreadPool(Object source, String name, String profileId) {
172        ThreadPoolProfile profile = getThreadPoolProfile(profileId);
173        if (profile != null) {
174            return newThreadPool(source, name, profile);
175        } else {
176            // no profile with that id
177            return null;
178        }
179    }
180
181    @Override
182    public ExecutorService newThreadPool(Object source, String name, ThreadPoolProfile profile) {
183        String sanitizedName = URISupport.sanitizeUri(name);
184        ObjectHelper.notNull(profile, "ThreadPoolProfile");
185
186        ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile();
187        profile.addDefaults(defaultProfile);
188
189        ThreadFactory threadFactory = createThreadFactory(sanitizedName, true);
190        ExecutorService executorService = threadPoolFactory.newThreadPool(profile, threadFactory);
191        onThreadPoolCreated(executorService, source, profile.getId());
192        if (LOG.isDebugEnabled()) {
193            LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, executorService});
194        }
195
196        return executorService;
197    }
198
199    @Override
200    public ExecutorService newThreadPool(Object source, String name, int poolSize, int maxPoolSize) {
201        ThreadPoolProfile profile = new ThreadPoolProfile(name);
202        profile.setPoolSize(poolSize);
203        profile.setMaxPoolSize(maxPoolSize);
204        return newThreadPool(source, name, profile);
205    }
206
207    @Override
208    public ExecutorService newSingleThreadExecutor(Object source, String name) {
209        return newFixedThreadPool(source, name, 1);
210    }
211
212    @Override
213    public ExecutorService newCachedThreadPool(Object source, String name) {
214        String sanitizedName = URISupport.sanitizeUri(name);
215        ExecutorService answer = threadPoolFactory.newCachedThreadPool(createThreadFactory(sanitizedName, true));
216        onThreadPoolCreated(answer, source, null);
217
218        if (LOG.isDebugEnabled()) {
219            LOG.debug("Created new CachedThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer});
220        }
221        return answer;
222    }
223
224    @Override
225    public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) {
226        ThreadPoolProfile profile = new ThreadPoolProfile(name);
227        profile.setPoolSize(poolSize);
228        profile.setMaxPoolSize(poolSize);
229        profile.setKeepAliveTime(0L);
230        return newThreadPool(source, name, profile);
231    }
232
233    @Override
234    public ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name) {
235        return newScheduledThreadPool(source, name, 1);
236    }
237    
238    @Override
239    public ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile) {
240        String sanitizedName = URISupport.sanitizeUri(name);
241        profile.addDefaults(getDefaultThreadPoolProfile());
242        ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(profile, createThreadFactory(sanitizedName, true));
243        onThreadPoolCreated(answer, source, null);
244
245        if (LOG.isDebugEnabled()) {
246            LOG.debug("Created new ScheduledThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer});
247        }
248        return answer;
249    }
250
251    @Override
252    public ScheduledExecutorService newScheduledThreadPool(Object source, String name, String profileId) {
253        ThreadPoolProfile profile = getThreadPoolProfile(profileId);
254        if (profile != null) {
255            return newScheduledThreadPool(source, name, profile);
256        } else {
257            // no profile with that id
258            return null;
259        }
260    }
261
262    @Override
263    public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
264        ThreadPoolProfile profile = new ThreadPoolProfile(name);
265        profile.setPoolSize(poolSize);
266        return newScheduledThreadPool(source, name, profile);
267    }
268
269    @Override
270    public void shutdown(ExecutorService executorService) {
271        doShutdown(executorService, 0, false);
272    }
273
274    @Override
275    public void shutdownGraceful(ExecutorService executorService) {
276        doShutdown(executorService, getShutdownAwaitTermination(), false);
277    }
278
279    @Override
280    public void shutdownGraceful(ExecutorService executorService, long shutdownAwaitTermination) {
281        doShutdown(executorService, shutdownAwaitTermination, false);
282    }
283
284    private boolean doShutdown(ExecutorService executorService, long shutdownAwaitTermination, boolean failSafe) {
285        if (executorService == null) {
286            return false;
287        }
288
289        boolean warned = false;
290
291        // shutting down a thread pool is a 2 step process. First we try graceful, and if that fails, then we go more aggressively
292        // and try shutting down again. In both cases we wait at most the given shutdown timeout value given
293        // (total wait could then be 2 x shutdownAwaitTermination, but when we shutdown the 2nd time we are aggressive and thus
294        // we ought to shutdown much faster)
295        if (!executorService.isShutdown()) {
296            StopWatch watch = new StopWatch();
297
298            LOG.trace("Shutdown of ExecutorService: {} with await termination: {} millis", executorService, shutdownAwaitTermination);
299            executorService.shutdown();
300
301            if (shutdownAwaitTermination > 0) {
302                try {
303                    if (!awaitTermination(executorService, shutdownAwaitTermination)) {
304                        warned = true;
305                        LOG.warn("Forcing shutdown of ExecutorService: {} due first await termination elapsed.", executorService);
306                        executorService.shutdownNow();
307                        // we are now shutting down aggressively, so wait to see if we can completely shutdown or not
308                        if (!awaitTermination(executorService, shutdownAwaitTermination)) {
309                            LOG.warn("Cannot completely force shutdown of ExecutorService: {} due second await termination elapsed.", executorService);
310                        }
311                    }
312                } catch (InterruptedException e) {
313                    warned = true;
314                    LOG.warn("Forcing shutdown of ExecutorService: {} due interrupted.", executorService);
315                    // we were interrupted during shutdown, so force shutdown
316                    executorService.shutdownNow();
317                }
318            }
319
320            // if we logged at WARN level, then report at INFO level when we are complete so the end user can see this in the log
321            if (warned) {
322                LOG.info("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
323                        new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
324            } else if (LOG.isDebugEnabled()) {
325                LOG.debug("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {} took: {}.",
326                    new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated(), TimeUtils.printDuration(watch.taken())});
327            }
328        }
329
330        // let lifecycle strategy be notified as well which can let it be managed in JMX as well
331        ThreadPoolExecutor threadPool = null;
332        if (executorService instanceof ThreadPoolExecutor) {
333            threadPool = (ThreadPoolExecutor) executorService;
334        } else if (executorService instanceof SizedScheduledExecutorService) {
335            threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
336        }
337        if (threadPool != null) {
338            for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
339                lifecycle.onThreadPoolRemove(camelContext, threadPool);
340            }
341        }
342
343        // remove reference as its shutdown (do not remove if fail-safe)
344        if (!failSafe) {
345            executorServices.remove(executorService);
346        }
347
348        return warned;
349    }
350
351    @Override
352    public List<Runnable> shutdownNow(ExecutorService executorService) {
353        return doShutdownNow(executorService, false);
354    }
355
356    private List<Runnable> doShutdownNow(ExecutorService executorService, boolean failSafe) {
357        ObjectHelper.notNull(executorService, "executorService");
358
359        List<Runnable> answer = null;
360        if (!executorService.isShutdown()) {
361            if (failSafe) {
362                // log as warn, as we shutdown as fail-safe, so end user should see more details in the log.
363                LOG.warn("Forcing shutdown of ExecutorService: {}", executorService);
364            } else {
365                LOG.debug("Forcing shutdown of ExecutorService: {}", executorService);
366            }
367            answer = executorService.shutdownNow();
368            if (LOG.isTraceEnabled()) {
369                LOG.trace("Shutdown of ExecutorService: {} is shutdown: {} and terminated: {}.",
370                        new Object[]{executorService, executorService.isShutdown(), executorService.isTerminated()});
371            }
372        }
373
374        // let lifecycle strategy be notified as well which can let it be managed in JMX as well
375        ThreadPoolExecutor threadPool = null;
376        if (executorService instanceof ThreadPoolExecutor) {
377            threadPool = (ThreadPoolExecutor) executorService;
378        } else if (executorService instanceof SizedScheduledExecutorService) {
379            threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
380        }
381        if (threadPool != null) {
382            for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
383                lifecycle.onThreadPoolRemove(camelContext, threadPool);
384            }
385        }
386
387        // remove reference as its shutdown (do not remove if fail-safe)
388        if (!failSafe) {
389            executorServices.remove(executorService);
390        }
391
392        return answer;
393    }
394
395    @Override
396    public boolean awaitTermination(ExecutorService executorService, long shutdownAwaitTermination) throws InterruptedException {
397        // log progress every 2nd second so end user is aware of we are shutting down
398        StopWatch watch = new StopWatch();
399        long interval = Math.min(2000, shutdownAwaitTermination);
400        boolean done = false;
401        while (!done && interval > 0) {
402            if (executorService.awaitTermination(interval, TimeUnit.MILLISECONDS)) {
403                done = true;
404            } else {
405                LOG.info("Waited {} for ExecutorService: {} to terminate...", TimeUtils.printDuration(watch.taken()), executorService);
406                // recalculate interval
407                interval = Math.min(2000, shutdownAwaitTermination - watch.taken());
408            }
409        }
410
411        return done;
412    }
413
414    /**
415     * Strategy callback when a new {@link java.util.concurrent.ExecutorService} have been created.
416     *
417     * @param executorService the created {@link java.util.concurrent.ExecutorService}
418     */
419    protected void onNewExecutorService(ExecutorService executorService) {
420        // noop
421    }
422
423    @Override
424    protected void doStart() throws Exception {
425        if (threadNamePattern == null) {
426            // set default name pattern which includes the camel context name
427            threadNamePattern = "Camel (" + camelContext.getName() + ") thread ##counter# - #name#";
428        }
429    }
430
431    @Override
432    protected void doStop() throws Exception {
433        // noop
434    }
435
436    @Override
437    protected void doShutdown() throws Exception {
438        // shutdown all remainder executor services by looping and doing this aggressively
439        // as by normal all threads pool should have been shutdown using proper lifecycle
440        // by their EIPs, components etc. This is acting as a fail-safe during shutdown
441        // of CamelContext itself.
442        Set<ExecutorService> forced = new LinkedHashSet<ExecutorService>();
443        if (!executorServices.isEmpty()) {
444            // at first give a bit of time to shutdown nicely as the thread pool is most likely in the process of being shutdown also
445            LOG.debug("Giving time for {} ExecutorService's to shutdown properly (acting as fail-safe)", executorServices.size());
446            for (ExecutorService executorService : executorServices) {
447                try {
448                    boolean warned = doShutdown(executorService, getShutdownAwaitTermination(), true);
449                    // remember the thread pools that was forced to shutdown (eg warned)
450                    if (warned) {
451                        forced.add(executorService);
452                    }
453                } catch (Throwable e) {
454                    // only log if something goes wrong as we want to shutdown them all
455                    LOG.warn("Error occurred during shutdown of ExecutorService: "
456                            + executorService + ". This exception will be ignored.", e);
457                }
458            }
459        }
460
461        // log the thread pools which was forced to shutdown so it may help the user to identify a problem of his
462        if (!forced.isEmpty()) {
463            LOG.warn("Forced shutdown of {} ExecutorService's which has not been shutdown properly (acting as fail-safe)", forced.size());
464            for (ExecutorService executorService : forced) {
465                LOG.warn("  forced -> {}", executorService);
466            }
467        }
468        forced.clear();
469
470        // clear list
471        executorServices.clear();
472
473        // do not clear the default profile as we could potential be restarted
474        Iterator<ThreadPoolProfile> it = threadPoolProfiles.values().iterator();
475        while (it.hasNext()) {
476            ThreadPoolProfile profile = it.next();
477            if (!profile.isDefaultProfile()) {
478                it.remove();
479            }
480        }
481    }
482
483    /**
484     * Invoked when a new thread pool is created.
485     * This implementation will invoke the {@link LifecycleStrategy#onThreadPoolAdd(org.apache.camel.CamelContext,
486     * java.util.concurrent.ThreadPoolExecutor, String, String, String, String) LifecycleStrategy.onThreadPoolAdd} method,
487     * which for example will enlist the thread pool in JMX management.
488     *
489     * @param executorService the thread pool
490     * @param source          the source to use the thread pool
491     * @param threadPoolProfileId profile id, if the thread pool was created from a thread pool profile
492     */
493    private void onThreadPoolCreated(ExecutorService executorService, Object source, String threadPoolProfileId) {
494        // add to internal list of thread pools
495        executorServices.add(executorService);
496
497        String id;
498        String sourceId = null;
499        String routeId = null;
500
501        // extract id from source
502        if (source instanceof NamedNode) {
503            id = ((OptionalIdentifiedDefinition<?>) source).idOrCreate(this.camelContext.getNodeIdFactory());
504            // and let source be the short name of the pattern
505            sourceId = ((NamedNode) source).getShortName();
506        } else if (source instanceof String) {
507            id = (String) source;
508        } else if (source != null) {
509            if (source instanceof StaticService) {
510                // the source is static service so its name would be unique
511                id = source.getClass().getSimpleName();
512            } else {
513                // fallback and use the simple class name with hashcode for the id so its unique for this given source
514                id = source.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(source) + ")";
515            }
516        } else {
517            // no source, so fallback and use the simple class name from thread pool and its hashcode identity so its unique
518            id = executorService.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(executorService) + ")";
519        }
520
521        // id is mandatory
522        ObjectHelper.notEmpty(id, "id for thread pool " + executorService);
523
524        // extract route id if possible
525        if (source instanceof ProcessorDefinition) {
526            RouteDefinition route = ProcessorDefinitionHelper.getRoute((ProcessorDefinition<?>) source);
527            if (route != null) {
528                routeId = route.idOrCreate(this.camelContext.getNodeIdFactory());
529            }
530        }
531
532        // let lifecycle strategy be notified as well which can let it be managed in JMX as well
533        ThreadPoolExecutor threadPool = null;
534        if (executorService instanceof ThreadPoolExecutor) {
535            threadPool = (ThreadPoolExecutor) executorService;
536        } else if (executorService instanceof SizedScheduledExecutorService) {
537            threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
538        }
539        if (threadPool != null) {
540            for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
541                lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
542            }
543        }
544
545        // now call strategy to allow custom logic
546        onNewExecutorService(executorService);
547    }
548
549    private ThreadFactory createThreadFactory(String name, boolean isDaemon) {
550        ThreadFactory threadFactory = new CamelThreadFactory(threadNamePattern, name, isDaemon);
551        return threadFactory;
552    }
553
554}