001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.List; 022import javax.xml.bind.annotation.XmlAccessType; 023import javax.xml.bind.annotation.XmlAccessorType; 024import javax.xml.bind.annotation.XmlElement; 025import javax.xml.bind.annotation.XmlElementRef; 026import javax.xml.bind.annotation.XmlElements; 027import javax.xml.bind.annotation.XmlRootElement; 028 029import org.apache.camel.Expression; 030import org.apache.camel.Processor; 031import org.apache.camel.model.loadbalancer.CircuitBreakerLoadBalancerDefinition; 032import org.apache.camel.model.loadbalancer.CustomLoadBalancerDefinition; 033import org.apache.camel.model.loadbalancer.FailoverLoadBalancerDefinition; 034import org.apache.camel.model.loadbalancer.RandomLoadBalancerDefinition; 035import org.apache.camel.model.loadbalancer.RoundRobinLoadBalancerDefinition; 036import org.apache.camel.model.loadbalancer.StickyLoadBalancerDefinition; 037import org.apache.camel.model.loadbalancer.TopicLoadBalancerDefinition; 038import org.apache.camel.model.loadbalancer.WeightedLoadBalancerDefinition; 039import org.apache.camel.processor.loadbalancer.LoadBalancer; 040import org.apache.camel.spi.Metadata; 041import org.apache.camel.spi.RouteContext; 042import org.apache.camel.util.CollectionStringBuffer; 043 044/** 045 * Balances message processing among a number of nodes 046 */ 047@Metadata(label = "eip,routing") 048@XmlRootElement(name = "loadBalance") 049@XmlAccessorType(XmlAccessType.FIELD) 050public class LoadBalanceDefinition extends ProcessorDefinition<LoadBalanceDefinition> { 051 @XmlElements({ 052 @XmlElement(required = false, name = "failover", type = FailoverLoadBalancerDefinition.class), 053 @XmlElement(required = false, name = "random", type = RandomLoadBalancerDefinition.class), 054 // TODO: Camel 3.0 - Should be named customLoadBalancer to avoid naming clash with custom dataformat 055 @XmlElement(required = false, name = "custom", type = CustomLoadBalancerDefinition.class), 056 @XmlElement(required = false, name = "roundRobin", type = RoundRobinLoadBalancerDefinition.class), 057 @XmlElement(required = false, name = "sticky", type = StickyLoadBalancerDefinition.class), 058 @XmlElement(required = false, name = "topic", type = TopicLoadBalancerDefinition.class), 059 @XmlElement(required = false, name = "weighted", type = WeightedLoadBalancerDefinition.class), 060 @XmlElement(required = false, name = "circuitBreaker", type = CircuitBreakerLoadBalancerDefinition.class)} 061 ) 062 private LoadBalancerDefinition loadBalancerType; 063 @XmlElementRef 064 private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); 065 066 public LoadBalanceDefinition() { 067 } 068 069 @Override 070 public List<ProcessorDefinition<?>> getOutputs() { 071 return outputs; 072 } 073 074 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 075 this.outputs = outputs; 076 if (outputs != null) { 077 for (ProcessorDefinition<?> output : outputs) { 078 configureChild(output); 079 } 080 } 081 } 082 083 public boolean isOutputSupported() { 084 return true; 085 } 086 087 public LoadBalancerDefinition getLoadBalancerType() { 088 return loadBalancerType; 089 } 090 091 /** 092 * The load balancer to be used 093 */ 094 public void setLoadBalancerType(LoadBalancerDefinition loadbalancer) { 095 if (loadBalancerType != null) { 096 throw new IllegalArgumentException("Loadbalancer already configured to: " + loadBalancerType + ". Cannot set it to: " + loadbalancer); 097 } 098 loadBalancerType = loadbalancer; 099 } 100 101 @Override 102 public Processor createProcessor(RouteContext routeContext) throws Exception { 103 // the load balancer is stateful so we should only create it once in case its used from a context scoped error handler 104 105 LoadBalancer loadBalancer = loadBalancerType.getLoadBalancer(routeContext); 106 if (loadBalancer == null) { 107 // then create it and reuse it 108 loadBalancer = loadBalancerType.createLoadBalancer(routeContext); 109 loadBalancerType.setLoadBalancer(loadBalancer); 110 111 // some load balancers can only support a fixed number of outputs 112 int max = loadBalancerType.getMaximumNumberOfOutputs(); 113 int size = getOutputs().size(); 114 if (size > max) { 115 throw new IllegalArgumentException("To many outputs configured on " + loadBalancerType + ": " + size + " > " + max); 116 } 117 118 for (ProcessorDefinition<?> processorType : getOutputs()) { 119 // output must not be another load balancer 120 // check for instanceof as the code below as there is compilation errors on earlier versions of JDK6 121 // on Windows boxes or with IBM JDKs etc. 122 if (LoadBalanceDefinition.class.isInstance(processorType)) { 123 throw new IllegalArgumentException("Loadbalancer already configured to: " + loadBalancerType + ". Cannot set it to: " + processorType); 124 } 125 Processor processor = createProcessor(routeContext, processorType); 126 processor = wrapChannel(routeContext, processor, processorType); 127 loadBalancer.addProcessor(processor); 128 } 129 } 130 return loadBalancer; 131 } 132 133 // Fluent API 134 // ------------------------------------------------------------------------- 135 136 /** 137 * Uses a custom load balancer 138 * 139 * @param loadBalancer the load balancer 140 * @return the builder 141 */ 142 public LoadBalanceDefinition loadBalance(LoadBalancer loadBalancer) { 143 CustomLoadBalancerDefinition def = new CustomLoadBalancerDefinition(); 144 def.setLoadBalancer(loadBalancer); 145 setLoadBalancerType(def); 146 return this; 147 } 148 149 /** 150 * Uses fail over load balancer 151 * <p/> 152 * Will not round robin and inherit the error handler. 153 * 154 * @return the builder 155 */ 156 public LoadBalanceDefinition failover() { 157 return failover(-1, true, false); 158 } 159 160 /** 161 * Uses fail over load balancer 162 * <p/> 163 * Will not round robin and inherit the error handler. 164 * 165 * @param exceptions exception classes which we want to failover if one of them was thrown 166 * @return the builder 167 */ 168 public LoadBalanceDefinition failover(Class<?>... exceptions) { 169 return failover(-1, true, false, exceptions); 170 } 171 172 /** 173 * Uses fail over load balancer 174 * 175 * @param maximumFailoverAttempts maximum number of failover attempts before exhausting. 176 * Use -1 to newer exhaust when round robin is also enabled. 177 * If round robin is disabled then it will exhaust when there are no more endpoints to failover 178 * @param inheritErrorHandler whether or not to inherit error handler. 179 * If <tt>false</tt> then it will failover immediately in case of an exception 180 * @param roundRobin whether or not to use round robin (which keeps state) 181 * @param exceptions exception classes which we want to failover if one of them was thrown 182 * @return the builder 183 */ 184 public LoadBalanceDefinition failover(int maximumFailoverAttempts, boolean inheritErrorHandler, boolean roundRobin, Class<?>... exceptions) { 185 return failover(maximumFailoverAttempts, inheritErrorHandler, roundRobin, false, exceptions); 186 } 187 188 /** 189 * Uses fail over load balancer 190 * 191 * @param maximumFailoverAttempts maximum number of failover attempts before exhausting. 192 * Use -1 to newer exhaust when round robin is also enabled. 193 * If round robin is disabled then it will exhaust when there are no more endpoints to failover 194 * @param inheritErrorHandler whether or not to inherit error handler. 195 * If <tt>false</tt> then it will failover immediately in case of an exception 196 * @param roundRobin whether or not to use round robin (which keeps state) 197 * @param sticky whether or not to use sticky (which keeps state) 198 * @param exceptions exception classes which we want to failover if one of them was thrown 199 * @return the builder 200 */ 201 public LoadBalanceDefinition failover(int maximumFailoverAttempts, boolean inheritErrorHandler, boolean roundRobin, boolean sticky, Class<?>... exceptions) { 202 FailoverLoadBalancerDefinition def = new FailoverLoadBalancerDefinition(); 203 def.setExceptionTypes(Arrays.asList(exceptions)); 204 def.setMaximumFailoverAttempts(maximumFailoverAttempts); 205 def.setRoundRobin(roundRobin); 206 def.setSticky(sticky); 207 setLoadBalancerType(def); 208 this.setInheritErrorHandler(inheritErrorHandler); 209 return this; 210 } 211 212 /** 213 * Uses weighted load balancer 214 * 215 * @param roundRobin used to set the processor selection algorithm. 216 * @param distributionRatio String of weighted ratios for distribution of messages. 217 * @return the builder 218 */ 219 public LoadBalanceDefinition weighted(boolean roundRobin, String distributionRatio) { 220 return weighted(roundRobin, distributionRatio, ","); 221 } 222 223 /** 224 * Uses circuitBreaker load balancer 225 * 226 * @param threshold number of errors before failure. 227 * @param halfOpenAfter time interval in milliseconds for half open state. 228 * @param exceptions exception classes which we want to break if one of them was thrown 229 * @return the builder 230 */ 231 public LoadBalanceDefinition circuitBreaker(int threshold, long halfOpenAfter, Class<?>... exceptions) { 232 CircuitBreakerLoadBalancerDefinition def = new CircuitBreakerLoadBalancerDefinition(); 233 def.setExceptionTypes(Arrays.asList(exceptions)); 234 def.setThreshold(threshold); 235 def.setHalfOpenAfter(halfOpenAfter); 236 setLoadBalancerType(def); 237 return this; 238 } 239 240 /** 241 * Uses weighted load balancer 242 * 243 * @param roundRobin used to set the processor selection algorithm. 244 * @param distributionRatio String of weighted ratios for distribution of messages. 245 * @param distributionRatioDelimiter String containing delimiter to be used for ratios 246 * @return the builder 247 */ 248 public LoadBalanceDefinition weighted(boolean roundRobin, String distributionRatio, String distributionRatioDelimiter) { 249 WeightedLoadBalancerDefinition def = new WeightedLoadBalancerDefinition(); 250 def.setRoundRobin(roundRobin); 251 def.setDistributionRatio(distributionRatio); 252 def.setDistributionRatioDelimiter(distributionRatioDelimiter); 253 setLoadBalancerType(def); 254 return this; 255 } 256 257 /** 258 * Uses round robin load balancer 259 * 260 * @return the builder 261 */ 262 public LoadBalanceDefinition roundRobin() { 263 setLoadBalancerType(new RoundRobinLoadBalancerDefinition()); 264 return this; 265 } 266 267 /** 268 * Uses random load balancer 269 * 270 * @return the builder 271 */ 272 public LoadBalanceDefinition random() { 273 setLoadBalancerType(new RandomLoadBalancerDefinition()); 274 return this; 275 } 276 277 /** 278 * Uses the custom load balancer 279 * 280 * @param ref reference to lookup a custom load balancer from the {@link org.apache.camel.spi.Registry} to be used. 281 * @return the builder 282 */ 283 public LoadBalanceDefinition custom(String ref) { 284 CustomLoadBalancerDefinition balancer = new CustomLoadBalancerDefinition(); 285 balancer.setRef(ref); 286 setLoadBalancerType(balancer); 287 return this; 288 } 289 290 /** 291 * Uses sticky load balancer 292 * 293 * @param correlationExpression the expression for correlation 294 * @return the builder 295 */ 296 public LoadBalanceDefinition sticky(Expression correlationExpression) { 297 StickyLoadBalancerDefinition def = new StickyLoadBalancerDefinition(); 298 def.setCorrelationExpression(correlationExpression); 299 setLoadBalancerType(def); 300 return this; 301 } 302 303 /** 304 * Uses topic load balancer 305 * 306 * @return the builder 307 */ 308 public LoadBalanceDefinition topic() { 309 setLoadBalancerType(new TopicLoadBalancerDefinition()); 310 return this; 311 } 312 313 @Override 314 public String getLabel() { 315 CollectionStringBuffer buffer = new CollectionStringBuffer("loadBalance["); 316 List<ProcessorDefinition<?>> list = getOutputs(); 317 for (ProcessorDefinition<?> processorType : list) { 318 buffer.append(processorType.getLabel()); 319 } 320 buffer.append("]"); 321 return buffer.toString(); 322 } 323 324 @Override 325 public String toString() { 326 return "LoadBalanceType[" + loadBalancerType + ", " + getOutputs() + "]"; 327 } 328}