001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.concurrent.ExecutorService; 020import java.util.concurrent.ScheduledExecutorService; 021 022import javax.xml.bind.annotation.XmlAccessType; 023import javax.xml.bind.annotation.XmlAccessorType; 024import javax.xml.bind.annotation.XmlAttribute; 025import javax.xml.bind.annotation.XmlElement; 026import javax.xml.bind.annotation.XmlRootElement; 027import javax.xml.bind.annotation.XmlTransient; 028import javax.xml.bind.annotation.XmlType; 029 030import org.apache.camel.Expression; 031import org.apache.camel.Processor; 032import org.apache.camel.builder.ExpressionBuilder; 033import org.apache.camel.model.language.ExpressionDefinition; 034import org.apache.camel.processor.Throttler; 035import org.apache.camel.spi.Metadata; 036import org.apache.camel.spi.RouteContext; 037 038/** 039 * Controls the rate at which messages are passed to the next node in the route 040 * 041 * @version 042 */ 043@Metadata(label = "eip,routing") 044@XmlRootElement(name = "throttle") 045@XmlAccessorType(XmlAccessType.FIELD) 046@XmlType(propOrder = {"expression", "correlationExpression", "outputs"}) 047public class ThrottleDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<ThrottleDefinition> { 048 // TODO: Camel 3.0 Should not support outputs 049 050 @XmlElement(name = "correlationExpression") 051 private ExpressionSubElementDefinition correlationExpression; 052 @XmlTransient 053 private ExecutorService executorService; 054 @XmlAttribute 055 private String executorServiceRef; 056 @XmlAttribute @Metadata(defaultValue = "1000") 057 private Long timePeriodMillis; 058 @XmlAttribute 059 private Boolean asyncDelayed; 060 @XmlAttribute @Metadata(defaultValue = "true") 061 private Boolean callerRunsWhenRejected; 062 @XmlAttribute 063 private Boolean rejectExecution; 064 065 public ThrottleDefinition() { 066 } 067 068 public ThrottleDefinition(Expression maximumRequestsPerPeriod) { 069 super(maximumRequestsPerPeriod); 070 } 071 072 public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression correlationExpression) { 073 this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod), correlationExpression); 074 } 075 076 private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod, Expression correlationExpression) { 077 super(maximumRequestsPerPeriod); 078 079 ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition(); 080 cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression)); 081 setCorrelationExpression(cor); 082 } 083 084 @Override 085 public String toString() { 086 return "Throttle[" + description() + " -> " + getOutputs() + "]"; 087 } 088 089 protected String description() { 090 return getExpression() + " request per " + getTimePeriodMillis() + " millis"; 091 } 092 093 @Override 094 public String getShortName() { 095 return "throttle"; 096 } 097 098 @Override 099 public String getLabel() { 100 return "throttle[" + description() + "]"; 101 } 102 103 @Override 104 public Processor createProcessor(RouteContext routeContext) throws Exception { 105 Processor childProcessor = this.createChildProcessor(routeContext, true); 106 107 boolean async = getAsyncDelayed() != null && getAsyncDelayed(); 108 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); 109 ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, true); 110 111 // should be default 1000 millis 112 long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L; 113 114 // max requests per period is mandatory 115 Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(routeContext); 116 if (maxRequestsExpression == null) { 117 throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this); 118 } 119 120 Expression correlation = null; 121 if (correlationExpression != null) { 122 correlation = correlationExpression.createExpression(routeContext); 123 } 124 125 boolean reject = getRejectExecution() != null && getRejectExecution(); 126 Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool, reject, correlation); 127 128 answer.setAsyncDelayed(async); 129 if (getCallerRunsWhenRejected() == null) { 130 // should be true by default 131 answer.setCallerRunsWhenRejected(true); 132 } else { 133 answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected()); 134 } 135 136 return answer; 137 } 138 139 private Expression createMaxRequestsPerPeriodExpression(RouteContext routeContext) { 140 ExpressionDefinition expr = getExpression(); 141 if (expr != null) { 142 return expr.createExpression(routeContext); 143 } 144 return null; 145 } 146 147 // Fluent API 148 // ------------------------------------------------------------------------- 149 /** 150 * Sets the time period during which the maximum request count is valid for 151 * 152 * @param timePeriodMillis period in millis 153 * @return the builder 154 */ 155 public ThrottleDefinition timePeriodMillis(long timePeriodMillis) { 156 setTimePeriodMillis(timePeriodMillis); 157 return this; 158 } 159 160 /** 161 * Sets the time period during which the maximum request count per period 162 * 163 * @param maximumRequestsPerPeriod the maximum request count number per time period 164 * @return the builder 165 */ 166 public ThrottleDefinition maximumRequestsPerPeriod(long maximumRequestsPerPeriod) { 167 setExpression(ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.constantExpression(maximumRequestsPerPeriod))); 168 return this; 169 } 170 171 /** 172 * Whether or not the caller should run the task when it was rejected by the thread pool. 173 * <p/> 174 * Is by default <tt>true</tt> 175 * 176 * @param callerRunsWhenRejected whether or not the caller should run 177 * @return the builder 178 */ 179 public ThrottleDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) { 180 setCallerRunsWhenRejected(callerRunsWhenRejected); 181 return this; 182 } 183 184 /** 185 * Enables asynchronous delay which means the thread will <b>not</b> block while delaying. 186 * 187 * @return the builder 188 */ 189 public ThrottleDefinition asyncDelayed() { 190 setAsyncDelayed(true); 191 return this; 192 } 193 194 /** 195 * Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit 196 * <p/> 197 * Is by default <tt>false</tt> 198 * 199 * @param rejectExecution throw the RejectExecutionException if the exchange exceeds the request limit 200 * @return the builder 201 */ 202 public ThrottleDefinition rejectExecution(boolean rejectExecution) { 203 setRejectExecution(rejectExecution); 204 return this; 205 } 206 207 /** 208 * To use a custom thread pool (ScheduledExecutorService) by the throttler. 209 * 210 * @param executorService the custom thread pool (must be scheduled) 211 * @return the builder 212 */ 213 public ThrottleDefinition executorService(ExecutorService executorService) { 214 setExecutorService(executorService); 215 return this; 216 } 217 218 /** 219 * To use a custom thread pool (ScheduledExecutorService) by the throttler. 220 * 221 * @param executorServiceRef the reference id of the thread pool (must be scheduled) 222 * @return the builder 223 */ 224 public ThrottleDefinition executorServiceRef(String executorServiceRef) { 225 setExecutorServiceRef(executorServiceRef); 226 return this; 227 } 228 229 // Properties 230 // ------------------------------------------------------------------------- 231 232 /** 233 * Expression to configure the maximum number of messages to throttle per request 234 */ 235 @Override 236 public void setExpression(ExpressionDefinition expression) { 237 // override to include javadoc what the expression is used for 238 super.setExpression(expression); 239 } 240 241 public Long getTimePeriodMillis() { 242 return timePeriodMillis; 243 } 244 245 public void setTimePeriodMillis(Long timePeriodMillis) { 246 this.timePeriodMillis = timePeriodMillis; 247 } 248 249 public Boolean getAsyncDelayed() { 250 return asyncDelayed; 251 } 252 253 public void setAsyncDelayed(Boolean asyncDelayed) { 254 this.asyncDelayed = asyncDelayed; 255 } 256 257 public Boolean getCallerRunsWhenRejected() { 258 return callerRunsWhenRejected; 259 } 260 261 public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) { 262 this.callerRunsWhenRejected = callerRunsWhenRejected; 263 } 264 265 public ExecutorService getExecutorService() { 266 return executorService; 267 } 268 269 public void setExecutorService(ExecutorService executorService) { 270 this.executorService = executorService; 271 } 272 273 public String getExecutorServiceRef() { 274 return executorServiceRef; 275 } 276 277 public void setExecutorServiceRef(String executorServiceRef) { 278 this.executorServiceRef = executorServiceRef; 279 } 280 281 public Boolean getRejectExecution() { 282 return rejectExecution; 283 } 284 285 public void setRejectExecution(Boolean rejectExecution) { 286 this.rejectExecution = rejectExecution; 287 } 288 289 /** 290 * The expression used to calculate the correlation key to use for throttle grouping. 291 * The Exchange which has the same correlation key is throttled together. 292 */ 293 public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) { 294 this.correlationExpression = correlationExpression; 295 } 296 297 public ExpressionSubElementDefinition getCorrelationExpression() { 298 return correlationExpression; 299 } 300}