001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.concurrent.ExecutorService;
020    
021    import javax.xml.bind.annotation.XmlAccessType;
022    import javax.xml.bind.annotation.XmlAccessorType;
023    import javax.xml.bind.annotation.XmlAttribute;
024    import javax.xml.bind.annotation.XmlRootElement;
025    import javax.xml.bind.annotation.XmlTransient;
026    
027    import org.apache.camel.Expression;
028    import org.apache.camel.Processor;
029    import org.apache.camel.model.language.ExpressionDefinition;
030    import org.apache.camel.processor.Splitter;
031    import org.apache.camel.processor.SubUnitOfWorkProcessor;
032    import org.apache.camel.processor.aggregate.AggregationStrategy;
033    import org.apache.camel.spi.RouteContext;
034    import org.apache.camel.util.CamelContextHelper;
035    
036    /**
037     * Represents an XML <split/> element
038     *
039     * @version 
040     */
041    @XmlRootElement(name = "split")
042    @XmlAccessorType(XmlAccessType.FIELD)
043    public class SplitDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<SplitDefinition> {
044        @XmlTransient
045        private AggregationStrategy aggregationStrategy;
046        @XmlTransient
047        private ExecutorService executorService;
048        @XmlAttribute
049        private Boolean parallelProcessing;
050        @XmlAttribute
051        private String strategyRef;
052        @XmlAttribute
053        private String executorServiceRef;
054        @XmlAttribute
055        private Boolean streaming;
056        @XmlAttribute
057        private Boolean stopOnException;
058        @XmlAttribute
059        private Long timeout;
060        @XmlAttribute
061        private String onPrepareRef;
062        @XmlTransient
063        private Processor onPrepare;
064        @XmlAttribute
065        private Boolean shareUnitOfWork;
066    
067        public SplitDefinition() {
068        }
069    
070        public SplitDefinition(Expression expression) {
071            super(expression);
072        }
073    
074        public SplitDefinition(ExpressionDefinition expression) {
075            super(expression);
076        }
077    
078        @Override
079        public String toString() {
080            return "Split[" + getExpression() + " -> " + getOutputs() + "]";
081        }
082    
083        @Override
084        public String getShortName() {
085            return "split";
086        }
087    
088        @Override
089        public String getLabel() {
090            return "split[" + getExpression() + "]";
091        }
092    
093        @Override
094        public Processor createProcessor(RouteContext routeContext) throws Exception {
095            Processor childProcessor = this.createChildProcessor(routeContext, true);
096            aggregationStrategy = createAggregationStrategy(routeContext);
097    
098            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
099            ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing());
100    
101            long timeout = getTimeout() != null ? getTimeout() : 0;
102            if (timeout > 0 && !isParallelProcessing()) {
103                throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
104            }
105            if (onPrepareRef != null) {
106                onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
107            }
108    
109            Expression exp = getExpression().createExpression(routeContext);
110    
111            Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
112                                isParallelProcessing(), threadPool, shutdownThreadPool, isStreaming(), isStopOnException(),
113                                timeout, onPrepare, isShareUnitOfWork());
114            if (isShareUnitOfWork()) {
115                // wrap answer in a sub unit of work, since we share the unit of work
116                return new SubUnitOfWorkProcessor(answer);
117            }
118            return answer;
119        }
120    
121        private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
122            AggregationStrategy strategy = getAggregationStrategy();
123            if (strategy == null && strategyRef != null) {
124                strategy = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), strategyRef, AggregationStrategy.class);
125            }
126            return strategy;
127        }        
128    
129        // Fluent API
130        // -------------------------------------------------------------------------
131    
132        /**
133         * Set the aggregationStrategy
134         *
135         * @return the builder
136         */
137        public SplitDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
138            setAggregationStrategy(aggregationStrategy);
139            return this;
140        }
141        
142        /**
143         * Set the aggregationStrategy
144         *
145         * @param aggregationStrategyRef a reference to a strategy to lookup
146         * @return the builder
147         */
148        public SplitDefinition aggregationStrategyRef(String aggregationStrategyRef) {
149            setStrategyRef(aggregationStrategyRef);
150            return this;
151        }
152    
153        /**
154         * Doing the splitting work in parallel
155         *
156         * @return the builder
157         */
158        public SplitDefinition parallelProcessing() {
159            setParallelProcessing(true);
160            return this;
161        }
162        
163        /**
164         * Enables streaming. 
165         * See {@link org.apache.camel.model.SplitDefinition#isStreaming()} for more information
166         *
167         * @return the builder
168         */
169        public SplitDefinition streaming() {
170            setStreaming(true);
171            return this;
172        }
173        
174        /**
175         * Will now stop further processing if an exception or failure occurred during processing of an
176         * {@link org.apache.camel.Exchange} and the caused exception will be thrown.
177         * <p/>
178         * Will also stop if processing the exchange failed (has a fault message) or an exception
179         * was thrown and handled by the error handler (such as using onException). In all situations
180         * the splitter will stop further processing. This is the same behavior as in pipeline, which
181         * is used by the routing engine.
182         * <p/>
183         * The default behavior is to <b>not</b> stop but continue processing till the end
184         *
185         * @return the builder
186         */
187        public SplitDefinition stopOnException() {
188            setStopOnException(true);
189            return this;
190        }
191       
192        public SplitDefinition executorService(ExecutorService executorService) {
193            setExecutorService(executorService);
194            return this;
195        }
196        
197        public SplitDefinition executorServiceRef(String executorServiceRef) {
198            setExecutorServiceRef(executorServiceRef);
199            return this;
200        }
201    
202        /**
203         * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
204         * This can be used to deep-clone messages that should be send, or any custom logic needed before
205         * the exchange is send.
206         *
207         * @param onPrepare the processor
208         * @return the builder
209         */
210        public SplitDefinition onPrepare(Processor onPrepare) {
211            setOnPrepare(onPrepare);
212            return this;
213        }
214    
215        /**
216         * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
217         * This can be used to deep-clone messages that should be send, or any custom logic needed before
218         * the exchange is send.
219         *
220         * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
221         * @return the builder
222         */
223        public SplitDefinition onPrepareRef(String onPrepareRef) {
224            setOnPrepareRef(onPrepareRef);
225            return this;
226        }
227    
228        /**
229         * Sets a timeout value in millis to use when using parallelProcessing.
230         *
231         * @param timeout timeout in millis
232         * @return the builder
233         */
234        public SplitDefinition timeout(long timeout) {
235            setTimeout(timeout);
236            return this;
237        }
238    
239        /**
240         * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages.
241         *
242         * @return the builder.
243         * @see org.apache.camel.spi.SubUnitOfWork
244         */
245        public SplitDefinition shareUnitOfWork() {
246            setShareUnitOfWork(true);
247            return this;
248        }
249    
250        // Properties
251        //-------------------------------------------------------------------------
252    
253        public AggregationStrategy getAggregationStrategy() {
254            return aggregationStrategy;
255        }
256    
257        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
258            this.aggregationStrategy = aggregationStrategy;
259        }
260    
261        public Boolean getParallelProcessing() {
262            return parallelProcessing;
263        }
264    
265        public void setParallelProcessing(Boolean parallelProcessing) {
266            this.parallelProcessing = parallelProcessing;
267        }
268    
269        public boolean isParallelProcessing() {
270            return parallelProcessing != null && parallelProcessing;
271        }
272    
273        public Boolean getStreaming() {
274            return streaming;
275        }
276    
277        public void setStreaming(Boolean streaming) {
278            this.streaming = streaming;
279        }
280    
281        /**
282         * The splitter should use streaming -- exchanges are being sent as the data for them becomes available.
283         * This improves throughput and memory usage, but it has a drawback:
284         * - the sent exchanges will no longer contain the {@link org.apache.camel.Exchange#SPLIT_SIZE} header property
285         *
286         * @return whether or not streaming should be used
287         */
288        public boolean isStreaming() {
289            return streaming != null && streaming;
290        }
291    
292        public Boolean getStopOnException() {
293            return stopOnException;
294        }
295    
296        public void setStopOnException(Boolean stopOnException) {
297            this.stopOnException = stopOnException;
298        }
299    
300        public Boolean isStopOnException() {
301            return stopOnException != null && stopOnException;
302        }
303    
304        public ExecutorService getExecutorService() {
305            return executorService;
306        }
307    
308        public void setExecutorService(ExecutorService executorService) {
309            this.executorService = executorService;
310        }
311    
312        public String getStrategyRef() {
313            return strategyRef;
314        }
315    
316        public void setStrategyRef(String strategyRef) {
317            this.strategyRef = strategyRef;
318        }
319    
320        public String getExecutorServiceRef() {
321            return executorServiceRef;
322        }
323    
324        public void setExecutorServiceRef(String executorServiceRef) {
325            this.executorServiceRef = executorServiceRef;
326        }
327    
328        public Long getTimeout() {
329            return timeout;
330        }
331    
332        public void setTimeout(Long timeout) {
333            this.timeout = timeout;
334        }
335    
336        public String getOnPrepareRef() {
337            return onPrepareRef;
338        }
339    
340        public void setOnPrepareRef(String onPrepareRef) {
341            this.onPrepareRef = onPrepareRef;
342        }
343    
344        public Processor getOnPrepare() {
345            return onPrepare;
346        }
347    
348        public void setOnPrepare(Processor onPrepare) {
349            this.onPrepare = onPrepare;
350        }
351    
352        public Boolean getShareUnitOfWork() {
353            return shareUnitOfWork;
354        }
355    
356        public void setShareUnitOfWork(Boolean shareUnitOfWork) {
357            this.shareUnitOfWork = shareUnitOfWork;
358        }
359    
360        public boolean isShareUnitOfWork() {
361            return shareUnitOfWork != null && shareUnitOfWork;
362        }
363    }