001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.concurrent.ExecutorService; 020 import java.util.concurrent.ScheduledExecutorService; 021 import javax.xml.bind.annotation.XmlAccessType; 022 import javax.xml.bind.annotation.XmlAccessorType; 023 import javax.xml.bind.annotation.XmlAttribute; 024 import javax.xml.bind.annotation.XmlRootElement; 025 import javax.xml.bind.annotation.XmlTransient; 026 027 import org.apache.camel.Expression; 028 import org.apache.camel.Processor; 029 import org.apache.camel.builder.ExpressionBuilder; 030 import org.apache.camel.model.language.ExpressionDefinition; 031 import org.apache.camel.processor.Throttler; 032 import org.apache.camel.spi.RouteContext; 033 import org.apache.camel.util.ObjectHelper; 034 035 /** 036 * Represents an XML <throttle/> element 037 * 038 * @version 039 */ 040 @XmlRootElement(name = "throttle") 041 @XmlAccessorType(XmlAccessType.FIELD) 042 public class ThrottleDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<ThrottleDefinition> { 043 // TODO: Camel 3.0 Should not support outputs 044 045 @XmlTransient 046 private ExecutorService executorService; 047 @XmlAttribute 048 private String executorServiceRef; 049 @XmlAttribute 050 private Long timePeriodMillis; 051 @XmlAttribute 052 private Boolean asyncDelayed; 053 @XmlAttribute 054 private Boolean callerRunsWhenRejected; 055 056 public ThrottleDefinition() { 057 } 058 059 public ThrottleDefinition(Expression maximumRequestsPerPeriod) { 060 super(maximumRequestsPerPeriod); 061 } 062 063 @Override 064 public String toString() { 065 return "Throttle[" + description() + " -> " + getOutputs() + "]"; 066 } 067 068 protected String description() { 069 return getExpression() + " request per " + getTimePeriodMillis() + " millis"; 070 } 071 072 @Override 073 public String getShortName() { 074 return "throttle"; 075 } 076 077 @Override 078 public String getLabel() { 079 return "throttle[" + description() + "]"; 080 } 081 082 @Override 083 public Processor createProcessor(RouteContext routeContext) throws Exception { 084 Processor childProcessor = this.createChildProcessor(routeContext, true); 085 086 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isAsyncDelayed()); 087 ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, isAsyncDelayed()); 088 089 // should be default 1000 millis 090 long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L; 091 092 // max requests per period is mandatory 093 Expression maxRequestsExpression = createMaxRequestsPerPeriodExpression(routeContext); 094 if (maxRequestsExpression == null) { 095 throw new IllegalArgumentException("MaxRequestsPerPeriod expression must be provided on " + this); 096 } 097 098 Throttler answer = new Throttler(routeContext.getCamelContext(), childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool); 099 100 if (getAsyncDelayed() != null) { 101 answer.setAsyncDelayed(getAsyncDelayed()); 102 } 103 104 if (getCallerRunsWhenRejected() == null) { 105 // should be true by default 106 answer.setCallerRunsWhenRejected(true); 107 } else { 108 answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected()); 109 } 110 return answer; 111 } 112 113 private Expression createMaxRequestsPerPeriodExpression(RouteContext routeContext) { 114 if (getExpression() != null) { 115 if (ObjectHelper.isNotEmpty(getExpression().getExpression()) || getExpression().getExpressionValue() != null) { 116 return getExpression().createExpression(routeContext); 117 } 118 } 119 return null; 120 } 121 122 // Fluent API 123 // ------------------------------------------------------------------------- 124 /** 125 * Sets the time period during which the maximum request count is valid for 126 * 127 * @param timePeriodMillis period in millis 128 * @return the builder 129 */ 130 public ThrottleDefinition timePeriodMillis(long timePeriodMillis) { 131 setTimePeriodMillis(timePeriodMillis); 132 return this; 133 } 134 135 /** 136 * Sets the time period during which the maximum request count per period 137 * 138 * @param maximumRequestsPerPeriod the maximum request count number per time period 139 * @return the builder 140 */ 141 public ThrottleDefinition maximumRequestsPerPeriod(Long maximumRequestsPerPeriod) { 142 setExpression(new ExpressionDefinition(ExpressionBuilder.constantExpression(maximumRequestsPerPeriod))); 143 return this; 144 } 145 146 /** 147 * Whether or not the caller should run the task when it was rejected by the thread pool. 148 * <p/> 149 * Is by default <tt>true</tt> 150 * 151 * @param callerRunsWhenRejected whether or not the caller should run 152 * @return the builder 153 */ 154 public ThrottleDefinition callerRunsWhenRejected(boolean callerRunsWhenRejected) { 155 setCallerRunsWhenRejected(callerRunsWhenRejected); 156 return this; 157 } 158 159 /** 160 * Enables asynchronous delay which means the thread will <b>noy</b> block while delaying. 161 * 162 * @return the builder 163 */ 164 public ThrottleDefinition asyncDelayed() { 165 setAsyncDelayed(true); 166 return this; 167 } 168 169 public ThrottleDefinition executorService(ExecutorService executorService) { 170 setExecutorService(executorService); 171 return this; 172 } 173 174 public ThrottleDefinition executorServiceRef(String executorServiceRef) { 175 setExecutorServiceRef(executorServiceRef); 176 return this; 177 } 178 179 // Properties 180 // ------------------------------------------------------------------------- 181 182 public Long getTimePeriodMillis() { 183 return timePeriodMillis; 184 } 185 186 public void setTimePeriodMillis(Long timePeriodMillis) { 187 this.timePeriodMillis = timePeriodMillis; 188 } 189 190 public Boolean getAsyncDelayed() { 191 return asyncDelayed; 192 } 193 194 public void setAsyncDelayed(Boolean asyncDelayed) { 195 this.asyncDelayed = asyncDelayed; 196 } 197 198 public boolean isAsyncDelayed() { 199 return asyncDelayed != null && asyncDelayed; 200 } 201 202 public Boolean getCallerRunsWhenRejected() { 203 return callerRunsWhenRejected; 204 } 205 206 public void setCallerRunsWhenRejected(Boolean callerRunsWhenRejected) { 207 this.callerRunsWhenRejected = callerRunsWhenRejected; 208 } 209 210 public ExecutorService getExecutorService() { 211 return executorService; 212 } 213 214 public void setExecutorService(ExecutorService executorService) { 215 this.executorService = executorService; 216 } 217 218 public String getExecutorServiceRef() { 219 return executorServiceRef; 220 } 221 222 public void setExecutorServiceRef(String executorServiceRef) { 223 this.executorServiceRef = executorServiceRef; 224 } 225 }