001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.concurrent.ExecutorService;
020import javax.xml.bind.annotation.XmlAccessType;
021import javax.xml.bind.annotation.XmlAccessorType;
022import javax.xml.bind.annotation.XmlAttribute;
023import javax.xml.bind.annotation.XmlRootElement;
024import javax.xml.bind.annotation.XmlTransient;
025
026import org.apache.camel.CamelContextAware;
027import org.apache.camel.Expression;
028import org.apache.camel.Processor;
029import org.apache.camel.model.language.ExpressionDefinition;
030import org.apache.camel.processor.CamelInternalProcessor;
031import org.apache.camel.processor.Splitter;
032import org.apache.camel.processor.aggregate.AggregationStrategy;
033import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
034import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
035import org.apache.camel.spi.Metadata;
036import org.apache.camel.spi.RouteContext;
037import org.apache.camel.util.CamelContextHelper;
038
039/**
040 * Splits a single message into many sub-messages.
041 *
042 * @version 
043 */
044@Metadata(label = "eip,routing")
045@XmlRootElement(name = "split")
046@XmlAccessorType(XmlAccessType.FIELD)
047public class SplitDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<SplitDefinition> {
048    @XmlTransient
049    private AggregationStrategy aggregationStrategy;
050    @XmlTransient
051    private ExecutorService executorService;
052    @XmlAttribute
053    private Boolean parallelProcessing;
054    @XmlAttribute
055    private String strategyRef;
056    @XmlAttribute
057    private String strategyMethodName;
058    @XmlAttribute
059    private Boolean strategyMethodAllowNull;
060    @XmlAttribute
061    private String executorServiceRef;
062    @XmlAttribute
063    private Boolean streaming;
064    @XmlAttribute
065    private Boolean stopOnException;
066    @XmlAttribute @Metadata(defaultValue = "0")
067    private Long timeout;
068    @XmlAttribute
069    private String onPrepareRef;
070    @XmlTransient
071    private Processor onPrepare;
072    @XmlAttribute
073    private Boolean shareUnitOfWork;
074    @XmlAttribute
075    private Boolean parallelAggregate;
076
077    public SplitDefinition() {
078    }
079
080    public SplitDefinition(Expression expression) {
081        super(expression);
082    }
083
084    public SplitDefinition(ExpressionDefinition expression) {
085        super(expression);
086    }
087
088    @Override
089    public String toString() {
090        return "Split[" + getExpression() + " -> " + getOutputs() + "]";
091    }
092
093    @Override
094    public String getLabel() {
095        return "split[" + getExpression() + "]";
096    }
097
098    @Override
099    public Processor createProcessor(RouteContext routeContext) throws Exception {
100        Processor childProcessor = this.createChildProcessor(routeContext, true);
101        aggregationStrategy = createAggregationStrategy(routeContext);
102
103        boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
104        boolean isStreaming = getStreaming() != null && getStreaming();
105        boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork();
106        boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate();
107        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing);
108        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing);
109
110        long timeout = getTimeout() != null ? getTimeout() : 0;
111        if (timeout > 0 && !isParallelProcessing) {
112            throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
113        }
114        if (onPrepareRef != null) {
115            onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
116        }
117
118        Expression exp = getExpression().createExpression(routeContext);
119
120        Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy,
121                            isParallelProcessing, threadPool, shutdownThreadPool, isStreaming, isStopOnException(),
122                            timeout, onPrepare, isShareUnitOfWork, isParallelAggregate);
123        return answer;
124    }
125
126    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
127        AggregationStrategy strategy = getAggregationStrategy();
128        if (strategy == null && strategyRef != null) {
129            Object aggStrategy = routeContext.lookup(strategyRef, Object.class);
130            if (aggStrategy instanceof AggregationStrategy) {
131                strategy = (AggregationStrategy) aggStrategy;
132            } else if (aggStrategy != null) {
133                AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName());
134                if (getStrategyMethodAllowNull() != null) {
135                    adapter.setAllowNullNewExchange(getStrategyMethodAllowNull());
136                    adapter.setAllowNullOldExchange(getStrategyMethodAllowNull());
137                }
138                strategy = adapter;
139            } else {
140                throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef);
141            }
142        }
143
144        if (strategy != null && strategy instanceof CamelContextAware) {
145            ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext());
146        }
147
148        if (strategy != null && shareUnitOfWork != null && shareUnitOfWork) {
149            // wrap strategy in share unit of work
150            strategy = new ShareUnitOfWorkAggregationStrategy(strategy);
151        }
152
153        return strategy;
154    }
155
156    // Fluent API
157    // -------------------------------------------------------------------------
158
159    /**
160     * Sets the AggregationStrategy to be used to assemble the replies from the splitted messages, into a single outgoing message from the Splitter.
161     * By default Camel will use the original incoming message to the splitter (leave it unchanged). You can also use a POJO as the AggregationStrategy
162     */
163    public SplitDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
164        setAggregationStrategy(aggregationStrategy);
165        return this;
166    }
167
168    /**
169     * Sets a reference to the AggregationStrategy to be used to assemble the replies from the splitted messages, into a single outgoing message from the Splitter.
170     * By default Camel will use the original incoming message to the splitter (leave it unchanged). You can also use a POJO as the AggregationStrategy
171     */
172    public SplitDefinition aggregationStrategyRef(String aggregationStrategyRef) {
173        setStrategyRef(aggregationStrategyRef);
174        return this;
175    }
176
177    /**
178     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
179     *
180     * @param  methodName the method name to call
181     * @return the builder
182     */
183    public SplitDefinition aggregationStrategyMethodName(String methodName) {
184        setStrategyMethodName(methodName);
185        return this;
186    }
187
188    /**
189     * If this option is false then the aggregate method is not used if there was no data to enrich.
190     * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy
191     *
192     * @return the builder
193     */
194    public SplitDefinition aggregationStrategyMethodAllowNull() {
195        setStrategyMethodAllowNull(true);
196        return this;
197    }
198
199    /**
200     * If enabled then processing each splitted messages occurs concurrently.
201     * Note the caller thread will still wait until all messages has been fully processed, before it continues.
202     * Its only processing the sub messages from the splitter which happens concurrently.
203     *
204     * @return the builder
205     */
206    public SplitDefinition parallelProcessing() {
207        setParallelProcessing(true);
208        return this;
209    }
210
211    /**
212     * If enabled then processing each splitted messages occurs concurrently.
213     * Note the caller thread will still wait until all messages has been fully processed, before it continues.
214     * Its only processing the sub messages from the splitter which happens concurrently.
215     *
216     * @return the builder
217     */
218    public SplitDefinition parallelProcessing(boolean parallelProcessing) {
219        setParallelProcessing(parallelProcessing);
220        return this;
221    }
222
223    /**
224     * If enabled then the aggregate method on AggregationStrategy can be called concurrently.
225     * Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe.
226     * By default this is false meaning that Camel synchronizes the call to the aggregate method.
227     * Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe.
228     *
229     * @return the builder
230     */
231    public SplitDefinition parallelAggregate() {
232        setParallelAggregate(true);
233        return this;
234    }
235
236    /**
237     * When in streaming mode, then the splitter splits the original message on-demand, and each splitted
238     * message is processed one by one. This reduces memory usage as the splitter do not split all the messages first,
239     * but then we do not know the total size, and therefore the {@link org.apache.camel.Exchange#SPLIT_SIZE} is empty.
240     * <p/>
241     * In non-streaming mode (default) the splitter will split each message first, to know the total size, and then
242     * process each message one by one. This requires to keep all the splitted messages in memory and therefore requires
243     * more memory. The total size is provided in the {@link org.apache.camel.Exchange#SPLIT_SIZE} header.
244     * <p/>
245     * The streaming mode also affects the aggregation behavior.
246     * If enabled then Camel will process replies out-of-order, eg in the order they come back.
247     * If disabled, Camel will process replies in the same order as the messages was splitted.
248     *
249     * @return the builder
250     */
251    public SplitDefinition streaming() {
252        setStreaming(true);
253        return this;
254    }
255    
256    /**
257     * Will now stop further processing if an exception or failure occurred during processing of an
258     * {@link org.apache.camel.Exchange} and the caused exception will be thrown.
259     * <p/>
260     * Will also stop if processing the exchange failed (has a fault message) or an exception
261     * was thrown and handled by the error handler (such as using onException). In all situations
262     * the splitter will stop further processing. This is the same behavior as in pipeline, which
263     * is used by the routing engine.
264     * <p/>
265     * The default behavior is to <b>not</b> stop but continue processing till the end
266     *
267     * @return the builder
268     */
269    public SplitDefinition stopOnException() {
270        setStopOnException(true);
271        return this;
272    }
273
274    /**
275     * To use a custom Thread Pool to be used for parallel processing.
276     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
277     */
278    public SplitDefinition executorService(ExecutorService executorService) {
279        setExecutorService(executorService);
280        return this;
281    }
282
283    /**
284     * Refers to a custom Thread Pool to be used for parallel processing.
285     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
286     */
287    public SplitDefinition executorServiceRef(String executorServiceRef) {
288        setExecutorServiceRef(executorServiceRef);
289        return this;
290    }
291
292    /**
293     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
294     * This can be used to deep-clone messages that should be send, or any custom logic needed before
295     * the exchange is send.
296     *
297     * @param onPrepare the processor
298     * @return the builder
299     */
300    public SplitDefinition onPrepare(Processor onPrepare) {
301        setOnPrepare(onPrepare);
302        return this;
303    }
304
305    /**
306     * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
307     * This can be used to deep-clone messages that should be send, or any custom logic needed before
308     * the exchange is send.
309     *
310     * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
311     * @return the builder
312     */
313    public SplitDefinition onPrepareRef(String onPrepareRef) {
314        setOnPrepareRef(onPrepareRef);
315        return this;
316    }
317
318    /**
319     * Sets a total timeout specified in millis, when using parallel processing.
320     * If the Splitter hasn't been able to split and process all the sub messages within the given timeframe,
321     * then the timeout triggers and the Splitter breaks out and continues.
322     * Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out.
323     * If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel
324     * to shut down in a graceful manner may continue to run. So use this option with a bit of care.
325     *
326     * @param timeout timeout in millis
327     * @return the builder
328     */
329    public SplitDefinition timeout(long timeout) {
330        setTimeout(timeout);
331        return this;
332    }
333
334    /**
335     * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages.
336     * Splitter will by default not share unit of work between the parent exchange and each splitted exchange.
337     * This means each splitted exchange has its own individual unit of work.
338     *
339     * @return the builder.
340     * @see org.apache.camel.spi.SubUnitOfWork
341     */
342    public SplitDefinition shareUnitOfWork() {
343        setShareUnitOfWork(true);
344        return this;
345    }
346
347    // Properties
348    //-------------------------------------------------------------------------
349
350    /**
351     * Expression of how to split the message body, such as as-is, using a tokenizer, or using an xpath.
352     */
353    @Override
354    public void setExpression(ExpressionDefinition expression) {
355        // override to include javadoc what the expression is used for
356        super.setExpression(expression);
357    }
358
359    public AggregationStrategy getAggregationStrategy() {
360        return aggregationStrategy;
361    }
362
363    /**
364     * Sets the AggregationStrategy to be used to assemble the replies from the splitted messages, into a single outgoing message from the Splitter.
365     * By default Camel will use the original incoming message to the splitter (leave it unchanged). You can also use a POJO as the AggregationStrategy
366     */
367    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
368        this.aggregationStrategy = aggregationStrategy;
369    }
370
371    public Boolean getParallelProcessing() {
372        return parallelProcessing;
373    }
374
375    public void setParallelProcessing(Boolean parallelProcessing) {
376        this.parallelProcessing = parallelProcessing;
377    }
378
379    public Boolean getStreaming() {
380        return streaming;
381    }
382
383    public void setStreaming(Boolean streaming) {
384        this.streaming = streaming;
385    }
386
387    public Boolean getParallelAggregate() {
388        return parallelAggregate;
389    }
390
391    public void setParallelAggregate(Boolean parallelAggregate) {
392        this.parallelAggregate = parallelAggregate;
393    }
394
395    public Boolean getStopOnException() {
396        return stopOnException;
397    }
398
399    public void setStopOnException(Boolean stopOnException) {
400        this.stopOnException = stopOnException;
401    }
402
403    public Boolean isStopOnException() {
404        return stopOnException != null && stopOnException;
405    }
406
407    public ExecutorService getExecutorService() {
408        return executorService;
409    }
410
411    public void setExecutorService(ExecutorService executorService) {
412        this.executorService = executorService;
413    }
414
415    public String getStrategyRef() {
416        return strategyRef;
417    }
418
419    /**
420     * Sets a reference to the AggregationStrategy to be used to assemble the replies from the splitted messages, into a single outgoing message from the Splitter.
421     * By default Camel will use the original incoming message to the splitter (leave it unchanged). You can also use a POJO as the AggregationStrategy
422     */
423    public void setStrategyRef(String strategyRef) {
424        this.strategyRef = strategyRef;
425    }
426
427    public String getStrategyMethodName() {
428        return strategyMethodName;
429    }
430
431    /**
432     * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy.
433     */
434    public void setStrategyMethodName(String strategyMethodName) {
435        this.strategyMethodName = strategyMethodName;
436    }
437
438    public Boolean getStrategyMethodAllowNull() {
439        return strategyMethodAllowNull;
440    }
441
442    /**
443     * If this option is false then the aggregate method is not used if there was no data to enrich.
444     * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy
445     */
446    public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) {
447        this.strategyMethodAllowNull = strategyMethodAllowNull;
448    }
449
450    public String getExecutorServiceRef() {
451        return executorServiceRef;
452    }
453
454    public void setExecutorServiceRef(String executorServiceRef) {
455        this.executorServiceRef = executorServiceRef;
456    }
457
458    public Long getTimeout() {
459        return timeout;
460    }
461
462    public void setTimeout(Long timeout) {
463        this.timeout = timeout;
464    }
465
466    public String getOnPrepareRef() {
467        return onPrepareRef;
468    }
469
470    public void setOnPrepareRef(String onPrepareRef) {
471        this.onPrepareRef = onPrepareRef;
472    }
473
474    public Processor getOnPrepare() {
475        return onPrepare;
476    }
477
478    public void setOnPrepare(Processor onPrepare) {
479        this.onPrepare = onPrepare;
480    }
481
482    public Boolean getShareUnitOfWork() {
483        return shareUnitOfWork;
484    }
485
486    public void setShareUnitOfWork(Boolean shareUnitOfWork) {
487        this.shareUnitOfWork = shareUnitOfWork;
488    }
489
490}