001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.Map;
020import java.util.concurrent.ConcurrentHashMap;
021import java.util.concurrent.DelayQueue;
022import java.util.concurrent.Delayed;
023import java.util.concurrent.RejectedExecutionException;
024import java.util.concurrent.ScheduledExecutorService;
025import java.util.concurrent.ScheduledFuture;
026import java.util.concurrent.TimeUnit;
027import java.util.concurrent.atomic.AtomicReference;
028
029import org.apache.camel.AsyncCallback;
030import org.apache.camel.CamelContext;
031import org.apache.camel.Exchange;
032import org.apache.camel.Expression;
033import org.apache.camel.Processor;
034import org.apache.camel.RuntimeExchangeException;
035import org.apache.camel.Traceable;
036import org.apache.camel.spi.IdAware;
037import org.apache.camel.util.AsyncProcessorHelper;
038import org.apache.camel.util.ObjectHelper;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * A <a href="http://camel.apache.org/throttler.html">Throttler</a>
044 * will set a limit on the maximum number of message exchanges which can be sent
045 * to a processor within a specific time period. <p/> This pattern can be
046 * extremely useful if you have some external system which meters access; such
047 * as only allowing 100 requests per second; or if huge load can cause a
048 * particular system to malfunction or to reduce its throughput you might want
049 * to introduce some throttling.
050 *
051 * This throttle implementation is thread-safe and is therefore safe to be used
052 * by multiple concurrent threads in a single route.
053 *
054 * The throttling mechanism is a DelayQueue with maxRequestsPerPeriod permits on
055 * it. Each permit is set to be delayed by timePeriodMillis (except when the
056 * throttler is initialized or the throttle rate increased, then there is no delay
057 * for those permits). Callers trying to acquire a permit from the DelayQueue will
058 * block if necessary. The end result is a rolling window of time. Where from the
059 * callers point of view in the last timePeriodMillis no more than
060 * maxRequestsPerPeriod have been allowed to be acquired.
061 *
062 * @version
063 */
064public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAware {
065
066    private static final String DEFAULT_KEY = "CamelThrottlerDefaultKey";
067
068    private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = "CamelThrottlerExchangeQueuedTimestamp";
069    private static final String PROPERTY_EXCHANGE_STATE = "CamelThrottlerExchangeState";
070
071    private enum State { SYNC, ASYNC, ASYNC_REJECTED }
072
073    private final Logger log = LoggerFactory.getLogger(Throttler.class);
074    private final CamelContext camelContext;
075    private final ScheduledExecutorService asyncExecutor;
076    private final boolean shutdownAsyncExecutor;
077
078    private volatile long timePeriodMillis;
079    private volatile long cleanPeriodMillis;
080    private String id;
081    private Expression maxRequestsPerPeriodExpression;
082    private boolean rejectExecution;
083    private boolean asyncDelayed;
084    private boolean callerRunsWhenRejected = true;
085    private Expression correlationExpression;
086    private Map<String, ThrottlingState> states = new ConcurrentHashMap<>();
087
088    public Throttler(final CamelContext camelContext, final Processor processor, final Expression maxRequestsPerPeriodExpression, final long timePeriodMillis,
089                     final ScheduledExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) {
090        super(processor);
091        this.camelContext = camelContext;
092        this.rejectExecution = rejectExecution;
093        this.shutdownAsyncExecutor = shutdownAsyncExecutor;
094
095        ObjectHelper.notNull(maxRequestsPerPeriodExpression, "maxRequestsPerPeriodExpression");
096        this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
097
098        if (timePeriodMillis <= 0) {
099            throw new IllegalArgumentException("TimePeriodMillis should be a positive number, was: " + timePeriodMillis);
100        }
101        this.timePeriodMillis = timePeriodMillis;
102        this.cleanPeriodMillis = timePeriodMillis * 10;
103        this.asyncExecutor = asyncExecutor;
104        this.correlationExpression = correlation;
105    }
106
107    @Override
108    public boolean process(final Exchange exchange, final AsyncCallback callback) {
109        long queuedStart = 0;
110        if (log.isTraceEnabled()) {
111            queuedStart = exchange.getProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP, 0L, Long.class);
112            exchange.removeProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP);
113        }
114        State state = exchange.getProperty(PROPERTY_EXCHANGE_STATE, State.SYNC, State.class);
115        exchange.removeProperty(PROPERTY_EXCHANGE_STATE);
116        boolean doneSync = state == State.SYNC || state == State.ASYNC_REJECTED;
117
118        try {
119            if (!isRunAllowed()) {
120                throw new RejectedExecutionException("Run is not allowed");
121            }
122
123            String key = DEFAULT_KEY;
124            if (correlationExpression != null) {
125                key = correlationExpression.evaluate(exchange, String.class);
126            }
127            ThrottlingState throttlingState = states.computeIfAbsent(key, ThrottlingState::new);
128            throttlingState.calculateAndSetMaxRequestsPerPeriod(exchange);
129
130            ThrottlePermit permit = throttlingState.poll();
131
132            if (permit == null) {
133                if (isRejectExecution()) {
134                    throw new ThrottlerRejectedExecutionException("Exceeded the max throttle rate of "
135                            + throttlingState.getThrottleRate() + " within " + timePeriodMillis + "ms");
136                } else {
137                    // delegate to async pool
138                    if (isAsyncDelayed() && !exchange.isTransacted() && state == State.SYNC) {
139                        log.debug("Throttle rate exceeded but AsyncDelayed enabled, so queueing for async processing, exchangeId: {}", exchange.getExchangeId());
140                        return processAsynchronously(exchange, callback, throttlingState);
141                    }
142
143                    // block waiting for a permit
144                    long start = 0;
145                    long elapsed = 0;
146                    if (log.isTraceEnabled()) {
147                        start = System.currentTimeMillis();
148                    }
149                    permit = throttlingState.take();
150                    if (log.isTraceEnabled()) {
151                        elapsed = System.currentTimeMillis() - start;
152                    }
153                    throttlingState.enqueue(permit, exchange);
154
155                    if (state == State.ASYNC) {
156                        if (log.isTraceEnabled()) {
157                            long queuedTime = start - queuedStart;
158                            log.trace("Queued for {}ms, Throttled for {}ms, exchangeId: {}", queuedTime, elapsed, exchange.getExchangeId());
159                        }
160                    } else {
161                        log.trace("Throttled for {}ms, exchangeId: {}", elapsed, exchange.getExchangeId());
162                    }
163                }
164            } else {
165                throttlingState.enqueue(permit, exchange);
166
167                if (state == State.ASYNC) {
168                    if (log.isTraceEnabled()) {
169                        long queuedTime = System.currentTimeMillis() - queuedStart;
170                        log.trace("Queued for {}ms, No throttling applied (throttle cleared while queued), for exchangeId: {}", queuedTime, exchange.getExchangeId());
171                    }
172                } else {
173                    log.trace("No throttling applied to exchangeId: {}", exchange.getExchangeId());
174                }
175            }
176
177            if (processor != null) {
178                if (doneSync) {
179                    return processor.process(exchange, callback);
180                } else {
181                    // if we are executing async, then we have to call the nested processor synchronously, and we
182                    // must not share our AsyncCallback, because the nested processing has no way of knowing that
183                    // we are already executing asynchronously.
184                    AsyncProcessorHelper.process(processor, exchange);
185                }
186            }
187
188            callback.done(doneSync);
189            return doneSync;
190
191        } catch (final InterruptedException e) {
192            // determine if we can still run, or the camel context is forcing a shutdown
193            boolean forceShutdown = exchange.getContext().getShutdownStrategy().forceShutdown(this);
194            if (forceShutdown) {
195                String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: " + exchange;
196                log.debug(msg);
197                exchange.setException(new RejectedExecutionException(msg, e));
198            } else {
199                exchange.setException(e);
200            }
201            callback.done(doneSync);
202            return doneSync;
203        } catch (final Throwable t) {
204            exchange.setException(t);
205            callback.done(doneSync);
206            return doneSync;
207        }
208    }
209
210    /**
211     * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the executor rejects the submission
212     * and isCallerRunsWhenRejected() is enabled, then this method will delegate back to process(), but not
213     * before changing the exchange state to stop any recursion.
214     */
215    protected boolean processAsynchronously(final Exchange exchange, final AsyncCallback callback, ThrottlingState throttlingState) {
216        try {
217            if (log.isTraceEnabled()) {
218                exchange.setProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP, System.currentTimeMillis());
219            }
220            exchange.setProperty(PROPERTY_EXCHANGE_STATE, State.ASYNC);
221            long delay = throttlingState.peek().getDelay(TimeUnit.NANOSECONDS);
222            asyncExecutor.schedule(() -> process(exchange, callback), delay, TimeUnit.NANOSECONDS);
223            return false;
224        } catch (final RejectedExecutionException e) {
225            if (isCallerRunsWhenRejected()) {
226                log.debug("AsyncExecutor is full, rejected exchange will run in the current thread, exchangeId: {}", exchange.getExchangeId());
227                exchange.setProperty(PROPERTY_EXCHANGE_STATE, State.ASYNC_REJECTED);
228                return process(exchange, callback);
229            }
230            throw e;
231        }
232    }
233
234    @SuppressWarnings("unchecked")
235    @Override
236    protected void doStart() throws Exception {
237        if (isAsyncDelayed()) {
238            ObjectHelper.notNull(asyncExecutor, "executorService", this);
239        }
240        super.doStart();
241    }
242
243    @SuppressWarnings("rawtypes")
244    @Override
245    protected void doShutdown() throws Exception {
246        if (shutdownAsyncExecutor && asyncExecutor != null) {
247            camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor);
248        }
249        states.clear();
250        super.doShutdown();
251    }
252
253    private class ThrottlingState {
254        private final String key;
255        private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
256        private final AtomicReference<ScheduledFuture<?>> cleanFuture = new AtomicReference<>();
257        private volatile int throttleRate;
258
259        ThrottlingState(String key) {
260            this.key = key;
261        }
262
263        public String getKey() {
264            return key;
265        }
266
267        public int getThrottleRate() {
268            return throttleRate;
269        }
270
271        public ThrottlePermit poll() {
272            return delayQueue.poll();
273        }
274
275        public ThrottlePermit peek() {
276            return delayQueue.peek();
277        }
278
279        public ThrottlePermit take() throws InterruptedException {
280            return delayQueue.take();
281        }
282
283        public void clean() {
284            states.remove(key);
285        }
286
287        /**
288         * Returns a permit to the DelayQueue, first resetting it's delay to be relative to now.
289         */
290        public void enqueue(final ThrottlePermit permit, final Exchange exchange) {
291            permit.setDelayMs(getTimePeriodMillis());
292            delayQueue.put(permit);
293            try {
294                ScheduledFuture<?> next = asyncExecutor.schedule(this::clean, cleanPeriodMillis, TimeUnit.MILLISECONDS);
295                ScheduledFuture<?> prev = cleanFuture.getAndSet(next);
296                if (prev != null) {
297                    prev.cancel(false);
298                }
299                // try and incur the least amount of overhead while releasing permits back to the queue
300                if (log.isTraceEnabled()) {
301                    log.trace("Permit released, for exchangeId: {}", exchange.getExchangeId());
302                }
303            } catch (RejectedExecutionException e) {
304                log.debug("Throttling queue cleaning rejected", e);
305            }
306        }
307
308        /**
309         * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down.
310         */
311        public synchronized void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception {
312            Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
313
314            if (newThrottle != null && newThrottle < 0) {
315                throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + newThrottle);
316            }
317
318            if (newThrottle == null && throttleRate == 0) {
319                throw new RuntimeExchangeException("The maxRequestsPerPeriodExpression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange);
320            }
321
322            if (newThrottle != null) {
323                if (newThrottle != throttleRate) {
324                    // decrease
325                    if (throttleRate > newThrottle) {
326                        int delta = throttleRate - newThrottle;
327
328                        // discard any permits that are needed to decrease throttling
329                        while (delta > 0) {
330                            delayQueue.take();
331                            delta--;
332                            log.trace("Permit discarded due to throttling rate decrease, triggered by ExchangeId: {}", exchange.getExchangeId());
333                        }
334                        log.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId());
335
336                        // increase
337                    } else if (newThrottle > throttleRate) {
338                        int delta = newThrottle - throttleRate;
339                        for (int i = 0; i < delta; i++) {
340                            delayQueue.put(new ThrottlePermit(-1));
341                        }
342                        if (throttleRate == 0) {
343                            log.debug("Initial throttle rate set to {}, triggered by ExchangeId: {}", newThrottle, exchange.getExchangeId());
344                        } else {
345                            log.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId());
346                        }
347                    }
348                    throttleRate = newThrottle;
349                }
350            }
351        }
352    }
353
354    /**
355     * Permit that implements the Delayed interface needed by DelayQueue.
356     */
357    private class ThrottlePermit implements Delayed {
358        private volatile long scheduledTime;
359
360        ThrottlePermit(final long delayMs) {
361            setDelayMs(delayMs);
362        }
363
364        public void setDelayMs(final long delayMs) {
365            this.scheduledTime = System.currentTimeMillis() + delayMs;
366        }
367
368        @Override
369        public long getDelay(final TimeUnit unit) {
370            return unit.convert(scheduledTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
371        }
372
373        @Override
374        public int compareTo(final Delayed o) {
375            return (int)(getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
376        }
377    }
378
379    public boolean isRejectExecution() {
380        return rejectExecution;
381    }
382
383    public void setRejectExecution(boolean rejectExecution) {
384        this.rejectExecution = rejectExecution;
385    }
386
387    public boolean isAsyncDelayed() {
388        return asyncDelayed;
389    }
390
391    public void setAsyncDelayed(boolean asyncDelayed) {
392        this.asyncDelayed = asyncDelayed;
393    }
394
395    public boolean isCallerRunsWhenRejected() {
396        return callerRunsWhenRejected;
397    }
398
399    public void setCallerRunsWhenRejected(boolean callerRunsWhenRejected) {
400        this.callerRunsWhenRejected = callerRunsWhenRejected;
401    }
402
403    public String getId() {
404        return id;
405    }
406
407    public void setId(final String id) {
408        this.id = id;
409    }
410
411    /**
412     * Sets the maximum number of requests per time period expression
413     */
414    public void setMaximumRequestsPerPeriodExpression(Expression maxRequestsPerPeriodExpression) {
415        this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
416    }
417
418    public Expression getMaximumRequestsPerPeriodExpression() {
419        return maxRequestsPerPeriodExpression;
420    }
421
422    /**
423     * Gets the current maximum request per period value.
424     * If it is grouped throttling applied with correlationExpression
425     * than the max per period within the group will return
426     */
427    public int getCurrentMaximumRequestsPerPeriod() {
428        return states.values().stream().mapToInt(ThrottlingState::getThrottleRate).max().orElse(0);
429    }
430
431    /**
432     * Sets the time period during which the maximum number of requests apply
433     */
434    public void setTimePeriodMillis(final long timePeriodMillis) {
435        this.timePeriodMillis = timePeriodMillis;
436    }
437
438    public long getTimePeriodMillis() {
439        return timePeriodMillis;
440    }
441
442    public String getTraceLabel() {
443        return "throttle[" + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + "]";
444    }
445
446    @Override
447    public String toString() {
448        return "Throttler[requests: " + maxRequestsPerPeriodExpression + " per: " + timePeriodMillis + " (ms) to: "
449                + getProcessor() + "]";
450    }
451}