001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.List;
020    import java.util.concurrent.ExecutorService;
021    
022    import javax.xml.bind.annotation.XmlAccessType;
023    import javax.xml.bind.annotation.XmlAccessorType;
024    import javax.xml.bind.annotation.XmlAttribute;
025    import javax.xml.bind.annotation.XmlRootElement;
026    import javax.xml.bind.annotation.XmlTransient;
027    
028    import org.apache.camel.Processor;
029    import org.apache.camel.processor.MulticastProcessor;
030    import org.apache.camel.processor.SubUnitOfWorkProcessor;
031    import org.apache.camel.processor.aggregate.AggregationStrategy;
032    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
033    import org.apache.camel.spi.RouteContext;
034    import org.apache.camel.util.CamelContextHelper;
035    
036    /**
037     * Represents an XML <multicast/> element
038     *
039     * @version 
040     */
041    @XmlRootElement(name = "multicast")
042    @XmlAccessorType(XmlAccessType.FIELD)
043    public class MulticastDefinition extends OutputDefinition<MulticastDefinition> implements ExecutorServiceAwareDefinition<MulticastDefinition> {
044        @XmlAttribute
045        private Boolean parallelProcessing;
046        @XmlAttribute
047        private String strategyRef;
048        @XmlTransient
049        private ExecutorService executorService;
050        @XmlAttribute
051        private String executorServiceRef;
052        @XmlAttribute
053        private Boolean streaming;
054        @XmlAttribute
055        private Boolean stopOnException;
056        @XmlAttribute
057        private Long timeout;
058        @XmlTransient
059        private AggregationStrategy aggregationStrategy;
060        @XmlAttribute
061        private String onPrepareRef;
062        @XmlTransient
063        private Processor onPrepare;
064        @XmlAttribute
065        private Boolean shareUnitOfWork;
066    
067        public MulticastDefinition() {
068        }
069    
070        @Override
071        public String toString() {
072            return "Multicast[" + getOutputs() + "]";
073        }
074    
075        @Override
076        public String getLabel() {
077            return "multicast";
078        }
079        
080        @Override
081        public String getShortName() {
082            return "multicast";
083        }
084    
085        @Override
086        public Processor createProcessor(RouteContext routeContext) throws Exception {
087            return this.createChildProcessor(routeContext, true);
088        }
089    
090        // Fluent API
091        // -------------------------------------------------------------------------
092    
093        /**
094         * Set the multicasting aggregationStrategy
095         *
096         * @return the builder
097         */
098        public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
099            setAggregationStrategy(aggregationStrategy);
100            return this;
101        }
102        
103        /**
104         * Set the aggregationStrategy
105         *
106         * @param aggregationStrategyRef a reference to a strategy to lookup
107         * @return the builder
108         */
109        public MulticastDefinition aggregationStrategyRef(String aggregationStrategyRef) {
110            setStrategyRef(aggregationStrategyRef);
111            return this;
112        }
113    
114        /**
115         * Uses the {@link java.util.concurrent.ExecutorService} to do the multicasting work
116         *     
117         * @return the builder
118         */
119        public MulticastDefinition parallelProcessing() {
120            setParallelProcessing(true);
121            return this;
122        }
123        
124        /**
125         * Aggregates the responses as the are done (e.g. out of order sequence)
126         *
127         * @return the builder
128         */
129        public MulticastDefinition streaming() {
130            setStreaming(true);
131            return this;
132        }
133    
134        /**
135         * Will now stop further processing if an exception or failure occurred during processing of an
136         * {@link org.apache.camel.Exchange} and the caused exception will be thrown.
137         * <p/>
138         * Will also stop if processing the exchange failed (has a fault message) or an exception
139         * was thrown and handled by the error handler (such as using onException). In all situations
140         * the multicast will stop further processing. This is the same behavior as in pipeline, which
141         * is used by the routing engine.
142         * <p/>
143         * The default behavior is to <b>not</b> stop but continue processing till the end
144         *
145         * @return the builder
146         */
147        public MulticastDefinition stopOnException() {
148            setStopOnException(true);
149            return this;
150        }
151        
152        public MulticastDefinition executorService(ExecutorService executorService) {
153            setExecutorService(executorService);
154            return this;
155        }
156        
157        public MulticastDefinition executorServiceRef(String executorServiceRef) {
158            setExecutorServiceRef(executorServiceRef);
159            return this;
160        }
161    
162        /**
163         * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
164         * This can be used to deep-clone messages that should be send, or any custom logic needed before
165         * the exchange is send.
166         *
167         * @param onPrepare the processor
168         * @return the builder
169         */
170        public MulticastDefinition onPrepare(Processor onPrepare) {
171            setOnPrepare(onPrepare);
172            return this;
173        }
174    
175        /**
176         * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
177         * This can be used to deep-clone messages that should be send, or any custom logic needed before
178         * the exchange is send.
179         *
180         * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
181         * @return the builder
182         */
183        public MulticastDefinition onPrepareRef(String onPrepareRef) {
184            setOnPrepareRef(onPrepareRef);
185            return this;
186        }
187    
188        /**
189         * Sets a timeout value in millis to use when using parallelProcessing.
190         *
191         * @param timeout timeout in millis
192         * @return the builder
193         */
194        public MulticastDefinition timeout(long timeout) {
195            setTimeout(timeout);
196            return this;
197        }
198    
199        /**
200         * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages.
201         *
202         * @return the builder.
203         * @see org.apache.camel.spi.SubUnitOfWork
204         */
205        public MulticastDefinition shareUnitOfWork() {
206            setShareUnitOfWork(true);
207            return this;
208        }
209    
210        protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception {
211            if (strategyRef != null) {
212                aggregationStrategy = routeContext.mandatoryLookup(strategyRef, AggregationStrategy.class);
213            }
214            if (aggregationStrategy == null) {
215                // default to use latest aggregation strategy
216                aggregationStrategy = new UseLatestAggregationStrategy();
217            }
218    
219            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
220            ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing());
221    
222            long timeout = getTimeout() != null ? getTimeout() : 0;
223            if (timeout > 0 && !isParallelProcessing()) {
224                throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
225            }
226            if (onPrepareRef != null) {
227                onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
228            }
229    
230            MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, aggregationStrategy, isParallelProcessing(),
231                                          threadPool, shutdownThreadPool, isStreaming(), isStopOnException(), timeout, onPrepare, isShareUnitOfWork());
232            if (isShareUnitOfWork()) {
233                // wrap answer in a sub unit of work, since we share the unit of work
234                return new SubUnitOfWorkProcessor(answer);
235            }
236            return answer;
237        }
238    
239        public AggregationStrategy getAggregationStrategy() {
240            return aggregationStrategy;
241        }
242    
243        public MulticastDefinition setAggregationStrategy(AggregationStrategy aggregationStrategy) {
244            this.aggregationStrategy = aggregationStrategy;
245            return this;
246        }
247    
248        public Boolean getParallelProcessing() {
249            return parallelProcessing;
250        }
251    
252        public void setParallelProcessing(Boolean parallelProcessing) {
253            this.parallelProcessing = parallelProcessing;
254        }
255    
256        public boolean isParallelProcessing() {
257            return parallelProcessing != null && parallelProcessing;
258        }
259    
260        public Boolean getStreaming() {
261            return streaming;
262        }
263    
264        public void setStreaming(Boolean streaming) {
265            this.streaming = streaming;
266        }
267    
268        public boolean isStreaming() {
269            return streaming != null && streaming;
270        }
271    
272        public Boolean getStopOnException() {
273            return stopOnException;
274        }
275    
276        public void setStopOnException(Boolean stopOnException) {
277            this.stopOnException = stopOnException;
278        }
279    
280        public Boolean isStopOnException() {
281            return stopOnException != null && stopOnException;
282        }
283    
284        public ExecutorService getExecutorService() {
285            return executorService;
286        }
287    
288        public void setExecutorService(ExecutorService executorService) {
289            this.executorService = executorService;
290        }
291    
292        public String getStrategyRef() {
293            return strategyRef;
294        }
295    
296        public void setStrategyRef(String strategyRef) {
297            this.strategyRef = strategyRef;
298        }
299    
300        public String getExecutorServiceRef() {
301            return executorServiceRef;
302        }
303    
304        public void setExecutorServiceRef(String executorServiceRef) {
305            this.executorServiceRef = executorServiceRef;
306        }
307    
308        public Long getTimeout() {
309            return timeout;
310        }
311    
312        public void setTimeout(Long timeout) {
313            this.timeout = timeout;
314        }
315    
316        public String getOnPrepareRef() {
317            return onPrepareRef;
318        }
319    
320        public void setOnPrepareRef(String onPrepareRef) {
321            this.onPrepareRef = onPrepareRef;
322        }
323    
324        public Processor getOnPrepare() {
325            return onPrepare;
326        }
327    
328        public void setOnPrepare(Processor onPrepare) {
329            this.onPrepare = onPrepare;
330        }
331    
332        public Boolean getShareUnitOfWork() {
333            return shareUnitOfWork;
334        }
335    
336        public void setShareUnitOfWork(Boolean shareUnitOfWork) {
337            this.shareUnitOfWork = shareUnitOfWork;
338        }
339    
340        public boolean isShareUnitOfWork() {
341            return shareUnitOfWork != null && shareUnitOfWork;
342        }
343    
344    }