001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl.remote; 018 019import java.util.List; 020import java.util.concurrent.RejectedExecutionException; 021 022import org.apache.camel.AsyncCallback; 023import org.apache.camel.AsyncProcessor; 024import org.apache.camel.CamelContext; 025import org.apache.camel.CamelContextAware; 026import org.apache.camel.Exchange; 027import org.apache.camel.ExchangePattern; 028import org.apache.camel.Expression; 029import org.apache.camel.Traceable; 030import org.apache.camel.processor.SendDynamicProcessor; 031import org.apache.camel.spi.IdAware; 032import org.apache.camel.spi.ServiceCallLoadBalancer; 033import org.apache.camel.spi.ServiceCallServer; 034import org.apache.camel.spi.ServiceCallServerListStrategy; 035import org.apache.camel.support.ServiceSupport; 036import org.apache.camel.util.AsyncProcessorHelper; 037import org.apache.camel.util.ObjectHelper; 038import org.apache.camel.util.ServiceHelper; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042public class DefaultServiceCallProcessor<S extends ServiceCallServer> extends ServiceSupport implements AsyncProcessor, CamelContextAware, Traceable, IdAware { 043 private static final Logger LOG = LoggerFactory.getLogger(DefaultServiceCallProcessor.class); 044 045 private final ExchangePattern exchangePattern; 046 private final String name; 047 private final String scheme; 048 private final String uri; 049 private final String contextPath; 050 private CamelContext camelContext; 051 private String id; 052 private ServiceCallServerListStrategy<S> serverListStrategy; 053 private ServiceCallLoadBalancer<S> loadBalancer; 054 private Expression serviceCallExpression; 055 private SendDynamicProcessor processor; 056 057 public DefaultServiceCallProcessor(String name, String scheme, String uri, ExchangePattern exchangePattern) { 058 this.uri = uri; 059 this.exchangePattern = exchangePattern; 060 061 // setup from the provided name which can contain scheme and context-path information as well 062 String serviceName; 063 if (name.contains("/")) { 064 serviceName = ObjectHelper.before(name, "/"); 065 this.contextPath = ObjectHelper.after(name, "/"); 066 } else if (name.contains("?")) { 067 serviceName = ObjectHelper.before(name, "?"); 068 this.contextPath = ObjectHelper.after(name, "?"); 069 } else { 070 serviceName = name; 071 this.contextPath = null; 072 } 073 if (serviceName.contains(":")) { 074 this.scheme = ObjectHelper.before(serviceName, ":"); 075 this.name = ObjectHelper.after(serviceName, ":"); 076 } else { 077 this.scheme = scheme; 078 this.name = serviceName; 079 } 080 081 this.serviceCallExpression = new DefaultServiceCallExpression( 082 this.name, 083 this.scheme, 084 this.contextPath, 085 this.uri); 086 } 087 088 @Override 089 public CamelContext getCamelContext() { 090 return camelContext; 091 } 092 093 @Override 094 public void setCamelContext(CamelContext camelContext) { 095 this.camelContext = camelContext; 096 } 097 098 @Override 099 public String getId() { 100 return id; 101 } 102 103 @Override 104 public void setId(String id) { 105 this.id = id; 106 } 107 108 @Override 109 public String getTraceLabel() { 110 return id; 111 } 112 113 public String getName() { 114 return name; 115 } 116 117 public String getScheme() { 118 return scheme; 119 } 120 121 public String getContextPath() { 122 return contextPath; 123 } 124 125 public String getUri() { 126 return uri; 127 } 128 129 public ExchangePattern getExchangePattern() { 130 return exchangePattern; 131 } 132 133 public ServiceCallLoadBalancer<S> getLoadBalancer() { 134 return loadBalancer; 135 } 136 137 public void setLoadBalancer(ServiceCallLoadBalancer<S> loadBalancer) { 138 this.loadBalancer = loadBalancer; 139 } 140 141 public DefaultServiceCallProcessor loadBalancer(ServiceCallLoadBalancer<S> loadBalancer) { 142 setLoadBalancer(loadBalancer); 143 return this; 144 } 145 146 public ServiceCallServerListStrategy<S> getServerListStrategy() { 147 return serverListStrategy; 148 } 149 150 public void setServerListStrategy(ServiceCallServerListStrategy<S> serverListStrategy) { 151 this.serverListStrategy = serverListStrategy; 152 } 153 154 public DefaultServiceCallProcessor serverListStrategy(ServiceCallServerListStrategy<S> serverListStrategy) { 155 setServerListStrategy(serverListStrategy); 156 return this; 157 } 158 159 public void setServiceCallExpression(Expression serviceCallExpression) { 160 this.serviceCallExpression = serviceCallExpression; 161 } 162 163 public Expression getServiceCallExpression() { 164 return serviceCallExpression; 165 } 166 167 public DefaultServiceCallProcessor serviceCallExpression(Expression serviceCallExpression) { 168 setServiceCallExpression(serviceCallExpression); 169 return this; 170 } 171 172 public AsyncProcessor getProcessor() { 173 return processor; 174 } 175 176 @Override 177 protected void doStart() throws Exception { 178 ObjectHelper.notEmpty(getName(), "name", "serviceName"); 179 ObjectHelper.notNull(camelContext, "camelContext"); 180 ObjectHelper.notNull(serviceCallExpression, "serviceCallExpression"); 181 182 LOG.info("ServiceCall with service name: {} is using load balancer: {} and service discovery: {}", 183 name, loadBalancer, serverListStrategy); 184 185 processor = new SendDynamicProcessor(uri, serviceCallExpression); 186 processor.setCamelContext(getCamelContext()); 187 if (exchangePattern != null) { 188 processor.setPattern(exchangePattern); 189 } 190 191 ServiceHelper.startServices(serverListStrategy, processor); 192 } 193 194 @Override 195 protected void doStop() throws Exception { 196 ServiceHelper.stopServices(processor, serverListStrategy); 197 } 198 199 @Override 200 public void process(Exchange exchange) throws Exception { 201 AsyncProcessorHelper.process(this, exchange); 202 } 203 204 @Override 205 public boolean process(Exchange exchange, AsyncCallback callback) { 206 final String serviceName = exchange.getIn().getHeader(ServiceCallConstants.SERVICE_NAME, name, String.class); 207 final ServiceCallServer server = chooseServer(exchange, serviceName); 208 209 if (exchange.getException() != null) { 210 callback.done(true); 211 return true; 212 } 213 214 String ip = server.getIp(); 215 int port = server.getPort(); 216 LOG.debug("Service {} active at server: {}:{}", name, ip, port); 217 218 // set selected server as header 219 exchange.getIn().setHeader(ServiceCallConstants.SERVER_IP, ip); 220 exchange.getIn().setHeader(ServiceCallConstants.SERVER_PORT, port); 221 exchange.getIn().setHeader(ServiceCallConstants.SERVICE_NAME, serviceName); 222 223 // use the dynamic send processor to call the service 224 return processor.process(exchange, callback); 225 } 226 227 protected S chooseServer(Exchange exchange, String serviceName) { 228 ObjectHelper.notNull(serverListStrategy, "serverListStrategy"); 229 ObjectHelper.notNull(loadBalancer, "loadBalancer"); 230 231 S server = null; 232 233 try { 234 List<S> servers = serverListStrategy.getUpdatedListOfServers(serviceName); 235 if (servers == null || servers.isEmpty()) { 236 exchange.setException(new RejectedExecutionException("No active services with name " + name)); 237 } else { 238 // let the client load balancer chose which server to use 239 server = servers.size() > 1 ? loadBalancer.chooseServer(servers) : servers.get(0); 240 if (server == null) { 241 exchange.setException(new RejectedExecutionException("No active services with name " + name)); 242 } 243 } 244 } catch (Throwable e) { 245 exchange.setException(e); 246 } 247 248 return server; 249 } 250}