001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor.loadbalancer; 018 019import java.util.List; 020import java.util.concurrent.RejectedExecutionException; 021import java.util.concurrent.atomic.AtomicInteger; 022 023import org.apache.camel.AsyncCallback; 024import org.apache.camel.AsyncProcessor; 025import org.apache.camel.CamelContext; 026import org.apache.camel.CamelContextAware; 027import org.apache.camel.Exchange; 028import org.apache.camel.Processor; 029import org.apache.camel.Traceable; 030import org.apache.camel.util.AsyncProcessorConverterHelper; 031 032public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements Traceable, CamelContextAware { 033 private static final int STATE_CLOSED = 0; 034 private static final int STATE_HALF_OPEN = 1; 035 private static final int STATE_OPEN = 2; 036 037 private final List<Class<?>> exceptions; 038 private CamelContext camelContext; 039 private int threshold; 040 private long halfOpenAfter; 041 private long lastFailure; 042 043 // stateful statistics 044 private AtomicInteger failures = new AtomicInteger(); 045 private AtomicInteger state = new AtomicInteger(STATE_CLOSED); 046 private final ExceptionFailureStatistics statistics = new ExceptionFailureStatistics(); 047 048 public CircuitBreakerLoadBalancer() { 049 this(null); 050 } 051 052 public CircuitBreakerLoadBalancer(List<Class<?>> exceptions) { 053 this.exceptions = exceptions; 054 statistics.init(exceptions); 055 } 056 057 public void setHalfOpenAfter(long halfOpenAfter) { 058 this.halfOpenAfter = halfOpenAfter; 059 } 060 061 public long getHalfOpenAfter() { 062 return halfOpenAfter; 063 } 064 065 public void setThreshold(int threshold) { 066 this.threshold = threshold; 067 } 068 069 public int getThreshold() { 070 return threshold; 071 } 072 073 public int getState() { 074 return state.get(); 075 } 076 077 @Override 078 public CamelContext getCamelContext() { 079 return camelContext; 080 } 081 082 @Override 083 public void setCamelContext(CamelContext camelContext) { 084 this.camelContext = camelContext; 085 } 086 087 public List<Class<?>> getExceptions() { 088 return exceptions; 089 } 090 091 /** 092 * Has the given Exchange failed 093 */ 094 protected boolean hasFailed(Exchange exchange) { 095 if (exchange == null) { 096 return false; 097 } 098 099 boolean answer = false; 100 101 if (exchange.getException() != null) { 102 if (exceptions == null || exceptions.isEmpty()) { 103 // always failover if no exceptions defined 104 answer = true; 105 } else { 106 for (Class<?> exception : exceptions) { 107 // will look in exception hierarchy 108 if (exchange.getException(exception) != null) { 109 answer = true; 110 break; 111 } 112 } 113 } 114 115 if (answer) { 116 // record the failure in the statistics 117 statistics.onHandledFailure(exchange.getException()); 118 } 119 } 120 121 log.trace("Failed: {} for exchangeId: {}", answer, exchange.getExchangeId()); 122 123 return answer; 124 } 125 126 @Override 127 public boolean isRunAllowed() { 128 boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this); 129 if (forceShutdown) { 130 log.trace("Run not allowed as ShutdownStrategy is forcing shutting down"); 131 } 132 return !forceShutdown && super.isRunAllowed(); 133 } 134 135 public boolean process(final Exchange exchange, final AsyncCallback callback) { 136 137 // can we still run 138 if (!isRunAllowed()) { 139 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 140 if (exchange.getException() == null) { 141 exchange.setException(new RejectedExecutionException("Run is not allowed")); 142 } 143 callback.done(true); 144 return true; 145 } 146 147 return calculateState(exchange, callback); 148 } 149 150 private boolean calculateState(final Exchange exchange, final AsyncCallback callback) { 151 boolean output; 152 if (state.get() == STATE_HALF_OPEN) { 153 if (failures.get() == 0) { 154 output = closeCircuit(exchange, callback); 155 } else { 156 output = openCircuit(exchange, callback); 157 } 158 } else if (state.get() == STATE_OPEN) { 159 if (failures.get() >= threshold && System.currentTimeMillis() - lastFailure < halfOpenAfter) { 160 output = openCircuit(exchange, callback); 161 } else { 162 output = halfOpenCircuit(exchange, callback); 163 } 164 } else if (state.get() == STATE_CLOSED) { 165 if (failures.get() >= threshold && System.currentTimeMillis() - lastFailure < halfOpenAfter) { 166 output = openCircuit(exchange, callback); 167 } else if (failures.get() >= threshold && System.currentTimeMillis() - lastFailure >= halfOpenAfter) { 168 output = halfOpenCircuit(exchange, callback); 169 } else { 170 output = closeCircuit(exchange, callback); 171 } 172 } else { 173 throw new IllegalStateException("Unrecognised circuitBreaker state " + state.get()); 174 } 175 return output; 176 } 177 178 private boolean openCircuit(final Exchange exchange, final AsyncCallback callback) { 179 boolean output = rejectExchange(exchange, callback); 180 state.set(STATE_OPEN); 181 logState(); 182 return output; 183 } 184 185 private boolean halfOpenCircuit(final Exchange exchange, final AsyncCallback callback) { 186 boolean output = executeProcessor(exchange, callback); 187 state.set(STATE_HALF_OPEN); 188 logState(); 189 return output; 190 } 191 192 private boolean closeCircuit(final Exchange exchange, final AsyncCallback callback) { 193 boolean output = executeProcessor(exchange, callback); 194 state.set(STATE_CLOSED); 195 logState(); 196 return output; 197 } 198 199 private void logState() { 200 if (log.isDebugEnabled()) { 201 log.debug(dumpState()); 202 } 203 } 204 205 public String dumpState() { 206 int num = state.get(); 207 String state = stateAsString(num); 208 if (lastFailure > 0) { 209 return String.format("State %s, failures %d, closed since %d", state, failures.get(), System.currentTimeMillis() - lastFailure); 210 } else { 211 return String.format("State %s, failures %d", state, failures.get()); 212 } 213 } 214 215 private boolean executeProcessor(final Exchange exchange, final AsyncCallback callback) { 216 Processor processor = getProcessors().get(0); 217 if (processor == null) { 218 throw new IllegalStateException("No processors could be chosen to process CircuitBreaker"); 219 } 220 221 // store state as exchange property 222 exchange.setProperty(Exchange.CIRCUIT_BREAKER_STATE, stateAsString(state.get())); 223 224 AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); 225 // Added a callback for processing the exchange in the callback 226 boolean sync = albp.process(exchange, new CircuitBreakerCallback(exchange, callback)); 227 228 // We need to check the exception here as albp is use sync call 229 if (sync) { 230 boolean failed = hasFailed(exchange); 231 if (!failed) { 232 failures.set(0); 233 } else { 234 failures.incrementAndGet(); 235 lastFailure = System.currentTimeMillis(); 236 } 237 } else { 238 // CircuitBreakerCallback can take care of failure check of the 239 // exchange 240 log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); 241 return false; 242 } 243 244 log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); 245 callback.done(true); 246 return true; 247 } 248 249 private boolean rejectExchange(final Exchange exchange, final AsyncCallback callback) { 250 exchange.setException(new RejectedExecutionException("CircuitBreaker Open: failures: " + failures + ", lastFailure: " + lastFailure)); 251 callback.done(true); 252 return true; 253 } 254 255 private static String stateAsString(int num) { 256 if (num == STATE_CLOSED) { 257 return "closed"; 258 } else if (num == STATE_HALF_OPEN) { 259 return "half opened"; 260 } else { 261 return "opened"; 262 } 263 } 264 265 public String toString() { 266 return "CircuitBreakerLoadBalancer[" + getProcessors() + "]"; 267 } 268 269 public String getTraceLabel() { 270 return "circuitbreaker"; 271 } 272 273 public ExceptionFailureStatistics getExceptionFailureStatistics() { 274 return statistics; 275 } 276 277 public void reset() { 278 // reset state 279 failures.set(0); 280 state.set(STATE_CLOSED); 281 statistics.reset(); 282 } 283 284 @Override 285 protected void doStart() throws Exception { 286 super.doStart(); 287 288 // reset state 289 reset(); 290 } 291 292 @Override 293 protected void doStop() throws Exception { 294 super.doStop(); 295 // noop 296 } 297 298 299 class CircuitBreakerCallback implements AsyncCallback { 300 private final AsyncCallback callback; 301 private final Exchange exchange; 302 303 CircuitBreakerCallback(Exchange exchange, AsyncCallback callback) { 304 this.callback = callback; 305 this.exchange = exchange; 306 } 307 308 @Override 309 public void done(boolean doneSync) { 310 if (!doneSync) { 311 boolean failed = hasFailed(exchange); 312 if (!failed) { 313 failures.set(0); 314 } else { 315 failures.incrementAndGet(); 316 lastFailure = System.currentTimeMillis(); 317 } 318 } 319 callback.done(doneSync); 320 } 321 322 } 323}