001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import org.apache.camel.AsyncCallback;
020import org.apache.camel.AsyncProcessor;
021import org.apache.camel.AsyncProducerCallback;
022import org.apache.camel.CamelContext;
023import org.apache.camel.CamelContextAware;
024import org.apache.camel.Endpoint;
025import org.apache.camel.Exchange;
026import org.apache.camel.ExchangePattern;
027import org.apache.camel.Expression;
028import org.apache.camel.NoTypeConversionAvailableException;
029import org.apache.camel.Producer;
030import org.apache.camel.impl.EmptyProducerCache;
031import org.apache.camel.impl.ProducerCache;
032import org.apache.camel.spi.EndpointUtilizationStatistics;
033import org.apache.camel.spi.IdAware;
034import org.apache.camel.support.ServiceSupport;
035import org.apache.camel.util.AsyncProcessorHelper;
036import org.apache.camel.util.EndpointHelper;
037import org.apache.camel.util.ExchangeHelper;
038import org.apache.camel.util.ServiceHelper;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042/**
043 * Processor for forwarding exchanges to a dynamic endpoint destination.
044 *
045 * @see org.apache.camel.processor.SendProcessor
046 */
047public class SendDynamicProcessor extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
048    protected static final Logger LOG = LoggerFactory.getLogger(SendDynamicProcessor.class);
049    protected CamelContext camelContext;
050    protected final String uri;
051    protected final Expression expression;
052    protected ExchangePattern pattern;
053    protected ProducerCache producerCache;
054    protected String id;
055    protected boolean ignoreInvalidEndpoint;
056    protected int cacheSize;
057
058    public SendDynamicProcessor(Expression expression) {
059        this.uri = null;
060        this.expression = expression;
061    }
062
063    public SendDynamicProcessor(String uri, Expression expression) {
064        this.uri = uri;
065        this.expression = expression;
066    }
067
068    @Override
069    public String toString() {
070        return "sendTo(" + getExpression() + ")";
071    }
072
073    public String getId() {
074        return id;
075    }
076
077    public void setId(String id) {
078        this.id = id;
079    }
080
081    public void process(final Exchange exchange) throws Exception {
082        AsyncProcessorHelper.process(this, exchange);
083    }
084
085    public boolean process(Exchange exchange, final AsyncCallback callback) {
086        if (!isStarted()) {
087            exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this));
088            callback.done(true);
089            return true;
090        }
091
092        // we should preserve existing MEP so remember old MEP
093        // if you want to permanently to change the MEP then use .setExchangePattern in the DSL
094        final ExchangePattern existingPattern = exchange.getPattern();
095
096        // which endpoint to send to
097        final Endpoint endpoint;
098        final ExchangePattern destinationExchangePattern;
099
100        // use dynamic endpoint so calculate the endpoint to use
101        Object recipient = null;
102        try {
103            recipient = expression.evaluate(exchange, Object.class);
104            endpoint = resolveEndpoint(exchange, recipient);
105            destinationExchangePattern = EndpointHelper.resolveExchangePatternFromUrl(endpoint.getEndpointUri());
106        } catch (Throwable e) {
107            if (isIgnoreInvalidEndpoint()) {
108                if (LOG.isDebugEnabled()) {
109                    LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
110                }
111            } else {
112                exchange.setException(e);
113            }
114            callback.done(true);
115            return true;
116        }
117
118        // send the exchange to the destination using the producer cache
119        return producerCache.doInAsyncProducer(endpoint, exchange, pattern, callback, new AsyncProducerCallback() {
120            public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange,
121                                             ExchangePattern pattern, final AsyncCallback callback) {
122                final Exchange target = configureExchange(exchange, pattern, destinationExchangePattern, endpoint);
123                LOG.debug(">>>> {} {}", endpoint, exchange);
124                return asyncProducer.process(target, new AsyncCallback() {
125                    public void done(boolean doneSync) {
126                        // restore previous MEP
127                        target.setPattern(existingPattern);
128                        // signal we are done
129                        callback.done(doneSync);
130                    }
131                });
132            }
133        });
134    }
135
136    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
137        // trim strings as end users might have added spaces between separators
138        if (recipient instanceof String) {
139            recipient = ((String) recipient).trim();
140        } else if (recipient instanceof Endpoint) {
141            return (Endpoint) recipient;
142        } else {
143            // convert to a string type we can work with
144            recipient = exchange.getContext().getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
145        }
146
147        return ExchangeHelper.resolveEndpoint(exchange, recipient);
148    }
149
150    protected Exchange configureExchange(Exchange exchange, ExchangePattern pattern, ExchangePattern destinationExchangePattern, Endpoint endpoint) {
151        // destination exchange pattern overrides pattern
152        if (destinationExchangePattern != null) {
153            exchange.setPattern(destinationExchangePattern);
154        } else if (pattern != null) {
155            exchange.setPattern(pattern);
156        }
157        // set property which endpoint we send to
158        exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
159        return exchange;
160    }
161
162    protected void doStart() throws Exception {
163        if (producerCache == null) {
164            if (cacheSize < 0) {
165                producerCache = new EmptyProducerCache(this, camelContext);
166                LOG.debug("DynamicSendTo {} is not using ProducerCache", this);
167            } else if (cacheSize == 0) {
168                producerCache = new ProducerCache(this, camelContext);
169                LOG.debug("DynamicSendTo {} using ProducerCache with default cache size", this);
170            } else {
171                producerCache = new ProducerCache(this, camelContext, cacheSize);
172                LOG.debug("DynamicSendTo {} using ProducerCache with cacheSize={}", this, cacheSize);
173            }
174        }
175        ServiceHelper.startService(producerCache);
176    }
177
178    protected void doStop() throws Exception {
179        ServiceHelper.stopServices(producerCache);
180    }
181
182    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
183        return producerCache.getEndpointUtilizationStatistics();
184    }
185
186    public CamelContext getCamelContext() {
187        return camelContext;
188    }
189
190    public void setCamelContext(CamelContext camelContext) {
191        this.camelContext = camelContext;
192    }
193
194    public String getUri() {
195        return uri;
196    }
197
198    public Expression getExpression() {
199        return expression;
200    }
201
202    public ExchangePattern getPattern() {
203        return pattern;
204    }
205
206    public void setPattern(ExchangePattern pattern) {
207        this.pattern = pattern;
208    }
209
210    public boolean isIgnoreInvalidEndpoint() {
211        return ignoreInvalidEndpoint;
212    }
213
214    public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) {
215        this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
216    }
217
218    public int getCacheSize() {
219        return cacheSize;
220    }
221
222    public void setCacheSize(int cacheSize) {
223        this.cacheSize = cacheSize;
224    }
225}