001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.ArrayList;
020    import java.util.HashMap;
021    import java.util.Iterator;
022    import java.util.List;
023    import java.util.Map;
024    import java.util.concurrent.ExecutorService;
025    import java.util.concurrent.ScheduledExecutorService;
026    import java.util.concurrent.ThreadFactory;
027    import java.util.concurrent.ThreadPoolExecutor;
028    import java.util.concurrent.TimeUnit;
029    
030    import org.apache.camel.CamelContext;
031    import org.apache.camel.NamedNode;
032    import org.apache.camel.ThreadPoolRejectedPolicy;
033    import org.apache.camel.model.OptionalIdentifiedDefinition;
034    import org.apache.camel.model.ProcessorDefinition;
035    import org.apache.camel.model.ProcessorDefinitionHelper;
036    import org.apache.camel.model.RouteDefinition;
037    import org.apache.camel.spi.ExecutorServiceManager;
038    import org.apache.camel.spi.LifecycleStrategy;
039    import org.apache.camel.spi.ThreadPoolFactory;
040    import org.apache.camel.spi.ThreadPoolProfile;
041    import org.apache.camel.support.ServiceSupport;
042    import org.apache.camel.util.ObjectHelper;
043    import org.apache.camel.util.URISupport;
044    import org.apache.camel.util.concurrent.CamelThreadFactory;
045    import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
046    import org.apache.camel.util.concurrent.ThreadHelper;
047    import org.slf4j.Logger;
048    import org.slf4j.LoggerFactory;
049    
050    /**
051     * @version 
052     */
053    public class DefaultExecutorServiceManager extends ServiceSupport implements ExecutorServiceManager {
054        private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorServiceManager.class);
055    
056        private final CamelContext camelContext;
057        private ThreadPoolFactory threadPoolFactory = new DefaultThreadPoolFactory();
058        private final List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
059        private String threadNamePattern;
060        private String defaultThreadPoolProfileId = "defaultThreadPoolProfile";
061        private final Map<String, ThreadPoolProfile> threadPoolProfiles = new HashMap<String, ThreadPoolProfile>();
062        private ThreadPoolProfile builtIndefaultProfile;
063    
064        public DefaultExecutorServiceManager(CamelContext camelContext) {
065            this.camelContext = camelContext;
066    
067            builtIndefaultProfile = new ThreadPoolProfile(defaultThreadPoolProfileId);
068            builtIndefaultProfile.setDefaultProfile(true);
069            builtIndefaultProfile.setPoolSize(10);
070            builtIndefaultProfile.setMaxPoolSize(20);
071            builtIndefaultProfile.setKeepAliveTime(60L);
072            builtIndefaultProfile.setTimeUnit(TimeUnit.SECONDS);
073            builtIndefaultProfile.setMaxQueueSize(1000);
074            builtIndefaultProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.CallerRuns);
075    
076            registerThreadPoolProfile(builtIndefaultProfile);
077        }
078    
079        @Override
080        public ThreadPoolFactory getThreadPoolFactory() {
081            return threadPoolFactory;
082        }
083    
084        @Override
085        public void setThreadPoolFactory(ThreadPoolFactory threadPoolFactory) {
086            this.threadPoolFactory = threadPoolFactory;
087        }
088    
089        @Override
090        public void registerThreadPoolProfile(ThreadPoolProfile profile) {
091            ObjectHelper.notNull(profile, "profile");
092            ObjectHelper.notEmpty(profile.getId(), "id", profile);
093            threadPoolProfiles.put(profile.getId(), profile);
094        }
095    
096        @Override
097        public ThreadPoolProfile getThreadPoolProfile(String id) {
098            return threadPoolProfiles.get(id);
099        }
100    
101        @Override
102        public ThreadPoolProfile getDefaultThreadPoolProfile() {
103            return getThreadPoolProfile(defaultThreadPoolProfileId);
104        }
105    
106        @Override
107        public void setDefaultThreadPoolProfile(ThreadPoolProfile defaultThreadPoolProfile) {
108            threadPoolProfiles.remove(defaultThreadPoolProfileId);
109            defaultThreadPoolProfile.addDefaults(builtIndefaultProfile);
110    
111            LOG.info("Using custom DefaultThreadPoolProfile: " + defaultThreadPoolProfile);
112    
113            this.defaultThreadPoolProfileId = defaultThreadPoolProfile.getId();
114            defaultThreadPoolProfile.setDefaultProfile(true);
115            registerThreadPoolProfile(defaultThreadPoolProfile);
116        }
117    
118        @Override
119        public String getThreadNamePattern() {
120            return threadNamePattern;
121        }
122    
123        @Override
124        public void setThreadNamePattern(String threadNamePattern) {
125            // must set camel id here in the pattern and let the other placeholders be resolved on demand
126            String name = threadNamePattern.replaceFirst("#camelId#", this.camelContext.getName());
127            this.threadNamePattern = name;
128        }
129        
130        @Override
131        public String resolveThreadName(String name) {
132            return ThreadHelper.resolveThreadName(threadNamePattern, name);
133        }
134    
135        @Override
136        public ExecutorService newDefaultThreadPool(Object source, String name) {
137            return newThreadPool(source, name, getDefaultThreadPoolProfile());
138        }
139    
140        @Override
141        public ScheduledExecutorService newDefaultScheduledThreadPool(Object source, String name) {
142            return newScheduledThreadPool(source, name, getDefaultThreadPoolProfile());
143        }
144    
145        @Override
146        public ExecutorService newThreadPool(Object source, String name, String profileId) {
147            ThreadPoolProfile profile = getThreadPoolProfile(profileId);
148            if (profile != null) {
149                return newThreadPool(source, name, profile);
150            } else {
151                // no profile with that id
152                return null;
153            }
154        }
155    
156        @Override
157        public ExecutorService newThreadPool(Object source, String name, ThreadPoolProfile profile) {
158            String sanitizedName = URISupport.sanitizeUri(name);
159            ObjectHelper.notNull(profile, "ThreadPoolProfile");
160    
161            ThreadPoolProfile defaultProfile = getDefaultThreadPoolProfile();
162            profile.addDefaults(defaultProfile);
163    
164            ThreadFactory threadFactory = createThreadFactory(sanitizedName, true);
165            ExecutorService executorService = threadPoolFactory.newThreadPool(profile, threadFactory);
166            onThreadPoolCreated(executorService, source, profile.getId());
167            if (LOG.isDebugEnabled()) {
168                LOG.debug("Created new ThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, executorService});
169            }
170    
171            return executorService;
172        }
173    
174        @Override
175        public ExecutorService newThreadPool(Object source, String name, int poolSize, int maxPoolSize) {
176            ThreadPoolProfile profile = new ThreadPoolProfile(name);
177            profile.setPoolSize(poolSize);
178            profile.setMaxPoolSize(maxPoolSize);
179            return  newThreadPool(source, name, profile);
180        }
181    
182        @Override
183        public ExecutorService newSingleThreadExecutor(Object source, String name) {
184            return newFixedThreadPool(source, name, 1);
185        }
186    
187        @Override
188        public ExecutorService newCachedThreadPool(Object source, String name) {
189            String sanitizedName = URISupport.sanitizeUri(name);
190            ExecutorService answer = threadPoolFactory.newCachedThreadPool(createThreadFactory(sanitizedName, true));
191            onThreadPoolCreated(answer, source, null);
192    
193            if (LOG.isDebugEnabled()) {
194                LOG.debug("Created new CachedThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer});
195            }
196            return answer;
197        }
198    
199        @Override
200        public ExecutorService newFixedThreadPool(Object source, String name, int poolSize) {
201            ThreadPoolProfile profile = new ThreadPoolProfile(name);
202            profile.setPoolSize(poolSize);
203            profile.setMaxPoolSize(poolSize);
204            profile.setKeepAliveTime(0L);
205            return newThreadPool(source, name, profile);
206        }
207    
208        @Override
209        public ScheduledExecutorService newSingleThreadScheduledExecutor(Object source, String name) {
210            return newScheduledThreadPool(source, name, 1);
211        }
212        
213        @Override
214        public ScheduledExecutorService newScheduledThreadPool(Object source, String name, ThreadPoolProfile profile) {
215            String sanitizedName = URISupport.sanitizeUri(name);
216            profile.addDefaults(getDefaultThreadPoolProfile());
217            ScheduledExecutorService answer = threadPoolFactory.newScheduledThreadPool(profile, createThreadFactory(sanitizedName, true));
218            onThreadPoolCreated(answer, source, null);
219    
220            if (LOG.isDebugEnabled()) {
221                LOG.debug("Created new ScheduledThreadPool for source: {} with name: {}. -> {}", new Object[]{source, sanitizedName, answer});
222            }
223            return answer;
224        }
225    
226        @Override
227        public ScheduledExecutorService newScheduledThreadPool(Object source, String name, String profileId) {
228            ThreadPoolProfile profile = getThreadPoolProfile(profileId);
229            if (profile != null) {
230                return newScheduledThreadPool(source, name, profile);
231            } else {
232                // no profile with that id
233                return null;
234            }
235        }
236    
237        @Override
238        public ScheduledExecutorService newScheduledThreadPool(Object source, String name, int poolSize) {
239            ThreadPoolProfile profile = new ThreadPoolProfile(name);
240            profile.setPoolSize(poolSize);
241            return newScheduledThreadPool(source, name, profile);
242        }
243    
244        @Override
245        public void shutdown(ExecutorService executorService) {
246            if (executorService == null) {
247                return;
248            }
249    
250    
251            if (!executorService.isShutdown()) {
252                LOG.debug("Shutdown ExecutorService: {}", executorService);
253                executorService.shutdown();
254                LOG.trace("Shutdown ExecutorService: {} complete.", executorService);
255            }
256    
257            if (executorService instanceof ThreadPoolExecutor) {
258                ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService;
259                for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
260                    lifecycle.onThreadPoolRemove(camelContext, threadPool);
261                }
262            }
263    
264            // remove reference as its shutdown
265            executorServices.remove(executorService);
266        }
267    
268        @Override
269        public List<Runnable> shutdownNow(ExecutorService executorService) {
270            return doShutdownNow(executorService, true);
271        }
272    
273        private List<Runnable> doShutdownNow(ExecutorService executorService, boolean remove) {
274            ObjectHelper.notNull(executorService, "executorService");
275    
276            List<Runnable> answer = null;
277            if (!executorService.isShutdown()) {
278                LOG.debug("ShutdownNow ExecutorService: {}", executorService);
279                answer = executorService.shutdownNow();
280                LOG.trace("ShutdownNow ExecutorService: {} complete.", executorService);
281            }
282    
283            if (executorService instanceof ThreadPoolExecutor) {
284                ThreadPoolExecutor threadPool = (ThreadPoolExecutor) executorService;
285                for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
286                    lifecycle.onThreadPoolRemove(camelContext, threadPool);
287                }
288            }
289    
290            // remove reference as its shutdown
291            if (remove) {
292                executorServices.remove(executorService);
293            }
294    
295            return answer;
296        }
297    
298        /**
299         * Strategy callback when a new {@link java.util.concurrent.ExecutorService} have been created.
300         *
301         * @param executorService the created {@link java.util.concurrent.ExecutorService}
302         */
303        protected void onNewExecutorService(ExecutorService executorService) {
304            // noop
305        }
306    
307        @Override
308        protected void doStart() throws Exception {
309            if (threadNamePattern == null) {
310                // set default name pattern which includes the camel context name
311                threadNamePattern = "Camel (" + camelContext.getName() + ") thread ##counter# - #name#";
312            }
313        }
314    
315        @Override
316        protected void doStop() throws Exception {
317            // noop
318        }
319    
320        @Override
321        protected void doShutdown() throws Exception {
322            // shutdown all executor services by looping
323            for (ExecutorService executorService : executorServices) {
324                // only log if something goes wrong as we want to shutdown them all
325                try {
326                    // must not remove during looping, as we clear the list afterwards
327                    doShutdownNow(executorService, false);
328                } catch (Throwable e) {
329                    LOG.warn("Error occurred during shutdown of ExecutorService: "
330                            + executorService + ". This exception will be ignored.", e);
331                }
332            }
333            executorServices.clear();
334    
335            // do not clear the default profile as we could potential be restarted
336            Iterator<ThreadPoolProfile> it = threadPoolProfiles.values().iterator();
337            while (it.hasNext()) {
338                ThreadPoolProfile profile = it.next();
339                if (!profile.isDefaultProfile()) {
340                    it.remove();
341                }
342            }
343        }
344    
345        /**
346         * Invoked when a new thread pool is created.
347         * This implementation will invoke the {@link LifecycleStrategy#onThreadPoolAdd(org.apache.camel.CamelContext,
348         * java.util.concurrent.ThreadPoolExecutor, String, String, String, String) LifecycleStrategy.onThreadPoolAdd} method,
349         * which for example will enlist the thread pool in JMX management.
350         *
351         * @param executorService the thread pool
352         * @param source          the source to use the thread pool
353         * @param threadPoolProfileId profile id, if the thread pool was created from a thread pool profile
354         */
355        private void onThreadPoolCreated(ExecutorService executorService, Object source, String threadPoolProfileId) {
356            // add to internal list of thread pools
357            executorServices.add(executorService);
358    
359            String id;
360            String sourceId = null;
361            String routeId = null;
362    
363            // extract id from source
364            if (source instanceof NamedNode) {
365                id = ((OptionalIdentifiedDefinition<?>) source).idOrCreate(this.camelContext.getNodeIdFactory());
366                // and let source be the short name of the pattern
367                sourceId = ((NamedNode) source).getShortName();
368            } else if (source instanceof String) {
369                id = (String) source;
370            } else if (source != null) {
371                // fallback and use the simple class name with hashcode for the id so its unique for this given source
372                id = source.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(source) + ")";
373            } else {
374                // no source, so fallback and use the simple class name from thread pool and its hashcode identity so its unique
375                id = executorService.getClass().getSimpleName() + "(" + ObjectHelper.getIdentityHashCode(executorService) + ")";
376            }
377    
378            // id is mandatory
379            ObjectHelper.notEmpty(id, "id for thread pool " + executorService);
380    
381            // extract route id if possible
382            if (source instanceof ProcessorDefinition) {
383                RouteDefinition route = ProcessorDefinitionHelper.getRoute((ProcessorDefinition<?>) source);
384                if (route != null) {
385                    routeId = route.idOrCreate(this.camelContext.getNodeIdFactory());
386                }
387            }
388    
389            // let lifecycle strategy be notified as well which can let it be managed in JMX as well
390            ThreadPoolExecutor threadPool = null;
391            if (executorService instanceof ThreadPoolExecutor) {
392                threadPool = (ThreadPoolExecutor) executorService;
393            } else if (executorService instanceof SizedScheduledExecutorService) {
394                threadPool = ((SizedScheduledExecutorService) executorService).getScheduledThreadPoolExecutor();
395            }
396            if (threadPool != null) {
397                for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
398                    lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
399                }
400            }
401    
402            // now call strategy to allow custom logic
403            onNewExecutorService(executorService);
404        }
405    
406        private ThreadFactory createThreadFactory(String name, boolean isDaemon) {
407            ThreadFactory threadFactory = new CamelThreadFactory(threadNamePattern, name, isDaemon);
408            return threadFactory;
409        }
410    
411    }