001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.ExecutorService; 022import java.util.concurrent.TimeUnit; 023 024import javax.xml.bind.annotation.XmlAccessType; 025import javax.xml.bind.annotation.XmlAccessorType; 026import javax.xml.bind.annotation.XmlAttribute; 027import javax.xml.bind.annotation.XmlRootElement; 028import javax.xml.bind.annotation.XmlTransient; 029import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; 030 031import org.apache.camel.Processor; 032import org.apache.camel.ThreadPoolRejectedPolicy; 033import org.apache.camel.builder.ThreadPoolProfileBuilder; 034import org.apache.camel.builder.xml.TimeUnitAdapter; 035import org.apache.camel.processor.Pipeline; 036import org.apache.camel.processor.ThreadsProcessor; 037import org.apache.camel.spi.ExecutorServiceManager; 038import org.apache.camel.spi.Metadata; 039import org.apache.camel.spi.RouteContext; 040import org.apache.camel.spi.ThreadPoolProfile; 041 042/** 043 * Specifies that all steps after this node are processed asynchronously 044 * 045 * @version 046 */ 047@Metadata(label = "eip,routing") 048@XmlRootElement(name = "threads") 049@XmlAccessorType(XmlAccessType.FIELD) 050public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> implements ExecutorServiceAwareDefinition<ThreadsDefinition> { 051 052 // TODO: Camel 3.0 Should extend NoOutputDefinition 053 054 @XmlTransient 055 private ExecutorService executorService; 056 @XmlAttribute 057 private String executorServiceRef; 058 @XmlAttribute 059 private Integer poolSize; 060 @XmlAttribute 061 private Integer maxPoolSize; 062 @XmlAttribute 063 private Long keepAliveTime; 064 @XmlAttribute 065 @XmlJavaTypeAdapter(TimeUnitAdapter.class) 066 private TimeUnit timeUnit; 067 @XmlAttribute 068 private Integer maxQueueSize; 069 @XmlAttribute 070 private Boolean allowCoreThreadTimeOut; 071 @XmlAttribute @Metadata(defaultValue = "Threads") 072 private String threadName; 073 @XmlAttribute 074 private ThreadPoolRejectedPolicy rejectedPolicy; 075 @XmlAttribute @Metadata(defaultValue = "true") 076 private Boolean callerRunsWhenRejected; 077 078 public ThreadsDefinition() { 079 this.threadName = "Threads"; 080 } 081 082 @Override 083 public Processor createProcessor(RouteContext routeContext) throws Exception { 084 // the threads name 085 String name = getThreadName() != null ? getThreadName() : "Threads"; 086 // prefer any explicit configured executor service 087 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); 088 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false); 089 090 // resolve what rejected policy to use 091 ThreadPoolRejectedPolicy policy = resolveRejectedPolicy(routeContext); 092 if (policy == null) { 093 if (callerRunsWhenRejected == null || callerRunsWhenRejected) { 094 // should use caller runs by default if not configured 095 policy = ThreadPoolRejectedPolicy.CallerRuns; 096 } else { 097 policy = ThreadPoolRejectedPolicy.Abort; 098 } 099 } 100 log.debug("Using ThreadPoolRejectedPolicy: {}", policy); 101 102 // if no explicit then create from the options 103 if (threadPool == null) { 104 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 105 // create the thread pool using a builder 106 ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name) 107 .poolSize(getPoolSize()) 108 .maxPoolSize(getMaxPoolSize()) 109 .keepAliveTime(getKeepAliveTime(), getTimeUnit()) 110 .maxQueueSize(getMaxQueueSize()) 111 .rejectedPolicy(policy) 112 .allowCoreThreadTimeOut(getAllowCoreThreadTimeOut()) 113 .build(); 114 threadPool = manager.newThreadPool(this, name, profile); 115 shutdownThreadPool = true; 116 } else { 117 if (getThreadName() != null && !getThreadName().equals("Threads")) { 118 throw new IllegalArgumentException("ThreadName and executorServiceRef options cannot be used together."); 119 } 120 if (getPoolSize() != null) { 121 throw new IllegalArgumentException("PoolSize and executorServiceRef options cannot be used together."); 122 } 123 if (getMaxPoolSize() != null) { 124 throw new IllegalArgumentException("MaxPoolSize and executorServiceRef options cannot be used together."); 125 } 126 if (getKeepAliveTime() != null) { 127 throw new IllegalArgumentException("KeepAliveTime and executorServiceRef options cannot be used together."); 128 } 129 if (getTimeUnit() != null) { 130 throw new IllegalArgumentException("TimeUnit and executorServiceRef options cannot be used together."); 131 } 132 if (getMaxQueueSize() != null) { 133 throw new IllegalArgumentException("MaxQueueSize and executorServiceRef options cannot be used together."); 134 } 135 if (getRejectedPolicy() != null) { 136 throw new IllegalArgumentException("RejectedPolicy and executorServiceRef options cannot be used together."); 137 } 138 if (getAllowCoreThreadTimeOut() != null) { 139 throw new IllegalArgumentException("AllowCoreThreadTimeOut and executorServiceRef options cannot be used together."); 140 } 141 } 142 143 ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool, policy); 144 145 List<Processor> pipe = new ArrayList<>(2); 146 pipe.add(thread); 147 pipe.add(createChildProcessor(routeContext, true)); 148 // wrap in nested pipeline so this appears as one processor 149 // (recipient list definition does this as well) 150 return new Pipeline(routeContext.getCamelContext(), pipe) { 151 @Override 152 public String toString() { 153 return "Threads[" + getOutputs() + "]"; 154 } 155 }; 156 } 157 158 protected ThreadPoolRejectedPolicy resolveRejectedPolicy(RouteContext routeContext) { 159 if (getExecutorServiceRef() != null && getRejectedPolicy() == null) { 160 ThreadPoolProfile threadPoolProfile = routeContext.getCamelContext().getExecutorServiceManager().getThreadPoolProfile(getExecutorServiceRef()); 161 if (threadPoolProfile != null) { 162 return threadPoolProfile.getRejectedPolicy(); 163 } 164 } 165 return getRejectedPolicy(); 166 } 167 168 @Override 169 public String getShortName() { 170 return "threads"; 171 } 172 173 @Override 174 public String getLabel() { 175 return "threads"; 176 } 177 178 @Override 179 public String toString() { 180 return "Threads[" + getOutputs() + "]"; 181 } 182 183 /** 184 * To use a custom thread pool 185 */ 186 public ThreadsDefinition executorService(ExecutorService executorService) { 187 setExecutorService(executorService); 188 return this; 189 } 190 191 /** 192 * To refer to a custom thread pool or use a thread pool profile (as overlay) 193 */ 194 public ThreadsDefinition executorServiceRef(String executorServiceRef) { 195 setExecutorServiceRef(executorServiceRef); 196 return this; 197 } 198 199 /** 200 * Sets the core pool size 201 * 202 * @param poolSize the core pool size to keep minimum in the pool 203 * @return the builder 204 */ 205 public ThreadsDefinition poolSize(int poolSize) { 206 setPoolSize(poolSize); 207 return this; 208 } 209 210 /** 211 * Sets the maximum pool size 212 * 213 * @param maxPoolSize the maximum pool size 214 * @return the builder 215 */ 216 public ThreadsDefinition maxPoolSize(int maxPoolSize) { 217 setMaxPoolSize(maxPoolSize); 218 return this; 219 } 220 221 /** 222 * Sets the keep alive time for idle threads 223 * 224 * @param keepAliveTime keep alive time 225 * @return the builder 226 */ 227 public ThreadsDefinition keepAliveTime(long keepAliveTime) { 228 setKeepAliveTime(keepAliveTime); 229 return this; 230 } 231 232 /** 233 * Sets the keep alive time unit. 234 * By default SECONDS is used. 235 * 236 * @param keepAliveTimeUnits time unit 237 * @return the builder 238 */ 239 public ThreadsDefinition timeUnit(TimeUnit keepAliveTimeUnits) { 240 setTimeUnit(keepAliveTimeUnits); 241 return this; 242 } 243 244 /** 245 * Sets the maximum number of tasks in the work queue. 246 * <p/> 247 * Use <tt>-1</tt> or <tt>Integer.MAX_VALUE</tt> for an unbounded queue 248 * 249 * @param maxQueueSize the max queue size 250 * @return the builder 251 */ 252 public ThreadsDefinition maxQueueSize(int maxQueueSize) { 253 setMaxQueueSize(maxQueueSize); 254 return this; 255 } 256 257 /** 258 * Sets the handler for tasks which cannot be executed by the thread pool. 259 * 260 * @param rejectedPolicy the policy for the handler 261 * @return the builder 262 */ 263 public ThreadsDefinition rejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) { 264 setRejectedPolicy(rejectedPolicy); 265 return this; 266 } 267 268 /** 269 * Sets the thread name to use. 270 * 271 * @param threadName the thread name 272 * @return the builder 273 */ 274 public ThreadsDefinition threadName(String threadName) { 275 setThreadName(threadName); 276 return this; 277 } 278 279 /** 280 * Whether or not to use as caller runs as <b>fallback</b> when a task is rejected being added to the thread pool (when its full). 281 * This is only used as fallback if no rejectedPolicy has been configured, or the thread pool has no configured rejection handler. 282 * <p/> 283 * Is by default <tt>true</tt> 284 * 285 * @param callerRunsWhenRejected whether or not the caller should run 286 * @return the builder 287 */ 288 public ThreadsDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) { 289 setCallerRunsWhenRejected(callerRunsWhenRejected); 290 return this; 291 } 292 293 /** 294 * Whether idle core threads is allowed to timeout and therefore can shrink the pool size below the core pool size 295 * <p/> 296 * Is by default <tt>false</tt> 297 * 298 * @param allowCoreThreadTimeOut <tt>true</tt> to allow timeout 299 * @return the builder 300 */ 301 public ThreadsDefinition allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { 302 setAllowCoreThreadTimeOut(allowCoreThreadTimeOut); 303 return this; 304 } 305 306 public ExecutorService getExecutorService() { 307 return executorService; 308 } 309 310 public void setExecutorService(ExecutorService executorService) { 311 this.executorService = executorService; 312 } 313 314 public String getExecutorServiceRef() { 315 return executorServiceRef; 316 } 317 318 public void setExecutorServiceRef(String executorServiceRef) { 319 this.executorServiceRef = executorServiceRef; 320 } 321 322 public Integer getPoolSize() { 323 return poolSize; 324 } 325 326 public void setPoolSize(Integer poolSize) { 327 this.poolSize = poolSize; 328 } 329 330 public Integer getMaxPoolSize() { 331 return maxPoolSize; 332 } 333 334 public void setMaxPoolSize(Integer maxPoolSize) { 335 this.maxPoolSize = maxPoolSize; 336 } 337 338 public Long getKeepAliveTime() { 339 return keepAliveTime; 340 } 341 342 public void setKeepAliveTime(Long keepAliveTime) { 343 this.keepAliveTime = keepAliveTime; 344 } 345 346 public TimeUnit getTimeUnit() { 347 return timeUnit; 348 } 349 350 public void setTimeUnit(TimeUnit timeUnit) { 351 this.timeUnit = timeUnit; 352 } 353 354 public Integer getMaxQueueSize() { 355 return maxQueueSize; 356 } 357 358 public void setMaxQueueSize(Integer maxQueueSize) { 359 this.maxQueueSize = maxQueueSize; 360 } 361 362 public String getThreadName() { 363 return threadName; 364 } 365 366 public void setThreadName(String threadName) { 367 this.threadName = threadName; 368 } 369 370 public ThreadPoolRejectedPolicy getRejectedPolicy() { 371 return rejectedPolicy; 372 } 373 374 public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) { 375 this.rejectedPolicy = rejectedPolicy; 376 } 377 378 public Boolean getCallerRunsWhenRejected() { 379 return callerRunsWhenRejected; 380 } 381 382 public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) { 383 this.callerRunsWhenRejected = callerRunsWhenRejected; 384 } 385 386 public Boolean getAllowCoreThreadTimeOut() { 387 return allowCoreThreadTimeOut; 388 } 389 390 public void setAllowCoreThreadTimeOut(Boolean allowCoreThreadTimeOut) { 391 this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; 392 } 393}