001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.concurrent.ExecutorService;
020    import java.util.concurrent.ScheduledExecutorService;
021    import javax.xml.bind.annotation.XmlAccessType;
022    import javax.xml.bind.annotation.XmlAccessorType;
023    import javax.xml.bind.annotation.XmlAttribute;
024    import javax.xml.bind.annotation.XmlRootElement;
025    import javax.xml.bind.annotation.XmlTransient;
026    
027    import org.apache.camel.Expression;
028    import org.apache.camel.Processor;
029    import org.apache.camel.builder.ExpressionBuilder;
030    import org.apache.camel.model.language.ExpressionDefinition;
031    import org.apache.camel.processor.Throttler;
032    import org.apache.camel.spi.RouteContext;
033    import org.apache.camel.util.ObjectHelper;
034    
035    /**
036     * Represents an XML <throttle/> element
037     *
038     * @version 
039     */
040    @XmlRootElement(name = "throttle")
041    @XmlAccessorType(XmlAccessType.FIELD)
042    public class ThrottleDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<ThrottleDefinition> {
043        // TODO: Camel 3.0 Should not support outputs
044    
045        @XmlTransient
046        private ExecutorService executorService;
047        @XmlAttribute
048        private String executorServiceRef;
049        @XmlAttribute
050        private Long timePeriodMillis;
051        @XmlAttribute
052        private Boolean asyncDelayed;
053        @XmlAttribute
054        private Boolean callerRunsWhenRejected;
055        
056        public ThrottleDefinition() {
057        }
058    
059        public ThrottleDefinition(Expression maximumRequestsPerPeriod) {
060            super(maximumRequestsPerPeriod);
061        }
062    
063        @Override
064        public String toString() {
065            return "Throttle[" + description() + " -> " + getOutputs() + "]";
066        }
067        
068        protected String description() {
069            return getExpression() + " request per " + getTimePeriodMillis() + " millis";
070        }
071    
072        @Override
073        public String getShortName() {
074            return "throttle";
075        }
076    
077        @Override
078        public String getLabel() {
079            return "throttle[" + description() + "]";
080        }
081    
082        @Override
083        public Processor createProcessor(RouteContext routeContext) throws Exception {
084            Processor childProcessor = this.createChildProcessor(routeContext, true);
085    
086            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isAsyncDelayed());
087            ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, isAsyncDelayed());
088    
089            // should be default 1000 millis
090            long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L;
091    
092            // max requests per period is mandatory
093            Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(routeContext);
094            if (maxRequestsExpression == null) {
095                throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
096            }
097    
098            Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool);
099    
100            if (getAsyncDelayed() != null) {
101                answer.setAsyncDelayed(getAsyncDelayed());
102            }
103            
104            if (getCallerRunsWhenRejected() == null) {
105                // should be true by default
106                answer.setCallerRunsWhenRejected(true);
107            } else {
108                answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
109            }
110            return answer;
111        }
112    
113        private Expression createMaxRequestsPerPeriodExpression(RouteContext routeContext) {
114            if (getExpression() != null) {
115                if (ObjectHelper.isNotEmpty(getExpression().getExpression()) || getExpression().getExpressionValue() != null) {
116                    return getExpression().createExpression(routeContext);
117                } 
118            } 
119            return null;
120        }
121        
122        // Fluent API
123        // -------------------------------------------------------------------------
124        /**
125         * Sets the time period during which the maximum request count is valid for
126         *
127         * @param timePeriodMillis  period in millis
128         * @return the builder
129         */
130        public ThrottleDefinition timePeriodMillis(long timePeriodMillis) {
131            setTimePeriodMillis(timePeriodMillis);
132            return this;
133        }
134        
135        /**
136         * Sets the time period during which the maximum request count per period
137         *
138         * @param maximumRequestsPerPeriod  the maximum request count number per time period
139         * @return the builder
140         */
141        public ThrottleDefinition maximumRequestsPerPeriod(Long maximumRequestsPerPeriod) {
142            setExpression(new ExpressionDefinition(ExpressionBuilder.constantExpression(maximumRequestsPerPeriod)));
143            return this;
144        }
145    
146        /**
147         * Whether or not the caller should run the task when it was rejected by the thread pool.
148         * <p/>
149         * Is by default <tt>true</tt>
150         *
151         * @param callerRunsWhenRejected whether or not the caller should run
152         * @return the builder
153         */
154        public ThrottleDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) {
155            setCallerRunsWhenRejected(callerRunsWhenRejected);
156            return this;
157        }
158    
159        /**
160         * Enables asynchronous delay which means the thread will <b>noy</b> block while delaying.
161         *
162         * @return the builder
163         */
164        public ThrottleDefinition asyncDelayed() {
165            setAsyncDelayed(true);
166            return this;
167        }
168    
169        public ThrottleDefinition executorService(ExecutorService executorService) {
170            setExecutorService(executorService);
171            return this;
172        }
173    
174        public ThrottleDefinition executorServiceRef(String executorServiceRef) {
175            setExecutorServiceRef(executorServiceRef);
176            return this;
177        }
178    
179        // Properties
180        // -------------------------------------------------------------------------
181    
182        public Long getTimePeriodMillis() {
183            return timePeriodMillis;
184        }
185    
186        public void setTimePeriodMillis(Long timePeriodMillis) {
187            this.timePeriodMillis = timePeriodMillis;
188        }
189    
190        public Boolean getAsyncDelayed() {
191            return asyncDelayed;
192        }
193    
194        public void setAsyncDelayed(Boolean asyncDelayed) {
195            this.asyncDelayed = asyncDelayed;
196        }
197    
198        public boolean isAsyncDelayed() {
199            return asyncDelayed != null && asyncDelayed;
200        }
201    
202        public Boolean getCallerRunsWhenRejected() {
203            return callerRunsWhenRejected;
204        }
205    
206        public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) {
207            this.callerRunsWhenRejected = callerRunsWhenRejected;
208        }
209    
210        public ExecutorService getExecutorService() {
211            return executorService;
212        }
213    
214        public void setExecutorService(ExecutorService executorService) {
215            this.executorService = executorService;
216        }
217    
218        public String getExecutorServiceRef() {
219            return executorServiceRef;
220        }
221    
222        public void setExecutorServiceRef(String executorServiceRef) {
223            this.executorServiceRef = executorServiceRef;
224        }
225    }