001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.loadbalancer;
018    
019    import java.util.List;
020    import java.util.concurrent.RejectedExecutionException;
021    import java.util.concurrent.atomic.AtomicInteger;
022    
023    import org.apache.camel.AsyncCallback;
024    import org.apache.camel.AsyncProcessor;
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.CamelContextAware;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.Processor;
029    import org.apache.camel.Traceable;
030    import org.apache.camel.util.AsyncProcessorConverterHelper;
031    import org.apache.camel.util.AsyncProcessorHelper;
032    import org.apache.camel.util.ExchangeHelper;
033    import org.apache.camel.util.ObjectHelper;
034    
035    /**
036     * This FailOverLoadBalancer will failover to use next processor when an exception occurred
037     * <p/>
038     * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation
039     * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the
040     * pipeline to ensure it works the same and the async routing engine is flawless.
041     */
042    public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceable, CamelContextAware {
043    
044        private final List<Class<?>> exceptions;
045        private CamelContext camelContext;
046        private boolean roundRobin;
047        private int maximumFailoverAttempts = -1;
048    
049        // stateful counter
050        private final AtomicInteger counter = new AtomicInteger(-1);
051    
052        public FailOverLoadBalancer() {
053            this.exceptions = null;
054        }
055    
056        public FailOverLoadBalancer(List<Class<?>> exceptions) {
057            this.exceptions = exceptions;
058    
059            // validate its all exception types
060            for (Class<?> type : exceptions) {
061                if (!ObjectHelper.isAssignableFrom(Throwable.class, type)) {
062                    throw new IllegalArgumentException("Class is not an instance of Throwable: " + type);
063                }
064            }
065        }
066    
067        @Override
068        public CamelContext getCamelContext() {
069            return camelContext;
070        }
071    
072        @Override
073        public void setCamelContext(CamelContext camelContext) {
074            this.camelContext = camelContext;
075        }
076    
077        public List<Class<?>> getExceptions() {
078            return exceptions;
079        }
080    
081        public boolean isRoundRobin() {
082            return roundRobin;
083        }
084    
085        public void setRoundRobin(boolean roundRobin) {
086            this.roundRobin = roundRobin;
087        }
088    
089        public int getMaximumFailoverAttempts() {
090            return maximumFailoverAttempts;
091        }
092    
093        public void setMaximumFailoverAttempts(int maximumFailoverAttempts) {
094            this.maximumFailoverAttempts = maximumFailoverAttempts;
095        }
096    
097        /**
098         * Should the given failed Exchange failover?
099         *
100         * @param exchange the exchange that failed
101         * @return <tt>true</tt> to failover
102         */
103        protected boolean shouldFailOver(Exchange exchange) {
104            if (exchange == null) {
105                return false;
106            }
107    
108            boolean answer = false;
109    
110            if (exchange.getException() != null) {
111                if (exceptions == null || exceptions.isEmpty()) {
112                    // always failover if no exceptions defined
113                    answer = true;
114                } else {
115                    for (Class<?> exception : exceptions) {
116                        // will look in exception hierarchy
117                        if (exchange.getException(exception) != null) {
118                            answer = true;
119                            break;
120                        }
121                    }
122                }
123            }
124    
125            log.trace("Should failover: {} for exchangeId: {}", answer, exchange.getExchangeId());
126    
127            return answer;
128        }
129    
130        @Override
131        public boolean isRunAllowed() {
132            // determine if we can still run, or the camel context is forcing a shutdown
133            boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this);
134            if (forceShutdown) {
135                log.trace("Run not allowed as ShutdownStrategy is forcing shutting down");
136            }
137            return !forceShutdown && super.isRunAllowed();
138        }
139    
140        public boolean process(final Exchange exchange, final AsyncCallback callback) {
141            final List<Processor> processors = getProcessors();
142    
143            final AtomicInteger index = new AtomicInteger();
144            final AtomicInteger attempts = new AtomicInteger();
145            boolean first = true;
146            // use a copy of the original exchange before failover to avoid populating side effects
147            // directly into the original exchange
148            Exchange copy = null;
149    
150            // get the next processor
151            if (isRoundRobin()) {
152                if (counter.incrementAndGet() >= processors.size()) {
153                    counter.set(0);
154                }
155                index.set(counter.get());
156            }
157            log.trace("Failover starting with endpoint index {}", index);
158    
159            while (first || shouldFailOver(copy)) {
160    
161                // can we still run
162                if (!isRunAllowed()) {
163                    log.trace("Run not allowed, will reject executing exchange: {}", exchange);
164                    if (exchange.getException() == null) {
165                        exchange.setException(new RejectedExecutionException());
166                    }
167                    // we cannot process so invoke callback
168                    callback.done(true);
169                    return true;
170                }
171    
172                if (!first) {
173                    attempts.incrementAndGet();
174                    // are we exhausted by attempts?
175                    if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) {
176                        log.debug("Breaking out of failover after {} failover attempts", attempts);
177                        break;
178                    }
179    
180                    index.incrementAndGet();
181                    counter.incrementAndGet();
182                } else {
183                    // flip first switch
184                    first = false;
185                }
186    
187                if (index.get() >= processors.size()) {
188                    // out of bounds
189                    if (isRoundRobin()) {
190                        log.trace("Failover is round robin enabled and therefore starting from the first endpoint");
191                        index.set(0);
192                        counter.set(0);
193                    } else {
194                        // no more processors to try
195                        log.trace("Breaking out of failover as we reached the end of endpoints to use for failover");
196                        break;
197                    }
198                }
199    
200                // try again but copy original exchange before we failover
201                copy = prepareExchangeForFailover(exchange);
202                Processor processor = processors.get(index.get());
203    
204                // process the exchange
205                boolean sync = processExchange(processor, exchange, copy, attempts, index, callback, processors);
206    
207                // continue as long its being processed synchronously
208                if (!sync) {
209                    log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
210                    // the remainder of the failover will be completed async
211                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
212                    return false;
213                }
214    
215                log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
216            }
217    
218            // and copy the current result to original so it will contain this result of this eip
219            if (copy != null) {
220                ExchangeHelper.copyResults(exchange, copy);
221            }
222            log.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
223            callback.done(true);
224            return true;
225        }
226    
227        /**
228         * Prepares the exchange for failover
229         *
230         * @param exchange the exchange
231         * @return a copy of the exchange to use for failover
232         */
233        protected Exchange prepareExchangeForFailover(Exchange exchange) {
234            // use a copy of the exchange to avoid side effects on the original exchange
235            return ExchangeHelper.createCopy(exchange, true);
236        }
237    
238        private boolean processExchange(Processor processor, Exchange exchange, Exchange copy,
239                                        AtomicInteger attempts, AtomicInteger index,
240                                        AsyncCallback callback, List<Processor> processors) {
241            if (processor == null) {
242                throw new IllegalStateException("No processors could be chosen to process " + copy);
243            }
244            log.debug("Processing failover at attempt {} for {}", attempts, copy);
245    
246            AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
247            return AsyncProcessorHelper.process(albp, copy, new FailOverAsyncCallback(exchange, copy, attempts, index, callback, processors));
248        }
249    
250        /**
251         * Failover logic to be executed asynchronously if one of the failover endpoints
252         * is a real {@link AsyncProcessor}.
253         */
254        private final class FailOverAsyncCallback implements AsyncCallback {
255    
256            private final Exchange exchange;
257            private Exchange copy;
258            private final AtomicInteger attempts;
259            private final AtomicInteger index;
260            private final AsyncCallback callback;
261            private final List<Processor> processors;
262    
263            private FailOverAsyncCallback(Exchange exchange, Exchange copy, AtomicInteger attempts, AtomicInteger index, AsyncCallback callback, List<Processor> processors) {
264                this.exchange = exchange;
265                this.copy = copy;
266                this.attempts = attempts;
267                this.index = index;
268                this.callback = callback;
269                this.processors = processors;
270            }
271    
272            public void done(boolean doneSync) {
273                // we only have to handle async completion of the pipeline
274                if (doneSync) {
275                    return;
276                }
277    
278                while (shouldFailOver(copy)) {
279    
280                    // can we still run
281                    if (!isRunAllowed()) {
282                        log.trace("Run not allowed, will reject executing exchange: {}", exchange);
283                        if (exchange.getException() == null) {
284                            exchange.setException(new RejectedExecutionException());
285                        }
286                        // we cannot process so invoke callback
287                        callback.done(false);
288                    }
289    
290                    attempts.incrementAndGet();
291                    // are we exhausted by attempts?
292                    if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) {
293                        log.trace("Breaking out of failover after {} failover attempts", attempts);
294                        break;
295                    }
296    
297                    index.incrementAndGet();
298                    counter.incrementAndGet();
299    
300                    if (index.get() >= processors.size()) {
301                        // out of bounds
302                        if (isRoundRobin()) {
303                            log.trace("Failover is round robin enabled and therefore starting from the first endpoint");
304                            index.set(0);
305                            counter.set(0);
306                        } else {
307                            // no more processors to try
308                            log.trace("Breaking out of failover as we reached the end of endpoints to use for failover");
309                            break;
310                        }
311                    }
312    
313                    // try again but prepare exchange before we failover
314                    copy = prepareExchangeForFailover(exchange);
315                    Processor processor = processors.get(index.get());
316    
317                    // try to failover using the next processor
318                    doneSync = processExchange(processor, exchange, copy, attempts, index, callback, processors);
319                    if (!doneSync) {
320                        log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
321                        // the remainder of the failover will be completed async
322                        // so we break out now, then the callback will be invoked which then continue routing from where we left here
323                        return;
324                    }
325                }
326    
327                // and copy the current result to original so it will contain this result of this eip
328                if (copy != null) {
329                    ExchangeHelper.copyResults(exchange, copy);
330                }
331                log.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
332                // signal callback we are done
333                callback.done(false);
334            };
335        }
336    
337        public String toString() {
338            return "FailoverLoadBalancer[" + getProcessors() + "]";
339        }
340    
341        public String getTraceLabel() {
342            return "failover";
343        }
344    }