001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.concurrent.BlockingQueue; 020 import java.util.concurrent.ExecutorService; 021 import java.util.concurrent.Executors; 022 import java.util.concurrent.LinkedBlockingQueue; 023 import java.util.concurrent.RejectedExecutionHandler; 024 import java.util.concurrent.ScheduledExecutorService; 025 import java.util.concurrent.ScheduledThreadPoolExecutor; 026 import java.util.concurrent.SynchronousQueue; 027 import java.util.concurrent.ThreadFactory; 028 import java.util.concurrent.ThreadPoolExecutor; 029 import java.util.concurrent.TimeUnit; 030 031 import org.apache.camel.spi.ThreadPoolFactory; 032 import org.apache.camel.spi.ThreadPoolProfile; 033 import org.apache.camel.util.concurrent.RejectableScheduledThreadPoolExecutor; 034 import org.apache.camel.util.concurrent.RejectableThreadPoolExecutor; 035 import org.apache.camel.util.concurrent.SizedScheduledExecutorService; 036 037 /** 038 * Factory for thread pools that uses the JDK {@link Executors} for creating the thread pools. 039 */ 040 public class DefaultThreadPoolFactory implements ThreadPoolFactory { 041 042 public ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { 043 return Executors.newCachedThreadPool(threadFactory); 044 } 045 046 @Override 047 public ExecutorService newThreadPool(ThreadPoolProfile profile, ThreadFactory factory) { 048 return newThreadPool(profile.getPoolSize(), 049 profile.getMaxPoolSize(), 050 profile.getKeepAliveTime(), 051 profile.getTimeUnit(), 052 profile.getMaxQueueSize(), 053 profile.getRejectedExecutionHandler(), 054 factory); 055 } 056 057 public ExecutorService newThreadPool(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit timeUnit, 058 int maxQueueSize, RejectedExecutionHandler rejectedExecutionHandler, 059 ThreadFactory threadFactory) throws IllegalArgumentException { 060 061 // If we set the corePoolSize to be 0, the whole camel application will hang in JDK5 062 // just add a check here to throw the IllegalArgumentException 063 if (corePoolSize < 1) { 064 throw new IllegalArgumentException("The corePoolSize can't be lower than 1"); 065 } 066 067 // validate max >= core 068 if (maxPoolSize < corePoolSize) { 069 throw new IllegalArgumentException("MaxPoolSize must be >= corePoolSize, was " + maxPoolSize + " >= " + corePoolSize); 070 } 071 072 BlockingQueue<Runnable> workQueue; 073 if (corePoolSize == 0 && maxQueueSize <= 0) { 074 // use a synchronous queue 075 workQueue = new SynchronousQueue<Runnable>(); 076 // and force 1 as pool size to be able to create the thread pool by the JDK 077 corePoolSize = 1; 078 maxPoolSize = 1; 079 } else if (maxQueueSize <= 0) { 080 // unbounded task queue 081 workQueue = new LinkedBlockingQueue<Runnable>(); 082 } else { 083 // bounded task queue 084 workQueue = new LinkedBlockingQueue<Runnable>(maxQueueSize); 085 } 086 087 ThreadPoolExecutor answer = new RejectableThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, timeUnit, workQueue); 088 answer.setThreadFactory(threadFactory); 089 if (rejectedExecutionHandler == null) { 090 rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); 091 } 092 answer.setRejectedExecutionHandler(rejectedExecutionHandler); 093 return answer; 094 } 095 096 @Override 097 public ScheduledExecutorService newScheduledThreadPool(ThreadPoolProfile profile, ThreadFactory threadFactory) { 098 RejectedExecutionHandler rejectedExecutionHandler = profile.getRejectedExecutionHandler(); 099 if (rejectedExecutionHandler == null) { 100 rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy(); 101 } 102 103 ScheduledThreadPoolExecutor answer = new RejectableScheduledThreadPoolExecutor(profile.getPoolSize(), threadFactory, rejectedExecutionHandler); 104 // TODO: when JDK7 we should setRemoveOnCancelPolicy(true) 105 106 // need to wrap the thread pool in a sized to guard against the problem that the 107 // JDK created thread pool has an unbounded queue (see class javadoc), which mean 108 // we could potentially keep adding tasks, and run out of memory. 109 if (profile.getMaxPoolSize() > 0) { 110 return new SizedScheduledExecutorService(answer, profile.getMaxQueueSize()); 111 } else { 112 return answer; 113 } 114 } 115 116 }