001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.ExecutorService;
022import javax.xml.bind.annotation.XmlAccessType;
023import javax.xml.bind.annotation.XmlAccessorType;
024import javax.xml.bind.annotation.XmlAttribute;
025import javax.xml.bind.annotation.XmlRootElement;
026import javax.xml.bind.annotation.XmlTransient;
027
028import org.apache.camel.CamelContextAware;
029import org.apache.camel.Expression;
030import org.apache.camel.Processor;
031import org.apache.camel.model.language.ExpressionDefinition;
032import org.apache.camel.processor.EvaluateExpressionProcessor;
033import org.apache.camel.processor.Pipeline;
034import org.apache.camel.processor.RecipientList;
035import org.apache.camel.processor.aggregate.AggregationStrategy;
036import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
037import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
038import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
039import org.apache.camel.spi.Metadata;
040import org.apache.camel.spi.RouteContext;
041import org.apache.camel.util.CamelContextHelper;
042
043/**
044 * Routes messages to a number of dynamically specified recipients (dynamic to)
045 *
046 * @version 
047 */
048@Metadata(label = "eip,endpoint,routing")
049@XmlRootElement(name = "recipientList")
050@XmlAccessorType(XmlAccessType.FIELD)
051public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> extends NoOutputExpressionNode implements ExecutorServiceAwareDefinition<RecipientListDefinition<Type>> {
052    @XmlTransient
053    private AggregationStrategy aggregationStrategy;
054    @XmlTransient
055    private ExecutorService executorService;
056    @XmlAttribute @Metadata(defaultValue = ",")
057    private String delimiter;
058    @XmlAttribute
059    private Boolean parallelProcessing;
060    @XmlAttribute
061    private String strategyRef;
062    @XmlAttribute
063    private String strategyMethodName;
064    @XmlAttribute
065    private Boolean strategyMethodAllowNull;
066    @XmlAttribute
067    private String executorServiceRef;
068    @XmlAttribute
069    private Boolean stopOnException;
070    @XmlAttribute
071    private Boolean ignoreInvalidEndpoints;
072    @XmlAttribute
073    private Boolean streaming;
074    @XmlAttribute @Metadata(defaultValue = "0")
075    private Long timeout;
076    @XmlAttribute
077    private String onPrepareRef;
078    @XmlTransient
079    private Processor onPrepare;
080    @XmlAttribute
081    private Boolean shareUnitOfWork;
082    @XmlAttribute
083    private Integer cacheSize;
084    @XmlAttribute
085    private Boolean parallelAggregate;
086
087    public RecipientListDefinition() {
088    }
089
090    public RecipientListDefinition(ExpressionDefinition expression) {
091        super(expression);
092    }
093
094    public RecipientListDefinition(Expression expression) {
095        super(expression);
096    }
097
098    @Override
099    public String toString() {
100        return "RecipientList[" + getExpression() + "]";
101    }
102
103    @Override
104    public String getLabel() {
105        return "recipientList[" + getExpression() + "]";
106    }
107
108    @Override
109    public Processor createProcessor(RouteContext routeContext) throws Exception {
110        final Expression expression = getExpression().createExpression(routeContext);
111
112        boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
113        boolean isStreaming = getStreaming() != null && getStreaming();
114        boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate();
115        boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
116        boolean isStopOnException = getStopOnException() != null && getStopOnException();
117        boolean isIgnoreInvalidEndpoints = getIgnoreInvalidEndpoints() != null && getIgnoreInvalidEndpoints();
118
119        RecipientList answer;
120        if (delimiter != null) {
121            answer = new RecipientList(routeContext.getCamelContext(), expression, delimiter);
122        } else {
123            answer = new RecipientList(routeContext.getCamelContext(), expression);
124        }
125        answer.setAggregationStrategy(createAggregationStrategy(routeContext));
126        answer.setParallelProcessing(isParallelProcessing);
127        answer.setParallelAggregate(isParallelAggregate);
128        answer.setStreaming(isStreaming);
129        answer.setShareUnitOfWork(isShareUnitOfWork);
130        answer.setStopOnException(isStopOnException);
131        answer.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints);
132        if (getCacheSize() != null) {
133            answer.setCacheSize(getCacheSize());
134        }
135        if (onPrepareRef != null) {
136            onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
137        }
138        if (onPrepare != null) {
139            answer.setOnPrepare(onPrepare);
140        }
141        if (getTimeout() != null) {
142            answer.setTimeout(getTimeout());
143        }
144
145        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing);
146        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing);
147        answer.setExecutorService(threadPool);
148        answer.setShutdownExecutorService(shutdownThreadPool);
149        long timeout = getTimeout() != null ? getTimeout() : 0;
150        if (timeout > 0 && !isParallelProcessing) {
151            throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
152        }
153
154        // create a pipeline with two processors
155        // the first is the eval processor which evaluates the expression to use
156        // the second is the recipient list
157        List<Processor> pipe = new ArrayList<Processor>(2);
158
159        // the eval processor must be wrapped in error handler, so in case there was an
160        // error during evaluation, the error handler can deal with it
161        // the recipient list is not in error handler, as its has its own special error handling
162        // when sending to the recipients individually
163        Processor evalProcessor = new EvaluateExpressionProcessor(expression);
164        evalProcessor = super.wrapInErrorHandler(routeContext, evalProcessor);
165
166        pipe.add(evalProcessor);
167        pipe.add(answer);
168
169        // wrap in nested pipeline so this appears as one processor
170        // (threads definition does this as well)
171        return new Pipeline(routeContext.getCamelContext(), pipe) {
172            @Override
173            public String toString() {
174                return "RecipientList[" + expression + "]";
175            }
176        };
177    }
178
179    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
180        AggregationStrategy strategy = getAggregationStrategy();
181        if (strategy == null && strategyRef != null) {
182            Object aggStrategy = routeContext.lookup(strategyRef, Object.class);
183            if (aggStrategy instanceof AggregationStrategy) {
184                strategy = (AggregationStrategy) aggStrategy;
185            } else if (aggStrategy != null) {
186                AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName());
187                if (getStrategyMethodAllowNull() != null) {
188                    adapter.setAllowNullNewExchange(getStrategyMethodAllowNull());
189                    adapter.setAllowNullOldExchange(getStrategyMethodAllowNull());
190                }
191                strategy = adapter;
192            } else {
193                throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
194            }
195        }
196
197        if (strategy == null) {
198            // default to use latest aggregation strategy
199            strategy = new UseLatestAggregationStrategy();
200        }
201
202        if (strategy instanceof CamelContextAware) {
203            ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
204        }
205
206        if (shareUnitOfWork != null && shareUnitOfWork) {
207            // wrap strategy in share unit of work
208            strategy = new ShareUnitOfWorkAggregationStrategy(strategy);
209        }
210
211        return strategy;
212    }
213
214    // Fluent API
215    // -------------------------------------------------------------------------
216
217    @Override
218    @SuppressWarnings("unchecked")
219    public Type end() {
220        // allow end() to return to previous type so you can continue in the DSL
221        return (Type) super.end();
222    }
223
224    /**
225     * Delimiter used if the Expression returned multiple endpoints. Can be turned off using the value <tt>false</tt>.
226     * <p/>
227     * The default value is ,
228     *
229     * @param delimiter the delimiter
230     * @return the builder
231     */
232    public RecipientListDefinition<Type> delimiter(String delimiter) {
233        setDelimiter(delimiter);
234        return this;
235    }
236
237    /**
238     * Sets the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList.
239     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
240     */
241    public RecipientListDefinition<Type> aggregationStrategy(AggregationStrategy aggregationStrategy) {
242        setAggregationStrategy(aggregationStrategy);
243        return this;
244    }
245
246    /**
247     * Sets a reference to the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList.
248     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
249     */
250    public RecipientListDefinition<Type> aggregationStrategyRef(String aggregationStrategyRef) {
251        setStrategyRef(aggregationStrategyRef);
252        return this;
253    }
254
255    /**
256     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
257     *
258     * @param  methodName the method name to call
259     * @return the builder
260     */
261    public RecipientListDefinition<Type> aggregationStrategyMethodName(String methodName) {
262        setStrategyMethodName(methodName);
263        return this;
264    }
265
266    /**
267     * If this option is false then the aggregate method is not used if there was no data to enrich.
268     * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy
269     *
270     * @return the builder
271     */
272    public RecipientListDefinition<Type> aggregationStrategyMethodAllowNull() {
273        setStrategyMethodAllowNull(true);
274        return this;
275    }
276
277    /**
278     * Ignore the invalidate endpoint exception when try to create a producer with that endpoint
279     *
280     * @return the builder
281     */
282    public RecipientListDefinition<Type> ignoreInvalidEndpoints() {
283        setIgnoreInvalidEndpoints(true);
284        return this;
285    }
286
287    /**
288     * If enabled then sending messages to the recipients occurs concurrently.
289     * Note the caller thread will still wait until all messages has been fully processed, before it continues.
290     * Its only the sending and processing the replies from the recipients which happens concurrently.
291     *
292     * @return the builder
293     */
294    public RecipientListDefinition<Type> parallelProcessing() {
295        setParallelProcessing(true);
296        return this;
297    }
298
299    /**
300     * If enabled then sending messages to the recipients occurs concurrently.
301     * Note the caller thread will still wait until all messages has been fully processed, before it continues.
302     * Its only the sending and processing the replies from the recipients which happens concurrently.
303     *
304     * @return the builder
305     */
306    public RecipientListDefinition<Type> parallelProcessing(boolean parallelProcessing) {
307        setParallelProcessing(parallelProcessing);
308        return this;
309    }
310
311    /**
312     * If enabled then the aggregate method on AggregationStrategy can be called concurrently.
313     * Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe.
314     * By default this is false meaning that Camel synchronizes the call to the aggregate method.
315     * Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe.
316     *
317     * @return the builder
318     */
319    public RecipientListDefinition<Type> parallelAggregate() {
320        setParallelAggregate(true);
321        return this;
322    }
323
324    /**
325     * If enabled then Camel will process replies out-of-order, eg in the order they come back.
326     * If disabled, Camel will process replies in the same order as defined by the recipient list.
327     *
328     * @return the builder
329     */
330    public RecipientListDefinition<Type> streaming() {
331        setStreaming(true);
332        return this;
333    }
334
335    /**
336     * Will now stop further processing if an exception or failure occurred during processing of an
337     * {@link org.apache.camel.Exchange} and the caused exception will be thrown.
338     * <p/>
339     * Will also stop if processing the exchange failed (has a fault message) or an exception
340     * was thrown and handled by the error handler (such as using onException). In all situations
341     * the recipient list will stop further processing. This is the same behavior as in pipeline, which
342     * is used by the routing engine.
343     * <p/>
344     * The default behavior is to <b>not</b> stop but continue processing till the end
345     *
346     * @return the builder
347     */
348    public RecipientListDefinition<Type> stopOnException() {
349        setStopOnException(true);
350        return this;
351    }
352
353    /**
354     * To use a custom Thread Pool to be used for parallel processing.
355     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
356     */
357    public RecipientListDefinition<Type> executorService(ExecutorService executorService) {
358        setExecutorService(executorService);
359        return this;
360    }
361
362    /**
363     * Refers to a custom Thread Pool to be used for parallel processing.
364     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
365     */
366    public RecipientListDefinition<Type> executorServiceRef(String executorServiceRef) {
367        setExecutorServiceRef(executorServiceRef);
368        return this;
369    }
370
371    /**
372     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send.
373     * This can be used to deep-clone messages that should be send, or any custom logic needed before
374     * the exchange is send.
375     *
376     * @param onPrepare the processor
377     * @return the builder
378     */
379    public RecipientListDefinition<Type> onPrepare(Processor onPrepare) {
380        setOnPrepare(onPrepare);
381        return this;
382    }
383
384    /**
385     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
386     * This can be used to deep-clone messages that should be send, or any custom logic needed before
387     * the exchange is send.
388     *
389     * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
390     * @return the builder
391     */
392    public RecipientListDefinition<Type> onPrepareRef(String onPrepareRef) {
393        setOnPrepareRef(onPrepareRef);
394        return this;
395    }
396
397    /**
398     * Sets a total timeout specified in millis, when using parallel processing.
399     * If the Recipient List hasn't been able to send and process all replies within the given timeframe,
400     * then the timeout triggers and the Recipient List breaks out and continues.
401     * Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out.
402     * If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel
403     * to shut down in a graceful manner may continue to run. So use this option with a bit of care.
404     *
405     * @param timeout timeout in millis
406     * @return the builder
407     */
408    public RecipientListDefinition<Type> timeout(long timeout) {
409        setTimeout(timeout);
410        return this;
411    }
412
413    /**
414     * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages.
415     * Recipient List will by default not share unit of work between the parent exchange and each recipient exchange.
416     * This means each sub exchange has its own individual unit of work.
417     *
418     * @return the builder.
419     * @see org.apache.camel.spi.SubUnitOfWork
420     */
421    public RecipientListDefinition<Type> shareUnitOfWork() {
422        setShareUnitOfWork(true);
423        return this;
424    }
425
426    /**
427     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
428     * to cache and reuse producers when using this recipient list, when uris are reused.
429     *
430     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
431     * @return the builder
432     */
433    public RecipientListDefinition<Type> cacheSize(int cacheSize) {
434        setCacheSize(cacheSize);
435        return this;
436    }
437
438    // Properties
439    //-------------------------------------------------------------------------
440
441
442    /**
443     * Expression that returns which endpoints (url) to send the message to (the recipients).
444     * If the expression return an empty value then the message is not sent to any recipients.
445     */
446    @Override
447    public void setExpression(ExpressionDefinition expression) {
448        // override to include javadoc what the expression is used for
449        super.setExpression(expression);
450    }
451
452    public String getDelimiter() {
453        return delimiter;
454    }
455
456    public void setDelimiter(String delimiter) {
457        this.delimiter = delimiter;
458    }
459
460    public Boolean getParallelProcessing() {
461        return parallelProcessing;
462    }
463
464    public void setParallelProcessing(Boolean parallelProcessing) {
465        this.parallelProcessing = parallelProcessing;
466    }
467
468    public String getStrategyRef() {
469        return strategyRef;
470    }
471
472    /**
473     * Sets a reference to the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList.
474     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
475     */
476    public void setStrategyRef(String strategyRef) {
477        this.strategyRef = strategyRef;
478    }
479
480    public String getStrategyMethodName() {
481        return strategyMethodName;
482    }
483
484    /**
485     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
486     */
487    public void setStrategyMethodName(String strategyMethodName) {
488        this.strategyMethodName = strategyMethodName;
489    }
490
491    public Boolean getStrategyMethodAllowNull() {
492        return strategyMethodAllowNull;
493    }
494
495    /**
496     * If this option is false then the aggregate method is not used if there was no data to enrich.
497     * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy
498     */
499    public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) {
500        this.strategyMethodAllowNull = strategyMethodAllowNull;
501    }
502
503    public String getExecutorServiceRef() {
504        return executorServiceRef;
505    }
506
507    public void setExecutorServiceRef(String executorServiceRef) {
508        this.executorServiceRef = executorServiceRef;
509    }
510
511    public Boolean getIgnoreInvalidEndpoints() {
512        return ignoreInvalidEndpoints;
513    }
514
515    public void setIgnoreInvalidEndpoints(Boolean ignoreInvalidEndpoints) {
516        this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
517    }
518
519    public Boolean getStopOnException() {
520        return stopOnException;
521    }
522
523    public void setStopOnException(Boolean stopOnException) {
524        this.stopOnException = stopOnException;
525    }
526
527    public AggregationStrategy getAggregationStrategy() {
528        return aggregationStrategy;
529    }
530
531    /**
532     * Sets the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList.
533     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
534     */
535    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
536        this.aggregationStrategy = aggregationStrategy;
537    }
538
539    public ExecutorService getExecutorService() {
540        return executorService;
541    }
542
543    public void setExecutorService(ExecutorService executorService) {
544        this.executorService = executorService;
545    }
546
547    public Boolean getStreaming() {
548        return streaming;
549    }
550
551    public void setStreaming(Boolean streaming) {
552        this.streaming = streaming;
553    }
554
555    public Long getTimeout() {
556        return timeout;
557    }
558
559    public void setTimeout(Long timeout) {
560        this.timeout = timeout;
561    }
562
563    public String getOnPrepareRef() {
564        return onPrepareRef;
565    }
566
567    public void setOnPrepareRef(String onPrepareRef) {
568        this.onPrepareRef = onPrepareRef;
569    }
570
571    public Processor getOnPrepare() {
572        return onPrepare;
573    }
574
575    public void setOnPrepare(Processor onPrepare) {
576        this.onPrepare = onPrepare;
577    }
578
579    public Boolean getShareUnitOfWork() {
580        return shareUnitOfWork;
581    }
582
583    public void setShareUnitOfWork(Boolean shareUnitOfWork) {
584        this.shareUnitOfWork = shareUnitOfWork;
585    }
586
587    public Integer getCacheSize() {
588        return cacheSize;
589    }
590
591    public void setCacheSize(Integer cacheSize) {
592        this.cacheSize = cacheSize;
593    }
594
595    public Boolean getParallelAggregate() {
596        return parallelAggregate;
597    }
598
599    public void setParallelAggregate(Boolean parallelAggregate) {
600        this.parallelAggregate = parallelAggregate;
601    }
602}