001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import javax.xml.bind.annotation.XmlAccessType;
020import javax.xml.bind.annotation.XmlAccessorType;
021import javax.xml.bind.annotation.XmlAttribute;
022import javax.xml.bind.annotation.XmlRootElement;
023import javax.xml.bind.annotation.XmlTransient;
024
025import org.apache.camel.CamelContextAware;
026import org.apache.camel.Expression;
027import org.apache.camel.Processor;
028import org.apache.camel.model.language.ExpressionDefinition;
029import org.apache.camel.processor.PollEnricher;
030import org.apache.camel.processor.aggregate.AggregationStrategy;
031import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
032import org.apache.camel.spi.Metadata;
033import org.apache.camel.spi.RouteContext;
034
035/**
036 * Enriches messages with data polled from a secondary resource
037 *
038 * @see org.apache.camel.processor.Enricher
039 */
040@Metadata(label = "eip,transformation")
041@XmlRootElement(name = "pollEnrich")
042@XmlAccessorType(XmlAccessType.FIELD)
043public class PollEnrichDefinition extends NoOutputExpressionNode {
044    @XmlAttribute @Metadata(defaultValue = "-1")
045    private Long timeout;
046    @XmlAttribute(name = "strategyRef")
047    private String aggregationStrategyRef;
048    @XmlAttribute(name = "strategyMethodName")
049    private String aggregationStrategyMethodName;
050    @XmlAttribute(name = "strategyMethodAllowNull")
051    private Boolean aggregationStrategyMethodAllowNull;
052    @XmlAttribute
053    private Boolean aggregateOnException;
054    @XmlTransient
055    private AggregationStrategy aggregationStrategy;
056    @XmlAttribute
057    private Integer cacheSize;
058    @XmlAttribute
059    private Boolean ignoreInvalidEndpoint;
060
061    public PollEnrichDefinition() {
062    }
063
064    public PollEnrichDefinition(AggregationStrategy aggregationStrategy, long timeout) {
065        this.aggregationStrategy = aggregationStrategy;
066        this.timeout = timeout;
067    }
068
069    @Override
070    public String toString() {
071        return "PollEnrich[" + getExpression() + "]";
072    }
073    
074    @Override
075    public String getLabel() {
076        return "pollEnrich[" + getExpression() + "]";
077    }
078
079    @Override
080    public Processor createProcessor(RouteContext routeContext) throws Exception {
081
082        // if no timeout then we should block, and there use a negative timeout
083        long time = timeout != null ? timeout : -1;
084        boolean isIgnoreInvalidEndpoint = getIgnoreInvalidEndpoint() != null && getIgnoreInvalidEndpoint();
085        Expression exp = getExpression().createExpression(routeContext);
086
087        PollEnricher enricher = new PollEnricher(exp, time);
088
089        AggregationStrategy strategy = createAggregationStrategy(routeContext);
090        if (strategy == null) {
091            enricher.setDefaultAggregationStrategy();
092        } else {
093            enricher.setAggregationStrategy(strategy);
094        }
095        if (getAggregateOnException() != null) {
096            enricher.setAggregateOnException(getAggregateOnException());
097        }
098        if (getCacheSize() != null) {
099            enricher.setCacheSize(getCacheSize());
100        }
101        enricher.setIgnoreInvalidEndpoint(isIgnoreInvalidEndpoint);
102
103        return enricher;
104    }
105
106    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
107        AggregationStrategy strategy = getAggregationStrategy();
108        if (strategy == null && aggregationStrategyRef != null) {
109            Object aggStrategy = routeContext.lookup(aggregationStrategyRef, Object.class);
110            if (aggStrategy instanceof AggregationStrategy) {
111                strategy = (AggregationStrategy) aggStrategy;
112            } else if (aggStrategy != null) {
113                AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getAggregationStrategyMethodName());
114                if (getAggregationStrategyMethodAllowNull() != null) {
115                    adapter.setAllowNullNewExchange(getAggregationStrategyMethodAllowNull());
116                    adapter.setAllowNullOldExchange(getAggregationStrategyMethodAllowNull());
117                }
118                strategy = adapter;
119            } else {
120                throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + aggregationStrategyRef);
121            }
122        }
123
124        if (strategy instanceof CamelContextAware) {
125            ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
126        }
127
128        return strategy;
129    }
130
131    // Fluent API
132    // -------------------------------------------------------------------------
133
134    /**
135     * Timeout in millis when polling from the external service.
136     * <p/>
137     * The timeout has influence about the poll enrich behavior. It basically operations in three different modes:
138     * <ul>
139     *     <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li>
140     *     <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li>
141     *     <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li>
142     * </ul>
143     * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value
144     */
145    public PollEnrichDefinition timeout(long timeout) {
146        setTimeout(timeout);
147        return this;
148    }
149
150    /**
151     * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
152     * By default Camel will use the reply from the external service as outgoing message.
153     */
154    public PollEnrichDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
155        setAggregationStrategy(aggregationStrategy);
156        return this;
157    }
158
159    /**
160     * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message.
161     * By default Camel will use the reply from the external service as outgoing message.
162     */
163    public PollEnrichDefinition aggregationStrategyRef(String aggregationStrategyRef) {
164        setAggregationStrategyRef(aggregationStrategyRef);
165        return this;
166    }
167
168    /**
169     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
170     */
171    public PollEnrichDefinition aggregationStrategyMethodName(String aggregationStrategyMethodName) {
172        setAggregationStrategyMethodName(aggregationStrategyMethodName);
173        return this;
174    }
175
176    /**
177     * If this option is false then the aggregate method is not used if there was no data to enrich.
178     * If this option is true then null values is used as the oldExchange (when no data to enrich),
179     * when using POJOs as the AggregationStrategy.
180     */
181    public PollEnrichDefinition aggregationStrategyMethodAllowNull(boolean aggregationStrategyMethodAllowNull) {
182        setAggregationStrategyMethodAllowNull(aggregationStrategyMethodAllowNull);
183        return this;
184    }
185
186    /**
187     * If this option is false then the aggregate method is not used if there was an exception thrown while trying
188     * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what
189     * to do if there was an exception in the aggregate method. For example to suppress the exception
190     * or set a custom message body etc.
191     */
192    public PollEnrichDefinition aggregateOnException(boolean aggregateOnException) {
193        setAggregateOnException(aggregateOnException);
194        return this;
195    }
196
197    /**
198     * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used
199     * to cache and reuse consumers when uris are reused.
200     *
201     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
202     * @return the builder
203     */
204    public PollEnrichDefinition cacheSize(int cacheSize) {
205        setCacheSize(cacheSize);
206        return this;
207    }
208
209    /**
210     * Ignore the invalidate endpoint exception when try to create a producer with that endpoint
211     *
212     * @return the builder
213     */
214    public PollEnrichDefinition ignoreInvalidEndpoint() {
215        setIgnoreInvalidEndpoint(true);
216        return this;
217    }
218
219    // Properties
220    // -------------------------------------------------------------------------
221
222    /**
223     * Expression that computes the endpoint uri to use as the resource endpoint to enrich from
224     */
225    @Override
226    public void setExpression(ExpressionDefinition expression) {
227        // override to include javadoc what the expression is used for
228        super.setExpression(expression);
229    }
230
231    public Long getTimeout() {
232        return timeout;
233    }
234
235    public void setTimeout(Long timeout) {
236        this.timeout = timeout;
237    }
238
239    public String getAggregationStrategyRef() {
240        return aggregationStrategyRef;
241    }
242
243    public void setAggregationStrategyRef(String aggregationStrategyRef) {
244        this.aggregationStrategyRef = aggregationStrategyRef;
245    }
246
247    public String getAggregationStrategyMethodName() {
248        return aggregationStrategyMethodName;
249    }
250
251    public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) {
252        this.aggregationStrategyMethodName = aggregationStrategyMethodName;
253    }
254
255    public Boolean getAggregationStrategyMethodAllowNull() {
256        return aggregationStrategyMethodAllowNull;
257    }
258
259    public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) {
260        this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull;
261    }
262
263    public AggregationStrategy getAggregationStrategy() {
264        return aggregationStrategy;
265    }
266
267    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
268        this.aggregationStrategy = aggregationStrategy;
269    }
270
271    public Boolean getAggregateOnException() {
272        return aggregateOnException;
273    }
274
275    public void setAggregateOnException(Boolean aggregateOnException) {
276        this.aggregateOnException = aggregateOnException;
277    }
278
279    public Integer getCacheSize() {
280        return cacheSize;
281    }
282
283    public void setCacheSize(Integer cacheSize) {
284        this.cacheSize = cacheSize;
285    }
286
287    public Boolean getIgnoreInvalidEndpoint() {
288        return ignoreInvalidEndpoint;
289    }
290
291    public void setIgnoreInvalidEndpoint(Boolean ignoreInvalidEndpoint) {
292        this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
293    }
294}