001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl.cloud;
018
019import org.apache.camel.AsyncCallback;
020import org.apache.camel.AsyncProcessor;
021import org.apache.camel.CamelContext;
022import org.apache.camel.Exchange;
023import org.apache.camel.ExchangePattern;
024import org.apache.camel.Expression;
025import org.apache.camel.Message;
026import org.apache.camel.cloud.ServiceDefinition;
027import org.apache.camel.cloud.ServiceLoadBalancer;
028import org.apache.camel.language.simple.SimpleLanguage;
029import org.apache.camel.processor.SendDynamicProcessor;
030import org.apache.camel.support.ServiceSupport;
031import org.apache.camel.util.AsyncProcessorHelper;
032import org.apache.camel.util.ObjectHelper;
033import org.apache.camel.util.ServiceHelper;
034import org.apache.camel.util.StringHelper;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038public class DefaultServiceCallProcessor extends ServiceSupport implements AsyncProcessor {
039    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultServiceCallProcessor.class);
040
041    private final ExchangePattern exchangePattern;
042    private final String name;
043    private final String scheme;
044    private final String uri;
045    private final String contextPath;
046    private final CamelContext camelContext;
047    private final ServiceLoadBalancer loadBalancer;
048    private final Expression expression;
049    private SendDynamicProcessor processor;
050
051    public DefaultServiceCallProcessor(
052        CamelContext camelContext, String name, String scheme, String uri, ExchangePattern exchangePattern,
053        ServiceLoadBalancer loadBalancer, Expression expression) {
054
055        this.uri = uri;
056        this.exchangePattern = exchangePattern;
057        this.camelContext = camelContext;
058        this.loadBalancer = loadBalancer;
059
060        // setup from the provided name which can contain scheme and context-path information as well
061        String serviceName;
062        if (name.contains("/")) {
063            serviceName = StringHelper.before(name, "/");
064            this.contextPath = StringHelper.after(name, "/");
065        } else if (name.contains("?")) {
066            serviceName = StringHelper.before(name, "?");
067            this.contextPath = StringHelper.after(name, "?");
068        } else {
069            serviceName = name;
070            this.contextPath = null;
071        }
072        if (serviceName.contains(":")) {
073            this.scheme = StringHelper.before(serviceName, ":");
074            this.name = StringHelper.after(serviceName, ":");
075        } else {
076            this.scheme = scheme;
077            this.name = serviceName;
078        }
079
080        this.expression = expression;
081    }
082
083    // *************************************
084    // Properties
085    // *************************************
086
087
088    public ExchangePattern getExchangePattern() {
089        return exchangePattern;
090    }
091
092    public String getName() {
093        return name;
094    }
095
096    public String getScheme() {
097        return scheme;
098    }
099
100    public String getUri() {
101        return uri;
102    }
103
104    public String getContextPath() {
105        return contextPath;
106    }
107
108    public ServiceLoadBalancer getLoadBalancer() {
109        return loadBalancer;
110    }
111
112    public Expression getExpression() {
113        return expression;
114    }
115
116    // *************************************
117    // Lifecycle
118    // *************************************
119
120    @Override
121    protected void doStart() throws Exception {
122        StringHelper.notEmpty(name, "name", "service name");
123        ObjectHelper.notNull(camelContext, "camel context");
124        ObjectHelper.notNull(expression, "expression");
125        ObjectHelper.notNull(loadBalancer, "load balancer");
126
127        processor = new SendDynamicProcessor(uri, expression);
128        processor.setCamelContext(camelContext);
129        if (exchangePattern != null) {
130            processor.setPattern(exchangePattern);
131        }
132
133        // Start services if needed
134        ServiceHelper.startService(processor);
135        ServiceHelper.startService(loadBalancer);
136    }
137
138    @Override
139    protected void doStop() throws Exception {
140        // Stop services if needed
141        ServiceHelper.stopService(loadBalancer);
142        ServiceHelper.stopService(processor);
143    }
144
145    // *************************************
146    // Processor
147    // *************************************
148
149
150    @Override
151    public void process(Exchange exchange) throws Exception {
152        AsyncProcessorHelper.process(this, exchange);
153    }
154
155    @Override
156    public boolean process(final Exchange exchange, final AsyncCallback callback) {
157        Message message = exchange.getIn();
158
159        // the values can be dynamic using simple language so compute those
160        String val = uri;
161        if (SimpleLanguage.hasSimpleFunction(val)) {
162            val = SimpleLanguage.simple(val).evaluate(exchange, String.class);
163        }
164        message.setHeader(ServiceCallConstants.SERVICE_CALL_URI, val);
165
166        val = contextPath;
167        if (SimpleLanguage.hasSimpleFunction(val)) {
168            val = SimpleLanguage.simple(val).evaluate(exchange, String.class);
169        }
170        message.setHeader(ServiceCallConstants.SERVICE_CALL_CONTEXT_PATH, val);
171
172        val = scheme;
173        if (SimpleLanguage.hasSimpleFunction(val)) {
174            val = SimpleLanguage.simple(val).evaluate(exchange, String.class);
175        }
176        message.setHeader(ServiceCallConstants.SERVICE_CALL_SCHEME, val);
177
178        String serviceName = name;
179        if (SimpleLanguage.hasSimpleFunction(serviceName)) {
180            serviceName = SimpleLanguage.simple(serviceName).evaluate(exchange, String.class);
181        }
182        message.setHeader(ServiceCallConstants.SERVICE_NAME, serviceName);
183
184        try {
185            return loadBalancer.process(serviceName, server -> execute(server, exchange, callback));
186        } catch (Exception e) {
187            exchange.setException(e);
188            return true;
189        }
190    }
191
192    private boolean execute(ServiceDefinition server, Exchange exchange, AsyncCallback callback) throws Exception {
193        String host = server.getHost();
194        int port = server.getPort();
195
196        LOGGER.debug("Service {} active at server: {}:{}", name, host, port);
197
198        // set selected server as header
199        exchange.getIn().setHeader(ServiceCallConstants.SERVICE_HOST, host);
200        exchange.getIn().setHeader(ServiceCallConstants.SERVICE_PORT, port > 0 ? port : null);
201        exchange.getIn().setHeader(ServiceCallConstants.SERVICE_NAME, server.getName());
202        exchange.getIn().setHeader(ServiceCallConstants.SERVICE_META, server.getMetadata());
203
204        // use the dynamic send processor to call the service
205        return processor.process(exchange, callback);
206    }
207}