001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.impl.cloud; 018 019import org.apache.camel.AsyncCallback; 020import org.apache.camel.AsyncProcessor; 021import org.apache.camel.CamelContext; 022import org.apache.camel.Exchange; 023import org.apache.camel.ExchangePattern; 024import org.apache.camel.Expression; 025import org.apache.camel.Message; 026import org.apache.camel.cloud.ServiceDefinition; 027import org.apache.camel.cloud.ServiceLoadBalancer; 028import org.apache.camel.language.simple.SimpleLanguage; 029import org.apache.camel.processor.SendDynamicProcessor; 030import org.apache.camel.support.ServiceSupport; 031import org.apache.camel.util.AsyncProcessorHelper; 032import org.apache.camel.util.ObjectHelper; 033import org.apache.camel.util.ServiceHelper; 034import org.apache.camel.util.StringHelper; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038public class DefaultServiceCallProcessor extends ServiceSupport implements AsyncProcessor { 039 private static final Logger LOGGER = LoggerFactory.getLogger(DefaultServiceCallProcessor.class); 040 041 private final ExchangePattern exchangePattern; 042 private final String name; 043 private final String scheme; 044 private final String uri; 045 private final String contextPath; 046 private final CamelContext camelContext; 047 private final ServiceLoadBalancer loadBalancer; 048 private final Expression expression; 049 private SendDynamicProcessor processor; 050 051 public DefaultServiceCallProcessor( 052 CamelContext camelContext, String name, String scheme, String uri, ExchangePattern exchangePattern, 053 ServiceLoadBalancer loadBalancer, Expression expression) { 054 055 this.uri = uri; 056 this.exchangePattern = exchangePattern; 057 this.camelContext = camelContext; 058 this.loadBalancer = loadBalancer; 059 060 // setup from the provided name which can contain scheme and context-path information as well 061 String serviceName; 062 if (name.contains("/")) { 063 serviceName = StringHelper.before(name, "/"); 064 this.contextPath = StringHelper.after(name, "/"); 065 } else if (name.contains("?")) { 066 serviceName = StringHelper.before(name, "?"); 067 this.contextPath = StringHelper.after(name, "?"); 068 } else { 069 serviceName = name; 070 this.contextPath = null; 071 } 072 if (serviceName.contains(":")) { 073 this.scheme = StringHelper.before(serviceName, ":"); 074 this.name = StringHelper.after(serviceName, ":"); 075 } else { 076 this.scheme = scheme; 077 this.name = serviceName; 078 } 079 080 this.expression = expression; 081 } 082 083 // ************************************* 084 // Properties 085 // ************************************* 086 087 088 public ExchangePattern getExchangePattern() { 089 return exchangePattern; 090 } 091 092 public String getName() { 093 return name; 094 } 095 096 public String getScheme() { 097 return scheme; 098 } 099 100 public String getUri() { 101 return uri; 102 } 103 104 public String getContextPath() { 105 return contextPath; 106 } 107 108 public ServiceLoadBalancer getLoadBalancer() { 109 return loadBalancer; 110 } 111 112 public Expression getExpression() { 113 return expression; 114 } 115 116 // ************************************* 117 // Lifecycle 118 // ************************************* 119 120 @Override 121 protected void doStart() throws Exception { 122 StringHelper.notEmpty(name, "name", "service name"); 123 ObjectHelper.notNull(camelContext, "camel context"); 124 ObjectHelper.notNull(expression, "expression"); 125 ObjectHelper.notNull(loadBalancer, "load balancer"); 126 127 processor = new SendDynamicProcessor(uri, expression); 128 processor.setCamelContext(camelContext); 129 if (exchangePattern != null) { 130 processor.setPattern(exchangePattern); 131 } 132 133 // Start services if needed 134 ServiceHelper.startService(processor); 135 ServiceHelper.startService(loadBalancer); 136 } 137 138 @Override 139 protected void doStop() throws Exception { 140 // Stop services if needed 141 ServiceHelper.stopService(loadBalancer); 142 ServiceHelper.stopService(processor); 143 } 144 145 // ************************************* 146 // Processor 147 // ************************************* 148 149 150 @Override 151 public void process(Exchange exchange) throws Exception { 152 AsyncProcessorHelper.process(this, exchange); 153 } 154 155 @Override 156 public boolean process(final Exchange exchange, final AsyncCallback callback) { 157 Message message = exchange.getIn(); 158 159 // the values can be dynamic using simple language so compute those 160 String val = uri; 161 if (SimpleLanguage.hasSimpleFunction(val)) { 162 val = SimpleLanguage.simple(val).evaluate(exchange, String.class); 163 } 164 message.setHeader(ServiceCallConstants.SERVICE_CALL_URI, val); 165 166 val = contextPath; 167 if (SimpleLanguage.hasSimpleFunction(val)) { 168 val = SimpleLanguage.simple(val).evaluate(exchange, String.class); 169 } 170 message.setHeader(ServiceCallConstants.SERVICE_CALL_CONTEXT_PATH, val); 171 172 val = scheme; 173 if (SimpleLanguage.hasSimpleFunction(val)) { 174 val = SimpleLanguage.simple(val).evaluate(exchange, String.class); 175 } 176 message.setHeader(ServiceCallConstants.SERVICE_CALL_SCHEME, val); 177 178 String serviceName = name; 179 if (SimpleLanguage.hasSimpleFunction(serviceName)) { 180 serviceName = SimpleLanguage.simple(serviceName).evaluate(exchange, String.class); 181 } 182 message.setHeader(ServiceCallConstants.SERVICE_NAME, serviceName); 183 184 try { 185 return loadBalancer.process(serviceName, server -> execute(server, exchange, callback)); 186 } catch (Exception e) { 187 exchange.setException(e); 188 return true; 189 } 190 } 191 192 private boolean execute(ServiceDefinition server, Exchange exchange, AsyncCallback callback) throws Exception { 193 String host = server.getHost(); 194 int port = server.getPort(); 195 196 LOGGER.debug("Service {} active at server: {}:{}", name, host, port); 197 198 // set selected server as header 199 exchange.getIn().setHeader(ServiceCallConstants.SERVICE_HOST, host); 200 exchange.getIn().setHeader(ServiceCallConstants.SERVICE_PORT, port > 0 ? port : null); 201 exchange.getIn().setHeader(ServiceCallConstants.SERVICE_NAME, server.getName()); 202 exchange.getIn().setHeader(ServiceCallConstants.SERVICE_META, server.getMetadata()); 203 204 // use the dynamic send processor to call the service 205 return processor.process(exchange, callback); 206 } 207}