001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.concurrent.ExecutorService;
020import java.util.concurrent.ScheduledExecutorService;
021
022import javax.xml.bind.annotation.XmlAccessType;
023import javax.xml.bind.annotation.XmlAccessorType;
024import javax.xml.bind.annotation.XmlAttribute;
025import javax.xml.bind.annotation.XmlElement;
026import javax.xml.bind.annotation.XmlRootElement;
027import javax.xml.bind.annotation.XmlTransient;
028import javax.xml.bind.annotation.XmlType;
029
030import org.apache.camel.Expression;
031import org.apache.camel.Processor;
032import org.apache.camel.builder.ExpressionBuilder;
033import org.apache.camel.model.language.ExpressionDefinition;
034import org.apache.camel.processor.Throttler;
035import org.apache.camel.spi.Metadata;
036import org.apache.camel.spi.RouteContext;
037
038/**
039 * Controls the rate at which messages are passed to the next node in the route
040 *
041 * @version 
042 */
043@Metadata(label = "eip,routing")
044@XmlRootElement(name = "throttle")
045@XmlAccessorType(XmlAccessType.FIELD)
046@XmlType(propOrder = {"expression", "correlationExpression", "outputs"})
047public class ThrottleDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<ThrottleDefinition> {
048    // TODO: Camel 3.0 Should not support outputs
049
050    @XmlElement(name = "correlationExpression")
051    private ExpressionSubElementDefinition correlationExpression;
052    @XmlTransient
053    private ExecutorService executorService;
054    @XmlAttribute
055    private String executorServiceRef;
056    @XmlAttribute @Metadata(defaultValue = "1000")
057    private Long timePeriodMillis;
058    @XmlAttribute
059    private Boolean asyncDelayed;
060    @XmlAttribute @Metadata(defaultValue = "true")
061    private Boolean callerRunsWhenRejected;
062    @XmlAttribute
063    private Boolean rejectExecution;
064
065    public ThrottleDefinition() {
066    }
067
068    public ThrottleDefinition(Expression maximumRequestsPerPeriod) {
069        super(maximumRequestsPerPeriod);
070    }
071
072    public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression correlationExpression) {
073        this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), correlationExpression);
074    }
075
076    private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, Expression correlationExpression) {
077        super(maximumRequestsPerPeriod);
078
079        ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition();
080        cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression));
081        setCorrelationExpression(cor);
082    }
083
084    @Override
085    public String toString() {
086        return "Throttle[" + description() + " -> " + getOutputs() + "]";
087    }
088    
089    protected String description() {
090        return getExpression() + " request per " + getTimePeriodMillis() + " millis";
091    }
092
093    @Override
094    public String getShortName() {
095        return "throttle";
096    }
097
098    @Override
099    public String getLabel() {
100        return "throttle[" + description() + "]";
101    }
102
103    @Override
104    public Processor createProcessor(RouteContext routeContext) throws Exception {
105        Processor childProcessor = this.createChildProcessor(routeContext, true);
106
107        boolean async = getAsyncDelayed() != null && getAsyncDelayed();
108        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
109        ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, true);
110        
111        // should be default 1000 millis
112        long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L;
113
114        // max requests per period is mandatory
115        Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(routeContext);
116        if (maxRequestsExpression == null) {
117            throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this);
118        }
119        
120        Expression correlation = null;
121        if (correlationExpression != null) {
122            correlation = correlationExpression.createExpression(routeContext);
123        }
124
125        boolean reject = getRejectExecution() != null && getRejectExecution();
126        Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject, correlation);
127
128        answer.setAsyncDelayed(async);
129        if (getCallerRunsWhenRejected() == null) {
130            // should be true by default
131            answer.setCallerRunsWhenRejected(true);
132        } else {
133            answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
134        }
135
136        return answer;
137    }
138
139    private Expression createMaxRequestsPerPeriodExpression(RouteContext routeContext) {
140        ExpressionDefinition expr = getExpression();
141        if (expr != null) {
142            return expr.createExpression(routeContext);
143        }
144        return null;
145    }
146    
147    // Fluent API
148    // -------------------------------------------------------------------------
149    /**
150     * Sets the time period during which the maximum request count is valid for
151     *
152     * @param timePeriodMillis  period in millis
153     * @return the builder
154     */
155    public ThrottleDefinition timePeriodMillis(long timePeriodMillis) {
156        setTimePeriodMillis(timePeriodMillis);
157        return this;
158    }
159    
160    /**
161     * Sets the time period during which the maximum request count per period
162     *
163     * @param maximumRequestsPerPeriod  the maximum request count number per time period
164     * @return the builder
165     */
166    public ThrottleDefinition maximumRequestsPerPeriod(long maximumRequestsPerPeriod) {
167        setExpression(ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.constantExpression(maximumRequestsPerPeriod)));
168        return this;
169    }
170
171    /**
172     * Whether or not the caller should run the task when it was rejected by the thread pool.
173     * <p/>
174     * Is by default <tt>true</tt>
175     *
176     * @param callerRunsWhenRejected whether or not the caller should run
177     * @return the builder
178     */
179    public ThrottleDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) {
180        setCallerRunsWhenRejected(callerRunsWhenRejected);
181        return this;
182    }
183
184    /**
185     * Enables asynchronous delay which means the thread will <b>not</b> block while delaying.
186     *
187     * @return the builder
188     */
189    public ThrottleDefinition asyncDelayed() {
190        setAsyncDelayed(true);
191        return this;
192    }
193    
194    /**
195     * Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit
196     * <p/>
197     * Is by default <tt>false</tt>
198     *
199     * @param rejectExecution throw the RejectExecutionException if the exchange exceeds the request limit 
200     * @return the builder
201     */
202    public ThrottleDefinition rejectExecution(boolean rejectExecution) {
203        setRejectExecution(rejectExecution);
204        return this;
205    }
206
207    /**
208     * To use a custom thread pool (ScheduledExecutorService) by the throttler.
209     *
210     * @param executorService  the custom thread pool (must be scheduled)
211     * @return the builder
212     */
213    public ThrottleDefinition executorService(ExecutorService executorService) {
214        setExecutorService(executorService);
215        return this;
216    }
217
218    /**
219     * To use a custom thread pool (ScheduledExecutorService) by the throttler.
220     *
221     * @param executorServiceRef the reference id of the thread pool (must be scheduled)
222     * @return the builder
223     */
224    public ThrottleDefinition executorServiceRef(String executorServiceRef) {
225        setExecutorServiceRef(executorServiceRef);
226        return this;
227    }
228
229    // Properties
230    // -------------------------------------------------------------------------
231
232    /**
233     * Expression to configure the maximum number of messages to throttle per request
234     */
235    @Override
236    public void setExpression(ExpressionDefinition expression) {
237        // override to include javadoc what the expression is used for
238        super.setExpression(expression);
239    }
240
241    public Long getTimePeriodMillis() {
242        return timePeriodMillis;
243    }
244
245    public void setTimePeriodMillis(Long timePeriodMillis) {
246        this.timePeriodMillis = timePeriodMillis;
247    }
248
249    public Boolean getAsyncDelayed() {
250        return asyncDelayed;
251    }
252
253    public void setAsyncDelayed(Boolean asyncDelayed) {
254        this.asyncDelayed = asyncDelayed;
255    }
256
257    public Boolean getCallerRunsWhenRejected() {
258        return callerRunsWhenRejected;
259    }
260
261    public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) {
262        this.callerRunsWhenRejected = callerRunsWhenRejected;
263    }
264
265    public ExecutorService getExecutorService() {
266        return executorService;
267    }
268
269    public void setExecutorService(ExecutorService executorService) {
270        this.executorService = executorService;
271    }
272
273    public String getExecutorServiceRef() {
274        return executorServiceRef;
275    }
276
277    public void setExecutorServiceRef(String executorServiceRef) {
278        this.executorServiceRef = executorServiceRef;
279    }
280    
281    public Boolean getRejectExecution() {
282        return rejectExecution;
283    }
284
285    public void setRejectExecution(Boolean rejectExecution) {
286        this.rejectExecution = rejectExecution;
287    }
288
289    /**
290     * The expression used to calculate the correlation key to use for throttle grouping.
291     * The Exchange which has the same correlation key is throttled together.
292     */
293    public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
294        this.correlationExpression = correlationExpression;
295    }
296
297    public ExpressionSubElementDefinition getCorrelationExpression() {
298        return correlationExpression;
299    }
300}