001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import org.apache.camel.CamelExchangeException;
020    import org.apache.camel.Exchange;
021    import org.apache.camel.PollingConsumer;
022    import org.apache.camel.Processor;
023    import org.apache.camel.processor.aggregate.AggregationStrategy;
024    import org.apache.camel.support.ServiceSupport;
025    import org.apache.camel.util.ExchangeHelper;
026    import org.apache.camel.util.ServiceHelper;
027    import org.slf4j.Logger;
028    import org.slf4j.LoggerFactory;
029    
030    import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
031    
032    /**
033     * A content enricher that enriches input data by first obtaining additional
034     * data from a <i>resource</i> represented by an endpoint <code>producer</code>
035     * and second by aggregating input data and additional data. Aggregation of
036     * input data and additional data is delegated to an {@link org.apache.camel.processor.aggregate.AggregationStrategy}
037     * object.
038     * <p/>
039     * Uses a {@link org.apache.camel.PollingConsumer} to obtain the additional data as opposed to {@link Enricher}
040     * that uses a {@link org.apache.camel.Producer}.
041     *
042     * @see Enricher
043     */
044    public class PollEnricher extends ServiceSupport implements Processor {
045    
046        private static final transient Logger LOG = LoggerFactory.getLogger(PollEnricher.class);
047        private AggregationStrategy aggregationStrategy;
048        private PollingConsumer consumer;
049        private long timeout;
050    
051        /**
052         * Creates a new {@link PollEnricher}. The default aggregation strategy is to
053         * copy the additional data obtained from the enricher's resource over the
054         * input data. When using the copy aggregation strategy the enricher
055         * degenerates to a normal transformer.
056         *
057         * @param consumer consumer to resource endpoint.
058         */
059        public PollEnricher(PollingConsumer consumer) {
060            this(defaultAggregationStrategy(), consumer, 0);
061        }
062    
063        /**
064         * Creates a new {@link PollEnricher}.
065         *
066         * @param aggregationStrategy  aggregation strategy to aggregate input data and additional data.
067         * @param consumer consumer to resource endpoint.
068         * @param timeout timeout in millis
069         */
070        public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) {
071            this.aggregationStrategy = aggregationStrategy;
072            this.consumer = consumer;
073            this.timeout = timeout;
074        }
075    
076        /**
077         * Sets the aggregation strategy for this poll enricher.
078         *
079         * @param aggregationStrategy the aggregationStrategy to set
080         */
081        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
082            this.aggregationStrategy = aggregationStrategy;
083        }
084    
085        /**
086         * Sets the default aggregation strategy for this poll enricher.
087         */
088        public void setDefaultAggregationStrategy() {
089            this.aggregationStrategy = defaultAggregationStrategy();
090        }
091    
092        /**
093         * Sets the timeout to use when polling.
094         * <p/>
095         * Use 0 or negative to not use timeout and block until data is available.
096         *
097         * @param timeout timeout in millis.
098         */
099        public void setTimeout(long timeout) {
100            this.timeout = timeout;
101        }
102    
103        /**
104         * Enriches the input data (<code>exchange</code>) by first obtaining
105         * additional data from an endpoint represented by an endpoint
106         * <code>producer</code> and second by aggregating input data and additional
107         * data. Aggregation of input data and additional data is delegated to an
108         * {@link org.apache.camel.processor.aggregate.AggregationStrategy} object set at construction time. If the
109         * message exchange with the resource endpoint fails then no aggregation
110         * will be done and the failed exchange content is copied over to the
111         * original message exchange.
112         *
113         * @param exchange input data.
114         */
115        public void process(Exchange exchange) throws Exception {
116            preCheckPoll(exchange);
117    
118            Exchange resourceExchange;
119            if (timeout < 0) {
120                LOG.debug("Consumer receive: {}", consumer);
121                resourceExchange = consumer.receive();
122            } else if (timeout == 0) {
123                LOG.debug("Consumer receiveNoWait: {}", consumer);
124                resourceExchange = consumer.receiveNoWait();
125            } else {
126                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
127                resourceExchange = consumer.receive(timeout);
128            }
129    
130            if (resourceExchange == null) {
131                LOG.debug("Consumer received no exchange");
132            } else {
133                LOG.debug("Consumer received: {}", resourceExchange);
134            }
135    
136            if (resourceExchange != null && resourceExchange.isFailed()) {
137                // copy resource exchange onto original exchange (preserving pattern)
138                copyResultsPreservePattern(exchange, resourceExchange);
139            } else {
140                prepareResult(exchange);
141    
142                // prepare the exchanges for aggregation
143                ExchangeHelper.prepareAggregation(exchange, resourceExchange);
144                // must catch any exception from aggregation
145                Exchange aggregatedExchange;
146                try {
147                    aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
148                } catch (Throwable e) {
149                    throw new CamelExchangeException("Error occurred during aggregation", exchange, e);
150                }
151                if (aggregatedExchange != null) {
152                    // copy aggregation result onto original exchange (preserving pattern)
153                    copyResultsPreservePattern(exchange, aggregatedExchange);
154                    // handover any synchronization
155                    if (resourceExchange != null) {
156                        resourceExchange.handoverCompletions(exchange);
157                    }
158                }
159            }
160    
161            // set header with the uri of the endpoint enriched so we can use that for tracing etc
162            if (exchange.hasOut()) {
163                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
164            } else {
165                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
166            }
167        }
168    
169        /**
170         * Strategy to pre check polling.
171         * <p/>
172         * Is currently used to prevent doing poll enrich from a file based endpoint when the current route also
173         * started from a file based endpoint as that is not currently supported.
174         *
175         * @param exchange the current exchange
176         */
177        protected void preCheckPoll(Exchange exchange) throws Exception {
178            // noop
179        }
180    
181        private static void prepareResult(Exchange exchange) {
182            if (exchange.getPattern().isOutCapable()) {
183                exchange.getOut().copyFrom(exchange.getIn());
184            }
185        }
186    
187        private static AggregationStrategy defaultAggregationStrategy() {
188            return new CopyAggregationStrategy();
189        }
190    
191        @Override
192        public String toString() {
193            return "PollEnrich[" + consumer + "]";
194        }
195    
196        protected void doStart() throws Exception {
197            ServiceHelper.startService(consumer);
198        }
199    
200        protected void doStop() throws Exception {
201            ServiceHelper.stopService(consumer);
202        }
203    
204        private static class CopyAggregationStrategy implements AggregationStrategy {
205    
206            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
207                if (newExchange != null) {
208                    copyResultsPreservePattern(oldExchange, newExchange);
209                } else {
210                    // if no newExchange then there was no message from the external resource
211                    // and therefore we should set an empty body to indicate this fact
212                    // but keep headers/attachments as we want to propagate those
213                    oldExchange.getIn().setBody(null);
214                    oldExchange.setOut(null);
215                }
216                return oldExchange;
217            }
218    
219        }
220    
221    }