001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import java.util.concurrent.ExecutorService;
022    import java.util.concurrent.TimeUnit;
023    
024    import javax.xml.bind.annotation.XmlAccessType;
025    import javax.xml.bind.annotation.XmlAccessorType;
026    import javax.xml.bind.annotation.XmlAttribute;
027    import javax.xml.bind.annotation.XmlRootElement;
028    import javax.xml.bind.annotation.XmlTransient;
029    import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
030    
031    import org.apache.camel.Processor;
032    import org.apache.camel.ThreadPoolRejectedPolicy;
033    import org.apache.camel.builder.ThreadPoolProfileBuilder;
034    import org.apache.camel.builder.xml.TimeUnitAdapter;
035    import org.apache.camel.processor.Pipeline;
036    import org.apache.camel.processor.ThreadsProcessor;
037    import org.apache.camel.spi.ExecutorServiceManager;
038    import org.apache.camel.spi.RouteContext;
039    import org.apache.camel.spi.ThreadPoolProfile;
040    
041    /**
042     * Represents an XML <threads/> element
043     *
044     * @version 
045     */
046    @XmlRootElement(name = "threads")
047    @XmlAccessorType(XmlAccessType.FIELD)
048    public class ThreadsDefinition extends OutputDefinition<ThreadsDefinition> implements ExecutorServiceAwareDefinition<ThreadsDefinition> {
049    
050        // TODO: Camel 3.0 Should extend NoOutputDefinition
051    
052        @XmlTransient
053        private ExecutorService executorService;
054        @XmlAttribute
055        private String executorServiceRef;
056        @XmlAttribute
057        private Integer poolSize;
058        @XmlAttribute
059        private Integer maxPoolSize;
060        @XmlAttribute
061        private Long keepAliveTime;
062        @XmlAttribute
063        @XmlJavaTypeAdapter(TimeUnitAdapter.class)
064        private TimeUnit timeUnit;
065        @XmlAttribute
066        private Integer maxQueueSize;
067        @XmlAttribute
068        private String threadName;
069        @XmlAttribute
070        private ThreadPoolRejectedPolicy rejectedPolicy;
071        @XmlAttribute
072        private Boolean callerRunsWhenRejected;
073        
074        public ThreadsDefinition() {
075            this.threadName =  "Threads";
076        }
077    
078        @Override
079        public Processor createProcessor(RouteContext routeContext) throws Exception {
080            // the threads name
081            String name = getThreadName() != null ? getThreadName() : "Threads";
082            // prefer any explicit configured executor service
083            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
084            ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, name, this, false);
085            // if no explicit then create from the options
086            if (threadPool == null) {
087                ExecutorServiceManager manager = routeContext.getCamelContext().getExecutorServiceManager();
088                // create the thread pool using a builder
089                ThreadPoolProfile profile = new ThreadPoolProfileBuilder(name)
090                        .poolSize(getPoolSize())
091                        .maxPoolSize(getMaxPoolSize())
092                        .keepAliveTime(getKeepAliveTime(), getTimeUnit())
093                        .maxQueueSize(getMaxQueueSize())
094                        .rejectedPolicy(getRejectedPolicy())
095                        .build();
096                threadPool = manager.newThreadPool(this, name, profile);
097                shutdownThreadPool = true;
098            }
099    
100            ThreadsProcessor thread = new ThreadsProcessor(routeContext.getCamelContext(), threadPool, shutdownThreadPool);
101            if (getCallerRunsWhenRejected() == null) {
102                // should be true by default
103                thread.setCallerRunsWhenRejected(true);
104            } else {
105                thread.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
106            }
107            thread.setRejectedPolicy(getRejectedPolicy());
108    
109            List<Processor> pipe = new ArrayList<Processor>(2);
110            pipe.add(thread);
111            pipe.add(createChildProcessor(routeContext, true));
112            // wrap in nested pipeline so this appears as one processor
113            // (recipient list definition does this as well)
114            return new Pipeline(routeContext.getCamelContext(), pipe) {
115                @Override
116                public String toString() {
117                    return "Threads[" + getOutputs() + "]";
118                }
119            };
120        }
121    
122        @Override
123        public String getLabel() {
124            return "threads";
125        }
126    
127        @Override
128        public String getShortName() {
129            return "threads";
130        }
131    
132        @Override
133        public String toString() {
134            return "Threads[" + getOutputs() + "]";
135        }
136    
137        public ThreadsDefinition executorService(ExecutorService executorService) {
138            setExecutorService(executorService);
139            return this;
140        }
141    
142        public ThreadsDefinition executorServiceRef(String executorServiceRef) {
143            setExecutorServiceRef(executorServiceRef);
144            return this;
145        }
146    
147        /**
148         * Sets the core pool size for the underlying {@link java.util.concurrent.ExecutorService}.
149         *
150         * @param poolSize the core pool size to keep minimum in the pool
151         * @return the builder
152         */
153        public ThreadsDefinition poolSize(int poolSize) {
154            setPoolSize(poolSize);
155            return this;
156        }
157    
158        /**
159         * Sets the maximum pool size for the underlying {@link java.util.concurrent.ExecutorService}.
160         *
161         * @param maxPoolSize the maximum pool size
162         * @return the builder
163         */
164        public ThreadsDefinition maxPoolSize(int maxPoolSize) {
165            setMaxPoolSize(maxPoolSize);
166            return this;
167        }
168    
169        /**
170         * Sets the keep alive time for idle threads
171         *
172         * @param keepAliveTime keep alive time
173         * @return the builder
174         */
175        public ThreadsDefinition keepAliveTime(long keepAliveTime) {
176            setKeepAliveTime(keepAliveTime);
177            return this;
178        }
179    
180        /**
181         * Sets the keep alive time unit.
182         * By default SECONDS is used.
183         *
184         * @param keepAliveTimeUnits time unit
185         * @return the builder
186         */
187        public ThreadsDefinition timeUnit(TimeUnit keepAliveTimeUnits) {
188            setTimeUnit(keepAliveTimeUnits);
189            return this;
190        }
191    
192        /**
193         * Sets the maximum number of tasks in the work queue.
194         * <p/>
195         * Use <tt>-1</tt> or <tt>Integer.MAX_VALUE</tt> for an unbounded queue
196         *
197         * @param maxQueueSize the max queue size
198         * @return the builder
199         */
200        public ThreadsDefinition maxQueueSize(int maxQueueSize) {
201            setMaxQueueSize(maxQueueSize);
202            return this;
203        }
204    
205        /**
206         * Sets the handler for tasks which cannot be executed by the thread pool.
207         *
208         * @param rejectedPolicy  the policy for the handler
209         * @return the builder
210         */
211        public ThreadsDefinition rejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
212            setRejectedPolicy(rejectedPolicy);
213            return this;
214        }
215    
216        /**
217         * Sets the thread name to use.
218         *
219         * @param threadName the thread name
220         * @return the builder
221         */
222        public ThreadsDefinition threadName(String threadName) {
223            setThreadName(threadName);
224            return this;
225        }
226    
227        /**
228         * Whether or not the caller should run the task when it was rejected by the thread pool.
229         * <p/>
230         * Is by default <tt>true</tt>
231         *
232         * @param callerRunsWhenRejected whether or not the caller should run
233         * @return the builder
234         */
235        public ThreadsDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) {
236            setCallerRunsWhenRejected(callerRunsWhenRejected);
237            return this;
238        }
239    
240        public ExecutorService getExecutorService() {
241            return executorService;
242        }
243    
244        public void setExecutorService(ExecutorService executorService) {
245            this.executorService = executorService;
246        }
247    
248        public String getExecutorServiceRef() {
249            return executorServiceRef;
250        }
251    
252        public void setExecutorServiceRef(String executorServiceRef) {
253            this.executorServiceRef = executorServiceRef;
254        }
255    
256        public Integer getPoolSize() {
257            return poolSize;
258        }
259    
260        public void setPoolSize(Integer poolSize) {
261            this.poolSize = poolSize;
262        }
263    
264        public Integer getMaxPoolSize() {
265            return maxPoolSize;
266        }
267    
268        public void setMaxPoolSize(Integer maxPoolSize) {
269            this.maxPoolSize = maxPoolSize;
270        }
271    
272        public Long getKeepAliveTime() {
273            return keepAliveTime;
274        }
275    
276        public void setKeepAliveTime(Long keepAliveTime) {
277            this.keepAliveTime = keepAliveTime;
278        }
279    
280        public TimeUnit getTimeUnit() {
281            return timeUnit;
282        }
283    
284        public void setTimeUnit(TimeUnit timeUnit) {
285            this.timeUnit = timeUnit;
286        }
287    
288        public Integer getMaxQueueSize() {
289            return maxQueueSize;
290        }
291    
292        public void setMaxQueueSize(Integer maxQueueSize) {
293            this.maxQueueSize = maxQueueSize;
294        }
295    
296        public String getThreadName() {
297            return threadName;
298        }
299    
300        public void setThreadName(String threadName) {
301            this.threadName = threadName;
302        }
303    
304        public ThreadPoolRejectedPolicy getRejectedPolicy() {
305            return rejectedPolicy;
306        }
307    
308        public void setRejectedPolicy(ThreadPoolRejectedPolicy rejectedPolicy) {
309            this.rejectedPolicy = rejectedPolicy;
310        }
311    
312        public Boolean getCallerRunsWhenRejected() {
313            return callerRunsWhenRejected;
314        }
315    
316        public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) {
317            this.callerRunsWhenRejected = callerRunsWhenRejected;
318        }
319    }