001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor.loadbalancer;
018
019import java.util.List;
020import java.util.concurrent.RejectedExecutionException;
021import java.util.concurrent.atomic.AtomicInteger;
022
023import org.apache.camel.AsyncCallback;
024import org.apache.camel.AsyncProcessor;
025import org.apache.camel.CamelContext;
026import org.apache.camel.CamelContextAware;
027import org.apache.camel.Exchange;
028import org.apache.camel.Processor;
029import org.apache.camel.Traceable;
030import org.apache.camel.util.AsyncProcessorConverterHelper;
031
032public class CircuitBreakerLoadBalancer extends LoadBalancerSupport implements Traceable, CamelContextAware {
033    private static final int STATE_CLOSED = 0;
034    private static final int STATE_HALF_OPEN = 1;
035    private static final int STATE_OPEN = 2;
036
037    private final List<Class<?>> exceptions;
038    private CamelContext camelContext;
039    private int threshold;
040    private long halfOpenAfter;
041    private long lastFailure;
042
043    // stateful statistics
044    private AtomicInteger failures = new AtomicInteger();
045    private AtomicInteger state = new AtomicInteger(STATE_CLOSED);
046    private final ExceptionFailureStatistics statistics = new ExceptionFailureStatistics();
047
048    public CircuitBreakerLoadBalancer() {
049        this(null);
050    }
051
052    public CircuitBreakerLoadBalancer(List<Class<?>> exceptions) {
053        this.exceptions = exceptions;
054        statistics.init(exceptions);
055    }
056
057    public void setHalfOpenAfter(long halfOpenAfter) {
058        this.halfOpenAfter = halfOpenAfter;
059    }
060
061    public long getHalfOpenAfter() {
062        return halfOpenAfter;
063    }
064
065    public void setThreshold(int threshold) {
066        this.threshold = threshold;
067    }
068
069    public int getThreshold() {
070        return threshold;
071    }
072
073    public int getState() {
074        return state.get();
075    }
076
077    @Override
078    public CamelContext getCamelContext() {
079        return camelContext;
080    }
081
082    @Override
083    public void setCamelContext(CamelContext camelContext) {
084        this.camelContext = camelContext;
085    }
086
087    public List<Class<?>> getExceptions() {
088        return exceptions;
089    }
090
091    /**
092     * Has the given Exchange failed
093     */
094    protected boolean hasFailed(Exchange exchange) {
095        if (exchange == null) {
096            return false;
097        }
098
099        boolean answer = false;
100
101        if (exchange.getException() != null) {
102            if (exceptions == null || exceptions.isEmpty()) {
103                // always failover if no exceptions defined
104                answer = true;
105            } else {
106                for (Class<?> exception : exceptions) {
107                    // will look in exception hierarchy
108                    if (exchange.getException(exception) != null) {
109                        answer = true;
110                        break;
111                    }
112                }
113            }
114
115            if (answer) {
116                // record the failure in the statistics
117                statistics.onHandledFailure(exchange.getException());
118            }
119        }
120
121        log.trace("Failed: {} for exchangeId: {}", answer, exchange.getExchangeId());
122
123        return answer;
124    }
125
126    @Override
127    public boolean isRunAllowed() {
128        boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
129        if (forceShutdown) {
130            log.trace("Run not allowed as ShutdownStrategy is forcing shutting down");
131        }
132        return !forceShutdown && super.isRunAllowed();
133    }
134
135    public boolean process(final Exchange exchange, final AsyncCallback callback) {
136
137        // can we still run
138        if (!isRunAllowed()) {
139            log.trace("Run not allowed, will reject executing exchange: {}", exchange);
140            if (exchange.getException() == null) {
141                exchange.setException(new RejectedExecutionException("Run is not allowed"));
142            }
143            callback.done(true);
144            return true;
145        }
146
147        return calculateState(exchange, callback);
148    }
149
150    private boolean calculateState(final Exchange exchange, final AsyncCallback callback) {
151        boolean output;
152        if (state.get() == STATE_HALF_OPEN) {
153            if (failures.get() == 0) {
154                output = closeCircuit(exchange, callback);
155            } else {
156                output = openCircuit(exchange, callback);
157            }
158        } else if (state.get() == STATE_OPEN) {
159            if (failures.get() >= threshold && System.currentTimeMillis() - lastFailure < halfOpenAfter) {
160                output = openCircuit(exchange, callback);
161            } else {
162                output = halfOpenCircuit(exchange, callback);
163            }
164        } else if (state.get() == STATE_CLOSED) {
165            if (failures.get() >= threshold && System.currentTimeMillis() - lastFailure < halfOpenAfter) {
166                output = openCircuit(exchange, callback);
167            } else if (failures.get() >= threshold && System.currentTimeMillis() - lastFailure >= halfOpenAfter) {
168                output = halfOpenCircuit(exchange, callback);
169            } else {
170                output = closeCircuit(exchange, callback);
171            }
172        } else {
173            throw new IllegalStateException("Unrecognised circuitBreaker state " + state.get());
174        }
175        return output;
176    }
177
178    private boolean openCircuit(final Exchange exchange, final AsyncCallback callback) {
179        boolean output = rejectExchange(exchange, callback);
180        state.set(STATE_OPEN);
181        logState();
182        return output;
183    }
184
185    private boolean halfOpenCircuit(final Exchange exchange, final AsyncCallback callback) {
186        boolean output = executeProcessor(exchange, callback);
187        state.set(STATE_HALF_OPEN);
188        logState();
189        return output;
190    }
191
192    private boolean closeCircuit(final Exchange exchange, final AsyncCallback callback) {
193        boolean output = executeProcessor(exchange, callback);
194        state.set(STATE_CLOSED);
195        logState();
196        return output;
197    }
198
199    private void logState() {
200        if (log.isDebugEnabled()) {
201            log.debug(dumpState());
202        }
203    }
204
205    public String dumpState() {
206        int num = state.get();
207        String state = stateAsString(num);
208        if (lastFailure > 0) {
209            return String.format("State %s, failures %d, closed since %d", state, failures.get(), System.currentTimeMillis() - lastFailure);
210        } else {
211            return String.format("State %s, failures %d", state, failures.get());
212        }
213    }
214
215    private boolean executeProcessor(final Exchange exchange, final AsyncCallback callback) {
216        Processor processor = getProcessors().get(0);
217        if (processor == null) {
218            throw new IllegalStateException("No processors could be chosen to process CircuitBreaker");
219        }
220
221        // store state as exchange property
222        exchange.setProperty(Exchange.CIRCUIT_BREAKER_STATE, stateAsString(state.get()));
223
224        AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
225        // Added a callback for processing the exchange in the callback
226        boolean sync = albp.process(exchange, new CircuitBreakerCallback(exchange, callback));
227
228        // We need to check the exception here as albp is use sync call
229        if (sync) {
230            boolean failed = hasFailed(exchange);
231            if (!failed) {
232                failures.set(0);
233            } else {
234                failures.incrementAndGet();
235                lastFailure = System.currentTimeMillis();
236            }
237        } else {
238            // CircuitBreakerCallback can take care of failure check of the
239            // exchange
240            log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
241            return false;
242        }
243
244        log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
245        callback.done(true);
246        return true;
247    }
248
249    private boolean rejectExchange(final Exchange exchange, final AsyncCallback callback) {
250        exchange.setException(new RejectedExecutionException("CircuitBreaker Open: failures: " + failures + ", lastFailure: " + lastFailure));
251        callback.done(true);
252        return true;
253    }
254
255    private static String stateAsString(int num) {
256        if (num == STATE_CLOSED) {
257            return "closed";
258        } else if (num == STATE_HALF_OPEN) {
259            return "half opened";
260        } else {
261            return "opened";
262        }
263    }
264
265    public String toString() {
266        return "CircuitBreakerLoadBalancer[" + getProcessors() + "]";
267    }
268
269    public String getTraceLabel() {
270        return "circuitbreaker";
271    }
272
273    public ExceptionFailureStatistics getExceptionFailureStatistics() {
274        return statistics;
275    }
276
277    public void reset() {
278        // reset state
279        failures.set(0);
280        state.set(STATE_CLOSED);
281        statistics.reset();
282    }
283
284    @Override
285    protected void doStart() throws Exception {
286        super.doStart();
287
288        // reset state
289        reset();
290    }
291
292    @Override
293    protected void doStop() throws Exception {
294        super.doStop();
295        // noop
296    }
297
298
299    class CircuitBreakerCallback implements AsyncCallback {
300        private final AsyncCallback callback;
301        private final Exchange exchange;
302
303        CircuitBreakerCallback(Exchange exchange, AsyncCallback callback) {
304            this.callback = callback;
305            this.exchange = exchange;
306        }
307
308        @Override
309        public void done(boolean doneSync) {
310            if (!doneSync) {
311                boolean failed = hasFailed(exchange);
312                if (!failed) {
313                    failures.set(0);
314                } else {
315                    failures.incrementAndGet();
316                    lastFailure = System.currentTimeMillis();
317                }
318            }
319            callback.done(doneSync);
320        }
321
322    }
323}