001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl.cloud; 018 019import java.util.Map; 020 021import org.apache.camel.AsyncCallback; 022import org.apache.camel.AsyncProcessor; 023import org.apache.camel.CamelContext; 024import org.apache.camel.Exchange; 025import org.apache.camel.ExchangePattern; 026import org.apache.camel.Expression; 027import org.apache.camel.Message; 028import org.apache.camel.cloud.ServiceDefinition; 029import org.apache.camel.cloud.ServiceLoadBalancer; 030import org.apache.camel.language.simple.SimpleLanguage; 031import org.apache.camel.processor.SendDynamicProcessor; 032import org.apache.camel.support.ServiceSupport; 033import org.apache.camel.util.AsyncProcessorHelper; 034import org.apache.camel.util.ObjectHelper; 035import org.apache.camel.util.ServiceHelper; 036import org.apache.camel.util.StringHelper; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040public class DefaultServiceCallProcessor extends ServiceSupport implements AsyncProcessor { 041 private static final Logger LOGGER = LoggerFactory.getLogger(DefaultServiceCallProcessor.class); 042 043 private final ExchangePattern exchangePattern; 044 private final String name; 045 private final String scheme; 046 private final String uri; 047 private final String contextPath; 048 private final CamelContext camelContext; 049 private final ServiceLoadBalancer loadBalancer; 050 private final Expression expression; 051 private SendDynamicProcessor processor; 052 053 public DefaultServiceCallProcessor( 054 CamelContext camelContext, String name, String scheme, String uri, ExchangePattern exchangePattern, 055 ServiceLoadBalancer loadBalancer, Expression expression) { 056 057 this.uri = uri; 058 this.exchangePattern = exchangePattern; 059 this.camelContext = camelContext; 060 this.loadBalancer = loadBalancer; 061 062 // setup from the provided name which can contain scheme and context-path information as well 063 String serviceName; 064 if (name.contains("/")) { 065 serviceName = StringHelper.before(name, "/"); 066 this.contextPath = StringHelper.after(name, "/"); 067 } else if (name.contains("?")) { 068 serviceName = StringHelper.before(name, "?"); 069 this.contextPath = StringHelper.after(name, "?"); 070 } else { 071 serviceName = name; 072 this.contextPath = null; 073 } 074 if (serviceName.contains(":")) { 075 this.scheme = StringHelper.before(serviceName, ":"); 076 this.name = StringHelper.after(serviceName, ":"); 077 } else { 078 this.scheme = scheme; 079 this.name = serviceName; 080 } 081 082 this.expression = expression; 083 } 084 085 // ************************************* 086 // Properties 087 // ************************************* 088 089 090 public ExchangePattern getExchangePattern() { 091 return exchangePattern; 092 } 093 094 public String getName() { 095 return name; 096 } 097 098 public String getScheme() { 099 return scheme; 100 } 101 102 public String getUri() { 103 return uri; 104 } 105 106 public String getContextPath() { 107 return contextPath; 108 } 109 110 public ServiceLoadBalancer getLoadBalancer() { 111 return loadBalancer; 112 } 113 114 public Expression getExpression() { 115 return expression; 116 } 117 118 // ************************************* 119 // Lifecycle 120 // ************************************* 121 122 @Override 123 protected void doStart() throws Exception { 124 StringHelper.notEmpty(name, "name", "service name"); 125 ObjectHelper.notNull(camelContext, "camel context"); 126 ObjectHelper.notNull(expression, "expression"); 127 ObjectHelper.notNull(loadBalancer, "load balancer"); 128 129 processor = new SendDynamicProcessor(uri, expression); 130 processor.setCamelContext(camelContext); 131 if (exchangePattern != null) { 132 processor.setPattern(exchangePattern); 133 } 134 135 // Start services if needed 136 ServiceHelper.startService(processor); 137 ServiceHelper.startService(loadBalancer); 138 } 139 140 @Override 141 protected void doStop() throws Exception { 142 // Stop services if needed 143 ServiceHelper.stopService(loadBalancer); 144 ServiceHelper.stopService(processor); 145 } 146 147 // ************************************* 148 // Processor 149 // ************************************* 150 151 152 @Override 153 public void process(Exchange exchange) throws Exception { 154 AsyncProcessorHelper.process(this, exchange); 155 } 156 157 @Override 158 public boolean process(final Exchange exchange, final AsyncCallback callback) { 159 final Message message = exchange.getIn(); 160 161 // the values can be dynamic using simple language so compute those 162 final String serviceName = applySimpleLanguage(name, exchange); 163 final String serviceUri = applySimpleLanguage(uri, exchange); 164 final String servicePath = applySimpleLanguage(contextPath, exchange); 165 final String serviceScheme = applySimpleLanguage(scheme, exchange); 166 167 message.setHeader(ServiceCallConstants.SERVICE_CALL_URI, serviceUri); 168 message.setHeader(ServiceCallConstants.SERVICE_CALL_CONTEXT_PATH, servicePath); 169 message.setHeader(ServiceCallConstants.SERVICE_CALL_SCHEME, serviceScheme); 170 message.setHeader(ServiceCallConstants.SERVICE_NAME, serviceName); 171 172 try { 173 return loadBalancer.process(serviceName, server -> execute(server, exchange, callback)); 174 } catch (Exception e) { 175 exchange.setException(e); 176 return true; 177 } 178 } 179 180 private boolean execute(ServiceDefinition service, Exchange exchange, AsyncCallback callback) throws Exception { 181 final Message message = exchange.getIn(); 182 final String host = service.getHost(); 183 final int port = service.getPort(); 184 final Map<String, String> meta = service.getMetadata(); 185 186 LOGGER.debug("Service {} active at server: {}:{}", name, host, port); 187 188 // set selected server as header 189 message.setHeader(ServiceCallConstants.SERVICE_HOST, host); 190 message.setHeader(ServiceCallConstants.SERVICE_PORT, port > 0 ? port : null); 191 message.setHeader(ServiceCallConstants.SERVICE_NAME, service.getName()); 192 message.setHeader(ServiceCallConstants.SERVICE_META, meta); 193 194 // If context path is not set on service call definition, reuse the one from 195 // ServiceDefinition, if any 196 message.getHeaders().compute(ServiceCallConstants.SERVICE_CALL_CONTEXT_PATH, (k, v) -> 197 v == null ? meta.get(ServiceDefinition.SERVICE_META_PATH) : v 198 ); 199 200 // If port is not set on service call definition, reuse the one from 201 // ServiceDefinition, if any 202 message.getHeaders().compute(ServiceCallConstants.SERVICE_PORT, (k, v) -> 203 v == null ? meta.get(ServiceDefinition.SERVICE_META_PORT) : v 204 ); 205 206 // use the dynamic send processor to call the service 207 return processor.process(exchange, callback); 208 } 209 210 /** 211 * This function applies the simple language to the given expression. 212 * 213 * @param expression the expression 214 * @param exchange the exchange 215 * @return the computed expression 216 */ 217 private String applySimpleLanguage(String expression, Exchange exchange) { 218 return SimpleLanguage.hasSimpleFunction(expression) 219 ? SimpleLanguage.simple(expression).evaluate(exchange, String.class) 220 : expression; 221 } 222}