001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.ExecutorService;
022
023import javax.xml.bind.annotation.XmlAccessType;
024import javax.xml.bind.annotation.XmlAccessorType;
025import javax.xml.bind.annotation.XmlAttribute;
026import javax.xml.bind.annotation.XmlRootElement;
027import javax.xml.bind.annotation.XmlTransient;
028
029import org.apache.camel.CamelContextAware;
030import org.apache.camel.Processor;
031import org.apache.camel.builder.AggregationStrategyClause;
032import org.apache.camel.builder.ProcessClause;
033import org.apache.camel.processor.MulticastProcessor;
034import org.apache.camel.processor.aggregate.AggregationStrategy;
035import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
036import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
037import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
038import org.apache.camel.spi.Metadata;
039import org.apache.camel.spi.RouteContext;
040import org.apache.camel.util.CamelContextHelper;
041
042/**
043 *  Routes the same message to multiple paths either sequentially or in parallel.
044 *
045 * @version 
046 */
047@Metadata(label = "eip,routing")
048@XmlRootElement(name = "multicast")
049@XmlAccessorType(XmlAccessType.FIELD)
050public class MulticastDefinition extends OutputDefinition<MulticastDefinition> implements ExecutorServiceAwareDefinition<MulticastDefinition> {
051    @XmlAttribute
052    private Boolean parallelProcessing;
053    @XmlAttribute
054    private String strategyRef;
055    @XmlAttribute
056    private String strategyMethodName;
057    @XmlAttribute
058    private Boolean strategyMethodAllowNull;
059    @XmlTransient
060    private ExecutorService executorService;
061    @XmlAttribute
062    private String executorServiceRef;
063    @XmlAttribute
064    private Boolean streaming;
065    @XmlAttribute
066    private Boolean stopOnException;
067    @XmlAttribute @Metadata(defaultValue = "0")
068    private Long timeout;
069    @XmlTransient
070    private AggregationStrategy aggregationStrategy;
071    @XmlAttribute
072    private String onPrepareRef;
073    @XmlTransient
074    private Processor onPrepare;
075    @XmlAttribute
076    private Boolean shareUnitOfWork;
077    @XmlAttribute
078    private Boolean parallelAggregate;
079    @XmlAttribute
080    private Boolean stopOnAggregateException;
081
082    public MulticastDefinition() {
083    }
084
085    @Override
086    public String toString() {
087        return "Multicast[" + getOutputs() + "]";
088    }
089
090    @Override
091    public String getShortName() {
092        return "multicast";
093    }
094
095    @Override
096    public String getLabel() {
097        return "multicast";
098    }
099    
100    @Override
101    public Processor createProcessor(RouteContext routeContext) throws Exception {
102        Processor answer = this.createChildProcessor(routeContext, true);
103
104        // force the answer as a multicast processor even if there is only one child processor in the multicast
105        if (!(answer instanceof MulticastProcessor)) {
106            List<Processor> list = new ArrayList<>(1);
107            list.add(answer);
108            answer = createCompositeProcessor(routeContext, list);
109        }
110        return answer;
111    }
112
113    // Fluent API
114    // -------------------------------------------------------------------------
115
116    /**
117     * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast using a fluent builder.
118     */
119    public AggregationStrategyClause<MulticastDefinition> aggregationStrategy() {
120        AggregationStrategyClause<MulticastDefinition> clause = new AggregationStrategyClause<>(this);
121        setAggregationStrategy(clause);
122        return clause;
123    }
124
125    /**
126     * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
127     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
128     * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
129     * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.
130     */
131    public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
132        setAggregationStrategy(aggregationStrategy);
133        return this;
134    }
135
136    /**
137     * Sets a reference to the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
138     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
139     * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
140     * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.
141     */
142    public MulticastDefinition aggregationStrategyRef(String aggregationStrategyRef) {
143        setStrategyRef(aggregationStrategyRef);
144        return this;
145    }
146
147    /**
148     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
149     *
150     * @param  methodName the method name to call
151     * @return the builder
152     */
153    public MulticastDefinition aggregationStrategyMethodName(String methodName) {
154        setStrategyMethodName(methodName);
155        return this;
156    }
157
158    /**
159     * If this option is false then the aggregate method is not used if there was no data to enrich.
160     * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy
161     *
162     * @return the builder
163     */
164    public MulticastDefinition aggregationStrategyMethodAllowNull() {
165        setStrategyMethodAllowNull(true);
166        return this;
167    }
168
169    /**
170     * If enabled then sending messages to the multicasts occurs concurrently.
171     * Note the caller thread will still wait until all messages has been fully processed, before it continues.
172     * Its only the sending and processing the replies from the multicasts which happens concurrently.
173     *
174     * @return the builder
175     */
176    public MulticastDefinition parallelProcessing() {
177        setParallelProcessing(true);
178        return this;
179    }
180
181    /**
182     * If enabled then sending messages to the multicasts occurs concurrently.
183     * Note the caller thread will still wait until all messages has been fully processed, before it continues.
184     * Its only the sending and processing the replies from the multicasts which happens concurrently.
185     *
186     * @return the builder
187     */
188    public MulticastDefinition parallelProcessing(boolean parallelProcessing) {
189        setParallelProcessing(parallelProcessing);
190        return this;
191    }
192
193    /**
194     * If enabled then the aggregate method on AggregationStrategy can be called concurrently.
195     * Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe.
196     * By default this is false meaning that Camel synchronizes the call to the aggregate method.
197     * Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe.
198     *
199     * @return the builder
200     */
201    public MulticastDefinition parallelAggregate() {
202        setParallelAggregate(true);
203        return this;
204    }
205    
206    /**
207     * If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used.
208     * Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used.
209     * Enabling this option allows to work around this behavior.
210     *
211     * The default value is <code>false</code> for the sake of backward compatibility.
212     *
213     * @return the builder
214     */
215    public MulticastDefinition stopOnAggregateException() {
216        setStopOnAggregateException(true);
217        return this;
218    }
219
220    /**
221     * If enabled then Camel will process replies out-of-order, eg in the order they come back.
222     * If disabled, Camel will process replies in the same order as defined by the multicast.
223     *
224     * @return the builder
225     */
226    public MulticastDefinition streaming() {
227        setStreaming(true);
228        return this;
229    }
230
231    /**
232     * Will now stop further processing if an exception or failure occurred during processing of an
233     * {@link org.apache.camel.Exchange} and the caused exception will be thrown.
234     * <p/>
235     * Will also stop if processing the exchange failed (has a fault message) or an exception
236     * was thrown and handled by the error handler (such as using onException). In all situations
237     * the multicast will stop further processing. This is the same behavior as in pipeline, which
238     * is used by the routing engine.
239     * <p/>
240     * The default behavior is to <b>not</b> stop but continue processing till the end
241     *
242     * @return the builder
243     */
244    public MulticastDefinition stopOnException() {
245        setStopOnException(true);
246        return this;
247    }
248
249    /**
250     * To use a custom Thread Pool to be used for parallel processing.
251     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
252     */
253    public MulticastDefinition executorService(ExecutorService executorService) {
254        setExecutorService(executorService);
255        return this;
256    }
257
258    /**
259     * Refers to a custom Thread Pool to be used for parallel processing.
260     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
261     */
262    public MulticastDefinition executorServiceRef(String executorServiceRef) {
263        setExecutorServiceRef(executorServiceRef);
264        return this;
265    }
266
267    /**
268     * Set the {@link Processor} to use when preparing the {@link org.apache.camel.Exchange} to be send using a fluent builder.
269     */
270    public ProcessClause<MulticastDefinition> onPrepare() {
271        ProcessClause<MulticastDefinition> clause = new ProcessClause<>(this);
272        setOnPrepare(clause);
273        return clause;
274    }
275
276    /**
277     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
278     * This can be used to deep-clone messages that should be send, or any custom logic needed before
279     * the exchange is send.
280     *
281     * @param onPrepare the processor
282     * @return the builder
283     */
284    public MulticastDefinition onPrepare(Processor onPrepare) {
285        setOnPrepare(onPrepare);
286        return this;
287    }
288
289    /**
290     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
291     * This can be used to deep-clone messages that should be send, or any custom logic needed before
292     * the exchange is send.
293     *
294     * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
295     * @return the builder
296     */
297    public MulticastDefinition onPrepareRef(String onPrepareRef) {
298        setOnPrepareRef(onPrepareRef);
299        return this;
300    }
301
302    /**
303     * Sets a total timeout specified in millis, when using parallel processing.
304     * If the Multicast hasn't been able to send and process all replies within the given timeframe,
305     * then the timeout triggers and the Multicast breaks out and continues.
306     * Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out.
307     * If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel
308     * to shut down in a graceful manner may continue to run. So use this option with a bit of care.
309     *
310     * @param timeout timeout in millis
311     * @return the builder
312     */
313    public MulticastDefinition timeout(long timeout) {
314        setTimeout(timeout);
315        return this;
316    }
317
318    /**
319     * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages.
320     * Multicast will by default not share unit of work between the parent exchange and each multicasted exchange.
321     * This means each sub exchange has its own individual unit of work.
322     *
323     * @return the builder.
324     * @see org.apache.camel.spi.SubUnitOfWork
325     */
326    public MulticastDefinition shareUnitOfWork() {
327        setShareUnitOfWork(true);
328        return this;
329    }
330
331    protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception {
332        final AggregationStrategy strategy = createAggregationStrategy(routeContext);
333
334        boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
335        boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
336        boolean isStreaming = getStreaming() != null && getStreaming();
337        boolean isStopOnException = getStopOnException() != null && getStopOnException();
338        boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate();
339        boolean isStopOnAggregateException = getStopOnAggregateException() != null && getStopOnAggregateException();
340
341        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing);
342        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing);
343
344        long timeout = getTimeout() != null ? getTimeout() : 0;
345        if (timeout > 0 && !isParallelProcessing) {
346            throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
347        }
348        if (onPrepareRef != null) {
349            onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
350        }
351
352        MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, strategy, isParallelProcessing,
353                                      threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, isParallelAggregate, isStopOnAggregateException);
354        return answer;
355    }
356
357    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
358        AggregationStrategy strategy = getAggregationStrategy();
359        if (strategy == null && strategyRef != null) {
360            Object aggStrategy = routeContext.lookup(strategyRef, Object.class);
361            if (aggStrategy instanceof AggregationStrategy) {
362                strategy = (AggregationStrategy) aggStrategy;
363            } else if (aggStrategy != null) {
364                AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName());
365                if (getStrategyMethodAllowNull() != null) {
366                    adapter.setAllowNullNewExchange(getStrategyMethodAllowNull());
367                    adapter.setAllowNullOldExchange(getStrategyMethodAllowNull());
368                }
369                strategy = adapter;
370            } else {
371                throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
372            }
373        }
374
375        if (strategy == null) {
376            // default to use latest aggregation strategy
377            strategy = new UseLatestAggregationStrategy();
378        }
379
380        if (strategy instanceof CamelContextAware) {
381            ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
382        }
383
384        if (shareUnitOfWork != null && shareUnitOfWork) {
385            // wrap strategy in share unit of work
386            strategy = new ShareUnitOfWorkAggregationStrategy(strategy);
387        }
388
389        return strategy;
390    }
391
392    public AggregationStrategy getAggregationStrategy() {
393        return aggregationStrategy;
394    }
395
396    public MulticastDefinition setAggregationStrategy(AggregationStrategy aggregationStrategy) {
397        this.aggregationStrategy = aggregationStrategy;
398        return this;
399    }
400
401    public Boolean getParallelProcessing() {
402        return parallelProcessing;
403    }
404
405    public void setParallelProcessing(Boolean parallelProcessing) {
406        this.parallelProcessing = parallelProcessing;
407    }
408
409    public Boolean getStreaming() {
410        return streaming;
411    }
412
413    public void setStreaming(Boolean streaming) {
414        this.streaming = streaming;
415    }
416
417    public Boolean getStopOnException() {
418        return stopOnException;
419    }
420
421    public void setStopOnException(Boolean stopOnException) {
422        this.stopOnException = stopOnException;
423    }
424
425    public ExecutorService getExecutorService() {
426        return executorService;
427    }
428
429    public void setExecutorService(ExecutorService executorService) {
430        this.executorService = executorService;
431    }
432
433    public String getStrategyRef() {
434        return strategyRef;
435    }
436
437    /**
438     * Refers to an AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
439     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
440     */
441    public void setStrategyRef(String strategyRef) {
442        this.strategyRef = strategyRef;
443    }
444
445    public String getStrategyMethodName() {
446        return strategyMethodName;
447    }
448
449    /**
450     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
451     */
452    public void setStrategyMethodName(String strategyMethodName) {
453        this.strategyMethodName = strategyMethodName;
454    }
455
456    public Boolean getStrategyMethodAllowNull() {
457        return strategyMethodAllowNull;
458    }
459
460    /**
461     * If this option is false then the aggregate method is not used if there was no data to enrich.
462     * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy
463     */
464    public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) {
465        this.strategyMethodAllowNull = strategyMethodAllowNull;
466    }
467
468    public String getExecutorServiceRef() {
469        return executorServiceRef;
470    }
471
472    /**
473     * Refers to a custom Thread Pool to be used for parallel processing.
474     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
475     */
476    public void setExecutorServiceRef(String executorServiceRef) {
477        this.executorServiceRef = executorServiceRef;
478    }
479
480    public Long getTimeout() {
481        return timeout;
482    }
483
484    public void setTimeout(Long timeout) {
485        this.timeout = timeout;
486    }
487
488    public String getOnPrepareRef() {
489        return onPrepareRef;
490    }
491
492    public void setOnPrepareRef(String onPrepareRef) {
493        this.onPrepareRef = onPrepareRef;
494    }
495
496    public Processor getOnPrepare() {
497        return onPrepare;
498    }
499
500    public void setOnPrepare(Processor onPrepare) {
501        this.onPrepare = onPrepare;
502    }
503
504    public Boolean getShareUnitOfWork() {
505        return shareUnitOfWork;
506    }
507
508    public void setShareUnitOfWork(Boolean shareUnitOfWork) {
509        this.shareUnitOfWork = shareUnitOfWork;
510    }
511
512    public Boolean getParallelAggregate() {
513        return parallelAggregate;
514    }
515
516    public void setParallelAggregate(Boolean parallelAggregate) {
517        this.parallelAggregate = parallelAggregate;
518    }
519
520    public Boolean getStopOnAggregateException() {
521        return stopOnAggregateException;
522    }
523
524    public void setStopOnAggregateException(Boolean stopOnAggregateException) {
525        this.stopOnAggregateException = stopOnAggregateException;
526    }
527
528}