001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.util.concurrent; 018 019import java.util.Collection; 020import java.util.List; 021import java.util.concurrent.Callable; 022import java.util.concurrent.ExecutionException; 023import java.util.concurrent.Future; 024import java.util.concurrent.RejectedExecutionException; 025import java.util.concurrent.RejectedExecutionHandler; 026import java.util.concurrent.ScheduledExecutorService; 027import java.util.concurrent.ScheduledFuture; 028import java.util.concurrent.ScheduledThreadPoolExecutor; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.TimeoutException; 032 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * A sized {@link ScheduledExecutorService} which will reject executing tasks if the task queue is full. 038 * <p/> 039 * The {@link ScheduledThreadPoolExecutor} which is the default implementation of the {@link ScheduledExecutorService} 040 * has unbounded task queue, which mean you can keep scheduling tasks which may cause the system to run out of memory. 041 * <p/> 042 * This class is a wrapped for {@link ScheduledThreadPoolExecutor} to reject executing tasks if an upper limit of the 043 * task queue has been reached. 044 */ 045public class SizedScheduledExecutorService implements ScheduledExecutorService { 046 047 private static final Logger LOG = LoggerFactory.getLogger(SizedScheduledExecutorService.class); 048 private final ScheduledThreadPoolExecutor delegate; 049 private final long queueSize; 050 051 /** 052 * Creates a new sized {@link ScheduledExecutorService} with the given queue size as upper task limit. 053 * 054 * @param delegate the delegate of the actual thread pool implementation 055 * @param queueSize the upper queue size, use 0 or negative value for unlimited 056 */ 057 public SizedScheduledExecutorService(ScheduledThreadPoolExecutor delegate, long queueSize) { 058 this.delegate = delegate; 059 this.queueSize = queueSize; 060 } 061 062 /** 063 * Gets the wrapped {@link ScheduledThreadPoolExecutor} 064 */ 065 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() { 066 return delegate; 067 } 068 069 @Override 070 public <V> ScheduledFuture<V> schedule(Callable<V> task, long delay, TimeUnit timeUnit) { 071 if (canScheduleOrExecute()) { 072 return delegate.schedule(task, delay, timeUnit); 073 } else { 074 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 075 } 076 } 077 078 @Override 079 public ScheduledFuture<?> schedule(Runnable task, long delay, TimeUnit timeUnit) { 080 if (canScheduleOrExecute()) { 081 return delegate.schedule(task, delay, timeUnit); 082 } else { 083 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 084 } 085 } 086 087 @Override 088 public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit timeUnit) { 089 if (canScheduleOrExecute()) { 090 return delegate.scheduleAtFixedRate(task, initialDelay, period, timeUnit); 091 } else { 092 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 093 } 094 } 095 096 @Override 097 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long period, TimeUnit timeUnit) { 098 if (canScheduleOrExecute()) { 099 return delegate.scheduleWithFixedDelay(task, initialDelay, period, timeUnit); 100 } else { 101 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 102 } 103 } 104 105 @Override 106 public boolean awaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException { 107 return delegate.awaitTermination(timeout, timeUnit); 108 } 109 110 public int getActiveCount() { 111 return delegate.getActiveCount(); 112 } 113 114 public long getCompletedTaskCount() { 115 return delegate.getCompletedTaskCount(); 116 } 117 118 public int getCorePoolSize() { 119 return delegate.getCorePoolSize(); 120 } 121 122 public long getKeepAliveTime(TimeUnit timeUnit) { 123 return delegate.getKeepAliveTime(timeUnit); 124 } 125 126 public int getLargestPoolSize() { 127 return delegate.getLargestPoolSize(); 128 } 129 130 public int getMaximumPoolSize() { 131 return delegate.getMaximumPoolSize(); 132 } 133 134 public int getPoolSize() { 135 return delegate.getPoolSize(); 136 } 137 138 public RejectedExecutionHandler getRejectedExecutionHandler() { 139 return delegate.getRejectedExecutionHandler(); 140 } 141 142 public long getTaskCount() { 143 return delegate.getTaskCount(); 144 } 145 146 public ThreadFactory getThreadFactory() { 147 return delegate.getThreadFactory(); 148 } 149 150 @Override 151 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { 152 if (canScheduleOrExecute()) { 153 return delegate.invokeAll(tasks); 154 } else { 155 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 156 } 157 } 158 159 @Override 160 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit timeUnit) 161 throws InterruptedException { 162 if (canScheduleOrExecute()) { 163 return delegate.invokeAll(tasks, timeout, timeUnit); 164 } else { 165 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 166 } 167 } 168 169 @Override 170 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { 171 if (canScheduleOrExecute()) { 172 return delegate.invokeAny(tasks); 173 } else { 174 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 175 } 176 } 177 178 @Override 179 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit timeUnit) 180 throws InterruptedException, ExecutionException, TimeoutException { 181 if (canScheduleOrExecute()) { 182 return delegate.invokeAny(tasks, timeout, timeUnit); 183 } else { 184 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 185 } 186 } 187 188 @Override 189 public boolean isShutdown() { 190 return delegate.isShutdown(); 191 } 192 193 @Override 194 public boolean isTerminated() { 195 return delegate.isTerminated(); 196 } 197 198 public boolean isTerminating() { 199 return delegate.isTerminating(); 200 } 201 202 public int prestartAllCoreThreads() { 203 return delegate.prestartAllCoreThreads(); 204 } 205 206 public boolean prestartCoreThread() { 207 return delegate.prestartCoreThread(); 208 } 209 210 public void purge() { 211 delegate.purge(); 212 } 213 214 public void setCorePoolSize(int corePoolSize) { 215 delegate.setCorePoolSize(corePoolSize); 216 } 217 218 public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) { 219 delegate.setKeepAliveTime(keepAliveTime, timeUnit); 220 } 221 222 public void setMaximumPoolSize(int maximumPoolSize) { 223 delegate.setMaximumPoolSize(maximumPoolSize); 224 } 225 226 public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) { 227 delegate.setRejectedExecutionHandler(rejectedExecutionHandler); 228 } 229 230 public void setThreadFactory(ThreadFactory threadFactory) { 231 delegate.setThreadFactory(threadFactory); 232 } 233 234 @Override 235 public void shutdown() { 236 delegate.shutdown(); 237 } 238 239 @Override 240 public List<Runnable> shutdownNow() { 241 return delegate.shutdownNow(); 242 } 243 244 @Override 245 public <T> Future<T> submit(Callable<T> task) { 246 if (canScheduleOrExecute()) { 247 return delegate.submit(task); 248 } else { 249 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 250 } 251 } 252 253 @Override 254 public Future<?> submit(Runnable task) { 255 if (canScheduleOrExecute()) { 256 return delegate.submit(task); 257 } else { 258 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 259 } 260 } 261 262 @Override 263 public <T> Future<T> submit(Runnable task, T result) { 264 if (canScheduleOrExecute()) { 265 return delegate.submit(task, result); 266 } else { 267 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 268 } 269 } 270 271 @Override 272 public void execute(Runnable task) { 273 if (canScheduleOrExecute()) { 274 delegate.execute(task); 275 } else { 276 throw new RejectedExecutionException("Task rejected due queue size limit reached"); 277 } 278 } 279 280 public void allowCoreThreadTimeOut(boolean value) { 281 delegate.allowCoreThreadTimeOut(value); 282 } 283 284 public boolean allowsCoreThreadTimeOut() { 285 return delegate.allowsCoreThreadTimeOut(); 286 } 287 288 /** 289 * Can the task be scheduled or executed? 290 * 291 * @return <tt>true</tt> to accept, <tt>false</tt> to not accept 292 */ 293 protected boolean canScheduleOrExecute() { 294 if (queueSize <= 0) { 295 return true; 296 } 297 298 int size = delegate.getQueue().size(); 299 boolean answer = size < queueSize; 300 if (LOG.isTraceEnabled()) { 301 LOG.trace("canScheduleOrExecute {} < {} -> {}", size, queueSize, answer); 302 } 303 return answer; 304 } 305 306 @Override 307 public String toString() { 308 // the thread factory often have more precise details what the thread pool is used for 309 if (delegate.getThreadFactory() instanceof CamelThreadFactory) { 310 String name = ((CamelThreadFactory) delegate.getThreadFactory()).getName(); 311 return super.toString() + "[" + name + "]"; 312 } else { 313 return super.toString(); 314 } 315 } 316}