001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import org.apache.camel.AsyncCallback;
020import org.apache.camel.AsyncProcessor;
021import org.apache.camel.AsyncProducerCallback;
022import org.apache.camel.CamelContext;
023import org.apache.camel.CamelContextAware;
024import org.apache.camel.Endpoint;
025import org.apache.camel.Exchange;
026import org.apache.camel.ExchangePattern;
027import org.apache.camel.Expression;
028import org.apache.camel.NoTypeConversionAvailableException;
029import org.apache.camel.Processor;
030import org.apache.camel.Producer;
031import org.apache.camel.ResolveEndpointFailedException;
032import org.apache.camel.impl.EmptyProducerCache;
033import org.apache.camel.impl.ProducerCache;
034import org.apache.camel.spi.EndpointUtilizationStatistics;
035import org.apache.camel.spi.IdAware;
036import org.apache.camel.spi.SendDynamicAware;
037import org.apache.camel.support.ServiceSupport;
038import org.apache.camel.util.AsyncProcessorHelper;
039import org.apache.camel.util.EndpointHelper;
040import org.apache.camel.util.ExchangeHelper;
041import org.apache.camel.util.ServiceHelper;
042import org.apache.camel.util.URISupport;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * Processor for forwarding exchanges to a dynamic endpoint destination.
048 *
049 * @see org.apache.camel.processor.SendProcessor
050 */
051public class SendDynamicProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
052    protected static final Logger LOG = LoggerFactory.getLogger(SendDynamicProcessor.class);
053    protected SendDynamicAware dynamicAware;
054    protected CamelContext camelContext;
055    protected final String uri;
056    protected final Expression expression;
057    protected ExchangePattern pattern;
058    protected ProducerCache producerCache;
059    protected String id;
060    protected boolean ignoreInvalidEndpoint;
061    protected int cacheSize;
062    protected boolean allowOptimisedComponents = true;
063
064    public SendDynamicProcessor(Expression expression) {
065        this.uri = null;
066        this.expression = expression;
067    }
068
069    public SendDynamicProcessor(String uri, Expression expression) {
070        this.uri = uri;
071        this.expression = expression;
072    }
073
074    @Override
075    public String toString() {
076        return "sendTo(" + getExpression() + ")";
077    }
078
079    public String getId() {
080        return id;
081    }
082
083    public void setId(String id) {
084        this.id = id;
085    }
086
087    public void process(final Exchange exchange) throws Exception {
088        AsyncProcessorHelper.process(this, exchange);
089    }
090
091    public boolean process(Exchange exchange, final AsyncCallback callback) {
092        if (!isStarted()) {
093            exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this));
094            callback.done(true);
095            return true;
096        }
097
098        // we should preserve existing MEP so remember old MEP
099        // if you want to permanently to change the MEP then use .setExchangePattern in the DSL
100        final ExchangePattern existingPattern = exchange.getPattern();
101
102        // which endpoint to send to
103        final Endpoint endpoint;
104        final ExchangePattern destinationExchangePattern;
105
106        // use dynamic endpoint so calculate the endpoint to use
107        Object recipient = null;
108        Processor preAwareProcessor = null;
109        Processor postAwareProcessor = null;
110        String staticUri = null;
111        try {
112            recipient = expression.evaluate(exchange, Object.class);
113            if (dynamicAware != null) {
114                // if its the same scheme as the pre-resolved dynamic aware then we can optimise to use it
115                String uri = resolveUri(exchange, recipient);
116                String scheme = resolveScheme(exchange, uri);
117                if (dynamicAware.getScheme().equals(scheme)) {
118                    SendDynamicAware.DynamicAwareEntry entry = dynamicAware.prepare(exchange, uri);
119                    if (entry != null) {
120                        staticUri = dynamicAware.resolveStaticUri(exchange, entry);
121                        preAwareProcessor = dynamicAware.createPreProcessor(exchange, entry);
122                        postAwareProcessor = dynamicAware.createPostProcessor(exchange, entry);
123                        if (staticUri != null) {
124                            if (LOG.isDebugEnabled()) {
125                                LOG.debug("Optimising toD via SendDynamicAware component: {} to use static uri: {}", scheme, URISupport.sanitizeUri(staticUri));
126                            }
127                        }
128                    }
129                }
130            }
131            if (staticUri != null) {
132                endpoint = resolveEndpoint(exchange, staticUri);
133            } else {
134                endpoint = resolveEndpoint(exchange, recipient);
135            }
136            if (endpoint == null) {
137                if (LOG.isDebugEnabled()) {
138                    LOG.debug("Send dynamic evaluated as null so cannot send to any endpoint");
139                }
140                // no endpoint to send to, so ignore
141                callback.done(true);
142                return true;
143            }
144            destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(endpoint.getEndpointUri());
145        } catch (Throwable e) {
146            if (isIgnoreInvalidEndpoint()) {
147                if (LOG.isDebugEnabled()) {
148                    LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
149                }
150            } else {
151                exchange.setException(e);
152            }
153            callback.done(true);
154            return true;
155        }
156
157        // send the exchange to the destination using the producer cache
158        final Processor preProcessor = preAwareProcessor;
159        final Processor postProcessor = postAwareProcessor;
160        return producerCache.doInAsyncProducer(endpoint, exchange, pattern, callback, new AsyncProducerCallback() {
161            public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange,
162                                             ExchangePattern pattern, final AsyncCallback callback) {
163                final Exchange target = configureExchange(exchange, pattern, destinationExchangePattern, endpoint);
164
165                try {
166                    if (preProcessor != null) {
167                        preProcessor.process(target);
168                    }
169                } catch (Throwable e) {
170                    exchange.setException(e);
171                    // restore previous MEP
172                    target.setPattern(existingPattern);
173                    // we failed
174                    callback.done(true);
175                }
176
177                LOG.debug(">>>> {} {}", endpoint, exchange);
178                return asyncProducer.process(target, new AsyncCallback() {
179                    public void done(boolean doneSync) {
180                        // restore previous MEP
181                        target.setPattern(existingPattern);
182                        try {
183                            if (postProcessor != null) {
184                                postProcessor.process(target);
185                            }
186                        } catch (Throwable e) {
187                            target.setException(e);
188                        }
189                        // signal we are done
190                        callback.done(doneSync);
191                    }
192                });
193            }
194        });
195    }
196
197    protected static String resolveUri(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
198        if (recipient == null) {
199            return null;
200        }
201
202        String uri;
203        // trim strings as end users might have added spaces between separators
204        if (recipient instanceof String) {
205            uri = ((String) recipient).trim();
206        } else if (recipient instanceof Endpoint) {
207            uri = ((Endpoint) recipient).getEndpointKey();
208        } else {
209            // convert to a string type we can work with
210            uri = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
211        }
212
213        // in case path has property placeholders then try to let property component resolve those
214        try {
215            uri = exchange.getContext().resolvePropertyPlaceholders(uri);
216        } catch (Exception e) {
217            throw new ResolveEndpointFailedException(uri, e);
218        }
219
220        return uri;
221    }
222
223    protected static String resolveScheme(Exchange exchange, String uri) {
224        return ExchangeHelper.resolveScheme(uri);
225    }
226
227    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
228        // trim strings as end users might have added spaces between separators
229        if (recipient instanceof String) {
230            recipient = ((String) recipient).trim();
231        } else if (recipient instanceof Endpoint) {
232            return (Endpoint) recipient;
233        } else if (recipient != null) {
234            // convert to a string type we can work with
235            recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
236        }
237
238        if (recipient != null) {
239            return ExchangeHelper.resolveEndpoint(exchange, recipient);
240        } else {
241            return null;
242        }
243    }
244
245    protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern, ExchangePattern destinationExchangePattern, Endpoint endpoint) {
246        // destination exchange pattern overrides pattern
247        if (destinationExchangePattern != null) {
248            exchange.setPattern(destinationExchangePattern);
249        } else if (pattern != null) {
250            exchange.setPattern(pattern);
251        }
252        // set property which endpoint we send to
253        exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
254        return exchange;
255    }
256
257    protected void doStart() throws Exception {
258        if (producerCache == null) {
259            if (cacheSize < 0) {
260                producerCache = new EmptyProducerCache(this, camelContext);
261                LOG.debug("DynamicSendTo {} is not using ProducerCache", this);
262            } else if (cacheSize == 0) {
263                producerCache = new ProducerCache(this, camelContext);
264                LOG.debug("DynamicSendTo {} using ProducerCache with default cache size", this);
265            } else {
266                producerCache = new ProducerCache(this, camelContext, cacheSize);
267                LOG.debug("DynamicSendTo {} using ProducerCache with cacheSize={}", this, cacheSize);
268            }
269        }
270
271        if (isAllowOptimisedComponents() && uri != null) {
272            try {
273                // in case path has property placeholders then try to let property component resolve those
274                String u = camelContext.resolvePropertyPlaceholders(uri);
275                // find out which component it is
276                String scheme = ExchangeHelper.resolveScheme(u);
277                if (scheme != null) {
278                    // find out if the component can be optimised for send-dynamic
279                    SendDynamicAwareResolver resolver = new SendDynamicAwareResolver();
280                    dynamicAware = resolver.resolve(camelContext, scheme);
281                    if (dynamicAware != null) {
282                        if (LOG.isDebugEnabled()) {
283                            LOG.debug("Detected SendDynamicAware component: {} optimising toD: {}", scheme, URISupport.sanitizeUri(uri));
284                        }
285                    }
286                }
287            } catch (Throwable e) {
288                // ignore
289                if (LOG.isDebugEnabled()) {
290                    LOG.debug("Error creating optimised SendDynamicAwareResolver for uri: " + URISupport.sanitizeUri(uri)
291                        + " due to " + e.getMessage() + ". This exception is ignored", e);
292                }
293            }
294        }
295
296        ServiceHelper.startServices(producerCache);
297    }
298
299    protected void doStop() throws Exception {
300        ServiceHelper.stopServices(producerCache);
301    }
302
303    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
304        return producerCache.getEndpointUtilizationStatistics();
305    }
306
307    public CamelContext getCamelContext() {
308        return camelContext;
309    }
310
311    public void setCamelContext(CamelContext camelContext) {
312        this.camelContext = camelContext;
313    }
314
315    public SendDynamicAware getDynamicAware() {
316        return dynamicAware;
317    }
318
319    public String getUri() {
320        return uri;
321    }
322
323    public Expression getExpression() {
324        return expression;
325    }
326
327    public ExchangePattern getPattern() {
328        return pattern;
329    }
330
331    public void setPattern(ExchangePattern pattern) {
332        this.pattern = pattern;
333    }
334
335    public boolean isIgnoreInvalidEndpoint() {
336        return ignoreInvalidEndpoint;
337    }
338
339    public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) {
340        this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
341    }
342
343    public int getCacheSize() {
344        return cacheSize;
345    }
346
347    public void setCacheSize(int cacheSize) {
348        this.cacheSize = cacheSize;
349    }
350
351    public boolean isAllowOptimisedComponents() {
352        return allowOptimisedComponents;
353    }
354
355    public void setAllowOptimisedComponents(boolean allowOptimisedComponents) {
356        this.allowOptimisedComponents = allowOptimisedComponents;
357    }
358}