001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl; 018 019import java.util.List; 020import java.util.Timer; 021import java.util.TimerTask; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicInteger; 024import java.util.concurrent.locks.Lock; 025import java.util.concurrent.locks.ReentrantLock; 026 027import org.apache.camel.CamelContext; 028import org.apache.camel.CamelContextAware; 029import org.apache.camel.Exchange; 030import org.apache.camel.Route; 031import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer; 032import org.apache.camel.spi.RoutePolicy; 033import org.apache.camel.support.RoutePolicySupport; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy} 039 * this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are 040 * thrown and the threshold setting. 041 * 042 * the scenario: if a route cannot process data from an endpoint due to problems with resources used by the route 043 * (ie database down) then it will stop consuming new messages from the endpoint by stopping the consumer. 044 * The implementation is comparable to the Circuit Breaker pattern. After a set amount of time, it will move 045 * to a half open state and attempt to determine if the consumer can be started. 046 * There are two ways to determine if a route can be closed after being opened 047 * (1) start the consumer and check the failure threshold 048 * (2) call the {@link ThrottlingExceptionHalfOpenHandler} 049 * The second option allows a custom check to be performed without having to take on the possibility of 050 * multiple messages from the endpoint. The idea is that a handler could run a simple test (ie select 1 from dual) 051 * to determine if the processes that cause the route to be open are now available 052 */ 053public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implements CamelContextAware { 054 private static final Logger LOG = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class); 055 056 private static final int STATE_CLOSED = 0; 057 private static final int STATE_HALF_OPEN = 1; 058 private static final int STATE_OPEN = 2; 059 060 private CamelContext camelContext; 061 private final Lock lock = new ReentrantLock(); 062 063 // configuration 064 private int failureThreshold; 065 private long failureWindow; 066 private long halfOpenAfter; 067 private final List<Class<?>> throttledExceptions; 068 069 // handler for half open circuit 070 // can be used instead of resuming route 071 // to check on resources 072 private ThrottlingExceptionHalfOpenHandler halfOpenHandler; 073 074 // stateful information 075 private final AtomicInteger failures = new AtomicInteger(); 076 private final AtomicInteger state = new AtomicInteger(STATE_CLOSED); 077 private final AtomicBoolean keepOpen = new AtomicBoolean(false); 078 private volatile Timer halfOpenTimer; 079 private volatile long lastFailure; 080 private volatile long openedAt; 081 082 public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions) { 083 this(threshold, failureWindow, halfOpenAfter, handledExceptions, false); 084 } 085 086 public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions, boolean keepOpen) { 087 this.throttledExceptions = handledExceptions; 088 this.failureWindow = failureWindow; 089 this.halfOpenAfter = halfOpenAfter; 090 this.failureThreshold = threshold; 091 this.keepOpen.set(keepOpen); 092 } 093 094 @Override 095 public void setCamelContext(CamelContext camelContext) { 096 this.camelContext = camelContext; 097 } 098 099 @Override 100 public CamelContext getCamelContext() { 101 return camelContext; 102 } 103 104 @Override 105 public void onInit(Route route) { 106 LOG.debug("Initializing ThrottlingExceptionRoutePolicy route policy..."); 107 logState(); 108 } 109 110 @Override 111 public void onStart(Route route) { 112 // if keepOpen then start w/ the circuit open 113 if (keepOpen.get()) { 114 openCircuit(route); 115 } 116 } 117 118 @Override 119 public void onExchangeDone(Route route, Exchange exchange) { 120 if (keepOpen.get()) { 121 if (state.get() != STATE_OPEN) { 122 LOG.debug("opening circuit b/c keepOpen is on"); 123 openCircuit(route); 124 } 125 } else { 126 if (hasFailed(exchange)) { 127 // record the failure 128 failures.incrementAndGet(); 129 lastFailure = System.currentTimeMillis(); 130 } 131 132 // check for state change 133 calculateState(route); 134 } 135 } 136 137 /** 138 * uses similar approach as {@link CircuitBreakerLoadBalancer} 139 * if the exchange has an exception that we are watching 140 * then we count that as a failure otherwise we ignore it 141 */ 142 private boolean hasFailed(Exchange exchange) { 143 if (exchange == null) { 144 return false; 145 } 146 147 boolean answer = false; 148 149 if (exchange.getException() != null) { 150 if (throttledExceptions == null || throttledExceptions.isEmpty()) { 151 // if no exceptions defined then always fail 152 // (ie) assume we throttle on all exceptions 153 answer = true; 154 } else { 155 for (Class<?> exception : throttledExceptions) { 156 // will look in exception hierarchy 157 if (exchange.getException(exception) != null) { 158 answer = true; 159 break; 160 } 161 } 162 } 163 } 164 165 if (LOG.isDebugEnabled()) { 166 String exceptionName = exchange.getException() == null ? "none" : exchange.getException().getClass().getSimpleName(); 167 LOG.debug("hasFailed ({}) with Throttled Exception: {} for exchangeId: {}", answer, exceptionName, exchange.getExchangeId()); 168 } 169 return answer; 170 } 171 172 private void calculateState(Route route) { 173 174 // have we reached the failure limit? 175 boolean failureLimitReached = isThresholdExceeded(); 176 177 if (state.get() == STATE_CLOSED) { 178 if (failureLimitReached) { 179 LOG.debug("Opening circuit..."); 180 openCircuit(route); 181 } 182 } else if (state.get() == STATE_HALF_OPEN) { 183 if (failureLimitReached) { 184 LOG.debug("Opening circuit..."); 185 openCircuit(route); 186 } else { 187 LOG.debug("Closing circuit..."); 188 closeCircuit(route); 189 } 190 } else if (state.get() == STATE_OPEN) { 191 if (!keepOpen.get()) { 192 long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt; 193 if (halfOpenAfter <= elapsedTimeSinceOpened) { 194 LOG.debug("Checking an open circuit..."); 195 if (halfOpenHandler != null) { 196 if (halfOpenHandler.isReadyToBeClosed()) { 197 LOG.debug("Closing circuit..."); 198 closeCircuit(route); 199 } else { 200 LOG.debug("Opening circuit..."); 201 openCircuit(route); 202 } 203 } else { 204 LOG.debug("Half opening circuit..."); 205 halfOpenCircuit(route); 206 } 207 } else { 208 log.debug("keeping circuit open (time not elapsed)..."); 209 } 210 } else { 211 log.debug("keeping circuit open (keepOpen is true)..."); 212 this.addHalfOpenTimer(route); 213 } 214 } 215 216 } 217 218 protected boolean isThresholdExceeded() { 219 boolean output = false; 220 logState(); 221 // failures exceed the threshold 222 // AND the last of those failures occurred within window 223 if ((failures.get() >= failureThreshold) && (lastFailure >= System.currentTimeMillis() - failureWindow)) { 224 output = true; 225 } 226 227 return output; 228 } 229 230 protected void openCircuit(Route route) { 231 try { 232 lock.lock(); 233 suspendOrStopConsumer(route.getConsumer()); 234 state.set(STATE_OPEN); 235 openedAt = System.currentTimeMillis(); 236 this.addHalfOpenTimer(route); 237 logState(); 238 } catch (Exception e) { 239 handleException(e); 240 } finally { 241 lock.unlock(); 242 } 243 } 244 245 protected void addHalfOpenTimer(Route route) { 246 halfOpenTimer = new Timer(); 247 halfOpenTimer.schedule(new HalfOpenTask(route), halfOpenAfter); 248 } 249 250 protected void halfOpenCircuit(Route route) { 251 try { 252 lock.lock(); 253 resumeOrStartConsumer(route.getConsumer()); 254 state.set(STATE_HALF_OPEN); 255 logState(); 256 } catch (Exception e) { 257 handleException(e); 258 } finally { 259 lock.unlock(); 260 } 261 } 262 263 protected void closeCircuit(Route route) { 264 try { 265 lock.lock(); 266 resumeOrStartConsumer(route.getConsumer()); 267 failures.set(0); 268 lastFailure = 0; 269 openedAt = 0; 270 state.set(STATE_CLOSED); 271 logState(); 272 } catch (Exception e) { 273 handleException(e); 274 } finally { 275 lock.unlock(); 276 } 277 } 278 279 private void logState() { 280 if (LOG.isDebugEnabled()) { 281 LOG.debug(dumpState()); 282 } 283 } 284 285 public String dumpState() { 286 int num = state.get(); 287 String routeState = stateAsString(num); 288 if (failures.get() > 0) { 289 return String.format("State %s, failures %d, last failure %d ms ago", routeState, failures.get(), System.currentTimeMillis() - lastFailure); 290 } else { 291 return String.format("State %s, failures %d", routeState, failures.get()); 292 } 293 } 294 295 private static String stateAsString(int num) { 296 if (num == STATE_CLOSED) { 297 return "closed"; 298 } else if (num == STATE_HALF_OPEN) { 299 return "half opened"; 300 } else { 301 return "opened"; 302 } 303 } 304 305 class HalfOpenTask extends TimerTask { 306 private final Route route; 307 308 HalfOpenTask(Route route) { 309 this.route = route; 310 } 311 312 @Override 313 public void run() { 314 halfOpenTimer.cancel(); 315 calculateState(route); 316 } 317 } 318 319 public ThrottlingExceptionHalfOpenHandler getHalfOpenHandler() { 320 return halfOpenHandler; 321 } 322 323 public void setHalfOpenHandler(ThrottlingExceptionHalfOpenHandler halfOpenHandler) { 324 this.halfOpenHandler = halfOpenHandler; 325 } 326 327 public boolean getKeepOpen() { 328 return this.keepOpen.get(); 329 } 330 331 public void setKeepOpen(boolean keepOpen) { 332 log.debug("keep open: {}", keepOpen); 333 this.keepOpen.set(keepOpen); 334 } 335 336 public int getFailureThreshold() { 337 return failureThreshold; 338 } 339 340 public void setFailureThreshold(int failureThreshold) { 341 this.failureThreshold = failureThreshold; 342 } 343 344 public long getFailureWindow() { 345 return failureWindow; 346 } 347 348 public void setFailureWindow(long failureWindow) { 349 this.failureWindow = failureWindow; 350 } 351 352 public long getHalfOpenAfter() { 353 return halfOpenAfter; 354 } 355 356 public void setHalfOpenAfter(long halfOpenAfter) { 357 this.halfOpenAfter = halfOpenAfter; 358 } 359 360 public int getFailures() { 361 return failures.get(); 362 } 363 364 public long getLastFailure() { 365 return lastFailure; 366 } 367 368 public long getOpenedAt() { 369 return openedAt; 370 } 371 372}