001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import org.apache.camel.AsyncCallback;
020import org.apache.camel.AsyncProcessor;
021import org.apache.camel.AsyncProducerCallback;
022import org.apache.camel.CamelContext;
023import org.apache.camel.CamelContextAware;
024import org.apache.camel.Endpoint;
025import org.apache.camel.Exchange;
026import org.apache.camel.ExchangePattern;
027import org.apache.camel.Expression;
028import org.apache.camel.NoTypeConversionAvailableException;
029import org.apache.camel.Producer;
030import org.apache.camel.impl.EmptyProducerCache;
031import org.apache.camel.impl.ProducerCache;
032import org.apache.camel.spi.EndpointUtilizationStatistics;
033import org.apache.camel.spi.IdAware;
034import org.apache.camel.support.ServiceSupport;
035import org.apache.camel.util.AsyncProcessorHelper;
036import org.apache.camel.util.EndpointHelper;
037import org.apache.camel.util.ExchangeHelper;
038import org.apache.camel.util.ServiceHelper;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Processor for forwarding exchanges to a dynamic endpoint destination.
044 *
045 * @see org.apache.camel.processor.SendProcessor
046 */
047public class SendDynamicProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
048    protected static final Logger LOG = LoggerFactory.getLogger(SendDynamicProcessor.class);
049    protected CamelContext camelContext;
050    protected final String uri;
051    protected final Expression expression;
052    protected ExchangePattern pattern;
053    protected ProducerCache producerCache;
054    protected String id;
055    protected boolean ignoreInvalidEndpoint;
056    protected int cacheSize;
057
058    public SendDynamicProcessor(Expression expression) {
059        this.uri = null;
060        this.expression = expression;
061    }
062
063    public SendDynamicProcessor(String uri, Expression expression) {
064        this.uri = uri;
065        this.expression = expression;
066    }
067
068    @Override
069    public String toString() {
070        return "sendTo(" + getExpression() + ")";
071    }
072
073    public String getId() {
074        return id;
075    }
076
077    public void setId(String id) {
078        this.id = id;
079    }
080
081    public void process(final Exchange exchange) throws Exception {
082        AsyncProcessorHelper.process(this, exchange);
083    }
084
085    public boolean process(Exchange exchange, final AsyncCallback callback) {
086        if (!isStarted()) {
087            exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this));
088            callback.done(true);
089            return true;
090        }
091
092        // we should preserve existing MEP so remember old MEP
093        // if you want to permanently to change the MEP then use .setExchangePattern in the DSL
094        final ExchangePattern existingPattern = exchange.getPattern();
095
096        // which endpoint to send to
097        final Endpoint endpoint;
098        final ExchangePattern destinationExchangePattern;
099
100        // use dynamic endpoint so calculate the endpoint to use
101        Object recipient = null;
102        try {
103            recipient = expression.evaluate(exchange, Object.class);
104            endpoint = resolveEndpoint(exchange, recipient);
105            if (endpoint == null) {
106                if (LOG.isDebugEnabled()) {
107                    LOG.debug("Send dynamic evaluated as null so cannot send to any endpoint");
108                }
109                // no endpoint to send to, so ignore
110                callback.done(true);
111                return true;
112            }
113            destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(endpoint.getEndpointUri());
114        } catch (Throwable e) {
115            if (isIgnoreInvalidEndpoint()) {
116                if (LOG.isDebugEnabled()) {
117                    LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
118                }
119            } else {
120                exchange.setException(e);
121            }
122            callback.done(true);
123            return true;
124        }
125
126        // send the exchange to the destination using the producer cache
127        return producerCache.doInAsyncProducer(endpoint, exchange, pattern, callback, new AsyncProducerCallback() {
128            public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange,
129                                             ExchangePattern pattern, final AsyncCallback callback) {
130                final Exchange target = configureExchange(exchange, pattern, destinationExchangePattern, endpoint);
131                LOG.debug(">>>> {} {}", endpoint, exchange);
132                return asyncProducer.process(target, new AsyncCallback() {
133                    public void done(boolean doneSync) {
134                        // restore previous MEP
135                        target.setPattern(existingPattern);
136                        // signal we are done
137                        callback.done(doneSync);
138                    }
139                });
140            }
141        });
142    }
143
144    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
145        // trim strings as end users might have added spaces between separators
146        if (recipient instanceof String) {
147            recipient = ((String) recipient).trim();
148        } else if (recipient instanceof Endpoint) {
149            return (Endpoint) recipient;
150        } else if (recipient != null) {
151            // convert to a string type we can work with
152            recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
153        }
154
155        if (recipient != null) {
156            return ExchangeHelper.resolveEndpoint(exchange, recipient);
157        } else {
158            return null;
159        }
160    }
161
162    protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern, ExchangePattern destinationExchangePattern, Endpoint endpoint) {
163        // destination exchange pattern overrides pattern
164        if (destinationExchangePattern != null) {
165            exchange.setPattern(destinationExchangePattern);
166        } else if (pattern != null) {
167            exchange.setPattern(pattern);
168        }
169        // set property which endpoint we send to
170        exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
171        return exchange;
172    }
173
174    protected void doStart() throws Exception {
175        if (producerCache == null) {
176            if (cacheSize < 0) {
177                producerCache = new EmptyProducerCache(this, camelContext);
178                LOG.debug("DynamicSendTo {} is not using ProducerCache", this);
179            } else if (cacheSize == 0) {
180                producerCache = new ProducerCache(this, camelContext);
181                LOG.debug("DynamicSendTo {} using ProducerCache with default cache size", this);
182            } else {
183                producerCache = new ProducerCache(this, camelContext, cacheSize);
184                LOG.debug("DynamicSendTo {} using ProducerCache with cacheSize={}", this, cacheSize);
185            }
186        }
187        ServiceHelper.startService(producerCache);
188    }
189
190    protected void doStop() throws Exception {
191        ServiceHelper.stopServices(producerCache);
192    }
193
194    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
195        return producerCache.getEndpointUtilizationStatistics();
196    }
197
198    public CamelContext getCamelContext() {
199        return camelContext;
200    }
201
202    public void setCamelContext(CamelContext camelContext) {
203        this.camelContext = camelContext;
204    }
205
206    public String getUri() {
207        return uri;
208    }
209
210    public Expression getExpression() {
211        return expression;
212    }
213
214    public ExchangePattern getPattern() {
215        return pattern;
216    }
217
218    public void setPattern(ExchangePattern pattern) {
219        this.pattern = pattern;
220    }
221
222    public boolean isIgnoreInvalidEndpoint() {
223        return ignoreInvalidEndpoint;
224    }
225
226    public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) {
227        this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
228    }
229
230    public int getCacheSize() {
231        return cacheSize;
232    }
233
234    public void setCacheSize(int cacheSize) {
235        this.cacheSize = cacheSize;
236    }
237}