001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl.remote;
018
019import java.util.List;
020import java.util.concurrent.RejectedExecutionException;
021
022import org.apache.camel.AsyncCallback;
023import org.apache.camel.AsyncProcessor;
024import org.apache.camel.CamelContext;
025import org.apache.camel.CamelContextAware;
026import org.apache.camel.Exchange;
027import org.apache.camel.ExchangePattern;
028import org.apache.camel.Expression;
029import org.apache.camel.Traceable;
030import org.apache.camel.processor.SendDynamicProcessor;
031import org.apache.camel.spi.IdAware;
032import org.apache.camel.spi.ServiceCallLoadBalancer;
033import org.apache.camel.spi.ServiceCallServer;
034import org.apache.camel.spi.ServiceCallServerListStrategy;
035import org.apache.camel.support.ServiceSupport;
036import org.apache.camel.util.AsyncProcessorHelper;
037import org.apache.camel.util.ObjectHelper;
038import org.apache.camel.util.ServiceHelper;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042public class DefaultServiceCallProcessor<S extends ServiceCallServer> extends ServiceSupport implements AsyncProcessor, CamelContextAware, Traceable, IdAware {
043    private static final Logger LOG = LoggerFactory.getLogger(DefaultServiceCallProcessor.class);
044
045    private final ExchangePattern exchangePattern;
046    private final String name;
047    private final String scheme;
048    private final String uri;
049    private final String contextPath;
050    private CamelContext camelContext;
051    private String id;
052    private ServiceCallServerListStrategy<S> serverListStrategy;
053    private ServiceCallLoadBalancer<S> loadBalancer;
054    private Expression serviceCallExpression;
055    private SendDynamicProcessor processor;
056
057    public DefaultServiceCallProcessor(String name, String scheme, String uri, ExchangePattern exchangePattern) {
058        this.uri = uri;
059        this.exchangePattern = exchangePattern;
060
061        // setup from the provided name which can contain scheme and context-path information as well
062        String serviceName;
063        if (name.contains("/")) {
064            serviceName = ObjectHelper.before(name, "/");
065            this.contextPath = ObjectHelper.after(name, "/");
066        } else if (name.contains("?")) {
067            serviceName = ObjectHelper.before(name, "?");
068            this.contextPath = ObjectHelper.after(name, "?");
069        } else {
070            serviceName = name;
071            this.contextPath = null;
072        }
073        if (serviceName.contains(":")) {
074            this.scheme = ObjectHelper.before(serviceName, ":");
075            this.name = ObjectHelper.after(serviceName, ":");
076        } else {
077            this.scheme = scheme;
078            this.name = serviceName;
079        }
080
081        this.serviceCallExpression = new DefaultServiceCallExpression(
082            this.name,
083            this.scheme,
084            this.contextPath,
085            this.uri);
086    }
087
088    @Override
089    public CamelContext getCamelContext() {
090        return camelContext;
091    }
092
093    @Override
094    public void setCamelContext(CamelContext camelContext) {
095        this.camelContext = camelContext;
096    }
097
098    @Override
099    public String getId() {
100        return id;
101    }
102
103    @Override
104    public void setId(String id) {
105        this.id = id;
106    }
107
108    @Override
109    public String getTraceLabel() {
110        return id;
111    }
112
113    public String getName() {
114        return name;
115    }
116
117    public String getScheme() {
118        return scheme;
119    }
120
121    public String getContextPath() {
122        return contextPath;
123    }
124
125    public String getUri() {
126        return uri;
127    }
128
129    public ExchangePattern getExchangePattern() {
130        return exchangePattern;
131    }
132
133    public ServiceCallLoadBalancer<S> getLoadBalancer() {
134        return loadBalancer;
135    }
136
137    public void setLoadBalancer(ServiceCallLoadBalancer<S> loadBalancer) {
138        this.loadBalancer = loadBalancer;
139    }
140
141    public DefaultServiceCallProcessor loadBalancer(ServiceCallLoadBalancer<S> loadBalancer) {
142        setLoadBalancer(loadBalancer);
143        return this;
144    }
145
146    public ServiceCallServerListStrategy<S> getServerListStrategy() {
147        return serverListStrategy;
148    }
149
150    public void setServerListStrategy(ServiceCallServerListStrategy<S> serverListStrategy) {
151        this.serverListStrategy = serverListStrategy;
152    }
153
154    public DefaultServiceCallProcessor serverListStrategy(ServiceCallServerListStrategy<S> serverListStrategy) {
155        setServerListStrategy(serverListStrategy);
156        return this;
157    }
158
159    public void setServiceCallExpression(Expression serviceCallExpression) {
160        this.serviceCallExpression = serviceCallExpression;
161    }
162
163    public Expression getServiceCallExpression() {
164        return serviceCallExpression;
165    }
166
167    public DefaultServiceCallProcessor serviceCallExpression(Expression serviceCallExpression) {
168        setServiceCallExpression(serviceCallExpression);
169        return this;
170    }
171
172    public AsyncProcessor getProcessor() {
173        return processor;
174    }
175
176    @Override
177    protected void doStart() throws Exception {
178        ObjectHelper.notEmpty(getName(), "name", "serviceName");
179        ObjectHelper.notNull(camelContext, "camelContext");
180        ObjectHelper.notNull(serviceCallExpression, "serviceCallExpression");
181
182        LOG.info("ServiceCall with service name: {} is using load balancer: {} and service discovery: {}",
183            name, loadBalancer, serverListStrategy);
184
185        processor = new SendDynamicProcessor(uri, serviceCallExpression);
186        processor.setCamelContext(getCamelContext());
187        if (exchangePattern != null) {
188            processor.setPattern(exchangePattern);
189        }
190
191        ServiceHelper.startServices(serverListStrategy, processor);
192    }
193
194    @Override
195    protected void doStop() throws Exception {
196        ServiceHelper.stopServices(processor, serverListStrategy);
197    }
198
199    @Override
200    public void process(Exchange exchange) throws Exception {
201        AsyncProcessorHelper.process(this, exchange);
202    }
203
204    @Override
205    public boolean process(Exchange exchange, AsyncCallback callback) {
206        final String serviceName = exchange.getIn().getHeader(ServiceCallConstants.SERVICE_NAME, name, String.class);
207        final ServiceCallServer server = chooseServer(exchange, serviceName);
208
209        if (exchange.getException() != null) {
210            callback.done(true);
211            return true;
212        }
213
214        String ip = server.getIp();
215        int port = server.getPort();
216        LOG.debug("Service {} active at server: {}:{}", name, ip, port);
217
218        // set selected server as header
219        exchange.getIn().setHeader(ServiceCallConstants.SERVER_IP, ip);
220        exchange.getIn().setHeader(ServiceCallConstants.SERVER_PORT, port);
221        exchange.getIn().setHeader(ServiceCallConstants.SERVICE_NAME, serviceName);
222
223        // use the dynamic send processor to call the service
224        return processor.process(exchange, callback);
225    }
226
227    protected S chooseServer(Exchange exchange, String serviceName) {
228        ObjectHelper.notNull(serverListStrategy, "serverListStrategy");
229        ObjectHelper.notNull(loadBalancer, "loadBalancer");
230
231        S server = null;
232
233        try {
234            List<S> servers = serverListStrategy.getUpdatedListOfServers(serviceName);
235            if (servers == null || servers.isEmpty()) {
236                exchange.setException(new RejectedExecutionException("No active services with name " + name));
237            } else {
238                // let the client load balancer chose which server to use
239                server = servers.size() > 1 ? loadBalancer.chooseServer(servers) : servers.get(0);
240                if (server == null) {
241                    exchange.setException(new RejectedExecutionException("No active services with name " + name));
242                }
243            }
244        } catch (Throwable e) {
245            exchange.setException(e);
246        }
247
248        return server;
249    }
250}