001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import org.apache.camel.AsyncCallback; 020import org.apache.camel.AsyncProcessor; 021import org.apache.camel.AsyncProducerCallback; 022import org.apache.camel.CamelContext; 023import org.apache.camel.CamelContextAware; 024import org.apache.camel.Endpoint; 025import org.apache.camel.Exchange; 026import org.apache.camel.ExchangePattern; 027import org.apache.camel.Expression; 028import org.apache.camel.NoTypeConversionAvailableException; 029import org.apache.camel.Processor; 030import org.apache.camel.Producer; 031import org.apache.camel.ResolveEndpointFailedException; 032import org.apache.camel.impl.EmptyProducerCache; 033import org.apache.camel.impl.ProducerCache; 034import org.apache.camel.spi.EndpointUtilizationStatistics; 035import org.apache.camel.spi.IdAware; 036import org.apache.camel.spi.SendDynamicAware; 037import org.apache.camel.support.ServiceSupport; 038import org.apache.camel.util.AsyncProcessorHelper; 039import org.apache.camel.util.EndpointHelper; 040import org.apache.camel.util.ExchangeHelper; 041import org.apache.camel.util.ServiceHelper; 042import org.apache.camel.util.URISupport; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Processor for forwarding exchanges to a dynamic endpoint destination. 048 * 049 * @see org.apache.camel.processor.SendProcessor 050 */ 051public class SendDynamicProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware { 052 protected static final Logger LOG = LoggerFactory.getLogger(SendDynamicProcessor.class); 053 protected SendDynamicAware dynamicAware; 054 protected CamelContext camelContext; 055 protected final String uri; 056 protected final Expression expression; 057 protected ExchangePattern pattern; 058 protected ProducerCache producerCache; 059 protected String id; 060 protected boolean ignoreInvalidEndpoint; 061 protected int cacheSize; 062 protected boolean allowOptimisedComponents = true; 063 064 public SendDynamicProcessor(Expression expression) { 065 this.uri = null; 066 this.expression = expression; 067 } 068 069 public SendDynamicProcessor(String uri, Expression expression) { 070 this.uri = uri; 071 this.expression = expression; 072 } 073 074 @Override 075 public String toString() { 076 return "sendTo(" + getExpression() + ")"; 077 } 078 079 public String getId() { 080 return id; 081 } 082 083 public void setId(String id) { 084 this.id = id; 085 } 086 087 public void process(final Exchange exchange) throws Exception { 088 AsyncProcessorHelper.process(this, exchange); 089 } 090 091 public boolean process(Exchange exchange, final AsyncCallback callback) { 092 if (!isStarted()) { 093 exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this)); 094 callback.done(true); 095 return true; 096 } 097 098 // we should preserve existing MEP so remember old MEP 099 // if you want to permanently to change the MEP then use .setExchangePattern in the DSL 100 final ExchangePattern existingPattern = exchange.getPattern(); 101 102 // which endpoint to send to 103 final Endpoint endpoint; 104 final ExchangePattern destinationExchangePattern; 105 106 // use dynamic endpoint so calculate the endpoint to use 107 Object recipient = null; 108 Processor preAwareProcessor = null; 109 Processor postAwareProcessor = null; 110 String staticUri = null; 111 try { 112 recipient = expression.evaluate(exchange, Object.class); 113 if (dynamicAware != null) { 114 // if its the same scheme as the pre-resolved dynamic aware then we can optimise to use it 115 String uri = resolveUri(exchange, recipient); 116 String scheme = resolveScheme(exchange, uri); 117 if (dynamicAware.getScheme().equals(scheme)) { 118 SendDynamicAware.DynamicAwareEntry entry = dynamicAware.prepare(exchange, uri); 119 if (entry != null) { 120 staticUri = dynamicAware.resolveStaticUri(exchange, entry); 121 preAwareProcessor = dynamicAware.createPreProcessor(exchange, entry); 122 postAwareProcessor = dynamicAware.createPostProcessor(exchange, entry); 123 if (staticUri != null) { 124 if (LOG.isDebugEnabled()) { 125 LOG.debug("Optimising toD via SendDynamicAware component: {} to use static uri: {}", scheme, URISupport.sanitizeUri(staticUri)); 126 } 127 } 128 } 129 } 130 } 131 if (staticUri != null) { 132 endpoint = resolveEndpoint(exchange, staticUri); 133 } else { 134 endpoint = resolveEndpoint(exchange, recipient); 135 } 136 if (endpoint == null) { 137 if (LOG.isDebugEnabled()) { 138 LOG.debug("Send dynamic evaluated as null so cannot send to any endpoint"); 139 } 140 // no endpoint to send to, so ignore 141 callback.done(true); 142 return true; 143 } 144 destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(endpoint.getEndpointUri()); 145 } catch (Throwable e) { 146 if (isIgnoreInvalidEndpoint()) { 147 if (LOG.isDebugEnabled()) { 148 LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e); 149 } 150 } else { 151 exchange.setException(e); 152 } 153 callback.done(true); 154 return true; 155 } 156 157 // send the exchange to the destination using the producer cache 158 final Processor preProcessor = preAwareProcessor; 159 final Processor postProcessor = postAwareProcessor; 160 return producerCache.doInAsyncProducer(endpoint, exchange, pattern, callback, new AsyncProducerCallback() { 161 public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange, 162 ExchangePattern pattern, final AsyncCallback callback) { 163 final Exchange target = configureExchange(exchange, pattern, destinationExchangePattern, endpoint); 164 165 try { 166 if (preProcessor != null) { 167 preProcessor.process(target); 168 } 169 } catch (Throwable e) { 170 exchange.setException(e); 171 // restore previous MEP 172 target.setPattern(existingPattern); 173 // we failed 174 callback.done(true); 175 } 176 177 LOG.debug(">>>> {} {}", endpoint, exchange); 178 return asyncProducer.process(target, new AsyncCallback() { 179 public void done(boolean doneSync) { 180 // restore previous MEP 181 target.setPattern(existingPattern); 182 try { 183 if (postProcessor != null) { 184 postProcessor.process(target); 185 } 186 } catch (Throwable e) { 187 target.setException(e); 188 } 189 // signal we are done 190 callback.done(doneSync); 191 } 192 }); 193 } 194 }); 195 } 196 197 protected static String resolveUri(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { 198 if (recipient == null) { 199 return null; 200 } 201 202 String uri; 203 // trim strings as end users might have added spaces between separators 204 if (recipient instanceof String) { 205 uri = ((String) recipient).trim(); 206 } else if (recipient instanceof Endpoint) { 207 uri = ((Endpoint) recipient).getEndpointKey(); 208 } else { 209 // convert to a string type we can work with 210 uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); 211 } 212 213 // in case path has property placeholders then try to let property component resolve those 214 try { 215 uri = exchange.getContext().resolvePropertyPlaceholders(uri); 216 } catch (Exception e) { 217 throw new ResolveEndpointFailedException(uri, e); 218 } 219 220 return uri; 221 } 222 223 protected static String resolveScheme(Exchange exchange, String uri) { 224 return ExchangeHelper.resolveScheme(uri); 225 } 226 227 protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException { 228 // trim strings as end users might have added spaces between separators 229 if (recipient instanceof String) { 230 recipient = ((String) recipient).trim(); 231 } else if (recipient instanceof Endpoint) { 232 return (Endpoint) recipient; 233 } else if (recipient != null) { 234 // convert to a string type we can work with 235 recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient); 236 } 237 238 if (recipient != null) { 239 return ExchangeHelper.resolveEndpoint(exchange, recipient); 240 } else { 241 return null; 242 } 243 } 244 245 protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern, ExchangePattern destinationExchangePattern, Endpoint endpoint) { 246 // destination exchange pattern overrides pattern 247 if (destinationExchangePattern != null) { 248 exchange.setPattern(destinationExchangePattern); 249 } else if (pattern != null) { 250 exchange.setPattern(pattern); 251 } 252 // set property which endpoint we send to 253 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); 254 return exchange; 255 } 256 257 protected void doStart() throws Exception { 258 if (producerCache == null) { 259 if (cacheSize < 0) { 260 producerCache = new EmptyProducerCache(this, camelContext); 261 LOG.debug("DynamicSendTo {} is not using ProducerCache", this); 262 } else if (cacheSize == 0) { 263 producerCache = new ProducerCache(this, camelContext); 264 LOG.debug("DynamicSendTo {} using ProducerCache with default cache size", this); 265 } else { 266 producerCache = new ProducerCache(this, camelContext, cacheSize); 267 LOG.debug("DynamicSendTo {} using ProducerCache with cacheSize={}", this, cacheSize); 268 } 269 } 270 271 if (isAllowOptimisedComponents() && uri != null) { 272 try { 273 // in case path has property placeholders then try to let property component resolve those 274 String u = camelContext.resolvePropertyPlaceholders(uri); 275 // find out which component it is 276 String scheme = ExchangeHelper.resolveScheme(u); 277 if (scheme != null) { 278 // find out if the component can be optimised for send-dynamic 279 SendDynamicAwareResolver resolver = new SendDynamicAwareResolver(); 280 dynamicAware = resolver.resolve(camelContext, scheme); 281 if (dynamicAware != null) { 282 if (LOG.isDebugEnabled()) { 283 LOG.debug("Detected SendDynamicAware component: {} optimising toD: {}", scheme, URISupport.sanitizeUri(uri)); 284 } 285 } 286 } 287 } catch (Throwable e) { 288 // ignore 289 if (LOG.isDebugEnabled()) { 290 LOG.debug("Error creating optimised SendDynamicAwareResolver for uri: " + URISupport.sanitizeUri(uri) 291 + " due to " + e.getMessage() + ". This exception is ignored", e); 292 } 293 } 294 } 295 296 ServiceHelper.startServices(producerCache); 297 } 298 299 protected void doStop() throws Exception { 300 ServiceHelper.stopServices(producerCache); 301 } 302 303 public EndpointUtilizationStatistics getEndpointUtilizationStatistics() { 304 return producerCache.getEndpointUtilizationStatistics(); 305 } 306 307 public CamelContext getCamelContext() { 308 return camelContext; 309 } 310 311 public void setCamelContext(CamelContext camelContext) { 312 this.camelContext = camelContext; 313 } 314 315 public SendDynamicAware getDynamicAware() { 316 return dynamicAware; 317 } 318 319 public String getUri() { 320 return uri; 321 } 322 323 public Expression getExpression() { 324 return expression; 325 } 326 327 public ExchangePattern getPattern() { 328 return pattern; 329 } 330 331 public void setPattern(ExchangePattern pattern) { 332 this.pattern = pattern; 333 } 334 335 public boolean isIgnoreInvalidEndpoint() { 336 return ignoreInvalidEndpoint; 337 } 338 339 public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) { 340 this.ignoreInvalidEndpoint = ignoreInvalidEndpoint; 341 } 342 343 public int getCacheSize() { 344 return cacheSize; 345 } 346 347 public void setCacheSize(int cacheSize) { 348 this.cacheSize = cacheSize; 349 } 350 351 public boolean isAllowOptimisedComponents() { 352 return allowOptimisedComponents; 353 } 354 355 public void setAllowOptimisedComponents(boolean allowOptimisedComponents) { 356 this.allowOptimisedComponents = allowOptimisedComponents; 357 } 358}