001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.EventObject; 020import java.util.LinkedHashSet; 021import java.util.Set; 022import java.util.concurrent.locks.Lock; 023import java.util.concurrent.locks.ReentrantLock; 024 025import org.apache.camel.CamelContext; 026import org.apache.camel.CamelContextAware; 027import org.apache.camel.Consumer; 028import org.apache.camel.Exchange; 029import org.apache.camel.LoggingLevel; 030import org.apache.camel.NonManagedService; 031import org.apache.camel.Route; 032import org.apache.camel.management.event.ExchangeCompletedEvent; 033import org.apache.camel.support.EventNotifierSupport; 034import org.apache.camel.support.RoutePolicySupport; 035import org.apache.camel.util.CamelLogger; 036import org.apache.camel.util.ObjectHelper; 037import org.apache.camel.util.ServiceHelper; 038import org.slf4j.LoggerFactory; 039 040/** 041 * A throttle based {@link org.apache.camel.spi.RoutePolicy} which is capable of dynamic 042 * throttling a route based on number of current inflight exchanges. 043 * <p/> 044 * This implementation supports two scopes {@link ThrottlingScope#Context} and {@link ThrottlingScope#Route} (is default). 045 * If context scope is selected then this implementation will use a {@link org.apache.camel.spi.EventNotifier} to listen 046 * for events when {@link Exchange}s is done, and trigger the {@link #throttle(org.apache.camel.Route, org.apache.camel.Exchange)} 047 * method. If the route scope is selected then <b>no</b> {@link org.apache.camel.spi.EventNotifier} is in use, as there is already 048 * a {@link org.apache.camel.spi.Synchronization} callback on the current {@link Exchange} which triggers the 049 * {@link #throttle(org.apache.camel.Route, org.apache.camel.Exchange)} when the current {@link Exchange} is done. 050 * 051 * @version 052 */ 053public class ThrottlingInflightRoutePolicy extends RoutePolicySupport implements CamelContextAware, NonManagedService { 054 055 public enum ThrottlingScope { 056 Context, Route 057 } 058 059 private final Set<Route> routes = new LinkedHashSet<Route>(); 060 private ContextScopedEventNotifier eventNotifier; 061 private CamelContext camelContext; 062 private final Lock lock = new ReentrantLock(); 063 private ThrottlingScope scope = ThrottlingScope.Route; 064 private int maxInflightExchanges = 1000; 065 private int resumePercentOfMax = 70; 066 private int resumeInflightExchanges = 700; 067 private LoggingLevel loggingLevel = LoggingLevel.INFO; 068 private CamelLogger logger; 069 070 public ThrottlingInflightRoutePolicy() { 071 } 072 073 @Override 074 public String toString() { 075 return "ThrottlingInflightRoutePolicy[" + maxInflightExchanges + " / " + resumePercentOfMax + "% using scope " + scope + "]"; 076 } 077 078 public CamelContext getCamelContext() { 079 return camelContext; 080 } 081 082 public void setCamelContext(CamelContext camelContext) { 083 this.camelContext = camelContext; 084 } 085 086 @Override 087 public void onInit(Route route) { 088 // we need to remember the routes we apply for 089 routes.add(route); 090 } 091 092 @Override 093 public void onExchangeDone(Route route, Exchange exchange) { 094 // if route scoped then throttle directly 095 // as context scoped is handled using an EventNotifier instead 096 if (scope == ThrottlingScope.Route) { 097 throttle(route, exchange); 098 } 099 } 100 101 /** 102 * Throttles the route when {@link Exchange}s is done. 103 * 104 * @param route the route 105 * @param exchange the exchange 106 */ 107 protected void throttle(Route route, Exchange exchange) { 108 // this works the best when this logic is executed when the exchange is done 109 Consumer consumer = route.getConsumer(); 110 111 int size = getSize(route, exchange); 112 boolean stop = maxInflightExchanges > 0 && size > maxInflightExchanges; 113 if (log.isTraceEnabled()) { 114 log.trace("{} > 0 && {} > {} evaluated as {}", new Object[]{maxInflightExchanges, size, maxInflightExchanges, stop}); 115 } 116 if (stop) { 117 try { 118 lock.lock(); 119 stopConsumer(size, consumer); 120 } catch (Exception e) { 121 handleException(e); 122 } finally { 123 lock.unlock(); 124 } 125 } 126 127 // reload size in case a race condition with too many at once being invoked 128 // so we need to ensure that we read the most current size and start the consumer if we are already to low 129 size = getSize(route, exchange); 130 boolean start = size <= resumeInflightExchanges; 131 if (log.isTraceEnabled()) { 132 log.trace("{} <= {} evaluated as {}", new Object[]{size, resumeInflightExchanges, start}); 133 } 134 if (start) { 135 try { 136 lock.lock(); 137 startConsumer(size, consumer); 138 } catch (Exception e) { 139 handleException(e); 140 } finally { 141 lock.unlock(); 142 } 143 } 144 } 145 146 public int getMaxInflightExchanges() { 147 return maxInflightExchanges; 148 } 149 150 /** 151 * Sets the upper limit of number of concurrent inflight exchanges at which point reached 152 * the throttler should suspend the route. 153 * <p/> 154 * Is default 1000. 155 * 156 * @param maxInflightExchanges the upper limit of concurrent inflight exchanges 157 */ 158 public void setMaxInflightExchanges(int maxInflightExchanges) { 159 this.maxInflightExchanges = maxInflightExchanges; 160 // recalculate, must be at least at 1 161 this.resumeInflightExchanges = Math.max(resumePercentOfMax * maxInflightExchanges / 100, 1); 162 } 163 164 public int getResumePercentOfMax() { 165 return resumePercentOfMax; 166 } 167 168 /** 169 * Sets at which percentage of the max the throttler should start resuming the route. 170 * <p/> 171 * Will by default use 70%. 172 * 173 * @param resumePercentOfMax the percentage must be between 0 and 100 174 */ 175 public void setResumePercentOfMax(int resumePercentOfMax) { 176 if (resumePercentOfMax < 0 || resumePercentOfMax > 100) { 177 throw new IllegalArgumentException("Must be a percentage between 0 and 100, was: " + resumePercentOfMax); 178 } 179 180 this.resumePercentOfMax = resumePercentOfMax; 181 // recalculate, must be at least at 1 182 this.resumeInflightExchanges = Math.max(resumePercentOfMax * maxInflightExchanges / 100, 1); 183 } 184 185 public ThrottlingScope getScope() { 186 return scope; 187 } 188 189 /** 190 * Sets which scope the throttling should be based upon, either route or total scoped. 191 * 192 * @param scope the scope 193 */ 194 public void setScope(ThrottlingScope scope) { 195 this.scope = scope; 196 } 197 198 public LoggingLevel getLoggingLevel() { 199 return loggingLevel; 200 } 201 202 public CamelLogger getLogger() { 203 if (logger == null) { 204 logger = createLogger(); 205 } 206 return logger; 207 } 208 209 /** 210 * Sets the logger to use for logging throttling activity. 211 * 212 * @param logger the logger 213 */ 214 public void setLogger(CamelLogger logger) { 215 this.logger = logger; 216 } 217 218 /** 219 * Sets the logging level to report the throttling activity. 220 * <p/> 221 * Is default <tt>INFO</tt> level. 222 * 223 * @param loggingLevel the logging level 224 */ 225 public void setLoggingLevel(LoggingLevel loggingLevel) { 226 this.loggingLevel = loggingLevel; 227 } 228 229 protected CamelLogger createLogger() { 230 return new CamelLogger(LoggerFactory.getLogger(ThrottlingInflightRoutePolicy.class), getLoggingLevel()); 231 } 232 233 private int getSize(Route route, Exchange exchange) { 234 if (scope == ThrottlingScope.Context) { 235 return exchange.getContext().getInflightRepository().size(); 236 } else { 237 return exchange.getContext().getInflightRepository().size(route.getId()); 238 } 239 } 240 241 private void startConsumer(int size, Consumer consumer) throws Exception { 242 boolean started = super.startConsumer(consumer); 243 if (started) { 244 getLogger().log("Throttling consumer: " + size + " <= " + resumeInflightExchanges + " inflight exchange by resuming consumer: " + consumer); 245 } 246 } 247 248 private void stopConsumer(int size, Consumer consumer) throws Exception { 249 boolean stopped = super.stopConsumer(consumer); 250 if (stopped) { 251 getLogger().log("Throttling consumer: " + size + " > " + maxInflightExchanges + " inflight exchange by suspending consumer: " + consumer); 252 } 253 } 254 255 @Override 256 protected void doStart() throws Exception { 257 ObjectHelper.notNull(camelContext, "CamelContext", this); 258 if (scope == ThrottlingScope.Context) { 259 eventNotifier = new ContextScopedEventNotifier(); 260 // must start the notifier before it can be used 261 ServiceHelper.startService(eventNotifier); 262 // we are in context scope, so we need to use an event notifier to keep track 263 // when any exchanges is done on the camel context. 264 // This ensures we can trigger accordingly to context scope 265 camelContext.getManagementStrategy().addEventNotifier(eventNotifier); 266 } 267 } 268 269 @Override 270 protected void doStop() throws Exception { 271 ObjectHelper.notNull(camelContext, "CamelContext", this); 272 if (scope == ThrottlingScope.Context) { 273 camelContext.getManagementStrategy().removeEventNotifier(eventNotifier); 274 } 275 } 276 277 /** 278 * {@link org.apache.camel.spi.EventNotifier} to keep track on when {@link Exchange} 279 * is done, so we can throttle accordingly. 280 */ 281 private class ContextScopedEventNotifier extends EventNotifierSupport { 282 283 @Override 284 public void notify(EventObject event) throws Exception { 285 ExchangeCompletedEvent completedEvent = (ExchangeCompletedEvent) event; 286 for (Route route : routes) { 287 throttle(route, completedEvent.getExchange()); 288 } 289 } 290 291 @Override 292 public boolean isEnabled(EventObject event) { 293 return event instanceof ExchangeCompletedEvent; 294 } 295 296 @Override 297 protected void doStart() throws Exception { 298 // noop 299 } 300 301 @Override 302 protected void doStop() throws Exception { 303 // noop 304 } 305 306 @Override 307 public String toString() { 308 return "ContextScopedEventNotifier"; 309 } 310 } 311 312}