001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.util.List;
020import java.util.Timer;
021import java.util.TimerTask;
022import java.util.concurrent.atomic.AtomicBoolean;
023import java.util.concurrent.atomic.AtomicInteger;
024import java.util.concurrent.locks.Lock;
025import java.util.concurrent.locks.ReentrantLock;
026
027import org.apache.camel.CamelContext;
028import org.apache.camel.CamelContextAware;
029import org.apache.camel.Exchange;
030import org.apache.camel.Route;
031import org.apache.camel.processor.loadbalancer.CircuitBreakerLoadBalancer;
032import org.apache.camel.spi.RoutePolicy;
033import org.apache.camel.support.RoutePolicySupport;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Modeled after the {@link CircuitBreakerLoadBalancer} and {@link ThrottlingInflightRoutePolicy}
039 * this {@link RoutePolicy} will stop consuming from an endpoint based on the type of exceptions that are
040 * thrown and the threshold setting.
041 *
042 * the scenario: if a route cannot process data from an endpoint due to problems with resources used by the route
043 * (ie database down) then it will stop consuming new messages from the endpoint by stopping the consumer.
044 * The implementation is comparable to the Circuit Breaker pattern. After a set amount of time, it will move
045 * to a half open state and attempt to determine if the consumer can be started.
046 * There are two ways to determine if a route can be closed after being opened
047 * (1) start the consumer and check the failure threshold
048 * (2) call the {@link ThrottlingExceptionHalfOpenHandler}
049 * The second option allows a custom check to be performed without having to take on the possibility of
050 * multiple messages from the endpoint. The idea is that a handler could run a simple test (ie select 1 from dual)
051 * to determine if the processes that cause the route to be open are now available
052 */
053public class ThrottlingExceptionRoutePolicy extends RoutePolicySupport implements CamelContextAware {
054    private static final Logger LOG = LoggerFactory.getLogger(ThrottlingExceptionRoutePolicy.class);
055
056    private static final int STATE_CLOSED = 0;
057    private static final int STATE_HALF_OPEN = 1;
058    private static final int STATE_OPEN = 2;
059
060    private CamelContext camelContext;
061    private final Lock lock = new ReentrantLock();
062
063    // configuration
064    private int failureThreshold;
065    private long failureWindow;
066    private long halfOpenAfter;
067    private final List<Class<?>> throttledExceptions;
068
069    // handler for half open circuit
070    // can be used instead of resuming route
071    // to check on resources
072    private ThrottlingExceptionHalfOpenHandler halfOpenHandler;
073
074    // stateful information
075    private final AtomicInteger failures = new AtomicInteger();
076    private final AtomicInteger state = new AtomicInteger(STATE_CLOSED);
077    private final AtomicBoolean keepOpen = new AtomicBoolean(false);
078    private volatile Timer halfOpenTimer;
079    private volatile long lastFailure;
080    private volatile long openedAt;
081
082    public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions) {
083        this(threshold, failureWindow, halfOpenAfter, handledExceptions, false);
084    }
085
086    public ThrottlingExceptionRoutePolicy(int threshold, long failureWindow, long halfOpenAfter, List<Class<?>> handledExceptions, boolean keepOpen) {
087        this.throttledExceptions = handledExceptions;
088        this.failureWindow = failureWindow;
089        this.halfOpenAfter = halfOpenAfter;
090        this.failureThreshold = threshold;
091        this.keepOpen.set(keepOpen);
092    }
093
094    @Override
095    public void setCamelContext(CamelContext camelContext) {
096        this.camelContext = camelContext;
097    }
098
099    @Override
100    public CamelContext getCamelContext() {
101        return camelContext;
102    }
103
104    @Override
105    public void onInit(Route route) {
106        LOG.debug("Initializing ThrottlingExceptionRoutePolicy route policy...");
107        logState();
108    }
109
110    @Override
111    public void onStart(Route route) {
112        // if keepOpen then start w/ the circuit open
113        if (keepOpen.get()) {
114            openCircuit(route);
115        }
116    }
117
118    @Override
119    public void onExchangeDone(Route route, Exchange exchange) {
120        if (keepOpen.get()) {
121            if (state.get() != STATE_OPEN) {
122                LOG.debug("opening circuit b/c keepOpen is on");
123                openCircuit(route);
124            }
125        } else {
126            if (hasFailed(exchange)) {
127                // record the failure
128                failures.incrementAndGet();
129                lastFailure = System.currentTimeMillis();
130            }
131
132            // check for state change
133            calculateState(route);
134        }
135    }
136
137    /**
138     * uses similar approach as {@link CircuitBreakerLoadBalancer}
139     * if the exchange has an exception that we are watching
140     * then we count that as a failure otherwise we ignore it
141     */
142    private boolean hasFailed(Exchange exchange) {
143        if (exchange == null) {
144            return false;
145        }
146
147        boolean answer = false;
148
149        if (exchange.getException() != null) {
150            if (throttledExceptions == null || throttledExceptions.isEmpty()) {
151                // if no exceptions defined then always fail
152                // (ie) assume we throttle on all exceptions
153                answer = true;
154            } else {
155                for (Class<?> exception : throttledExceptions) {
156                    // will look in exception hierarchy
157                    if (exchange.getException(exception) != null) {
158                        answer = true;
159                        break;
160                    }
161                }
162            }
163        }
164
165        if (LOG.isDebugEnabled()) {
166            String exceptionName = exchange.getException() == null ? "none" : exchange.getException().getClass().getSimpleName();
167            LOG.debug("hasFailed ({}) with Throttled Exception: {} for exchangeId: {}", answer, exceptionName, exchange.getExchangeId());
168        }
169        return answer;
170    }
171
172    private void calculateState(Route route) {
173
174        // have we reached the failure limit?
175        boolean failureLimitReached = isThresholdExceeded();
176
177        if (state.get() == STATE_CLOSED) {
178            if (failureLimitReached) {
179                LOG.debug("Opening circuit...");
180                openCircuit(route);
181            }
182        } else if (state.get() == STATE_HALF_OPEN) {
183            if (failureLimitReached) {
184                LOG.debug("Opening circuit...");
185                openCircuit(route);
186            } else {
187                LOG.debug("Closing circuit...");
188                closeCircuit(route);
189            }
190        } else if (state.get() == STATE_OPEN) {
191            if (!keepOpen.get()) {
192                long elapsedTimeSinceOpened = System.currentTimeMillis() - openedAt;
193                if (halfOpenAfter <= elapsedTimeSinceOpened) {
194                    LOG.debug("Checking an open circuit...");
195                    if (halfOpenHandler != null) {
196                        if (halfOpenHandler.isReadyToBeClosed()) {
197                            LOG.debug("Closing circuit...");
198                            closeCircuit(route);
199                        } else {
200                            LOG.debug("Opening circuit...");
201                            openCircuit(route);
202                        }
203                    } else {
204                        LOG.debug("Half opening circuit...");
205                        halfOpenCircuit(route);
206                    }
207                } else {
208                    log.debug("keeping circuit open (time not elapsed)...");
209                }
210            } else {
211                log.debug("keeping circuit open (keepOpen is true)...");
212                this.addHalfOpenTimer(route);
213            }
214        }
215
216    }
217
218    protected boolean isThresholdExceeded() {
219        boolean output = false;
220        logState();
221        // failures exceed the threshold
222        // AND the last of those failures occurred within window
223        if ((failures.get() >= failureThreshold) && (lastFailure >= System.currentTimeMillis() - failureWindow)) {
224            output = true;
225        }
226
227        return output;
228    }
229
230    protected void openCircuit(Route route) {
231        try {
232            lock.lock();
233            suspendOrStopConsumer(route.getConsumer());
234            state.set(STATE_OPEN);
235            openedAt = System.currentTimeMillis();
236            this.addHalfOpenTimer(route);
237            logState();
238        } catch (Exception e) {
239            handleException(e);
240        } finally {
241            lock.unlock();
242        }
243    }
244
245    protected void addHalfOpenTimer(Route route) {
246        halfOpenTimer = new Timer();
247        halfOpenTimer.schedule(new HalfOpenTask(route), halfOpenAfter);
248    }
249
250    protected void halfOpenCircuit(Route route) {
251        try {
252            lock.lock();
253            resumeOrStartConsumer(route.getConsumer());
254            state.set(STATE_HALF_OPEN);
255            logState();
256        } catch (Exception e) {
257            handleException(e);
258        } finally {
259            lock.unlock();
260        }
261    }
262
263    protected void closeCircuit(Route route) {
264        try {
265            lock.lock();
266            resumeOrStartConsumer(route.getConsumer());
267            failures.set(0);
268            lastFailure = 0;
269            openedAt = 0;
270            state.set(STATE_CLOSED);
271            logState();
272        } catch (Exception e) {
273            handleException(e);
274        } finally {
275            lock.unlock();
276        }
277    }
278
279    private void logState() {
280        if (LOG.isDebugEnabled()) {
281            LOG.debug(dumpState());
282        }
283    }
284
285    public String dumpState() {
286        int num = state.get();
287        String routeState = stateAsString(num);
288        if (failures.get() > 0) {
289            return String.format("State %s, failures %d, last failure %d ms ago", routeState, failures.get(), System.currentTimeMillis() - lastFailure);
290        } else {
291            return String.format("State %s, failures %d", routeState, failures.get());
292        }
293    }
294
295    private static String stateAsString(int num) {
296        if (num == STATE_CLOSED) {
297            return "closed";
298        } else if (num == STATE_HALF_OPEN) {
299            return "half opened";
300        } else {
301            return "opened";
302        }
303    }
304
305    class HalfOpenTask extends TimerTask {
306        private final Route route;
307
308        HalfOpenTask(Route route) {
309            this.route = route;
310        }
311
312        @Override
313        public void run() {
314            halfOpenTimer.cancel();
315            calculateState(route);
316        }
317    }
318
319    public ThrottlingExceptionHalfOpenHandler getHalfOpenHandler() {
320        return halfOpenHandler;
321    }
322
323    public void setHalfOpenHandler(ThrottlingExceptionHalfOpenHandler halfOpenHandler) {
324        this.halfOpenHandler = halfOpenHandler;
325    }
326
327    public boolean getKeepOpen() {
328        return this.keepOpen.get();
329    }
330
331    public void setKeepOpen(boolean keepOpen) {
332        log.debug("keep open: {}", keepOpen);
333        this.keepOpen.set(keepOpen);
334    }
335
336    public int getFailureThreshold() {
337        return failureThreshold;
338    }
339
340    public void setFailureThreshold(int failureThreshold) {
341        this.failureThreshold = failureThreshold;
342    }
343
344    public long getFailureWindow() {
345        return failureWindow;
346    }
347
348    public void setFailureWindow(long failureWindow) {
349        this.failureWindow = failureWindow;
350    }
351
352    public long getHalfOpenAfter() {
353        return halfOpenAfter;
354    }
355
356    public void setHalfOpenAfter(long halfOpenAfter) {
357        this.halfOpenAfter = halfOpenAfter;
358    }
359
360    public int getFailures() {
361        return failures.get();
362    }
363
364    public long getLastFailure() {
365        return lastFailure;
366    }
367
368    public long getOpenedAt() {
369        return openedAt;
370    }
371
372}