001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.EventObject;
020import java.util.LinkedHashSet;
021import java.util.Set;
022import java.util.concurrent.locks.Lock;
023import java.util.concurrent.locks.ReentrantLock;
024
025import org.apache.camel.CamelContext;
026import org.apache.camel.CamelContextAware;
027import org.apache.camel.Consumer;
028import org.apache.camel.Exchange;
029import org.apache.camel.LoggingLevel;
030import org.apache.camel.NonManagedService;
031import org.apache.camel.Route;
032import org.apache.camel.management.event.ExchangeCompletedEvent;
033import org.apache.camel.support.EventNotifierSupport;
034import org.apache.camel.support.RoutePolicySupport;
035import org.apache.camel.util.CamelLogger;
036import org.apache.camel.util.ObjectHelper;
037import org.apache.camel.util.ServiceHelper;
038import org.slf4j.LoggerFactory;
039
040/**
041 * A throttle based {@link org.apache.camel.spi.RoutePolicy} which is capable of dynamic
042 * throttling a route based on number of current inflight exchanges.
043 * <p/>
044 * This implementation supports two scopes {@link ThrottlingScope#Context} and {@link ThrottlingScope#Route} (is default).
045 * If context scope is selected then this implementation will use a {@link org.apache.camel.spi.EventNotifier} to listen
046 * for events when {@link Exchange}s is done, and trigger the {@link #throttle(org.apache.camel.Route, org.apache.camel.Exchange)}
047 * method. If the route scope is selected then <b>no</b> {@link org.apache.camel.spi.EventNotifier} is in use, as there is already
048 * a {@link org.apache.camel.spi.Synchronization} callback on the current {@link Exchange} which triggers the
049 * {@link #throttle(org.apache.camel.Route, org.apache.camel.Exchange)} when the current {@link Exchange} is done.
050 *
051 * @version 
052 */
053public class ThrottlingInflightRoutePolicy extends RoutePolicySupport implements CamelContextAware, NonManagedService {
054
055    public enum ThrottlingScope {
056        Context, Route
057    }
058
059    private final Set<Route> routes = new LinkedHashSet<Route>();
060    private ContextScopedEventNotifier eventNotifier;
061    private CamelContext camelContext;
062    private final Lock lock = new ReentrantLock();
063    private ThrottlingScope scope = ThrottlingScope.Route;
064    private int maxInflightExchanges = 1000;
065    private int resumePercentOfMax = 70;
066    private int resumeInflightExchanges = 700;
067    private LoggingLevel loggingLevel = LoggingLevel.INFO;
068    private CamelLogger logger;
069
070    public ThrottlingInflightRoutePolicy() {
071    }
072
073    @Override
074    public String toString() {
075        return "ThrottlingInflightRoutePolicy[" + maxInflightExchanges + " / " + resumePercentOfMax + "% using scope " + scope + "]";
076    }
077
078    public CamelContext getCamelContext() {
079        return camelContext;
080    }
081
082    public void setCamelContext(CamelContext camelContext) {
083        this.camelContext = camelContext;
084    }
085
086    @Override
087    public void onInit(Route route) {
088        // we need to remember the routes we apply for
089        routes.add(route);
090    }
091
092    @Override
093    public void onExchangeDone(Route route, Exchange exchange) {
094        // if route scoped then throttle directly
095        // as context scoped is handled using an EventNotifier instead
096        if (scope == ThrottlingScope.Route) {
097            throttle(route, exchange);
098        }
099    }
100
101    /**
102     * Throttles the route when {@link Exchange}s is done.
103     *
104     * @param route  the route
105     * @param exchange the exchange
106     */
107    protected void throttle(Route route, Exchange exchange) {
108        // this works the best when this logic is executed when the exchange is done
109        Consumer consumer = route.getConsumer();
110
111        int size = getSize(route, exchange);
112        boolean stop = maxInflightExchanges > 0 && size > maxInflightExchanges;
113        if (log.isTraceEnabled()) {
114            log.trace("{} > 0 && {} > {} evaluated as {}", new Object[]{maxInflightExchanges, size, maxInflightExchanges, stop});
115        }
116        if (stop) {
117            try {
118                lock.lock();
119                stopConsumer(size, consumer);
120            } catch (Exception e) {
121                handleException(e);
122            } finally {
123                lock.unlock();
124            }
125        }
126
127        // reload size in case a race condition with too many at once being invoked
128        // so we need to ensure that we read the most current size and start the consumer if we are already to low
129        size = getSize(route, exchange);
130        boolean start = size <= resumeInflightExchanges;
131        if (log.isTraceEnabled()) {
132            log.trace("{} <= {} evaluated as {}", new Object[]{size, resumeInflightExchanges, start});
133        }
134        if (start) {
135            try {
136                lock.lock();
137                startConsumer(size, consumer);
138            } catch (Exception e) {
139                handleException(e);
140            } finally {
141                lock.unlock();
142            }
143        }
144    }
145
146    public int getMaxInflightExchanges() {
147        return maxInflightExchanges;
148    }
149
150    /**
151     * Sets the upper limit of number of concurrent inflight exchanges at which point reached
152     * the throttler should suspend the route.
153     * <p/>
154     * Is default 1000.
155     *
156     * @param maxInflightExchanges the upper limit of concurrent inflight exchanges
157     */
158    public void setMaxInflightExchanges(int maxInflightExchanges) {
159        this.maxInflightExchanges = maxInflightExchanges;
160        // recalculate, must be at least at 1
161        this.resumeInflightExchanges = Math.max(resumePercentOfMax * maxInflightExchanges / 100, 1);
162    }
163
164    public int getResumePercentOfMax() {
165        return resumePercentOfMax;
166    }
167
168    /**
169     * Sets at which percentage of the max the throttler should start resuming the route.
170     * <p/>
171     * Will by default use 70%.
172     *
173     * @param resumePercentOfMax the percentage must be between 0 and 100
174     */
175    public void setResumePercentOfMax(int resumePercentOfMax) {
176        if (resumePercentOfMax < 0 || resumePercentOfMax > 100) {
177            throw new IllegalArgumentException("Must be a percentage between 0 and 100, was: " + resumePercentOfMax);
178        }
179
180        this.resumePercentOfMax = resumePercentOfMax;
181        // recalculate, must be at least at 1
182        this.resumeInflightExchanges = Math.max(resumePercentOfMax * maxInflightExchanges / 100, 1);
183    }
184
185    public ThrottlingScope getScope() {
186        return scope;
187    }
188
189    /**
190     * Sets which scope the throttling should be based upon, either route or total scoped.
191     *
192     * @param scope the scope
193     */
194    public void setScope(ThrottlingScope scope) {
195        this.scope = scope;
196    }
197
198    public LoggingLevel getLoggingLevel() {
199        return loggingLevel;
200    }
201
202    public CamelLogger getLogger() {
203        if (logger == null) {
204            logger = createLogger();
205        }
206        return logger;
207    }
208
209    /**
210     * Sets the logger to use for logging throttling activity.
211     *
212     * @param logger the logger
213     */
214    public void setLogger(CamelLogger logger) {
215        this.logger = logger;
216    }
217
218    /**
219     * Sets the logging level to report the throttling activity.
220     * <p/>
221     * Is default <tt>INFO</tt> level.
222     *
223     * @param loggingLevel the logging level
224     */
225    public void setLoggingLevel(LoggingLevel loggingLevel) {
226        this.loggingLevel = loggingLevel;
227    }
228
229    protected CamelLogger createLogger() {
230        return new CamelLogger(LoggerFactory.getLogger(ThrottlingInflightRoutePolicy.class), getLoggingLevel());
231    }
232
233    private int getSize(Route route, Exchange exchange) {
234        if (scope == ThrottlingScope.Context) {
235            return exchange.getContext().getInflightRepository().size();
236        } else {
237            return exchange.getContext().getInflightRepository().size(route.getId());
238        }
239    }
240
241    private void startConsumer(int size, Consumer consumer) throws Exception {
242        boolean started = super.startConsumer(consumer);
243        if (started) {
244            getLogger().log("Throttling consumer: " + size + " <= " + resumeInflightExchanges + " inflight exchange by resuming consumer: " + consumer);
245        }
246    }
247
248    private void stopConsumer(int size, Consumer consumer) throws Exception {
249        boolean stopped = super.stopConsumer(consumer);
250        if (stopped) {
251            getLogger().log("Throttling consumer: " + size + " > " + maxInflightExchanges + " inflight exchange by suspending consumer: " + consumer);
252        }
253    }
254
255    @Override
256    protected void doStart() throws Exception {
257        ObjectHelper.notNull(camelContext, "CamelContext", this);
258        if (scope == ThrottlingScope.Context) {
259            eventNotifier = new ContextScopedEventNotifier();
260            // must start the notifier before it can be used
261            ServiceHelper.startService(eventNotifier);
262            // we are in context scope, so we need to use an event notifier to keep track
263            // when any exchanges is done on the camel context.
264            // This ensures we can trigger accordingly to context scope
265            camelContext.getManagementStrategy().addEventNotifier(eventNotifier);
266        }
267    }
268
269    @Override
270    protected void doStop() throws Exception {
271        ObjectHelper.notNull(camelContext, "CamelContext", this);
272        if (scope == ThrottlingScope.Context) {
273            camelContext.getManagementStrategy().removeEventNotifier(eventNotifier);
274        }
275    }
276
277    /**
278     * {@link org.apache.camel.spi.EventNotifier} to keep track on when {@link Exchange}
279     * is done, so we can throttle accordingly.
280     */
281    private class ContextScopedEventNotifier extends EventNotifierSupport {
282
283        @Override
284        public void notify(EventObject event) throws Exception {
285            ExchangeCompletedEvent completedEvent = (ExchangeCompletedEvent) event;
286            for (Route route : routes) {
287                throttle(route, completedEvent.getExchange());
288            }
289        }
290
291        @Override
292        public boolean isEnabled(EventObject event) {
293            return event instanceof ExchangeCompletedEvent;
294        }
295
296        @Override
297        protected void doStart() throws Exception {
298            // noop
299        }
300
301        @Override
302        protected void doStop() throws Exception {
303            // noop
304        }
305
306        @Override
307        public String toString() {
308            return "ContextScopedEventNotifier";
309        }
310    }
311
312}