001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.ExecutorService;
022
023import javax.xml.bind.annotation.XmlAccessType;
024import javax.xml.bind.annotation.XmlAccessorType;
025import javax.xml.bind.annotation.XmlAttribute;
026import javax.xml.bind.annotation.XmlRootElement;
027import javax.xml.bind.annotation.XmlTransient;
028
029import org.apache.camel.CamelContextAware;
030import org.apache.camel.Expression;
031import org.apache.camel.Processor;
032import org.apache.camel.builder.ProcessClause;
033import org.apache.camel.model.language.ExpressionDefinition;
034import org.apache.camel.processor.EvaluateExpressionProcessor;
035import org.apache.camel.processor.Pipeline;
036import org.apache.camel.processor.RecipientList;
037import org.apache.camel.processor.aggregate.AggregationStrategy;
038import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
039import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
040import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
041import org.apache.camel.spi.Metadata;
042import org.apache.camel.spi.RouteContext;
043import org.apache.camel.util.CamelContextHelper;
044
045/**
046 * Routes messages to a number of dynamically specified recipients (dynamic to)
047 *
048 * @version 
049 */
050@Metadata(label = "eip,endpoint,routing")
051@XmlRootElement(name = "recipientList")
052@XmlAccessorType(XmlAccessType.FIELD)
053public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> extends NoOutputExpressionNode implements ExecutorServiceAwareDefinition<RecipientListDefinition<Type>> {
054    @XmlTransient
055    private AggregationStrategy aggregationStrategy;
056    @XmlTransient
057    private ExecutorService executorService;
058    @XmlAttribute @Metadata(defaultValue = ",")
059    private String delimiter;
060    @XmlAttribute
061    private Boolean parallelProcessing;
062    @XmlAttribute
063    private String strategyRef;
064    @XmlAttribute
065    private String strategyMethodName;
066    @XmlAttribute
067    private Boolean strategyMethodAllowNull;
068    @XmlAttribute
069    private String executorServiceRef;
070    @XmlAttribute
071    private Boolean stopOnException;
072    @XmlAttribute
073    private Boolean ignoreInvalidEndpoints;
074    @XmlAttribute
075    private Boolean streaming;
076    @XmlAttribute @Metadata(defaultValue = "0")
077    private Long timeout;
078    @XmlAttribute
079    private String onPrepareRef;
080    @XmlTransient
081    private Processor onPrepare;
082    @XmlAttribute
083    private Boolean shareUnitOfWork;
084    @XmlAttribute
085    private Integer cacheSize;
086    @XmlAttribute
087    private Boolean parallelAggregate;
088    @XmlAttribute
089    private Boolean stopOnAggregateException;
090
091    public RecipientListDefinition() {
092    }
093
094    public RecipientListDefinition(ExpressionDefinition expression) {
095        super(expression);
096    }
097
098    public RecipientListDefinition(Expression expression) {
099        super(expression);
100    }
101
102    @Override
103    public String toString() {
104        return "RecipientList[" + getExpression() + "]";
105    }
106
107    @Override
108    public String getShortName() {
109        return "recipientList";
110    }
111
112    @Override
113    public String getLabel() {
114        return "recipientList[" + getExpression() + "]";
115    }
116
117    @Override
118    public Processor createProcessor(RouteContext routeContext) throws Exception {
119        final Expression expression = getExpression().createExpression(routeContext);
120
121        boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
122        boolean isStreaming = getStreaming() != null && getStreaming();
123        boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate();
124        boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
125        boolean isStopOnException = getStopOnException() != null && getStopOnException();
126        boolean isIgnoreInvalidEndpoints = getIgnoreInvalidEndpoints() != null && getIgnoreInvalidEndpoints();
127        boolean isStopOnAggregateException = getStopOnAggregateException() != null && getStopOnAggregateException();
128
129        RecipientList answer;
130        if (delimiter != null) {
131            answer = new RecipientList(routeContext.getCamelContext(), expression, delimiter);
132        } else {
133            answer = new RecipientList(routeContext.getCamelContext(), expression);
134        }
135        answer.setAggregationStrategy(createAggregationStrategy(routeContext));
136        answer.setParallelProcessing(isParallelProcessing);
137        answer.setParallelAggregate(isParallelAggregate);
138        answer.setStreaming(isStreaming);
139        answer.setShareUnitOfWork(isShareUnitOfWork);
140        answer.setStopOnException(isStopOnException);
141        answer.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints);
142        answer.setStopOnAggregateException(isStopOnAggregateException);
143        if (getCacheSize() != null) {
144            answer.setCacheSize(getCacheSize());
145        }
146        if (onPrepareRef != null) {
147            onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
148        }
149        if (onPrepare != null) {
150            answer.setOnPrepare(onPrepare);
151        }
152        if (getTimeout() != null) {
153            answer.setTimeout(getTimeout());
154        }
155
156        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing);
157        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing);
158        answer.setExecutorService(threadPool);
159        answer.setShutdownExecutorService(shutdownThreadPool);
160        long timeout = getTimeout() != null ? getTimeout() : 0;
161        if (timeout > 0 && !isParallelProcessing) {
162            throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
163        }
164
165        // create a pipeline with two processors
166        // the first is the eval processor which evaluates the expression to use
167        // the second is the recipient list
168        List<Processor> pipe = new ArrayList<>(2);
169
170        // the eval processor must be wrapped in error handler, so in case there was an
171        // error during evaluation, the error handler can deal with it
172        // the recipient list is not in error handler, as its has its own special error handling
173        // when sending to the recipients individually
174        Processor evalProcessor = new EvaluateExpressionProcessor(expression);
175        evalProcessor = super.wrapInErrorHandler(routeContext, evalProcessor);
176
177        pipe.add(evalProcessor);
178        pipe.add(answer);
179
180        // wrap in nested pipeline so this appears as one processor
181        // (threads definition does this as well)
182        return new Pipeline(routeContext.getCamelContext(), pipe) {
183            @Override
184            public String toString() {
185                return "RecipientList[" + expression + "]";
186            }
187        };
188    }
189
190    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
191        AggregationStrategy strategy = getAggregationStrategy();
192        if (strategy == null && strategyRef != null) {
193            Object aggStrategy = routeContext.lookup(strategyRef, Object.class);
194            if (aggStrategy instanceof AggregationStrategy) {
195                strategy = (AggregationStrategy) aggStrategy;
196            } else if (aggStrategy != null) {
197                AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName());
198                if (getStrategyMethodAllowNull() != null) {
199                    adapter.setAllowNullNewExchange(getStrategyMethodAllowNull());
200                    adapter.setAllowNullOldExchange(getStrategyMethodAllowNull());
201                }
202                strategy = adapter;
203            } else {
204                throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
205            }
206        }
207
208        if (strategy == null) {
209            // default to use latest aggregation strategy
210            strategy = new UseLatestAggregationStrategy();
211        }
212
213        if (strategy instanceof CamelContextAware) {
214            ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
215        }
216
217        if (shareUnitOfWork != null && shareUnitOfWork) {
218            // wrap strategy in share unit of work
219            strategy = new ShareUnitOfWorkAggregationStrategy(strategy);
220        }
221
222        return strategy;
223    }
224
225    // Fluent API
226    // -------------------------------------------------------------------------
227
228    @Override
229    @SuppressWarnings("unchecked")
230    public Type end() {
231        // allow end() to return to previous type so you can continue in the DSL
232        return (Type) super.end();
233    }
234
235    /**
236     * Delimiter used if the Expression returned multiple endpoints. Can be turned off using the value <tt>false</tt>.
237     * <p/>
238     * The default value is ,
239     *
240     * @param delimiter the delimiter
241     * @return the builder
242     */
243    public RecipientListDefinition<Type> delimiter(String delimiter) {
244        setDelimiter(delimiter);
245        return this;
246    }
247
248    /**
249     * Sets the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList.
250     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
251     */
252    public RecipientListDefinition<Type> aggregationStrategy(AggregationStrategy aggregationStrategy) {
253        setAggregationStrategy(aggregationStrategy);
254        return this;
255    }
256
257    /**
258     * Sets a reference to the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList.
259     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
260     */
261    public RecipientListDefinition<Type> aggregationStrategyRef(String aggregationStrategyRef) {
262        setStrategyRef(aggregationStrategyRef);
263        return this;
264    }
265
266    /**
267     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
268     *
269     * @param  methodName the method name to call
270     * @return the builder
271     */
272    public RecipientListDefinition<Type> aggregationStrategyMethodName(String methodName) {
273        setStrategyMethodName(methodName);
274        return this;
275    }
276
277    /**
278     * If this option is false then the aggregate method is not used if there was no data to enrich.
279     * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy
280     *
281     * @return the builder
282     */
283    public RecipientListDefinition<Type> aggregationStrategyMethodAllowNull() {
284        setStrategyMethodAllowNull(true);
285        return this;
286    }
287
288    /**
289     * Ignore the invalidate endpoint exception when try to create a producer with that endpoint
290     *
291     * @return the builder
292     */
293    public RecipientListDefinition<Type> ignoreInvalidEndpoints() {
294        setIgnoreInvalidEndpoints(true);
295        return this;
296    }
297
298    /**
299     * If enabled then sending messages to the recipients occurs concurrently.
300     * Note the caller thread will still wait until all messages has been fully processed, before it continues.
301     * Its only the sending and processing the replies from the recipients which happens concurrently.
302     *
303     * @return the builder
304     */
305    public RecipientListDefinition<Type> parallelProcessing() {
306        setParallelProcessing(true);
307        return this;
308    }
309
310    /**
311     * If enabled then sending messages to the recipients occurs concurrently.
312     * Note the caller thread will still wait until all messages has been fully processed, before it continues.
313     * Its only the sending and processing the replies from the recipients which happens concurrently.
314     *
315     * @return the builder
316     */
317    public RecipientListDefinition<Type> parallelProcessing(boolean parallelProcessing) {
318        setParallelProcessing(parallelProcessing);
319        return this;
320    }
321
322    /**
323     * If enabled then the aggregate method on AggregationStrategy can be called concurrently.
324     * Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe.
325     * By default this is false meaning that Camel synchronizes the call to the aggregate method.
326     * Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe.
327     *
328     * @return the builder
329     */
330    public RecipientListDefinition<Type> parallelAggregate() {
331        setParallelAggregate(true);
332        return this;
333    }
334
335    /**
336     * If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used.
337     * Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used.
338     * Enabling this option allows to work around this behavior.
339     *
340     * The default value is <code>false</code> for the sake of backward compatibility.
341     *
342     * @return the builder
343     */
344    public RecipientListDefinition<Type> stopOnAggregateException() {
345        setStopOnAggregateException(true);
346        return this;
347    }
348
349    /**
350     * If enabled then Camel will process replies out-of-order, eg in the order they come back.
351     * If disabled, Camel will process replies in the same order as defined by the recipient list.
352     *
353     * @return the builder
354     */
355    public RecipientListDefinition<Type> streaming() {
356        setStreaming(true);
357        return this;
358    }
359
360    /**
361     * Will now stop further processing if an exception or failure occurred during processing of an
362     * {@link org.apache.camel.Exchange} and the caused exception will be thrown.
363     * <p/>
364     * Will also stop if processing the exchange failed (has a fault message) or an exception
365     * was thrown and handled by the error handler (such as using onException). In all situations
366     * the recipient list will stop further processing. This is the same behavior as in pipeline, which
367     * is used by the routing engine.
368     * <p/>
369     * The default behavior is to <b>not</b> stop but continue processing till the end
370     *
371     * @return the builder
372     */
373    public RecipientListDefinition<Type> stopOnException() {
374        setStopOnException(true);
375        return this;
376    }
377
378    /**
379     * To use a custom Thread Pool to be used for parallel processing.
380     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
381     */
382    public RecipientListDefinition<Type> executorService(ExecutorService executorService) {
383        setExecutorService(executorService);
384        return this;
385    }
386
387    /**
388     * Refers to a custom Thread Pool to be used for parallel processing.
389     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
390     */
391    public RecipientListDefinition<Type> executorServiceRef(String executorServiceRef) {
392        setExecutorServiceRef(executorServiceRef);
393        return this;
394    }
395
396    /**
397     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send.
398     * This can be used to deep-clone messages that should be send, or any custom logic needed before
399     * the exchange is send.
400     *
401     * @param onPrepare the processor
402     * @return the builder
403     */
404    public RecipientListDefinition<Type> onPrepare(Processor onPrepare) {
405        setOnPrepare(onPrepare);
406        return this;
407    }
408
409    /**
410     * Sets the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send using a fluent buidler.
411     */
412    public ProcessClause<RecipientListDefinition<Type>> onPrepare() {
413        ProcessClause<RecipientListDefinition<Type>> clause = new ProcessClause<>(this);
414        setOnPrepare(clause);
415        return clause;
416    }
417
418    /**
419     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
420     * This can be used to deep-clone messages that should be send, or any custom logic needed before
421     * the exchange is send.
422     *
423     * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
424     * @return the builder
425     */
426    public RecipientListDefinition<Type> onPrepareRef(String onPrepareRef) {
427        setOnPrepareRef(onPrepareRef);
428        return this;
429    }
430
431    /**
432     * Sets a total timeout specified in millis, when using parallel processing.
433     * If the Recipient List hasn't been able to send and process all replies within the given timeframe,
434     * then the timeout triggers and the Recipient List breaks out and continues.
435     * Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out.
436     * If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel
437     * to shut down in a graceful manner may continue to run. So use this option with a bit of care.
438     *
439     * @param timeout timeout in millis
440     * @return the builder
441     */
442    public RecipientListDefinition<Type> timeout(long timeout) {
443        setTimeout(timeout);
444        return this;
445    }
446
447    /**
448     * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages.
449     * Recipient List will by default not share unit of work between the parent exchange and each recipient exchange.
450     * This means each sub exchange has its own individual unit of work.
451     *
452     * @return the builder.
453     * @see org.apache.camel.spi.SubUnitOfWork
454     */
455    public RecipientListDefinition<Type> shareUnitOfWork() {
456        setShareUnitOfWork(true);
457        return this;
458    }
459
460    /**
461     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
462     * to cache and reuse producers when using this recipient list, when uris are reused.
463     *
464     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
465     * @return the builder
466     */
467    public RecipientListDefinition<Type> cacheSize(int cacheSize) {
468        setCacheSize(cacheSize);
469        return this;
470    }
471
472    // Properties
473    //-------------------------------------------------------------------------
474
475
476    /**
477     * Expression that returns which endpoints (url) to send the message to (the recipients).
478     * If the expression return an empty value then the message is not sent to any recipients.
479     */
480    @Override
481    public void setExpression(ExpressionDefinition expression) {
482        // override to include javadoc what the expression is used for
483        super.setExpression(expression);
484    }
485
486    public String getDelimiter() {
487        return delimiter;
488    }
489
490    public void setDelimiter(String delimiter) {
491        this.delimiter = delimiter;
492    }
493
494    public Boolean getParallelProcessing() {
495        return parallelProcessing;
496    }
497
498    public void setParallelProcessing(Boolean parallelProcessing) {
499        this.parallelProcessing = parallelProcessing;
500    }
501
502    public String getStrategyRef() {
503        return strategyRef;
504    }
505
506    /**
507     * Sets a reference to the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList.
508     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
509     */
510    public void setStrategyRef(String strategyRef) {
511        this.strategyRef = strategyRef;
512    }
513
514    public String getStrategyMethodName() {
515        return strategyMethodName;
516    }
517
518    /**
519     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
520     */
521    public void setStrategyMethodName(String strategyMethodName) {
522        this.strategyMethodName = strategyMethodName;
523    }
524
525    public Boolean getStrategyMethodAllowNull() {
526        return strategyMethodAllowNull;
527    }
528
529    /**
530     * If this option is false then the aggregate method is not used if there was no data to enrich.
531     * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy
532     */
533    public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) {
534        this.strategyMethodAllowNull = strategyMethodAllowNull;
535    }
536
537    public String getExecutorServiceRef() {
538        return executorServiceRef;
539    }
540
541    public void setExecutorServiceRef(String executorServiceRef) {
542        this.executorServiceRef = executorServiceRef;
543    }
544
545    public Boolean getIgnoreInvalidEndpoints() {
546        return ignoreInvalidEndpoints;
547    }
548
549    public void setIgnoreInvalidEndpoints(Boolean ignoreInvalidEndpoints) {
550        this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
551    }
552
553    public Boolean getStopOnException() {
554        return stopOnException;
555    }
556
557    public void setStopOnException(Boolean stopOnException) {
558        this.stopOnException = stopOnException;
559    }
560
561    public AggregationStrategy getAggregationStrategy() {
562        return aggregationStrategy;
563    }
564
565    /**
566     * Sets the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList.
567     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
568     */
569    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
570        this.aggregationStrategy = aggregationStrategy;
571    }
572
573    public ExecutorService getExecutorService() {
574        return executorService;
575    }
576
577    public void setExecutorService(ExecutorService executorService) {
578        this.executorService = executorService;
579    }
580
581    public Boolean getStreaming() {
582        return streaming;
583    }
584
585    public void setStreaming(Boolean streaming) {
586        this.streaming = streaming;
587    }
588
589    public Long getTimeout() {
590        return timeout;
591    }
592
593    public void setTimeout(Long timeout) {
594        this.timeout = timeout;
595    }
596
597    public String getOnPrepareRef() {
598        return onPrepareRef;
599    }
600
601    public void setOnPrepareRef(String onPrepareRef) {
602        this.onPrepareRef = onPrepareRef;
603    }
604
605    public Processor getOnPrepare() {
606        return onPrepare;
607    }
608
609    public void setOnPrepare(Processor onPrepare) {
610        this.onPrepare = onPrepare;
611    }
612
613    public Boolean getShareUnitOfWork() {
614        return shareUnitOfWork;
615    }
616
617    public void setShareUnitOfWork(Boolean shareUnitOfWork) {
618        this.shareUnitOfWork = shareUnitOfWork;
619    }
620
621    public Integer getCacheSize() {
622        return cacheSize;
623    }
624
625    public void setCacheSize(Integer cacheSize) {
626        this.cacheSize = cacheSize;
627    }
628
629    public Boolean getParallelAggregate() {
630        return parallelAggregate;
631    }
632
633    public void setParallelAggregate(Boolean parallelAggregate) {
634        this.parallelAggregate = parallelAggregate;
635    }
636
637    public Boolean getStopOnAggregateException() {
638        return stopOnAggregateException;
639    }
640
641    public void setStopOnAggregateException(Boolean stopOnAggregateException) {
642        this.stopOnAggregateException = stopOnAggregateException;
643    }
644}