001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import java.util.concurrent.ExecutorService; 022 import java.util.concurrent.TimeUnit; 023 024 import javax.xml.bind.annotation.XmlAccessType; 025 import javax.xml.bind.annotation.XmlAccessorType; 026 import javax.xml.bind.annotation.XmlAttribute; 027 import javax.xml.bind.annotation.XmlRootElement; 028 import javax.xml.bind.annotation.XmlTransient; 029 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; 030 031 import org.apache.camel.Processor; 032 import org.apache.camel.ThreadPoolRejectedPolicy; 033 import org.apache.camel.builder.ThreadPoolProfileBuilder; 034 import org.apache.camel.builder.xml.TimeUnitAdapter; 035 import org.apache.camel.processor.Pipeline; 036 import org.apache.camel.processor.ThreadsProcessor; 037 import org.apache.camel.spi.ExecutorServiceManager; 038 import org.apache.camel.spi.RouteContext; 039 import org.apache.camel.spi.ThreadPoolProfile; 040 041 /** 042 * Represents an XML <threads/> element 043 * 044 * @version 045 */ 046 @XmlRootElement(name = "threads") 047 @XmlAccessorType(XmlAccessType.FIELD) 048 public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> implements ExecutorServiceAwareDefinition<ThreadsDefinition> { 049 050 // TODO: Camel 3.0 Should extend NoOutputDefinition 051 052 @XmlTransient 053 private ExecutorService executorService; 054 @XmlAttribute 055 private String executorServiceRef; 056 @XmlAttribute 057 private Integer poolSize; 058 @XmlAttribute 059 private Integer maxPoolSize; 060 @XmlAttribute 061 private Long keepAliveTime; 062 @XmlAttribute 063 @XmlJavaTypeAdapter(TimeUnitAdapter.class) 064 private TimeUnit timeUnit; 065 @XmlAttribute 066 private Integer maxQueueSize; 067 @XmlAttribute 068 private String threadName; 069 @XmlAttribute 070 private ThreadPoolRejectedPolicy rejectedPolicy; 071 @XmlAttribute 072 private Boolean callerRunsWhenRejected; 073 074 public ThreadsDefinition() { 075 this.threadName = "Threads"; 076 } 077 078 @Override 079 public Processor createProcessor(RouteContext routeContext) throws Exception { 080 // the threads name 081 String name = getThreadName() != null ? getThreadName() : "Threads"; 082 // prefer any explicit configured executor service 083 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); 084 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false); 085 // if no explicit then create from the options 086 if (threadPool == null) { 087 ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager(); 088 // create the thread pool using a builder 089 ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name) 090 .poolSize(getPoolSize()) 091 .maxPoolSize(getMaxPoolSize()) 092 .keepAliveTime(getKeepAliveTime(), getTimeUnit()) 093 .maxQueueSize(getMaxQueueSize()) 094 .rejectedPolicy(getRejectedPolicy()) 095 .build(); 096 threadPool = manager.newThreadPool(this, name, profile); 097 shutdownThreadPool = true; 098 } 099 100 ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool); 101 if (getCallerRunsWhenRejected() == null) { 102 // should be true by default 103 thread.setCallerRunsWhenRejected(true); 104 } else { 105 thread.setCallerRunsWhenRejected(getCallerRunsWhenRejected()); 106 } 107 thread.setRejectedPolicy(getRejectedPolicy()); 108 109 List<Processor> pipe = new ArrayList<Processor>(2); 110 pipe.add(thread); 111 pipe.add(createChildProcessor(routeContext, true)); 112 // wrap in nested pipeline so this appears as one processor 113 // (recipient list definition does this as well) 114 return new Pipeline(routeContext.getCamelContext(), pipe) { 115 @Override 116 public String toString() { 117 return "Threads[" + getOutputs() + "]"; 118 } 119 }; 120 } 121 122 @Override 123 public String getLabel() { 124 return "threads"; 125 } 126 127 @Override 128 public String getShortName() { 129 return "threads"; 130 } 131 132 @Override 133 public String toString() { 134 return "Threads[" + getOutputs() + "]"; 135 } 136 137 public ThreadsDefinition executorService(ExecutorService executorService) { 138 setExecutorService(executorService); 139 return this; 140 } 141 142 public ThreadsDefinition executorServiceRef(String executorServiceRef) { 143 setExecutorServiceRef(executorServiceRef); 144 return this; 145 } 146 147 /** 148 * Sets the core pool size for the underlying {@link java.util.concurrent.ExecutorService}. 149 * 150 * @param poolSize the core pool size to keep minimum in the pool 151 * @return the builder 152 */ 153 public ThreadsDefinition poolSize(int poolSize) { 154 setPoolSize(poolSize); 155 return this; 156 } 157 158 /** 159 * Sets the maximum pool size for the underlying {@link java.util.concurrent.ExecutorService}. 160 * 161 * @param maxPoolSize the maximum pool size 162 * @return the builder 163 */ 164 public ThreadsDefinition maxPoolSize(int maxPoolSize) { 165 setMaxPoolSize(maxPoolSize); 166 return this; 167 } 168 169 /** 170 * Sets the keep alive time for idle threads 171 * 172 * @param keepAliveTime keep alive time 173 * @return the builder 174 */ 175 public ThreadsDefinition keepAliveTime(long keepAliveTime) { 176 setKeepAliveTime(keepAliveTime); 177 return this; 178 } 179 180 /** 181 * Sets the keep alive time unit. 182 * By default SECONDS is used. 183 * 184 * @param keepAliveTimeUnits time unit 185 * @return the builder 186 */ 187 public ThreadsDefinition timeUnit(TimeUnit keepAliveTimeUnits) { 188 setTimeUnit(keepAliveTimeUnits); 189 return this; 190 } 191 192 /** 193 * Sets the maximum number of tasks in the work queue. 194 * <p/> 195 * Use <tt>-1</tt> or <tt>Integer.MAX_VALUE</tt> for an unbounded queue 196 * 197 * @param maxQueueSize the max queue size 198 * @return the builder 199 */ 200 public ThreadsDefinition maxQueueSize(int maxQueueSize) { 201 setMaxQueueSize(maxQueueSize); 202 return this; 203 } 204 205 /** 206 * Sets the handler for tasks which cannot be executed by the thread pool. 207 * 208 * @param rejectedPolicy the policy for the handler 209 * @return the builder 210 */ 211 public ThreadsDefinition rejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) { 212 setRejectedPolicy(rejectedPolicy); 213 return this; 214 } 215 216 /** 217 * Sets the thread name to use. 218 * 219 * @param threadName the thread name 220 * @return the builder 221 */ 222 public ThreadsDefinition threadName(String threadName) { 223 setThreadName(threadName); 224 return this; 225 } 226 227 /** 228 * Whether or not the caller should run the task when it was rejected by the thread pool. 229 * <p/> 230 * Is by default <tt>true</tt> 231 * 232 * @param callerRunsWhenRejected whether or not the caller should run 233 * @return the builder 234 */ 235 public ThreadsDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) { 236 setCallerRunsWhenRejected(callerRunsWhenRejected); 237 return this; 238 } 239 240 public ExecutorService getExecutorService() { 241 return executorService; 242 } 243 244 public void setExecutorService(ExecutorService executorService) { 245 this.executorService = executorService; 246 } 247 248 public String getExecutorServiceRef() { 249 return executorServiceRef; 250 } 251 252 public void setExecutorServiceRef(String executorServiceRef) { 253 this.executorServiceRef = executorServiceRef; 254 } 255 256 public Integer getPoolSize() { 257 return poolSize; 258 } 259 260 public void setPoolSize(Integer poolSize) { 261 this.poolSize = poolSize; 262 } 263 264 public Integer getMaxPoolSize() { 265 return maxPoolSize; 266 } 267 268 public void setMaxPoolSize(Integer maxPoolSize) { 269 this.maxPoolSize = maxPoolSize; 270 } 271 272 public Long getKeepAliveTime() { 273 return keepAliveTime; 274 } 275 276 public void setKeepAliveTime(Long keepAliveTime) { 277 this.keepAliveTime = keepAliveTime; 278 } 279 280 public TimeUnit getTimeUnit() { 281 return timeUnit; 282 } 283 284 public void setTimeUnit(TimeUnit timeUnit) { 285 this.timeUnit = timeUnit; 286 } 287 288 public Integer getMaxQueueSize() { 289 return maxQueueSize; 290 } 291 292 public void setMaxQueueSize(Integer maxQueueSize) { 293 this.maxQueueSize = maxQueueSize; 294 } 295 296 public String getThreadName() { 297 return threadName; 298 } 299 300 public void setThreadName(String threadName) { 301 this.threadName = threadName; 302 } 303 304 public ThreadPoolRejectedPolicy getRejectedPolicy() { 305 return rejectedPolicy; 306 } 307 308 public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) { 309 this.rejectedPolicy = rejectedPolicy; 310 } 311 312 public Boolean getCallerRunsWhenRejected() { 313 return callerRunsWhenRejected; 314 } 315 316 public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) { 317 this.callerRunsWhenRejected = callerRunsWhenRejected; 318 } 319 }