001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import javax.xml.bind.annotation.XmlAccessType;
020import javax.xml.bind.annotation.XmlAccessorType;
021import javax.xml.bind.annotation.XmlAttribute;
022import javax.xml.bind.annotation.XmlRootElement;
023import javax.xml.bind.annotation.XmlTransient;
024
025import org.apache.camel.CamelContextAware;
026import org.apache.camel.Expression;
027import org.apache.camel.Processor;
028import org.apache.camel.model.language.ExpressionDefinition;
029import org.apache.camel.processor.PollEnricher;
030import org.apache.camel.processor.aggregate.AggregationStrategy;
031import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
032import org.apache.camel.spi.Metadata;
033import org.apache.camel.spi.RouteContext;
034
035/**
036 * Enriches messages with data polled from a secondary resource
037 *
038 * @see org.apache.camel.processor.Enricher
039 */
040@Metadata(label = "eip,transformation")
041@XmlRootElement(name = "pollEnrich")
042@XmlAccessorType(XmlAccessType.FIELD)
043public class PollEnrichDefinition extends NoOutputExpressionNode {
044    @XmlAttribute @Metadata(defaultValue = "-1")
045    private Long timeout;
046    @XmlAttribute(name = "strategyRef")
047    private String aggregationStrategyRef;
048    @XmlAttribute(name = "strategyMethodName")
049    private String aggregationStrategyMethodName;
050    @XmlAttribute(name = "strategyMethodAllowNull")
051    private Boolean aggregationStrategyMethodAllowNull;
052    @XmlAttribute
053    private Boolean aggregateOnException;
054    @XmlTransient
055    private AggregationStrategy aggregationStrategy;
056    @XmlAttribute
057    private Integer cacheSize;
058    @XmlAttribute
059    private Boolean ignoreInvalidEndpoint;
060
061    public PollEnrichDefinition() {
062    }
063
064    public PollEnrichDefinition(AggregationStrategy aggregationStrategy, long timeout) {
065        this.aggregationStrategy = aggregationStrategy;
066        this.timeout = timeout;
067    }
068
069    @Override
070    public String toString() {
071        return "PollEnrich[" + getExpression() + "]";
072    }
073    
074    @Override
075    public String getShortName() {
076        return "pollEnrich";
077    }
078
079    @Override
080    public String getLabel() {
081        return "pollEnrich[" + getExpression() + "]";
082    }
083
084    @Override
085    public Processor createProcessor(RouteContext routeContext) throws Exception {
086
087        // if no timeout then we should block, and there use a negative timeout
088        long time = timeout != null ? timeout : -1;
089        boolean isIgnoreInvalidEndpoint = getIgnoreInvalidEndpoint() != null && getIgnoreInvalidEndpoint();
090        Expression exp = getExpression().createExpression(routeContext);
091
092        PollEnricher enricher = new PollEnricher(exp, time);
093
094        AggregationStrategy strategy = createAggregationStrategy(routeContext);
095        if (strategy == null) {
096            enricher.setDefaultAggregationStrategy();
097        } else {
098            enricher.setAggregationStrategy(strategy);
099        }
100        if (getAggregateOnException() != null) {
101            enricher.setAggregateOnException(getAggregateOnException());
102        }
103        if (getCacheSize() != null) {
104            enricher.setCacheSize(getCacheSize());
105        }
106        enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
107
108        return enricher;
109    }
110
111    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
112        AggregationStrategy strategy = getAggregationStrategy();
113        if (strategy == null && aggregationStrategyRef != null) {
114            Object aggStrategy = routeContext.lookup(aggregationStrategyRef, Object.class);
115            if (aggStrategy instanceof AggregationStrategy) {
116                strategy = (AggregationStrategy) aggStrategy;
117            } else if (aggStrategy != null) {
118                AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName());
119                if (getAggregationStrategyMethodAllowNull() != null) {
120                    adapter.setAllowNullNewExchange(getAggregationStrategyMethodAllowNull());
121                    adapter.setAllowNullOldExchange(getAggregationStrategyMethodAllowNull());
122                }
123                strategy = adapter;
124            } else {
125                throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + aggregationStrategyRef);
126            }
127        }
128
129        if (strategy instanceof CamelContextAware) {
130            ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
131        }
132
133        return strategy;
134    }
135
136    // Fluent API
137    // -------------------------------------------------------------------------
138
139    /**
140     * Timeout in millis when polling from the external service.
141     * <p/>
142     * The timeout has influence about the poll enrich behavior. It basically operations in three different modes:
143     * <ul>
144     *     <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li>
145     *     <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li>
146     *     <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li>
147     * </ul>
148     * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value
149     */
150    public PollEnrichDefinition timeout(long timeout) {
151        setTimeout(timeout);
152        return this;
153    }
154
155    /**
156     * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
157     * By default Camel will use the reply from the external service as outgoing message.
158     */
159    public PollEnrichDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
160        setAggregationStrategy(aggregationStrategy);
161        return this;
162    }
163
164    /**
165     * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
166     * By default Camel will use the reply from the external service as outgoing message.
167     */
168    public PollEnrichDefinition aggregationStrategyRef(String aggregationStrategyRef) {
169        setAggregationStrategyRef(aggregationStrategyRef);
170        return this;
171    }
172
173    /**
174     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
175     */
176    public PollEnrichDefinition aggregationStrategyMethodName(String aggregationStrategyMethodName) {
177        setAggregationStrategyMethodName(aggregationStrategyMethodName);
178        return this;
179    }
180
181    /**
182     * If this option is false then the aggregate method is not used if there was no data to enrich.
183     * If this option is true then null values is used as the oldExchange (when no data to enrich),
184     * when using POJOs as the AggregationStrategy.
185     */
186    public PollEnrichDefinition aggregationStrategyMethodAllowNull(boolean aggregationStrategyMethodAllowNull) {
187        setAggregationStrategyMethodAllowNull(aggregationStrategyMethodAllowNull);
188        return this;
189    }
190
191    /**
192     * If this option is false then the aggregate method is not used if there was an exception thrown while trying
193     * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
194     * to do if there was an exception in the aggregate method. For example to suppress the exception
195     * or set a custom message body etc.
196     */
197    public PollEnrichDefinition aggregateOnException(boolean aggregateOnException) {
198        setAggregateOnException(aggregateOnException);
199        return this;
200    }
201
202    /**
203     * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used
204     * to cache and reuse consumers when uris are reused.
205     *
206     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
207     * @return the builder
208     */
209    public PollEnrichDefinition cacheSize(int cacheSize) {
210        setCacheSize(cacheSize);
211        return this;
212    }
213
214    /**
215     * Ignore the invalidate endpoint exception when try to create a producer with that endpoint
216     *
217     * @return the builder
218     */
219    public PollEnrichDefinition ignoreInvalidEndpoint() {
220        setIgnoreInvalidEndpoint(true);
221        return this;
222    }
223
224    // Properties
225    // -------------------------------------------------------------------------
226
227    /**
228     * Expression that computes the endpoint uri to use as the resource endpoint to enrich from
229     */
230    @Override
231    public void setExpression(ExpressionDefinition expression) {
232        // override to include javadoc what the expression is used for
233        super.setExpression(expression);
234    }
235
236    public Long getTimeout() {
237        return timeout;
238    }
239
240    public void setTimeout(Long timeout) {
241        this.timeout = timeout;
242    }
243
244    public String getAggregationStrategyRef() {
245        return aggregationStrategyRef;
246    }
247
248    public void setAggregationStrategyRef(String aggregationStrategyRef) {
249        this.aggregationStrategyRef = aggregationStrategyRef;
250    }
251
252    public String getAggregationStrategyMethodName() {
253        return aggregationStrategyMethodName;
254    }
255
256    public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
257        this.aggregationStrategyMethodName = aggregationStrategyMethodName;
258    }
259
260    public Boolean getAggregationStrategyMethodAllowNull() {
261        return aggregationStrategyMethodAllowNull;
262    }
263
264    public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) {
265        this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull;
266    }
267
268    public AggregationStrategy getAggregationStrategy() {
269        return aggregationStrategy;
270    }
271
272    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
273        this.aggregationStrategy = aggregationStrategy;
274    }
275
276    public Boolean getAggregateOnException() {
277        return aggregateOnException;
278    }
279
280    public void setAggregateOnException(Boolean aggregateOnException) {
281        this.aggregateOnException = aggregateOnException;
282    }
283
284    public Integer getCacheSize() {
285        return cacheSize;
286    }
287
288    public void setCacheSize(Integer cacheSize) {
289        this.cacheSize = cacheSize;
290    }
291
292    public Boolean getIgnoreInvalidEndpoint() {
293        return ignoreInvalidEndpoint;
294    }
295
296    public void setIgnoreInvalidEndpoint(Boolean ignoreInvalidEndpoint) {
297        this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
298    }
299}