001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Iterator;
020    import java.util.concurrent.ExecutorService;
021    
022    import org.apache.camel.AsyncCallback;
023    import org.apache.camel.AsyncProcessor;
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.Expression;
028    import org.apache.camel.Processor;
029    import org.apache.camel.impl.ProducerCache;
030    import org.apache.camel.processor.aggregate.AggregationStrategy;
031    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
032    import org.apache.camel.support.ServiceSupport;
033    import org.apache.camel.util.AsyncProcessorHelper;
034    import org.apache.camel.util.ExchangeHelper;
035    import org.apache.camel.util.ObjectHelper;
036    import org.apache.camel.util.ServiceHelper;
037    
038    import static org.apache.camel.util.ObjectHelper.notNull;
039    
040    /**
041     * Implements a dynamic <a
042     * href="http://camel.apache.org/recipient-list.html">Recipient List</a>
043     * pattern where the list of actual endpoints to send a message exchange to are
044     * dependent on some dynamic expression.
045     *
046     * @version 
047     */
048    public class RecipientList extends ServiceSupport implements AsyncProcessor {
049        private final CamelContext camelContext;
050        private ProducerCache producerCache;
051        private Expression expression;
052        private final String delimiter;
053        private boolean parallelProcessing;
054        private boolean stopOnException;
055        private boolean ignoreInvalidEndpoints;
056        private boolean streaming;
057        private long timeout;
058        private Processor onPrepare;
059        private boolean shareUnitOfWork;
060        private ExecutorService executorService;
061        private boolean shutdownExecutorService;
062        private ExecutorService aggregateExecutorService;
063        private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
064    
065        public RecipientList(CamelContext camelContext) {
066            // use comma by default as delimiter
067            this(camelContext, ",");
068        }
069    
070        public RecipientList(CamelContext camelContext, String delimiter) {
071            notNull(camelContext, "camelContext");
072            ObjectHelper.notEmpty(delimiter, "delimiter");
073            this.camelContext = camelContext;
074            this.delimiter = delimiter;
075        }
076    
077        public RecipientList(CamelContext camelContext, Expression expression) {
078            // use comma by default as delimiter
079            this(camelContext, expression, ",");
080        }
081    
082        public RecipientList(CamelContext camelContext, Expression expression, String delimiter) {
083            notNull(camelContext, "camelContext");
084            ObjectHelper.notNull(expression, "expression");
085            ObjectHelper.notEmpty(delimiter, "delimiter");
086            this.camelContext = camelContext;
087            this.expression = expression;
088            this.delimiter = delimiter;
089        }
090    
091        @Override
092        public String toString() {
093            return "RecipientList[" + (expression != null ? expression : "") + "]";
094        }
095    
096        public void process(Exchange exchange) throws Exception {
097            AsyncProcessorHelper.process(this, exchange);
098        }
099    
100        public boolean process(Exchange exchange, AsyncCallback callback) {
101            if (!isStarted()) {
102                throw new IllegalStateException("RecipientList has not been started: " + this);
103            }
104    
105            // use the evaluate expression result if exists
106            Object recipientList = exchange.removeProperty(Exchange.EVALUATE_EXPRESSION_RESULT);
107            if (recipientList == null && expression != null) {
108                // fallback and evaluate the expression
109                recipientList = expression.evaluate(exchange, Object.class);
110            }
111    
112            return sendToRecipientList(exchange, recipientList, callback);
113        }
114    
115        /**
116         * Sends the given exchange to the recipient list
117         */
118        public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback callback) {
119            Iterator<Object> iter = ObjectHelper.createIterator(recipientList, delimiter);
120    
121            RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(),
122                    isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
123                    isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork()) {
124                @Override
125                protected synchronized ExecutorService createAggregateExecutorService(String name) {
126                    // use a shared executor service to avoid creating new thread pools
127                    if (aggregateExecutorService == null) {
128                        aggregateExecutorService = super.createAggregateExecutorService("RecipientList-AggregateTask");
129                    }
130                    return aggregateExecutorService;
131                }
132            };
133            rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
134    
135            // start the service
136            try {
137                ServiceHelper.startService(rlp);
138            } catch (Exception e) {
139                exchange.setException(e);
140                callback.done(true);
141                return true;
142            }
143    
144            AsyncProcessor target = rlp;
145            if (isShareUnitOfWork()) {
146                // wrap answer in a sub unit of work, since we share the unit of work
147                target = new SubUnitOfWorkProcessor(rlp);
148            }
149    
150            // now let the multicast process the exchange
151            return AsyncProcessorHelper.process(target, exchange, callback);
152        }
153    
154        protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
155            // trim strings as end users might have added spaces between separators
156            if (recipient instanceof String) {
157                recipient = ((String)recipient).trim();
158            }
159            return ExchangeHelper.resolveEndpoint(exchange, recipient);
160        }
161    
162        protected void doStart() throws Exception {
163            if (producerCache == null) {
164                producerCache = new ProducerCache(this, camelContext);
165            }
166            ServiceHelper.startService(producerCache);
167        }
168    
169        protected void doStop() throws Exception {
170            ServiceHelper.stopService(producerCache);
171        }
172    
173        protected void doShutdown() throws Exception {
174            ServiceHelper.stopAndShutdownService(producerCache);
175    
176            if (shutdownExecutorService && executorService != null) {
177                camelContext.getExecutorServiceManager().shutdownNow(executorService);
178            }
179        }
180    
181        public boolean isStreaming() {
182            return streaming;
183        }
184        
185        public void setStreaming(boolean streaming) {
186            this.streaming = streaming;
187        }
188     
189        public boolean isIgnoreInvalidEndpoints() {
190            return ignoreInvalidEndpoints;
191        }
192        
193        public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
194            this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
195        }
196    
197        public boolean isParallelProcessing() {
198            return parallelProcessing;
199        }
200    
201        public void setParallelProcessing(boolean parallelProcessing) {
202            this.parallelProcessing = parallelProcessing;
203        }
204    
205        public boolean isStopOnException() {
206            return stopOnException;
207        }
208    
209        public void setStopOnException(boolean stopOnException) {
210            this.stopOnException = stopOnException;
211        }
212    
213        public ExecutorService getExecutorService() {
214            return executorService;
215        }
216    
217        public void setExecutorService(ExecutorService executorService) {
218            this.executorService = executorService;
219        }
220    
221        public boolean isShutdownExecutorService() {
222            return shutdownExecutorService;
223        }
224    
225        public void setShutdownExecutorService(boolean shutdownExecutorService) {
226            this.shutdownExecutorService = shutdownExecutorService;
227        }
228    
229        public AggregationStrategy getAggregationStrategy() {
230            return aggregationStrategy;
231        }
232    
233        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
234            this.aggregationStrategy = aggregationStrategy;
235        }
236    
237        public long getTimeout() {
238            return timeout;
239        }
240    
241        public void setTimeout(long timeout) {
242            this.timeout = timeout;
243        }
244    
245        public Processor getOnPrepare() {
246            return onPrepare;
247        }
248    
249        public void setOnPrepare(Processor onPrepare) {
250            this.onPrepare = onPrepare;
251        }
252    
253        public boolean isShareUnitOfWork() {
254            return shareUnitOfWork;
255        }
256    
257        public void setShareUnitOfWork(boolean shareUnitOfWork) {
258            this.shareUnitOfWork = shareUnitOfWork;
259        }
260    }