001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.Map; 020import java.util.concurrent.ConcurrentHashMap; 021import java.util.concurrent.DelayQueue; 022import java.util.concurrent.Delayed; 023import java.util.concurrent.RejectedExecutionException; 024import java.util.concurrent.ScheduledExecutorService; 025import java.util.concurrent.ScheduledFuture; 026import java.util.concurrent.TimeUnit; 027import java.util.concurrent.atomic.AtomicReference; 028 029import org.apache.camel.AsyncCallback; 030import org.apache.camel.CamelContext; 031import org.apache.camel.Exchange; 032import org.apache.camel.Expression; 033import org.apache.camel.Processor; 034import org.apache.camel.RuntimeExchangeException; 035import org.apache.camel.Traceable; 036import org.apache.camel.spi.IdAware; 037import org.apache.camel.util.AsyncProcessorHelper; 038import org.apache.camel.util.ObjectHelper; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042/** 043 * A <a href="http://camel.apache.org/throttler.html">Throttler</a> 044 * will set a limit on the maximum number of message exchanges which can be sent 045 * to a processor within a specific time period. <p/> This pattern can be 046 * extremely useful if you have some external system which meters access; such 047 * as only allowing 100 requests per second; or if huge load can cause a 048 * particular system to malfunction or to reduce its throughput you might want 049 * to introduce some throttling. 050 * 051 * This throttle implementation is thread-safe and is therefore safe to be used 052 * by multiple concurrent threads in a single route. 053 * 054 * The throttling mechanism is a DelayQueue with maxRequestsPerPeriod permits on 055 * it. Each permit is set to be delayed by timePeriodMillis (except when the 056 * throttler is initialized or the throttle rate increased, then there is no delay 057 * for those permits). Callers trying to acquire a permit from the DelayQueue will 058 * block if necessary. The end result is a rolling window of time. Where from the 059 * callers point of view in the last timePeriodMillis no more than 060 * maxRequestsPerPeriod have been allowed to be acquired. 061 * 062 * @version 063 */ 064public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAware { 065 066 private static final String DEFAULT_KEY = "CamelThrottlerDefaultKey"; 067 068 private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = "CamelThrottlerExchangeQueuedTimestamp"; 069 private static final String PROPERTY_EXCHANGE_STATE = "CamelThrottlerExchangeState"; 070 071 private enum State { SYNC, ASYNC, ASYNC_REJECTED } 072 073 private final Logger log = LoggerFactory.getLogger(Throttler.class); 074 private final CamelContext camelContext; 075 private final ScheduledExecutorService asyncExecutor; 076 private final boolean shutdownAsyncExecutor; 077 078 private volatile long timePeriodMillis; 079 private volatile long cleanPeriodMillis; 080 private String id; 081 private Expression maxRequestsPerPeriodExpression; 082 private boolean rejectExecution; 083 private boolean asyncDelayed; 084 private boolean callerRunsWhenRejected = true; 085 private Expression correlationExpression; 086 private Map<String, ThrottlingState> states = new ConcurrentHashMap<>(); 087 088 public Throttler(final CamelContext camelContext, final Processor processor, final Expression maxRequestsPerPeriodExpression, final long timePeriodMillis, 089 final ScheduledExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) { 090 super(processor); 091 this.camelContext = camelContext; 092 this.rejectExecution = rejectExecution; 093 this.shutdownAsyncExecutor = shutdownAsyncExecutor; 094 095 ObjectHelper.notNull(maxRequestsPerPeriodExpression, "maxRequestsPerPeriodExpression"); 096 this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression; 097 098 if (timePeriodMillis <= 0) { 099 throw new IllegalArgumentException("TimePeriodMillis should be a positive number, was: " + timePeriodMillis); 100 } 101 this.timePeriodMillis = timePeriodMillis; 102 this.cleanPeriodMillis = timePeriodMillis * 10; 103 this.asyncExecutor = asyncExecutor; 104 this.correlationExpression = correlation; 105 } 106 107 @Override 108 public boolean process(final Exchange exchange, final AsyncCallback callback) { 109 long queuedStart = 0; 110 if (log.isTraceEnabled()) { 111 queuedStart = exchange.getProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP, 0L, Long.class); 112 exchange.removeProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP); 113 } 114 State state = exchange.getProperty(PROPERTY_EXCHANGE_STATE, State.SYNC, State.class); 115 exchange.removeProperty(PROPERTY_EXCHANGE_STATE); 116 boolean doneSync = state == State.SYNC || state == State.ASYNC_REJECTED; 117 118 try { 119 if (!isRunAllowed()) { 120 throw new RejectedExecutionException("Run is not allowed"); 121 } 122 123 String key = DEFAULT_KEY; 124 if (correlationExpression != null) { 125 key = correlationExpression.evaluate(exchange, String.class); 126 } 127 ThrottlingState throttlingState = states.computeIfAbsent(key, ThrottlingState::new); 128 throttlingState.calculateAndSetMaxRequestsPerPeriod(exchange); 129 130 ThrottlePermit permit = throttlingState.poll(); 131 132 if (permit == null) { 133 if (isRejectExecution()) { 134 throw new ThrottlerRejectedExecutionException("Exceeded the max throttle rate of " 135 + throttlingState.getThrottleRate() + " within " + timePeriodMillis + "ms"); 136 } else { 137 // delegate to async pool 138 if (isAsyncDelayed() && !exchange.isTransacted() && state == State.SYNC) { 139 log.debug("Throttle rate exceeded but AsyncDelayed enabled, so queueing for async processing, exchangeId: {}", exchange.getExchangeId()); 140 return processAsynchronously(exchange, callback, throttlingState); 141 } 142 143 // block waiting for a permit 144 long start = 0; 145 long elapsed = 0; 146 if (log.isTraceEnabled()) { 147 start = System.currentTimeMillis(); 148 } 149 permit = throttlingState.take(); 150 if (log.isTraceEnabled()) { 151 elapsed = System.currentTimeMillis() - start; 152 } 153 throttlingState.enqueue(permit, exchange); 154 155 if (state == State.ASYNC) { 156 if (log.isTraceEnabled()) { 157 long queuedTime = start - queuedStart; 158 log.trace("Queued for {}ms, Throttled for {}ms, exchangeId: {}", queuedTime, elapsed, exchange.getExchangeId()); 159 } 160 } else { 161 log.trace("Throttled for {}ms, exchangeId: {}", elapsed, exchange.getExchangeId()); 162 } 163 } 164 } else { 165 throttlingState.enqueue(permit, exchange); 166 167 if (state == State.ASYNC) { 168 if (log.isTraceEnabled()) { 169 long queuedTime = System.currentTimeMillis() - queuedStart; 170 log.trace("Queued for {}ms, No throttling applied (throttle cleared while queued), for exchangeId: {}", queuedTime, exchange.getExchangeId()); 171 } 172 } else { 173 log.trace("No throttling applied to exchangeId: {}", exchange.getExchangeId()); 174 } 175 } 176 177 if (processor != null) { 178 if (doneSync) { 179 return processor.process(exchange, callback); 180 } else { 181 // if we are executing async, then we have to call the nested processor synchronously, and we 182 // must not share our AsyncCallback, because the nested processing has no way of knowing that 183 // we are already executing asynchronously. 184 AsyncProcessorHelper.process(processor, exchange); 185 } 186 } 187 188 callback.done(doneSync); 189 return doneSync; 190 191 } catch (final InterruptedException e) { 192 // determine if we can still run, or the camel context is forcing a shutdown 193 boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown(this); 194 if (forceShutdown) { 195 String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + exchange; 196 log.debug(msg); 197 exchange.setException(new RejectedExecutionException(msg, e)); 198 } else { 199 exchange.setException(e); 200 } 201 callback.done(doneSync); 202 return doneSync; 203 } catch (final Throwable t) { 204 exchange.setException(t); 205 callback.done(doneSync); 206 return doneSync; 207 } 208 } 209 210 /** 211 * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the executor rejects the submission 212 * and isCallerRunsWhenRejected() is enabled, then this method will delegate back to process(), but not 213 * before changing the exchange state to stop any recursion. 214 */ 215 protected boolean processAsynchronously(final Exchange exchange, final AsyncCallback callback, ThrottlingState throttlingState) { 216 try { 217 if (log.isTraceEnabled()) { 218 exchange.setProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP, System.currentTimeMillis()); 219 } 220 exchange.setProperty(PROPERTY_EXCHANGE_STATE, State.ASYNC); 221 long delay = throttlingState.peek().getDelay(TimeUnit.NANOSECONDS); 222 asyncExecutor.schedule(() -> process(exchange, callback), delay, TimeUnit.NANOSECONDS); 223 return false; 224 } catch (final RejectedExecutionException e) { 225 if (isCallerRunsWhenRejected()) { 226 log.debug("AsyncExecutor is full, rejected exchange will run in the current thread, exchangeId: {}", exchange.getExchangeId()); 227 exchange.setProperty(PROPERTY_EXCHANGE_STATE, State.ASYNC_REJECTED); 228 return process(exchange, callback); 229 } 230 throw e; 231 } 232 } 233 234 @SuppressWarnings("unchecked") 235 @Override 236 protected void doStart() throws Exception { 237 if (isAsyncDelayed()) { 238 ObjectHelper.notNull(asyncExecutor, "executorService", this); 239 } 240 super.doStart(); 241 } 242 243 @SuppressWarnings("rawtypes") 244 @Override 245 protected void doShutdown() throws Exception { 246 if (shutdownAsyncExecutor && asyncExecutor != null) { 247 camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor); 248 } 249 states.clear(); 250 super.doShutdown(); 251 } 252 253 private class ThrottlingState { 254 private final String key; 255 private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>(); 256 private final AtomicReference<ScheduledFuture<?>> cleanFuture = new AtomicReference<>(); 257 private volatile int throttleRate; 258 259 ThrottlingState(String key) { 260 this.key = key; 261 } 262 263 public String getKey() { 264 return key; 265 } 266 267 public int getThrottleRate() { 268 return throttleRate; 269 } 270 271 public ThrottlePermit poll() { 272 return delayQueue.poll(); 273 } 274 275 public ThrottlePermit peek() { 276 return delayQueue.peek(); 277 } 278 279 public ThrottlePermit take() throws InterruptedException { 280 return delayQueue.take(); 281 } 282 283 public void clean() { 284 states.remove(key); 285 } 286 287 /** 288 * Returns a permit to the DelayQueue, first resetting it's delay to be relative to now. 289 */ 290 public void enqueue(final ThrottlePermit permit, final Exchange exchange) { 291 permit.setDelayMs(getTimePeriodMillis()); 292 delayQueue.put(permit); 293 try { 294 ScheduledFuture<?> next = asyncExecutor.schedule(this::clean, cleanPeriodMillis, TimeUnit.MILLISECONDS); 295 ScheduledFuture<?> prev = cleanFuture.getAndSet(next); 296 if (prev != null) { 297 prev.cancel(false); 298 } 299 // try and incur the least amount of overhead while releasing permits back to the queue 300 if (log.isTraceEnabled()) { 301 log.trace("Permit released, for exchangeId: {}", exchange.getExchangeId()); 302 } 303 } catch (RejectedExecutionException e) { 304 log.debug("Throttling queue cleaning rejected", e); 305 } 306 } 307 308 /** 309 * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down. 310 */ 311 public synchronized void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception { 312 Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class); 313 314 if (newThrottle != null && newThrottle < 0) { 315 throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + newThrottle); 316 } 317 318 if (newThrottle == null && throttleRate == 0) { 319 throw new RuntimeExchangeException("The maxRequestsPerPeriodExpression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange); 320 } 321 322 if (newThrottle != null) { 323 if (newThrottle != throttleRate) { 324 // decrease 325 if (throttleRate > newThrottle) { 326 int delta = throttleRate - newThrottle; 327 328 // discard any permits that are needed to decrease throttling 329 while (delta > 0) { 330 delayQueue.take(); 331 delta--; 332 log.trace("Permit discarded due to throttling rate decrease, triggered by ExchangeId: {}", exchange.getExchangeId()); 333 } 334 log.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId()); 335 336 // increase 337 } else if (newThrottle > throttleRate) { 338 int delta = newThrottle - throttleRate; 339 for (int i = 0; i < delta; i++) { 340 delayQueue.put(new ThrottlePermit(-1)); 341 } 342 if (throttleRate == 0) { 343 log.debug("Initial throttle rate set to {}, triggered by ExchangeId: {}", newThrottle, exchange.getExchangeId()); 344 } else { 345 log.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId()); 346 } 347 } 348 throttleRate = newThrottle; 349 } 350 } 351 } 352 } 353 354 /** 355 * Permit that implements the Delayed interface needed by DelayQueue. 356 */ 357 private class ThrottlePermit implements Delayed { 358 private volatile long scheduledTime; 359 360 ThrottlePermit(final long delayMs) { 361 setDelayMs(delayMs); 362 } 363 364 public void setDelayMs(final long delayMs) { 365 this.scheduledTime = System.currentTimeMillis() + delayMs; 366 } 367 368 @Override 369 public long getDelay(final TimeUnit unit) { 370 return unit.convert(scheduledTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); 371 } 372 373 @Override 374 public int compareTo(final Delayed o) { 375 return (int)(getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); 376 } 377 } 378 379 public boolean isRejectExecution() { 380 return rejectExecution; 381 } 382 383 public void setRejectExecution(boolean rejectExecution) { 384 this.rejectExecution = rejectExecution; 385 } 386 387 public boolean isAsyncDelayed() { 388 return asyncDelayed; 389 } 390 391 public void setAsyncDelayed(boolean asyncDelayed) { 392 this.asyncDelayed = asyncDelayed; 393 } 394 395 public boolean isCallerRunsWhenRejected() { 396 return callerRunsWhenRejected; 397 } 398 399 public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) { 400 this.callerRunsWhenRejected = callerRunsWhenRejected; 401 } 402 403 public String getId() { 404 return id; 405 } 406 407 public void setId(final String id) { 408 this.id = id; 409 } 410 411 /** 412 * Sets the maximum number of requests per time period expression 413 */ 414 public void setMaximumRequestsPerPeriodExpression(Expression maxRequestsPerPeriodExpression) { 415 this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression; 416 } 417 418 public Expression getMaximumRequestsPerPeriodExpression() { 419 return maxRequestsPerPeriodExpression; 420 } 421 422 /** 423 * Gets the current maximum request per period value. 424 * If it is grouped throttling applied with correlationExpression 425 * than the max per period within the group will return 426 */ 427 public int getCurrentMaximumRequestsPerPeriod() { 428 return states.values().stream().mapToInt(ThrottlingState::getThrottleRate).max().orElse(0); 429 } 430 431 /** 432 * Sets the time period during which the maximum number of requests apply 433 */ 434 public void setTimePeriodMillis(final long timePeriodMillis) { 435 this.timePeriodMillis = timePeriodMillis; 436 } 437 438 public long getTimePeriodMillis() { 439 return timePeriodMillis; 440 } 441 442 public String getTraceLabel() { 443 return "throttle[" + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + "]"; 444 } 445 446 @Override 447 public String toString() { 448 return "Throttler[requests: " + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + " (ms) to: " 449 + getProcessor() + "]"; 450 } 451}