001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.thread; 018 019import java.util.concurrent.Executor; 020import java.util.concurrent.ExecutorService; 021import java.util.concurrent.RejectedExecutionHandler; 022import java.util.concurrent.SynchronousQueue; 023import java.util.concurrent.ThreadFactory; 024import java.util.concurrent.ThreadPoolExecutor; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import java.util.concurrent.atomic.AtomicLong; 028 029import org.apache.activemq.util.ThreadPoolUtils; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Manages the thread pool for long running tasks. Long running tasks are not 035 * always active but when they are active, they may need a few iterations of 036 * processing for them to become idle. The manager ensures that each task is 037 * processes but that no one task overtakes the system. This is somewhat like 038 * cooperative multitasking. 039 * 040 * @org.apache.xbean.XBean 041 */ 042public class TaskRunnerFactory implements Executor { 043 044 private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerFactory.class); 045 private ExecutorService executor; 046 private int maxIterationsPerRun; 047 private String name; 048 private int priority; 049 private boolean daemon; 050 private final AtomicLong id = new AtomicLong(0); 051 private boolean dedicatedTaskRunner; 052 private long shutdownAwaitTermination = 30000; 053 private final AtomicBoolean initDone = new AtomicBoolean(false); 054 private int maxThreadPoolSize = getDefaultMaximumPoolSize(); 055 private RejectedExecutionHandler rejectedTaskHandler = null; 056 private ClassLoader threadClassLoader; 057 058 public TaskRunnerFactory() { 059 this("ActiveMQ Task"); 060 } 061 062 public TaskRunnerFactory(String name) { 063 this(name, Thread.NORM_PRIORITY, true, 1000); 064 } 065 066 private TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun) { 067 this(name, priority, daemon, maxIterationsPerRun, false); 068 } 069 070 public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner) { 071 this(name, priority, daemon, maxIterationsPerRun, dedicatedTaskRunner, getDefaultMaximumPoolSize()); 072 } 073 074 public TaskRunnerFactory(String name, int priority, boolean daemon, int maxIterationsPerRun, boolean dedicatedTaskRunner, int maxThreadPoolSize) { 075 this.name = name; 076 this.priority = priority; 077 this.daemon = daemon; 078 this.maxIterationsPerRun = maxIterationsPerRun; 079 this.dedicatedTaskRunner = dedicatedTaskRunner; 080 this.maxThreadPoolSize = maxThreadPoolSize; 081 } 082 083 public void init() { 084 if (initDone.compareAndSet(false, true)) { 085 // If your OS/JVM combination has a good thread model, you may want to 086 // avoid using a thread pool to run tasks and use a DedicatedTaskRunner instead. 087 if (dedicatedTaskRunner || "true".equalsIgnoreCase(System.getProperty("org.apache.activemq.UseDedicatedTaskRunner"))) { 088 executor = null; 089 } else if (executor == null) { 090 executor = createDefaultExecutor(); 091 } 092 LOG.debug("Initialized TaskRunnerFactory[{}] using ExecutorService: {}", name, executor); 093 } 094 } 095 096 /** 097 * Performs a shutdown only, by which the thread pool is shutdown by not graceful nor aggressively. 098 * 099 * @see ThreadPoolUtils#shutdown(java.util.concurrent.ExecutorService) 100 */ 101 public void shutdown() { 102 if (executor != null) { 103 ThreadPoolUtils.shutdown(executor); 104 executor = null; 105 } 106 initDone.set(false); 107 } 108 109 /** 110 * Performs a shutdown now (aggressively) on the thread pool. 111 * 112 * @see ThreadPoolUtils#shutdownNow(java.util.concurrent.ExecutorService) 113 */ 114 public void shutdownNow() { 115 if (executor != null) { 116 ThreadPoolUtils.shutdownNow(executor); 117 executor = null; 118 } 119 initDone.set(false); 120 } 121 122 /** 123 * Performs a graceful shutdown. 124 * 125 * @see ThreadPoolUtils#shutdownGraceful(java.util.concurrent.ExecutorService) 126 */ 127 public void shutdownGraceful() { 128 if (executor != null) { 129 ThreadPoolUtils.shutdownGraceful(executor, shutdownAwaitTermination); 130 executor = null; 131 } 132 initDone.set(false); 133 } 134 135 public TaskRunner createTaskRunner(Task task, String name) { 136 init(); 137 if (executor != null) { 138 return new PooledTaskRunner(executor, task, maxIterationsPerRun); 139 } else { 140 return new DedicatedTaskRunner(task, name, priority, daemon); 141 } 142 } 143 144 @Override 145 public void execute(Runnable runnable) { 146 execute(runnable, name); 147 } 148 149 public void execute(Runnable runnable, String name) { 150 init(); 151 LOG.trace("Execute[{}] runnable: {}", name, runnable); 152 if (executor != null) { 153 executor.execute(runnable); 154 } else { 155 doExecuteNewThread(runnable, name); 156 } 157 } 158 159 private void doExecuteNewThread(Runnable runnable, String name) { 160 String threadName = name + "-" + id.incrementAndGet(); 161 Thread thread = new Thread(runnable, threadName); 162 thread.setDaemon(daemon); 163 164 LOG.trace("Created and running thread[{}]: {}", threadName, thread); 165 thread.start(); 166 } 167 168 protected ExecutorService createDefaultExecutor() { 169 ThreadPoolExecutor rc = new ThreadPoolExecutor(getDefaultCorePoolSize(), getMaxThreadPoolSize(), getDefaultKeepAliveTime(), TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() { 170 @Override 171 public Thread newThread(Runnable runnable) { 172 String threadName = name + "-" + id.incrementAndGet(); 173 Thread thread = new Thread(runnable, threadName); 174 thread.setDaemon(daemon); 175 thread.setPriority(priority); 176 if (threadClassLoader != null) { 177 thread.setContextClassLoader(threadClassLoader); 178 } 179 thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 180 @Override 181 public void uncaughtException(final Thread t, final Throwable e) { 182 LOG.error("Error in thread '{}'", t.getName(), e); 183 } 184 }); 185 186 LOG.trace("Created thread[{}]: {}", threadName, thread); 187 return thread; 188 } 189 }); 190 191 if (rejectedTaskHandler != null) { 192 rc.setRejectedExecutionHandler(rejectedTaskHandler); 193 } 194 195 return rc; 196 } 197 198 public ExecutorService getExecutor() { 199 return executor; 200 } 201 202 public void setExecutor(ExecutorService executor) { 203 this.executor = executor; 204 } 205 206 public int getMaxIterationsPerRun() { 207 return maxIterationsPerRun; 208 } 209 210 public void setMaxIterationsPerRun(int maxIterationsPerRun) { 211 this.maxIterationsPerRun = maxIterationsPerRun; 212 } 213 214 public String getName() { 215 return name; 216 } 217 218 public void setName(String name) { 219 this.name = name; 220 } 221 222 public int getPriority() { 223 return priority; 224 } 225 226 public void setPriority(int priority) { 227 this.priority = priority; 228 } 229 230 public boolean isDaemon() { 231 return daemon; 232 } 233 234 public void setDaemon(boolean daemon) { 235 this.daemon = daemon; 236 } 237 238 public boolean isDedicatedTaskRunner() { 239 return dedicatedTaskRunner; 240 } 241 242 public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) { 243 this.dedicatedTaskRunner = dedicatedTaskRunner; 244 } 245 246 public int getMaxThreadPoolSize() { 247 return maxThreadPoolSize; 248 } 249 250 public void setMaxThreadPoolSize(int maxThreadPoolSize) { 251 this.maxThreadPoolSize = maxThreadPoolSize; 252 } 253 254 public void setThreadClassLoader(ClassLoader threadClassLoader) { 255 this.threadClassLoader = threadClassLoader; 256 } 257 258 public RejectedExecutionHandler getRejectedTaskHandler() { 259 return rejectedTaskHandler; 260 } 261 262 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) { 263 this.rejectedTaskHandler = rejectedTaskHandler; 264 } 265 266 public long getShutdownAwaitTermination() { 267 return shutdownAwaitTermination; 268 } 269 270 public void setShutdownAwaitTermination(long shutdownAwaitTermination) { 271 this.shutdownAwaitTermination = shutdownAwaitTermination; 272 } 273 274 private static int getDefaultCorePoolSize() { 275 return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.corePoolSize", 0); 276 } 277 278 private static int getDefaultMaximumPoolSize() { 279 return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.maximumPoolSize", Integer.MAX_VALUE); 280 } 281 282 private static int getDefaultKeepAliveTime() { 283 return Integer.getInteger("org.apache.activemq.thread.TaskRunnerFactory.keepAliveTime", 30); 284 } 285}