001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import org.apache.camel.AsyncCallback;
020import org.apache.camel.AsyncProcessor;
021import org.apache.camel.CamelContext;
022import org.apache.camel.CamelContextAware;
023import org.apache.camel.CamelExchangeException;
024import org.apache.camel.Consumer;
025import org.apache.camel.Endpoint;
026import org.apache.camel.Exchange;
027import org.apache.camel.Expression;
028import org.apache.camel.PollingConsumer;
029import org.apache.camel.impl.BridgeExceptionHandlerToErrorHandler;
030import org.apache.camel.impl.ConsumerCache;
031import org.apache.camel.impl.DefaultConsumer;
032import org.apache.camel.impl.EmptyConsumerCache;
033import org.apache.camel.impl.EventDrivenPollingConsumer;
034import org.apache.camel.processor.aggregate.AggregationStrategy;
035import org.apache.camel.spi.EndpointUtilizationStatistics;
036import org.apache.camel.spi.ExceptionHandler;
037import org.apache.camel.spi.IdAware;
038import org.apache.camel.support.ServiceSupport;
039import org.apache.camel.util.AsyncProcessorHelper;
040import org.apache.camel.util.ExchangeHelper;
041import org.apache.camel.util.ServiceHelper;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
046
047/**
048 * A content enricher that enriches input data by first obtaining additional
049 * data from a <i>resource</i> represented by an endpoint <code>producer</code>
050 * and second by aggregating input data and additional data. Aggregation of
051 * input data and additional data is delegated to an {@link org.apache.camel.processor.aggregate.AggregationStrategy}
052 * object.
053 * <p/>
054 * Uses a {@link org.apache.camel.PollingConsumer} to obtain the additional data as opposed to {@link Enricher}
055 * that uses a {@link org.apache.camel.Producer}.
056 *
057 * @see Enricher
058 */
059public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware {
060
061    private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class);
062    private CamelContext camelContext;
063    private ConsumerCache consumerCache;
064    private String id;
065    private AggregationStrategy aggregationStrategy;
066    private final Expression expression;
067    private long timeout;
068    private boolean aggregateOnException;
069    private int cacheSize;
070    private boolean ignoreInvalidEndpoint;
071
072    /**
073     * Creates a new {@link PollEnricher}.
074     *
075     * @param expression expression to use to compute the endpoint to poll from.
076     * @param timeout timeout in millis
077     */
078    public PollEnricher(Expression expression, long timeout) {
079        this.expression = expression;
080        this.timeout = timeout;
081    }
082
083    public CamelContext getCamelContext() {
084        return camelContext;
085    }
086
087    public void setCamelContext(CamelContext camelContext) {
088        this.camelContext = camelContext;
089    }
090
091    public String getId() {
092        return id;
093    }
094
095    public void setId(String id) {
096        this.id = id;
097    }
098
099    public Expression getExpression() {
100        return expression;
101    }
102
103    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
104        return consumerCache.getEndpointUtilizationStatistics();
105    }
106
107    public AggregationStrategy getAggregationStrategy() {
108        return aggregationStrategy;
109    }
110
111    /**
112     * Sets the aggregation strategy for this poll enricher.
113     *
114     * @param aggregationStrategy the aggregationStrategy to set
115     */
116    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
117        this.aggregationStrategy = aggregationStrategy;
118    }
119
120    public long getTimeout() {
121        return timeout;
122    }
123
124    /**
125     * Sets the timeout to use when polling.
126     * <p/>
127     * Use 0 to use receiveNoWait,
128     * Use -1 to use receive with no timeout (which will block until data is available).
129     *
130     * @param timeout timeout in millis.
131     */
132    public void setTimeout(long timeout) {
133        this.timeout = timeout;
134    }
135
136    public boolean isAggregateOnException() {
137        return aggregateOnException;
138    }
139
140    public void setAggregateOnException(boolean aggregateOnException) {
141        this.aggregateOnException = aggregateOnException;
142    }
143
144    /**
145     * Sets the default aggregation strategy for this poll enricher.
146     */
147    public void setDefaultAggregationStrategy() {
148        this.aggregationStrategy = defaultAggregationStrategy();
149    }
150
151    public int getCacheSize() {
152        return cacheSize;
153    }
154
155    public void setCacheSize(int cacheSize) {
156        this.cacheSize = cacheSize;
157    }
158
159    public boolean isIgnoreInvalidEndpoint() {
160        return ignoreInvalidEndpoint;
161    }
162
163    public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) {
164        this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
165    }
166
167    public void process(Exchange exchange) throws Exception {
168        AsyncProcessorHelper.process(this, exchange);
169    }
170
171    /**
172     * Enriches the input data (<code>exchange</code>) by first obtaining
173     * additional data from an endpoint represented by an endpoint
174     * <code>producer</code> and second by aggregating input data and additional
175     * data. Aggregation of input data and additional data is delegated to an
176     * {@link org.apache.camel.processor.aggregate.AggregationStrategy} object set at construction time. If the
177     * message exchange with the resource endpoint fails then no aggregation
178     * will be done and the failed exchange content is copied over to the
179     * original message exchange.
180     *
181     * @param exchange input data.
182     */
183    @Override
184    public boolean process(Exchange exchange, AsyncCallback callback) {
185        try {
186            preCheckPoll(exchange);
187        } catch (Exception e) {
188            exchange.setException(new CamelExchangeException("Error during pre poll check", exchange, e));
189            callback.done(true);
190            return true;
191        }
192
193        // which consumer to use
194        PollingConsumer consumer;
195        Endpoint endpoint;
196
197        // use dynamic endpoint so calculate the endpoint to use
198        Object recipient = null;
199        try {
200            recipient = expression.evaluate(exchange, Object.class);
201            endpoint = resolveEndpoint(exchange, recipient);
202            // acquire the consumer from the cache
203            consumer = consumerCache.acquirePollingConsumer(endpoint);
204        } catch (Throwable e) {
205            if (isIgnoreInvalidEndpoint()) {
206                if (LOG.isDebugEnabled()) {
207                    LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
208                }
209            } else {
210                exchange.setException(e);
211            }
212            callback.done(true);
213            return true;
214        }
215
216        // grab the real delegate consumer that performs the actual polling
217        Consumer delegate = consumer;
218        if (consumer instanceof EventDrivenPollingConsumer) {
219            delegate = ((EventDrivenPollingConsumer) consumer).getDelegateConsumer();
220        }
221
222        // is the consumer bridging the error handler?
223        boolean bridgeErrorHandler = false;
224        if (delegate instanceof DefaultConsumer) {
225            ExceptionHandler handler = ((DefaultConsumer) delegate).getExceptionHandler();
226            if (handler != null && handler instanceof BridgeExceptionHandlerToErrorHandler) {
227                bridgeErrorHandler = true;
228            }
229        }
230
231        Exchange resourceExchange;
232        try {
233            if (timeout < 0) {
234                LOG.debug("Consumer receive: {}", consumer);
235                resourceExchange = consumer.receive();
236            } else if (timeout == 0) {
237                LOG.debug("Consumer receiveNoWait: {}", consumer);
238                resourceExchange = consumer.receiveNoWait();
239            } else {
240                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
241                resourceExchange = consumer.receive(timeout);
242            }
243
244            if (resourceExchange == null) {
245                LOG.debug("Consumer received no exchange");
246            } else {
247                LOG.debug("Consumer received: {}", resourceExchange);
248            }
249        } catch (Exception e) {
250            exchange.setException(new CamelExchangeException("Error during poll", exchange, e));
251            callback.done(true);
252            return true;
253        } finally {
254            // return the consumer back to the cache
255            consumerCache.releasePollingConsumer(endpoint, consumer);
256        }
257
258        // remember current redelivery stats
259        Object redeliveried = exchange.getIn().getHeader(Exchange.REDELIVERED);
260        Object redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER);
261        Object redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER);
262
263        // if we are bridging error handler and failed then remember the caused exception
264        Throwable cause = null;
265        if (resourceExchange != null && bridgeErrorHandler) {
266            cause = resourceExchange.getException();
267        }
268
269        try {
270            if (!isAggregateOnException() && (resourceExchange != null && resourceExchange.isFailed())) {
271                // copy resource exchange onto original exchange (preserving pattern)
272                // and preserve redelivery headers
273                copyResultsPreservePattern(exchange, resourceExchange);
274            } else {
275                prepareResult(exchange);
276
277                // prepare the exchanges for aggregation
278                ExchangeHelper.prepareAggregation(exchange, resourceExchange);
279                // must catch any exception from aggregation
280                Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
281                if (aggregatedExchange != null) {
282                    // copy aggregation result onto original exchange (preserving pattern)
283                    copyResultsPreservePattern(exchange, aggregatedExchange);
284                    // handover any synchronization
285                    if (resourceExchange != null) {
286                        resourceExchange.handoverCompletions(exchange);
287                    }
288                }
289            }
290
291            // if we failed then restore caused exception
292            if (cause != null) {
293                // restore caused exception
294                exchange.setException(cause);
295                // remove the exhausted marker as we want to be able to perform redeliveries with the error handler
296                exchange.removeProperties(Exchange.REDELIVERY_EXHAUSTED);
297
298                // preserve the redelivery stats
299                if (redeliveried != null) {
300                    if (exchange.hasOut()) {
301                        exchange.getOut().setHeader(Exchange.REDELIVERED, redeliveried);
302                    } else {
303                        exchange.getIn().setHeader(Exchange.REDELIVERED, redeliveried);
304                    }
305                }
306                if (redeliveryCounter != null) {
307                    if (exchange.hasOut()) {
308                        exchange.getOut().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
309                    } else {
310                        exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
311                    }
312                }
313                if (redeliveryMaxCounter != null) {
314                    if (exchange.hasOut()) {
315                        exchange.getOut().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
316                    } else {
317                        exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
318                    }
319                }
320            }
321
322            // set header with the uri of the endpoint enriched so we can use that for tracing etc
323            if (exchange.hasOut()) {
324                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
325            } else {
326                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
327            }
328        } catch (Throwable e) {
329            exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
330            callback.done(true);
331            return true;
332        }
333
334        callback.done(true);
335        return true;
336    }
337
338    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
339        // trim strings as end users might have added spaces between separators
340        if (recipient instanceof String) {
341            recipient = ((String)recipient).trim();
342        }
343        return ExchangeHelper.resolveEndpoint(exchange, recipient);
344    }
345
346    /**
347     * Strategy to pre check polling.
348     * <p/>
349     * Is currently used to prevent doing poll enrich from a file based endpoint when the current route also
350     * started from a file based endpoint as that is not currently supported.
351     *
352     * @param exchange the current exchange
353     */
354    protected void preCheckPoll(Exchange exchange) throws Exception {
355        // noop
356    }
357
358    private static void prepareResult(Exchange exchange) {
359        if (exchange.getPattern().isOutCapable()) {
360            exchange.getOut().copyFrom(exchange.getIn());
361        }
362    }
363
364    private static AggregationStrategy defaultAggregationStrategy() {
365        return new CopyAggregationStrategy();
366    }
367
368    @Override
369    public String toString() {
370        return "PollEnrich[" + expression + "]";
371    }
372
373    protected void doStart() throws Exception {
374        if (consumerCache == null) {
375            // create consumer cache if we use dynamic expressions for computing the endpoints to poll
376            if (cacheSize < 0) {
377                consumerCache = new EmptyConsumerCache(this, camelContext);
378                LOG.debug("PollEnrich {} is not using ConsumerCache", this);
379            } else if (cacheSize == 0) {
380                consumerCache = new ConsumerCache(this, camelContext);
381                LOG.debug("PollEnrich {} using ConsumerCache with default cache size", this);
382            } else {
383                consumerCache = new ConsumerCache(this, camelContext, cacheSize);
384                LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize);
385            }
386        }
387        ServiceHelper.startServices(consumerCache, aggregationStrategy);
388    }
389
390    protected void doStop() throws Exception {
391        ServiceHelper.stopServices(aggregationStrategy, consumerCache);
392    }
393
394    protected void doShutdown() throws Exception {
395        ServiceHelper.stopAndShutdownServices(aggregationStrategy, consumerCache);
396    }
397
398    private static class CopyAggregationStrategy implements AggregationStrategy {
399
400        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
401            if (newExchange != null) {
402                copyResultsPreservePattern(oldExchange, newExchange);
403            } else {
404                // if no newExchange then there was no message from the external resource
405                // and therefore we should set an empty body to indicate this fact
406                // but keep headers/attachments as we want to propagate those
407                oldExchange.getIn().setBody(null);
408                oldExchange.setOut(null);
409            }
410            return oldExchange;
411        }
412
413    }
414
415}