001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.impl;
018    
019    import java.util.concurrent.BlockingQueue;
020    import java.util.concurrent.ExecutorService;
021    import java.util.concurrent.Executors;
022    import java.util.concurrent.LinkedBlockingQueue;
023    import java.util.concurrent.RejectedExecutionHandler;
024    import java.util.concurrent.ScheduledExecutorService;
025    import java.util.concurrent.ScheduledThreadPoolExecutor;
026    import java.util.concurrent.SynchronousQueue;
027    import java.util.concurrent.ThreadFactory;
028    import java.util.concurrent.ThreadPoolExecutor;
029    import java.util.concurrent.TimeUnit;
030    
031    import org.apache.camel.spi.ThreadPoolFactory;
032    import org.apache.camel.spi.ThreadPoolProfile;
033    import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor;
034    import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor;
035    import org.apache.camel.util.concurrent.SizedScheduledExecutorService;
036    
037    /**
038     * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools.
039     */
040    public class DefaultThreadPoolFactory implements ThreadPoolFactory {
041    
042        public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
043            return Executors.newCachedThreadPool(threadFactory);
044        }
045        
046        @Override
047        public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) {
048            return newThreadPool(profile.getPoolSize(), 
049                                 profile.getMaxPoolSize(), 
050                                 profile.getKeepAliveTime(),
051                                 profile.getTimeUnit(),
052                                 profile.getMaxQueueSize(), 
053                                 profile.getRejectedExecutionHandler(),
054                                 factory);
055        }
056    
057        public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit,
058                                             int maxQueueSize, RejectedExecutionHandler rejectedExecutionHandler,
059                                             ThreadFactory threadFactory) throws IllegalArgumentException {
060    
061            // If we set the corePoolSize to be 0, the whole camel application will hang in JDK5
062            // just add a check here to throw the IllegalArgumentException
063            if (corePoolSize < 1) {
064                throw new IllegalArgumentException("The corePoolSize can't be lower than 1");
065            }
066    
067            // validate max >= core
068            if (maxPoolSize < corePoolSize) {
069                throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize);
070            }
071    
072            BlockingQueue<Runnable> workQueue;
073            if (corePoolSize == 0 && maxQueueSize <= 0) {
074                // use a synchronous queue
075                workQueue = new SynchronousQueue<Runnable>();
076                // and force 1 as pool size to be able to create the thread pool by the JDK
077                corePoolSize = 1;
078                maxPoolSize = 1;
079            } else if (maxQueueSize <= 0) {
080                // unbounded task queue
081                workQueue = new LinkedBlockingQueue<Runnable>();
082            } else {
083                // bounded task queue
084                workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize);
085            }
086    
087            ThreadPoolExecutor answer = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue);
088            answer.setThreadFactory(threadFactory);
089            if (rejectedExecutionHandler == null) {
090                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
091            }
092            answer.setRejectedExecutionHandler(rejectedExecutionHandler);
093            return answer;
094        }
095        
096        @Override
097        public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) {
098            RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler();
099            if (rejectedExecutionHandler == null) {
100                rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
101            }
102    
103            ScheduledThreadPoolExecutor answer = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler);
104            // TODO: when JDK7 we should setRemoveOnCancelPolicy(true)
105    
106            // need to wrap the thread pool in a sized to guard against the problem that the
107            // JDK created thread pool has an unbounded queue (see class javadoc), which mean
108            // we could potentially keep adding tasks, and run out of memory.
109            if (profile.getMaxPoolSize() > 0) {
110                return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize());
111            } else {
112                return answer;
113            }
114        }
115    
116    }