001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.List;
021import java.util.concurrent.ExecutorService;
022import javax.xml.bind.annotation.XmlAccessType;
023import javax.xml.bind.annotation.XmlAccessorType;
024import javax.xml.bind.annotation.XmlAttribute;
025import javax.xml.bind.annotation.XmlRootElement;
026import javax.xml.bind.annotation.XmlTransient;
027
028import org.apache.camel.CamelContextAware;
029import org.apache.camel.Processor;
030import org.apache.camel.processor.CamelInternalProcessor;
031import org.apache.camel.processor.MulticastProcessor;
032import org.apache.camel.processor.aggregate.AggregationStrategy;
033import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
034import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
035import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
036import org.apache.camel.spi.Metadata;
037import org.apache.camel.spi.RouteContext;
038import org.apache.camel.util.CamelContextHelper;
039
040/**
041 *  Routes the same message to multiple paths either sequentially or in parallel.
042 *
043 * @version 
044 */
045@Metadata(label = "eip,routing")
046@XmlRootElement(name = "multicast")
047@XmlAccessorType(XmlAccessType.FIELD)
048public class MulticastDefinition extends OutputDefinition<MulticastDefinition> implements ExecutorServiceAwareDefinition<MulticastDefinition> {
049    @XmlAttribute
050    private Boolean parallelProcessing;
051    @XmlAttribute
052    private String strategyRef;
053    @XmlAttribute
054    private String strategyMethodName;
055    @XmlAttribute
056    private Boolean strategyMethodAllowNull;
057    @XmlTransient
058    private ExecutorService executorService;
059    @XmlAttribute
060    private String executorServiceRef;
061    @XmlAttribute
062    private Boolean streaming;
063    @XmlAttribute
064    private Boolean stopOnException;
065    @XmlAttribute @Metadata(defaultValue = "0")
066    private Long timeout;
067    @XmlTransient
068    private AggregationStrategy aggregationStrategy;
069    @XmlAttribute
070    private String onPrepareRef;
071    @XmlTransient
072    private Processor onPrepare;
073    @XmlAttribute
074    private Boolean shareUnitOfWork;
075    @XmlAttribute
076    private Boolean parallelAggregate;
077
078    public MulticastDefinition() {
079    }
080
081    @Override
082    public String toString() {
083        return "Multicast[" + getOutputs() + "]";
084    }
085
086    @Override
087    public String getLabel() {
088        return "multicast";
089    }
090    
091    @Override
092    public Processor createProcessor(RouteContext routeContext) throws Exception {
093        Processor answer = this.createChildProcessor(routeContext, true);
094
095        // force the answer as a multicast processor even if there is only one child processor in the multicast
096        if (!(answer instanceof MulticastProcessor)) {
097            List<Processor> list = new ArrayList<Processor>(1);
098            list.add(answer);
099            answer = createCompositeProcessor(routeContext, list);
100        }
101        return answer;
102    }
103
104    // Fluent API
105    // -------------------------------------------------------------------------
106
107    /**
108     * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
109     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy.
110     * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
111     * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.
112     */
113    public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
114        setAggregationStrategy(aggregationStrategy);
115        return this;
116    }
117
118    /**
119     * Sets a reference to the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
120     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
121     * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception
122     * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option.
123     */
124    public MulticastDefinition aggregationStrategyRef(String aggregationStrategyRef) {
125        setStrategyRef(aggregationStrategyRef);
126        return this;
127    }
128
129    /**
130     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
131     *
132     * @param  methodName the method name to call
133     * @return the builder
134     */
135    public MulticastDefinition aggregationStrategyMethodName(String methodName) {
136        setStrategyMethodName(methodName);
137        return this;
138    }
139
140    /**
141     * If this option is false then the aggregate method is not used if there was no data to enrich.
142     * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy
143     *
144     * @return the builder
145     */
146    public MulticastDefinition aggregationStrategyMethodAllowNull() {
147        setStrategyMethodAllowNull(true);
148        return this;
149    }
150
151    /**
152     * If enabled then sending messages to the multicasts occurs concurrently.
153     * Note the caller thread will still wait until all messages has been fully processed, before it continues.
154     * Its only the sending and processing the replies from the multicasts which happens concurrently.
155     *
156     * @return the builder
157     */
158    public MulticastDefinition parallelProcessing() {
159        setParallelProcessing(true);
160        return this;
161    }
162
163    /**
164     * If enabled then sending messages to the multicasts occurs concurrently.
165     * Note the caller thread will still wait until all messages has been fully processed, before it continues.
166     * Its only the sending and processing the replies from the multicasts which happens concurrently.
167     *
168     * @return the builder
169     */
170    public MulticastDefinition parallelProcessing(boolean parallelProcessing) {
171        setParallelProcessing(parallelProcessing);
172        return this;
173    }
174
175    /**
176     * If enabled then the aggregate method on AggregationStrategy can be called concurrently.
177     * Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe.
178     * By default this is false meaning that Camel synchronizes the call to the aggregate method.
179     * Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe.
180     *
181     * @return the builder
182     */
183    public MulticastDefinition parallelAggregate() {
184        setParallelAggregate(true);
185        return this;
186    }
187
188    /**
189     * If enabled then Camel will process replies out-of-order, eg in the order they come back.
190     * If disabled, Camel will process replies in the same order as defined by the multicast.
191     *
192     * @return the builder
193     */
194    public MulticastDefinition streaming() {
195        setStreaming(true);
196        return this;
197    }
198
199    /**
200     * Will now stop further processing if an exception or failure occurred during processing of an
201     * {@link org.apache.camel.Exchange} and the caused exception will be thrown.
202     * <p/>
203     * Will also stop if processing the exchange failed (has a fault message) or an exception
204     * was thrown and handled by the error handler (such as using onException). In all situations
205     * the multicast will stop further processing. This is the same behavior as in pipeline, which
206     * is used by the routing engine.
207     * <p/>
208     * The default behavior is to <b>not</b> stop but continue processing till the end
209     *
210     * @return the builder
211     */
212    public MulticastDefinition stopOnException() {
213        setStopOnException(true);
214        return this;
215    }
216
217    /**
218     * To use a custom Thread Pool to be used for parallel processing.
219     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
220     */
221    public MulticastDefinition executorService(ExecutorService executorService) {
222        setExecutorService(executorService);
223        return this;
224    }
225
226    /**
227     * Refers to a custom Thread Pool to be used for parallel processing.
228     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
229     */
230    public MulticastDefinition executorServiceRef(String executorServiceRef) {
231        setExecutorServiceRef(executorServiceRef);
232        return this;
233    }
234
235    /**
236     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
237     * This can be used to deep-clone messages that should be send, or any custom logic needed before
238     * the exchange is send.
239     *
240     * @param onPrepare the processor
241     * @return the builder
242     */
243    public MulticastDefinition onPrepare(Processor onPrepare) {
244        setOnPrepare(onPrepare);
245        return this;
246    }
247
248    /**
249     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
250     * This can be used to deep-clone messages that should be send, or any custom logic needed before
251     * the exchange is send.
252     *
253     * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
254     * @return the builder
255     */
256    public MulticastDefinition onPrepareRef(String onPrepareRef) {
257        setOnPrepareRef(onPrepareRef);
258        return this;
259    }
260
261    /**
262     * Sets a total timeout specified in millis, when using parallel processing.
263     * If the Multicast hasn't been able to send and process all replies within the given timeframe,
264     * then the timeout triggers and the Multicast breaks out and continues.
265     * Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out.
266     * If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel
267     * to shut down in a graceful manner may continue to run. So use this option with a bit of care.
268     *
269     * @param timeout timeout in millis
270     * @return the builder
271     */
272    public MulticastDefinition timeout(long timeout) {
273        setTimeout(timeout);
274        return this;
275    }
276
277    /**
278     * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages.
279     * Multicast will by default not share unit of work between the parent exchange and each multicasted exchange.
280     * This means each sub exchange has its own individual unit of work.
281     *
282     * @return the builder.
283     * @see org.apache.camel.spi.SubUnitOfWork
284     */
285    public MulticastDefinition shareUnitOfWork() {
286        setShareUnitOfWork(true);
287        return this;
288    }
289
290    protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception {
291        final AggregationStrategy strategy = createAggregationStrategy(routeContext);
292
293        boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
294        boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
295        boolean isStreaming = getStreaming() != null && getStreaming();
296        boolean isStopOnException = getStopOnException() != null && getStopOnException();
297        boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate();
298
299        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing);
300        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing);
301
302        long timeout = getTimeout() != null ? getTimeout() : 0;
303        if (timeout > 0 && !isParallelProcessing) {
304            throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
305        }
306        if (onPrepareRef != null) {
307            onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
308        }
309
310        MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, strategy, isParallelProcessing,
311                                      threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, isParallelAggregate);
312        return answer;
313    }
314
315    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
316        AggregationStrategy strategy = getAggregationStrategy();
317        if (strategy == null && strategyRef != null) {
318            Object aggStrategy = routeContext.lookup(strategyRef, Object.class);
319            if (aggStrategy instanceof AggregationStrategy) {
320                strategy = (AggregationStrategy) aggStrategy;
321            } else if (aggStrategy != null) {
322                AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName());
323                if (getStrategyMethodAllowNull() != null) {
324                    adapter.setAllowNullNewExchange(getStrategyMethodAllowNull());
325                    adapter.setAllowNullOldExchange(getStrategyMethodAllowNull());
326                }
327                strategy = adapter;
328            } else {
329                throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
330            }
331        }
332
333        if (strategy == null) {
334            // default to use latest aggregation strategy
335            strategy = new UseLatestAggregationStrategy();
336        }
337
338        if (strategy instanceof CamelContextAware) {
339            ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
340        }
341
342        if (shareUnitOfWork != null && shareUnitOfWork) {
343            // wrap strategy in share unit of work
344            strategy = new ShareUnitOfWorkAggregationStrategy(strategy);
345        }
346
347        return strategy;
348    }
349
350    public AggregationStrategy getAggregationStrategy() {
351        return aggregationStrategy;
352    }
353
354    public MulticastDefinition setAggregationStrategy(AggregationStrategy aggregationStrategy) {
355        this.aggregationStrategy = aggregationStrategy;
356        return this;
357    }
358
359    public Boolean getParallelProcessing() {
360        return parallelProcessing;
361    }
362
363    public void setParallelProcessing(Boolean parallelProcessing) {
364        this.parallelProcessing = parallelProcessing;
365    }
366
367    public Boolean getStreaming() {
368        return streaming;
369    }
370
371    public void setStreaming(Boolean streaming) {
372        this.streaming = streaming;
373    }
374
375    public Boolean getStopOnException() {
376        return stopOnException;
377    }
378
379    public void setStopOnException(Boolean stopOnException) {
380        this.stopOnException = stopOnException;
381    }
382
383    public ExecutorService getExecutorService() {
384        return executorService;
385    }
386
387    public void setExecutorService(ExecutorService executorService) {
388        this.executorService = executorService;
389    }
390
391    public String getStrategyRef() {
392        return strategyRef;
393    }
394
395    /**
396     * Refers to an AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast.
397     * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy
398     */
399    public void setStrategyRef(String strategyRef) {
400        this.strategyRef = strategyRef;
401    }
402
403    public String getStrategyMethodName() {
404        return strategyMethodName;
405    }
406
407    /**
408     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
409     */
410    public void setStrategyMethodName(String strategyMethodName) {
411        this.strategyMethodName = strategyMethodName;
412    }
413
414    public Boolean getStrategyMethodAllowNull() {
415        return strategyMethodAllowNull;
416    }
417
418    /**
419     * If this option is false then the aggregate method is not used if there was no data to enrich.
420     * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy
421     */
422    public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) {
423        this.strategyMethodAllowNull = strategyMethodAllowNull;
424    }
425
426    public String getExecutorServiceRef() {
427        return executorServiceRef;
428    }
429
430    /**
431     * Refers to a custom Thread Pool to be used for parallel processing.
432     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
433     */
434    public void setExecutorServiceRef(String executorServiceRef) {
435        this.executorServiceRef = executorServiceRef;
436    }
437
438    public Long getTimeout() {
439        return timeout;
440    }
441
442    public void setTimeout(Long timeout) {
443        this.timeout = timeout;
444    }
445
446    public String getOnPrepareRef() {
447        return onPrepareRef;
448    }
449
450    public void setOnPrepareRef(String onPrepareRef) {
451        this.onPrepareRef = onPrepareRef;
452    }
453
454    public Processor getOnPrepare() {
455        return onPrepare;
456    }
457
458    public void setOnPrepare(Processor onPrepare) {
459        this.onPrepare = onPrepare;
460    }
461
462    public Boolean getShareUnitOfWork() {
463        return shareUnitOfWork;
464    }
465
466    public void setShareUnitOfWork(Boolean shareUnitOfWork) {
467        this.shareUnitOfWork = shareUnitOfWork;
468    }
469
470    public Boolean getParallelAggregate() {
471        return parallelAggregate;
472    }
473
474    public void setParallelAggregate(Boolean parallelAggregate) {
475        this.parallelAggregate = parallelAggregate;
476    }
477
478}