001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import org.apache.camel.AsyncCallback;
020import org.apache.camel.AsyncProcessor;
021import org.apache.camel.CamelContext;
022import org.apache.camel.CamelContextAware;
023import org.apache.camel.CamelExchangeException;
024import org.apache.camel.Endpoint;
025import org.apache.camel.Exchange;
026import org.apache.camel.ExchangePattern;
027import org.apache.camel.Expression;
028import org.apache.camel.Producer;
029import org.apache.camel.impl.DefaultExchange;
030import org.apache.camel.impl.EmptyProducerCache;
031import org.apache.camel.impl.ProducerCache;
032import org.apache.camel.processor.aggregate.AggregationStrategy;
033import org.apache.camel.spi.EndpointUtilizationStatistics;
034import org.apache.camel.spi.IdAware;
035import org.apache.camel.support.ServiceSupport;
036import org.apache.camel.util.AsyncProcessorConverterHelper;
037import org.apache.camel.util.AsyncProcessorHelper;
038import org.apache.camel.util.EventHelper;
039import org.apache.camel.util.ExchangeHelper;
040import org.apache.camel.util.ServiceHelper;
041import org.apache.camel.util.StopWatch;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
046
047/**
048 * A content enricher that enriches input data by first obtaining additional
049 * data from a <i>resource</i> represented by an endpoint <code>producer</code>
050 * and second by aggregating input data and additional data. Aggregation of
051 * input data and additional data is delegated to an {@link AggregationStrategy}
052 * object.
053 * <p/>
054 * Uses a {@link org.apache.camel.Producer} to obtain the additional data as opposed to {@link PollEnricher}
055 * that uses a {@link org.apache.camel.PollingConsumer}.
056 *
057 * @see PollEnricher
058 */
059public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
060
061    private static final Logger LOG = LoggerFactory.getLogger(Enricher.class);
062    private CamelContext camelContext;
063    private String id;
064    private ProducerCache producerCache;
065    private final Expression expression;
066    private AggregationStrategy aggregationStrategy;
067    private boolean aggregateOnException;
068    private boolean shareUnitOfWork;
069    private int cacheSize;
070    private boolean ignoreInvalidEndpoint;
071
072    public Enricher(Expression expression) {
073        this.expression = expression;
074    }
075
076    public CamelContext getCamelContext() {
077        return camelContext;
078    }
079
080    public void setCamelContext(CamelContext camelContext) {
081        this.camelContext = camelContext;
082    }
083
084    public String getId() {
085        return id;
086    }
087
088    public void setId(String id) {
089        this.id = id;
090    }
091
092    public Expression getExpression() {
093        return expression;
094    }
095
096    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
097        return producerCache.getEndpointUtilizationStatistics();
098    }
099
100    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
101        this.aggregationStrategy = aggregationStrategy;
102    }
103
104    public AggregationStrategy getAggregationStrategy() {
105        return aggregationStrategy;
106    }
107
108    public boolean isAggregateOnException() {
109        return aggregateOnException;
110    }
111
112    public void setAggregateOnException(boolean aggregateOnException) {
113        this.aggregateOnException = aggregateOnException;
114    }
115
116    public boolean isShareUnitOfWork() {
117        return shareUnitOfWork;
118    }
119
120    public void setShareUnitOfWork(boolean shareUnitOfWork) {
121        this.shareUnitOfWork = shareUnitOfWork;
122    }
123
124    public int getCacheSize() {
125        return cacheSize;
126    }
127
128    public void setCacheSize(int cacheSize) {
129        this.cacheSize = cacheSize;
130    }
131
132    public boolean isIgnoreInvalidEndpoint() {
133        return ignoreInvalidEndpoint;
134    }
135
136    public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) {
137        this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
138    }
139
140    public void process(Exchange exchange) throws Exception {
141        AsyncProcessorHelper.process(this, exchange);
142    }
143
144    /**
145     * Enriches the input data (<code>exchange</code>) by first obtaining
146     * additional data from an endpoint represented by an endpoint
147     * <code>producer</code> and second by aggregating input data and additional
148     * data. Aggregation of input data and additional data is delegated to an
149     * {@link AggregationStrategy} object set at construction time. If the
150     * message exchange with the resource endpoint fails then no aggregation
151     * will be done and the failed exchange content is copied over to the
152     * original message exchange.
153     *
154     * @param exchange input data.
155     */
156    public boolean process(final Exchange exchange, final AsyncCallback callback) {
157        // which producer to use
158        final Producer producer;
159        final Endpoint endpoint;
160
161        // use dynamic endpoint so calculate the endpoint to use
162        Object recipient = null;
163        try {
164            recipient = expression.evaluate(exchange, Object.class);
165            endpoint = resolveEndpoint(exchange, recipient);
166            // acquire the consumer from the cache
167            producer = producerCache.acquireProducer(endpoint);
168        } catch (Throwable e) {
169            if (isIgnoreInvalidEndpoint()) {
170                if (LOG.isDebugEnabled()) {
171                    LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
172                }
173            } else {
174                exchange.setException(e);
175            }
176            callback.done(true);
177            return true;
178        }
179
180        final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
181        final Endpoint destination = producer.getEndpoint();
182
183        EventHelper.notifyExchangeSending(exchange.getContext(), resourceExchange, destination);
184        // record timing for sending the exchange using the producer
185        final StopWatch watch = new StopWatch();
186        AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer);
187        boolean sync = ap.process(resourceExchange, new AsyncCallback() {
188            public void done(boolean doneSync) {
189                // we only have to handle async completion of the routing slip
190                if (doneSync) {
191                    return;
192                }
193
194                // emit event that the exchange was sent to the endpoint
195                long timeTaken = watch.stop();
196                EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, destination, timeTaken);
197                
198                if (!isAggregateOnException() && resourceExchange.isFailed()) {
199                    // copy resource exchange onto original exchange (preserving pattern)
200                    copyResultsPreservePattern(exchange, resourceExchange);
201                } else {
202                    prepareResult(exchange);
203                    try {
204                        // prepare the exchanges for aggregation
205                        ExchangeHelper.prepareAggregation(exchange, resourceExchange);
206
207                        Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
208                        if (aggregatedExchange != null) {
209                            // copy aggregation result onto original exchange (preserving pattern)
210                            copyResultsPreservePattern(exchange, aggregatedExchange);
211                        }
212                    } catch (Throwable e) {
213                        // if the aggregationStrategy threw an exception, set it on the original exchange
214                        exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
215                        callback.done(false);
216                        // we failed so break out now
217                        return;
218                    }
219                }
220
221                // set property with the uri of the endpoint enriched so we can use that for tracing etc
222                exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
223
224                // return the producer back to the cache
225                try {
226                    producerCache.releaseProducer(endpoint, producer);
227                } catch (Exception e) {
228                    // ignore
229                }
230
231                callback.done(false);
232            }
233        });
234
235        if (!sync) {
236            LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
237            // the remainder of the routing slip will be completed async
238            // so we break out now, then the callback will be invoked which then continue routing from where we left here
239            return false;
240        }
241
242        LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
243
244        // emit event that the exchange was sent to the endpoint
245        long timeTaken = watch.stop();
246        EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, destination, timeTaken);
247        
248        if (!isAggregateOnException() && resourceExchange.isFailed()) {
249            // copy resource exchange onto original exchange (preserving pattern)
250            copyResultsPreservePattern(exchange, resourceExchange);
251        } else {
252            prepareResult(exchange);
253
254            try {
255                // prepare the exchanges for aggregation
256                ExchangeHelper.prepareAggregation(exchange, resourceExchange);
257
258                Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
259                if (aggregatedExchange != null) {
260                    // copy aggregation result onto original exchange (preserving pattern)
261                    copyResultsPreservePattern(exchange, aggregatedExchange);
262                }
263            } catch (Throwable e) {
264                // if the aggregationStrategy threw an exception, set it on the original exchange
265                exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
266                callback.done(true);
267                // we failed so break out now
268                return true;
269            }
270        }
271
272        // set property with the uri of the endpoint enriched so we can use that for tracing etc
273        exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
274
275        // return the producer back to the cache
276        try {
277            producerCache.releaseProducer(endpoint, producer);
278        } catch (Exception e) {
279            // ignore
280        }
281
282        callback.done(true);
283        return true;
284    }
285
286    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
287        // trim strings as end users might have added spaces between separators
288        if (recipient instanceof String) {
289            recipient = ((String)recipient).trim();
290        }
291        return ExchangeHelper.resolveEndpoint(exchange, recipient);
292    }
293
294    /**
295     * Creates a new {@link DefaultExchange} instance from the given
296     * <code>exchange</code>. The resulting exchange's pattern is defined by
297     * <code>pattern</code>.
298     *
299     * @param source  exchange to copy from.
300     * @param pattern exchange pattern to set.
301     * @return created exchange.
302     */
303    protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) {
304        // copy exchange, and do not share the unit of work
305        Exchange target = ExchangeHelper.createCorrelatedCopy(source, false);
306        target.setPattern(pattern);
307
308        // if we share unit of work, we need to prepare the resource exchange
309        if (isShareUnitOfWork()) {
310            target.setProperty(Exchange.PARENT_UNIT_OF_WORK, source.getUnitOfWork());
311            // and then share the unit of work
312            target.setUnitOfWork(source.getUnitOfWork());
313        }
314        return target;
315    }
316
317    private static void prepareResult(Exchange exchange) {
318        if (exchange.getPattern().isOutCapable()) {
319            exchange.getOut().copyFrom(exchange.getIn());
320        }
321    }
322
323    private static AggregationStrategy defaultAggregationStrategy() {
324        return new CopyAggregationStrategy();
325    }
326
327    @Override
328    public String toString() {
329        return "Enrich[" + expression + "]";
330    }
331
332    protected void doStart() throws Exception {
333        if (aggregationStrategy == null) {
334            aggregationStrategy = defaultAggregationStrategy();
335        }
336
337        if (producerCache == null) {
338            if (cacheSize < 0) {
339                producerCache = new EmptyProducerCache(this, camelContext);
340                LOG.debug("Enricher {} is not using ProducerCache", this);
341            } else if (cacheSize == 0) {
342                producerCache = new ProducerCache(this, camelContext);
343                LOG.debug("Enricher {} using ProducerCache with default cache size", this);
344            } else {
345                producerCache = new ProducerCache(this, camelContext, cacheSize);
346                LOG.debug("Enricher {} using ProducerCache with cacheSize={}", this, cacheSize);
347            }
348        }
349
350        ServiceHelper.startServices(producerCache, aggregationStrategy);
351    }
352
353    protected void doStop() throws Exception {
354        ServiceHelper.stopServices(aggregationStrategy, producerCache);
355    }
356
357    private static class CopyAggregationStrategy implements AggregationStrategy {
358
359        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
360            if (newExchange != null) {
361                copyResultsPreservePattern(oldExchange, newExchange);
362            }
363            return oldExchange;
364        }
365
366    }
367
368}