001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.List;
021    import java.util.concurrent.ExecutorService;
022    
023    import javax.xml.bind.annotation.XmlAccessType;
024    import javax.xml.bind.annotation.XmlAccessorType;
025    import javax.xml.bind.annotation.XmlAttribute;
026    import javax.xml.bind.annotation.XmlRootElement;
027    import javax.xml.bind.annotation.XmlTransient;
028    
029    import org.apache.camel.Expression;
030    import org.apache.camel.Processor;
031    import org.apache.camel.model.language.ExpressionDefinition;
032    import org.apache.camel.processor.EvaluateExpressionProcessor;
033    import org.apache.camel.processor.Pipeline;
034    import org.apache.camel.processor.RecipientList;
035    import org.apache.camel.processor.aggregate.AggregationStrategy;
036    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
037    import org.apache.camel.spi.RouteContext;
038    import org.apache.camel.util.CamelContextHelper;
039    
040    /**
041     * Represents an XML <recipientList/> element
042     *
043     * @version 
044     */
045    @XmlRootElement(name = "recipientList")
046    @XmlAccessorType(XmlAccessType.FIELD)
047    public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> extends NoOutputExpressionNode implements ExecutorServiceAwareDefinition<RecipientListDefinition<Type>> {
048        @XmlTransient
049        private AggregationStrategy aggregationStrategy;
050        @XmlTransient
051        private ExecutorService executorService;
052        @XmlAttribute
053        private String delimiter;
054        @XmlAttribute
055        private Boolean parallelProcessing;
056        @XmlAttribute
057        private String strategyRef;
058        @XmlAttribute
059        private String executorServiceRef;
060        @XmlAttribute
061        private Boolean stopOnException;
062        @XmlAttribute
063        private Boolean ignoreInvalidEndpoints;
064        @XmlAttribute
065        private Boolean streaming;
066        @XmlAttribute
067        private Long timeout;
068        @XmlAttribute
069        private String onPrepareRef;
070        @XmlTransient
071        private Processor onPrepare;
072        @XmlAttribute
073        private Boolean shareUnitOfWork;
074    
075        public RecipientListDefinition() {
076        }
077    
078        public RecipientListDefinition(ExpressionDefinition expression) {
079            super(expression);
080        }
081    
082        public RecipientListDefinition(Expression expression) {
083            super(expression);
084        }
085    
086        @Override
087        public String toString() {
088            return "RecipientList[" + getExpression() + "]";
089        }
090    
091        @Override
092        public String getShortName() {
093            return "recipientList";
094        }
095        
096        @Override
097        public String getLabel() {
098            return "recipientList[" + getExpression() + "]";
099        }
100    
101        @Override
102        public Processor createProcessor(RouteContext routeContext) throws Exception {
103            final Expression expression = getExpression().createExpression(routeContext);
104    
105            RecipientList answer;
106            if (delimiter != null) {
107                answer = new RecipientList(routeContext.getCamelContext(), expression, delimiter);
108            } else {
109                answer = new RecipientList(routeContext.getCamelContext(), expression);
110            }
111            answer.setAggregationStrategy(createAggregationStrategy(routeContext));
112            answer.setParallelProcessing(isParallelProcessing());
113            answer.setStreaming(isStreaming());   
114            answer.setShareUnitOfWork(isShareUnitOfWork());
115            if (onPrepareRef != null) {
116                onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class);
117            }
118            if (onPrepare != null) {
119                answer.setOnPrepare(onPrepare);
120            }
121            if (stopOnException != null) {
122                answer.setStopOnException(isStopOnException());
123            }
124            if (ignoreInvalidEndpoints != null) {
125                answer.setIgnoreInvalidEndpoints(ignoreInvalidEndpoints);
126            }
127            if (getTimeout() != null) {
128                answer.setTimeout(getTimeout());
129            }
130    
131            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing());
132            ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing());
133            answer.setExecutorService(threadPool);
134            answer.setShutdownExecutorService(shutdownThreadPool);
135            long timeout = getTimeout() != null ? getTimeout() : 0;
136            if (timeout > 0 && !isParallelProcessing()) {
137                throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled.");
138            }
139    
140            // create a pipeline with two processors
141            // the first is the eval processor which evaluates the expression to use
142            // the second is the recipient list
143            List<Processor> pipe = new ArrayList<Processor>(2);
144    
145            // the eval processor must be wrapped in error handler, so in case there was an
146            // error during evaluation, the error handler can deal with it
147            // the recipient list is not in error handler, as its has its own special error handling
148            // when sending to the recipients individually
149            Processor evalProcessor = new EvaluateExpressionProcessor(expression);
150            evalProcessor = super.wrapInErrorHandler(routeContext, evalProcessor);
151    
152            pipe.add(evalProcessor);
153            pipe.add(answer);
154    
155            // wrap in nested pipeline so this appears as one processor
156            // (threads definition does this as well)
157            return new Pipeline(routeContext.getCamelContext(), pipe) {
158                @Override
159                public String toString() {
160                    return "RecipientList[" + expression + "]";
161                }
162            };
163        }
164        
165        private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
166            if (aggregationStrategy == null && strategyRef != null) {
167                aggregationStrategy = routeContext.mandatoryLookup(strategyRef, AggregationStrategy.class);
168            }
169            if (aggregationStrategy == null) {
170                // fallback to use latest
171                aggregationStrategy = new UseLatestAggregationStrategy();
172            }
173            return aggregationStrategy;
174        }
175    
176        // Fluent API
177        // -------------------------------------------------------------------------
178    
179        @Override
180        @SuppressWarnings("unchecked")
181        public Type end() {
182            // allow end() to return to previous type so you can continue in the DSL
183            return (Type) super.end();
184        }
185    
186        /**
187         * Set the delimiter
188         *
189         * @param delimiter the delimiter
190         * @return the builder
191         */
192        public RecipientListDefinition<Type> delimiter(String delimiter) {
193            setDelimiter(delimiter);
194            return this;
195        }
196    
197        /**
198         * Set the aggregationStrategy
199         *
200         * @param aggregationStrategy the strategy
201         * @return the builder
202         */
203        public RecipientListDefinition<Type> aggregationStrategy(AggregationStrategy aggregationStrategy) {
204            setAggregationStrategy(aggregationStrategy);
205            return this;
206        }
207    
208        /**
209         * Set the aggregationStrategy
210         *
211         * @param aggregationStrategyRef a reference to a strategy to lookup
212         * @return the builder
213         */
214        public RecipientListDefinition<Type> aggregationStrategyRef(String aggregationStrategyRef) {
215            setStrategyRef(aggregationStrategyRef);
216            return this;
217        }
218        
219        /**
220         * Ignore the invalidate endpoint exception when try to create a producer with that endpoint
221         *
222         * @return the builder
223         */
224        public RecipientListDefinition<Type> ignoreInvalidEndpoints() {
225            setIgnoreInvalidEndpoints(true);
226            return this;
227        }
228    
229        /**
230         * Doing the recipient list work in parallel
231         *
232         * @return the builder
233         */
234        public RecipientListDefinition<Type> parallelProcessing() {
235            setParallelProcessing(true);
236            return this;
237        }
238        
239        /**
240         * Doing the recipient list work in streaming model
241         *
242         * @return the builder
243         */
244        public RecipientListDefinition<Type> streaming() {
245            setStreaming(true);
246            return this;
247        }
248    
249        /**
250         * Will now stop further processing if an exception or failure occurred during processing of an
251         * {@link org.apache.camel.Exchange} and the caused exception will be thrown.
252         * <p/>
253         * Will also stop if processing the exchange failed (has a fault message) or an exception
254         * was thrown and handled by the error handler (such as using onException). In all situations
255         * the recipient list will stop further processing. This is the same behavior as in pipeline, which
256         * is used by the routing engine.
257         * <p/>
258         * The default behavior is to <b>not</b> stop but continue processing till the end
259         *
260         * @return the builder
261         */
262        public RecipientListDefinition<Type> stopOnException() {
263            setStopOnException(true);
264            return this;
265        }
266    
267        public RecipientListDefinition<Type> executorService(ExecutorService executorService) {
268            setExecutorService(executorService);
269            return this;
270        }
271    
272        public RecipientListDefinition<Type> executorServiceRef(String executorServiceRef) {
273            setExecutorServiceRef(executorServiceRef);
274            return this;
275        }
276    
277        /**
278         * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send.
279         * This can be used to deep-clone messages that should be send, or any custom logic needed before
280         * the exchange is send.
281         *
282         * @param onPrepare the processor
283         * @return the builder
284         */
285        public RecipientListDefinition<Type> onPrepare(Processor onPrepare) {
286            setOnPrepare(onPrepare);
287            return this;
288        }
289    
290        /**
291         * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send.
292         * This can be used to deep-clone messages that should be send, or any custom logic needed before
293         * the exchange is send.
294         *
295         * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry}
296         * @return the builder
297         */
298        public RecipientListDefinition<Type> onPrepareRef(String onPrepareRef) {
299            setOnPrepareRef(onPrepareRef);
300            return this;
301        }
302    
303        /**
304         * Sets a timeout value in millis to use when using parallelProcessing.
305         *
306         * @param timeout timeout in millis
307         * @return the builder
308         */
309        public RecipientListDefinition<Type> timeout(long timeout) {
310            setTimeout(timeout);
311            return this;
312        }
313    
314        /**
315         * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages.
316         *
317         * @return the builder.
318         * @see org.apache.camel.spi.SubUnitOfWork
319         */
320        public RecipientListDefinition<Type> shareUnitOfWork() {
321            setShareUnitOfWork(true);
322            return this;
323        }
324    
325        // Properties
326        //-------------------------------------------------------------------------
327    
328        public String getDelimiter() {
329            return delimiter;
330        }
331    
332        public void setDelimiter(String delimiter) {
333            this.delimiter = delimiter;
334        }
335    
336        public Boolean getParallelProcessing() {
337            return parallelProcessing;
338        }
339    
340        public void setParallelProcessing(Boolean parallelProcessing) {
341            this.parallelProcessing = parallelProcessing;
342        }
343    
344        public boolean isParallelProcessing() {
345            return parallelProcessing != null && parallelProcessing;
346        }
347    
348        public String getStrategyRef() {
349            return strategyRef;
350        }
351    
352        public void setStrategyRef(String strategyRef) {
353            this.strategyRef = strategyRef;
354        }
355    
356        public String getExecutorServiceRef() {
357            return executorServiceRef;
358        }
359    
360        public void setExecutorServiceRef(String executorServiceRef) {
361            this.executorServiceRef = executorServiceRef;
362        }
363    
364        public Boolean getIgnoreInvalidEndpoints() {
365            return ignoreInvalidEndpoints;
366        }
367    
368        public void setIgnoreInvalidEndpoints(Boolean ignoreInvalidEndpoints) {
369            this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
370        }
371    
372        public boolean isIgnoreInvalidEndpoints() {
373            return ignoreInvalidEndpoints != null && ignoreInvalidEndpoints;
374        }
375    
376        public Boolean getStopOnException() {
377            return stopOnException;
378        }
379    
380        public void setStopOnException(Boolean stopOnException) {
381            this.stopOnException = stopOnException;
382        }
383    
384        public boolean isStopOnException() {
385            return stopOnException != null && stopOnException;
386        }
387    
388        public AggregationStrategy getAggregationStrategy() {
389            return aggregationStrategy;
390        }
391    
392        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
393            this.aggregationStrategy = aggregationStrategy;
394        }
395    
396        public ExecutorService getExecutorService() {
397            return executorService;
398        }
399    
400        public void setExecutorService(ExecutorService executorService) {
401            this.executorService = executorService;
402        }
403    
404        public Boolean getStreaming() {
405            return streaming;
406        }
407    
408        public void setStreaming(Boolean streaming) {
409            this.streaming = streaming;
410        }
411    
412        public boolean isStreaming() {
413            return streaming != null && streaming;
414        }
415    
416        public Long getTimeout() {
417            return timeout;
418        }
419    
420        public void setTimeout(Long timeout) {
421            this.timeout = timeout;
422        }
423    
424        public String getOnPrepareRef() {
425            return onPrepareRef;
426        }
427    
428        public void setOnPrepareRef(String onPrepareRef) {
429            this.onPrepareRef = onPrepareRef;
430        }
431    
432        public Processor getOnPrepare() {
433            return onPrepare;
434        }
435    
436        public void setOnPrepare(Processor onPrepare) {
437            this.onPrepare = onPrepare;
438        }
439    
440        public Boolean getShareUnitOfWork() {
441            return shareUnitOfWork;
442        }
443    
444        public void setShareUnitOfWork(Boolean shareUnitOfWork) {
445            this.shareUnitOfWork = shareUnitOfWork;
446        }
447    
448        public boolean isShareUnitOfWork() {
449            return shareUnitOfWork != null && shareUnitOfWork;
450        }
451    
452    }