001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor.loadbalancer; 018 019 import java.util.List; 020 import java.util.concurrent.RejectedExecutionException; 021 import java.util.concurrent.atomic.AtomicInteger; 022 023 import org.apache.camel.AsyncCallback; 024 import org.apache.camel.AsyncProcessor; 025 import org.apache.camel.CamelContext; 026 import org.apache.camel.CamelContextAware; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.Processor; 029 import org.apache.camel.Traceable; 030 import org.apache.camel.util.AsyncProcessorConverterHelper; 031 import org.apache.camel.util.AsyncProcessorHelper; 032 import org.apache.camel.util.ExchangeHelper; 033 import org.apache.camel.util.ObjectHelper; 034 035 /** 036 * This FailOverLoadBalancer will failover to use next processor when an exception occurred 037 * <p/> 038 * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation 039 * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the 040 * pipeline to ensure it works the same and the async routing engine is flawless. 041 */ 042 public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceable, CamelContextAware { 043 044 private final List<Class<?>> exceptions; 045 private CamelContext camelContext; 046 private boolean roundRobin; 047 private int maximumFailoverAttempts = -1; 048 049 // stateful counter 050 private final AtomicInteger counter = new AtomicInteger(-1); 051 052 public FailOverLoadBalancer() { 053 this.exceptions = null; 054 } 055 056 public FailOverLoadBalancer(List<Class<?>> exceptions) { 057 this.exceptions = exceptions; 058 059 // validate its all exception types 060 for (Class<?> type : exceptions) { 061 if (!ObjectHelper.isAssignableFrom(Throwable.class, type)) { 062 throw new IllegalArgumentException("Class is not an instance of Throwable: " + type); 063 } 064 } 065 } 066 067 @Override 068 public CamelContext getCamelContext() { 069 return camelContext; 070 } 071 072 @Override 073 public void setCamelContext(CamelContext camelContext) { 074 this.camelContext = camelContext; 075 } 076 077 public List<Class<?>> getExceptions() { 078 return exceptions; 079 } 080 081 public boolean isRoundRobin() { 082 return roundRobin; 083 } 084 085 public void setRoundRobin(boolean roundRobin) { 086 this.roundRobin = roundRobin; 087 } 088 089 public int getMaximumFailoverAttempts() { 090 return maximumFailoverAttempts; 091 } 092 093 public void setMaximumFailoverAttempts(int maximumFailoverAttempts) { 094 this.maximumFailoverAttempts = maximumFailoverAttempts; 095 } 096 097 /** 098 * Should the given failed Exchange failover? 099 * 100 * @param exchange the exchange that failed 101 * @return <tt>true</tt> to failover 102 */ 103 protected boolean shouldFailOver(Exchange exchange) { 104 if (exchange == null) { 105 return false; 106 } 107 108 boolean answer = false; 109 110 if (exchange.getException() != null) { 111 if (exceptions == null || exceptions.isEmpty()) { 112 // always failover if no exceptions defined 113 answer = true; 114 } else { 115 for (Class<?> exception : exceptions) { 116 // will look in exception hierarchy 117 if (exchange.getException(exception) != null) { 118 answer = true; 119 break; 120 } 121 } 122 } 123 } 124 125 log.trace("Should failover: {} for exchangeId: {}", answer, exchange.getExchangeId()); 126 127 return answer; 128 } 129 130 @Override 131 public boolean isRunAllowed() { 132 // determine if we can still run, or the camel context is forcing a shutdown 133 boolean forceShutdown = camelContext.getShutdownStrategy().forceShutdown(this); 134 if (forceShutdown) { 135 log.trace("Run not allowed as ShutdownStrategy is forcing shutting down"); 136 } 137 return !forceShutdown && super.isRunAllowed(); 138 } 139 140 public boolean process(final Exchange exchange, final AsyncCallback callback) { 141 final List<Processor> processors = getProcessors(); 142 143 final AtomicInteger index = new AtomicInteger(); 144 final AtomicInteger attempts = new AtomicInteger(); 145 boolean first = true; 146 // use a copy of the original exchange before failover to avoid populating side effects 147 // directly into the original exchange 148 Exchange copy = null; 149 150 // get the next processor 151 if (isRoundRobin()) { 152 if (counter.incrementAndGet() >= processors.size()) { 153 counter.set(0); 154 } 155 index.set(counter.get()); 156 } 157 log.trace("Failover starting with endpoint index {}", index); 158 159 while (first || shouldFailOver(copy)) { 160 161 // can we still run 162 if (!isRunAllowed()) { 163 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 164 if (exchange.getException() == null) { 165 exchange.setException(new RejectedExecutionException()); 166 } 167 // we cannot process so invoke callback 168 callback.done(true); 169 return true; 170 } 171 172 if (!first) { 173 attempts.incrementAndGet(); 174 // are we exhausted by attempts? 175 if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) { 176 log.debug("Breaking out of failover after {} failover attempts", attempts); 177 break; 178 } 179 180 index.incrementAndGet(); 181 counter.incrementAndGet(); 182 } else { 183 // flip first switch 184 first = false; 185 } 186 187 if (index.get() >= processors.size()) { 188 // out of bounds 189 if (isRoundRobin()) { 190 log.trace("Failover is round robin enabled and therefore starting from the first endpoint"); 191 index.set(0); 192 counter.set(0); 193 } else { 194 // no more processors to try 195 log.trace("Breaking out of failover as we reached the end of endpoints to use for failover"); 196 break; 197 } 198 } 199 200 // try again but copy original exchange before we failover 201 copy = prepareExchangeForFailover(exchange); 202 Processor processor = processors.get(index.get()); 203 204 // process the exchange 205 boolean sync = processExchange(processor, exchange, copy, attempts, index, callback, processors); 206 207 // continue as long its being processed synchronously 208 if (!sync) { 209 log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); 210 // the remainder of the failover will be completed async 211 // so we break out now, then the callback will be invoked which then continue routing from where we left here 212 return false; 213 } 214 215 log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); 216 } 217 218 // and copy the current result to original so it will contain this result of this eip 219 if (copy != null) { 220 ExchangeHelper.copyResults(exchange, copy); 221 } 222 log.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 223 callback.done(true); 224 return true; 225 } 226 227 /** 228 * Prepares the exchange for failover 229 * 230 * @param exchange the exchange 231 * @return a copy of the exchange to use for failover 232 */ 233 protected Exchange prepareExchangeForFailover(Exchange exchange) { 234 // use a copy of the exchange to avoid side effects on the original exchange 235 return ExchangeHelper.createCopy(exchange, true); 236 } 237 238 private boolean processExchange(Processor processor, Exchange exchange, Exchange copy, 239 AtomicInteger attempts, AtomicInteger index, 240 AsyncCallback callback, List<Processor> processors) { 241 if (processor == null) { 242 throw new IllegalStateException("No processors could be chosen to process " + copy); 243 } 244 log.debug("Processing failover at attempt {} for {}", attempts, copy); 245 246 AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); 247 return AsyncProcessorHelper.process(albp, copy, new FailOverAsyncCallback(exchange, copy, attempts, index, callback, processors)); 248 } 249 250 /** 251 * Failover logic to be executed asynchronously if one of the failover endpoints 252 * is a real {@link AsyncProcessor}. 253 */ 254 private final class FailOverAsyncCallback implements AsyncCallback { 255 256 private final Exchange exchange; 257 private Exchange copy; 258 private final AtomicInteger attempts; 259 private final AtomicInteger index; 260 private final AsyncCallback callback; 261 private final List<Processor> processors; 262 263 private FailOverAsyncCallback(Exchange exchange, Exchange copy, AtomicInteger attempts, AtomicInteger index, AsyncCallback callback, List<Processor> processors) { 264 this.exchange = exchange; 265 this.copy = copy; 266 this.attempts = attempts; 267 this.index = index; 268 this.callback = callback; 269 this.processors = processors; 270 } 271 272 public void done(boolean doneSync) { 273 // we only have to handle async completion of the pipeline 274 if (doneSync) { 275 return; 276 } 277 278 while (shouldFailOver(copy)) { 279 280 // can we still run 281 if (!isRunAllowed()) { 282 log.trace("Run not allowed, will reject executing exchange: {}", exchange); 283 if (exchange.getException() == null) { 284 exchange.setException(new RejectedExecutionException()); 285 } 286 // we cannot process so invoke callback 287 callback.done(false); 288 } 289 290 attempts.incrementAndGet(); 291 // are we exhausted by attempts? 292 if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) { 293 log.trace("Breaking out of failover after {} failover attempts", attempts); 294 break; 295 } 296 297 index.incrementAndGet(); 298 counter.incrementAndGet(); 299 300 if (index.get() >= processors.size()) { 301 // out of bounds 302 if (isRoundRobin()) { 303 log.trace("Failover is round robin enabled and therefore starting from the first endpoint"); 304 index.set(0); 305 counter.set(0); 306 } else { 307 // no more processors to try 308 log.trace("Breaking out of failover as we reached the end of endpoints to use for failover"); 309 break; 310 } 311 } 312 313 // try again but prepare exchange before we failover 314 copy = prepareExchangeForFailover(exchange); 315 Processor processor = processors.get(index.get()); 316 317 // try to failover using the next processor 318 doneSync = processExchange(processor, exchange, copy, attempts, index, callback, processors); 319 if (!doneSync) { 320 log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); 321 // the remainder of the failover will be completed async 322 // so we break out now, then the callback will be invoked which then continue routing from where we left here 323 return; 324 } 325 } 326 327 // and copy the current result to original so it will contain this result of this eip 328 if (copy != null) { 329 ExchangeHelper.copyResults(exchange, copy); 330 } 331 log.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 332 // signal callback we are done 333 callback.done(false); 334 }; 335 } 336 337 public String toString() { 338 return "FailoverLoadBalancer[" + getProcessors() + "]"; 339 } 340 341 public String getTraceLabel() { 342 return "failover"; 343 } 344 }