001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import org.apache.camel.AsyncCallback;
020import org.apache.camel.AsyncProcessor;
021import org.apache.camel.CamelContext;
022import org.apache.camel.CamelContextAware;
023import org.apache.camel.CamelExchangeException;
024import org.apache.camel.Endpoint;
025import org.apache.camel.Exchange;
026import org.apache.camel.Expression;
027import org.apache.camel.PollingConsumer;
028import org.apache.camel.impl.ConsumerCache;
029import org.apache.camel.impl.EmptyConsumerCache;
030import org.apache.camel.processor.aggregate.AggregationStrategy;
031import org.apache.camel.spi.EndpointUtilizationStatistics;
032import org.apache.camel.spi.IdAware;
033import org.apache.camel.support.ServiceSupport;
034import org.apache.camel.util.AsyncProcessorHelper;
035import org.apache.camel.util.ExchangeHelper;
036import org.apache.camel.util.ServiceHelper;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
041
042/**
043 * A content enricher that enriches input data by first obtaining additional
044 * data from a <i>resource</i> represented by an endpoint <code>producer</code>
045 * and second by aggregating input data and additional data. Aggregation of
046 * input data and additional data is delegated to an {@link org.apache.camel.processor.aggregate.AggregationStrategy}
047 * object.
048 * <p/>
049 * Uses a {@link org.apache.camel.PollingConsumer} to obtain the additional data as opposed to {@link Enricher}
050 * that uses a {@link org.apache.camel.Producer}.
051 *
052 * @see Enricher
053 */
054public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
055
056    private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class);
057    private CamelContext camelContext;
058    private ConsumerCache consumerCache;
059    private String id;
060    private AggregationStrategy aggregationStrategy;
061    private final Expression expression;
062    private long timeout;
063    private boolean aggregateOnException;
064    private int cacheSize;
065    private boolean ignoreInvalidEndpoint;
066
067    /**
068     * Creates a new {@link PollEnricher}.
069     *
070     * @param expression expression to use to compute the endpoint to poll from.
071     * @param timeout timeout in millis
072     */
073    public PollEnricher(Expression expression, long timeout) {
074        this.expression = expression;
075        this.timeout = timeout;
076    }
077
078    public CamelContext getCamelContext() {
079        return camelContext;
080    }
081
082    public void setCamelContext(CamelContext camelContext) {
083        this.camelContext = camelContext;
084    }
085
086    public String getId() {
087        return id;
088    }
089
090    public void setId(String id) {
091        this.id = id;
092    }
093
094    public Expression getExpression() {
095        return expression;
096    }
097
098    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
099        return consumerCache.getEndpointUtilizationStatistics();
100    }
101
102    public AggregationStrategy getAggregationStrategy() {
103        return aggregationStrategy;
104    }
105
106    /**
107     * Sets the aggregation strategy for this poll enricher.
108     *
109     * @param aggregationStrategy the aggregationStrategy to set
110     */
111    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
112        this.aggregationStrategy = aggregationStrategy;
113    }
114
115    public long getTimeout() {
116        return timeout;
117    }
118
119    /**
120     * Sets the timeout to use when polling.
121     * <p/>
122     * Use 0 to use receiveNoWait,
123     * Use -1 to use receive with no timeout (which will block until data is available).
124     *
125     * @param timeout timeout in millis.
126     */
127    public void setTimeout(long timeout) {
128        this.timeout = timeout;
129    }
130
131    public boolean isAggregateOnException() {
132        return aggregateOnException;
133    }
134
135    public void setAggregateOnException(boolean aggregateOnException) {
136        this.aggregateOnException = aggregateOnException;
137    }
138
139    /**
140     * Sets the default aggregation strategy for this poll enricher.
141     */
142    public void setDefaultAggregationStrategy() {
143        this.aggregationStrategy = defaultAggregationStrategy();
144    }
145
146    public int getCacheSize() {
147        return cacheSize;
148    }
149
150    public void setCacheSize(int cacheSize) {
151        this.cacheSize = cacheSize;
152    }
153
154    public boolean isIgnoreInvalidEndpoint() {
155        return ignoreInvalidEndpoint;
156    }
157
158    public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) {
159        this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
160    }
161
162    public void process(Exchange exchange) throws Exception {
163        AsyncProcessorHelper.process(this, exchange);
164    }
165
166    /**
167     * Enriches the input data (<code>exchange</code>) by first obtaining
168     * additional data from an endpoint represented by an endpoint
169     * <code>producer</code> and second by aggregating input data and additional
170     * data. Aggregation of input data and additional data is delegated to an
171     * {@link org.apache.camel.processor.aggregate.AggregationStrategy} object set at construction time. If the
172     * message exchange with the resource endpoint fails then no aggregation
173     * will be done and the failed exchange content is copied over to the
174     * original message exchange.
175     *
176     * @param exchange input data.
177     */
178    @Override
179    public boolean process(Exchange exchange, AsyncCallback callback) {
180        try {
181            preCheckPoll(exchange);
182        } catch (Exception e) {
183            exchange.setException(new CamelExchangeException("Error during pre poll check", exchange, e));
184            callback.done(true);
185            return true;
186        }
187
188        // which consumer to use
189        PollingConsumer consumer;
190        Endpoint endpoint;
191
192        // use dynamic endpoint so calculate the endpoint to use
193        Object recipient = null;
194        try {
195            recipient = expression.evaluate(exchange, Object.class);
196            endpoint = resolveEndpoint(exchange, recipient);
197            // acquire the consumer from the cache
198            consumer = consumerCache.acquirePollingConsumer(endpoint);
199        } catch (Throwable e) {
200            if (isIgnoreInvalidEndpoint()) {
201                if (LOG.isDebugEnabled()) {
202                    LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
203                }
204            } else {
205                exchange.setException(e);
206            }
207            callback.done(true);
208            return true;
209        }
210
211        Exchange resourceExchange;
212        try {
213            if (timeout < 0) {
214                LOG.debug("Consumer receive: {}", consumer);
215                resourceExchange = consumer.receive();
216            } else if (timeout == 0) {
217                LOG.debug("Consumer receiveNoWait: {}", consumer);
218                resourceExchange = consumer.receiveNoWait();
219            } else {
220                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
221                resourceExchange = consumer.receive(timeout);
222            }
223
224            if (resourceExchange == null) {
225                LOG.debug("Consumer received no exchange");
226            } else {
227                LOG.debug("Consumer received: {}", resourceExchange);
228            }
229        } catch (Exception e) {
230            exchange.setException(new CamelExchangeException("Error during poll", exchange, e));
231            callback.done(true);
232            return true;
233        } finally {
234            // return the consumer back to the cache
235            consumerCache.releasePollingConsumer(endpoint, consumer);
236        }
237
238        try {
239            if (!isAggregateOnException() && (resourceExchange != null && resourceExchange.isFailed())) {
240                // copy resource exchange onto original exchange (preserving pattern)
241                copyResultsPreservePattern(exchange, resourceExchange);
242            } else {
243                prepareResult(exchange);
244
245                // prepare the exchanges for aggregation
246                ExchangeHelper.prepareAggregation(exchange, resourceExchange);
247                // must catch any exception from aggregation
248                Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
249                if (aggregatedExchange != null) {
250                    // copy aggregation result onto original exchange (preserving pattern)
251                    copyResultsPreservePattern(exchange, aggregatedExchange);
252                    // handover any synchronization
253                    if (resourceExchange != null) {
254                        resourceExchange.handoverCompletions(exchange);
255                    }
256                }
257            }
258
259            // set header with the uri of the endpoint enriched so we can use that for tracing etc
260            if (exchange.hasOut()) {
261                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
262            } else {
263                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
264            }
265        } catch (Throwable e) {
266            exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
267            callback.done(true);
268            return true;
269        }
270
271        callback.done(true);
272        return true;
273    }
274
275    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
276        // trim strings as end users might have added spaces between separators
277        if (recipient instanceof String) {
278            recipient = ((String)recipient).trim();
279        }
280        return ExchangeHelper.resolveEndpoint(exchange, recipient);
281    }
282
283    /**
284     * Strategy to pre check polling.
285     * <p/>
286     * Is currently used to prevent doing poll enrich from a file based endpoint when the current route also
287     * started from a file based endpoint as that is not currently supported.
288     *
289     * @param exchange the current exchange
290     */
291    protected void preCheckPoll(Exchange exchange) throws Exception {
292        // noop
293    }
294
295    private static void prepareResult(Exchange exchange) {
296        if (exchange.getPattern().isOutCapable()) {
297            exchange.getOut().copyFrom(exchange.getIn());
298        }
299    }
300
301    private static AggregationStrategy defaultAggregationStrategy() {
302        return new CopyAggregationStrategy();
303    }
304
305    @Override
306    public String toString() {
307        return "PollEnrich[" + expression + "]";
308    }
309
310    protected void doStart() throws Exception {
311        if (consumerCache == null) {
312            // create consumer cache if we use dynamic expressions for computing the endpoints to poll
313            if (cacheSize < 0) {
314                consumerCache = new EmptyConsumerCache(this, camelContext);
315                LOG.debug("PollEnrich {} is not using ConsumerCache", this);
316            } else if (cacheSize == 0) {
317                consumerCache = new ConsumerCache(this, camelContext);
318                LOG.debug("PollEnrich {} using ConsumerCache with default cache size", this);
319            } else {
320                consumerCache = new ConsumerCache(this, camelContext, cacheSize);
321                LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize);
322            }
323        }
324        ServiceHelper.startServices(consumerCache, aggregationStrategy);
325    }
326
327    protected void doStop() throws Exception {
328        ServiceHelper.stopServices(aggregationStrategy, consumerCache);
329    }
330
331    protected void doShutdown() throws Exception {
332        ServiceHelper.stopAndShutdownServices(aggregationStrategy, consumerCache);
333    }
334
335    private static class CopyAggregationStrategy implements AggregationStrategy {
336
337        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
338            if (newExchange != null) {
339                copyResultsPreservePattern(oldExchange, newExchange);
340            } else {
341                // if no newExchange then there was no message from the external resource
342                // and therefore we should set an empty body to indicate this fact
343                // but keep headers/attachments as we want to propagate those
344                oldExchange.getIn().setBody(null);
345                oldExchange.setOut(null);
346            }
347            return oldExchange;
348        }
349
350    }
351
352}