001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.Iterator;
020import java.util.concurrent.ExecutorService;
021
022import org.apache.camel.AsyncCallback;
023import org.apache.camel.AsyncProcessor;
024import org.apache.camel.CamelContext;
025import org.apache.camel.Endpoint;
026import org.apache.camel.Exchange;
027import org.apache.camel.Expression;
028import org.apache.camel.Processor;
029import org.apache.camel.impl.EmptyProducerCache;
030import org.apache.camel.impl.ProducerCache;
031import org.apache.camel.processor.aggregate.AggregationStrategy;
032import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
033import org.apache.camel.spi.EndpointUtilizationStatistics;
034import org.apache.camel.spi.IdAware;
035import org.apache.camel.support.ServiceSupport;
036import org.apache.camel.util.AsyncProcessorHelper;
037import org.apache.camel.util.ExchangeHelper;
038import org.apache.camel.util.ObjectHelper;
039import org.apache.camel.util.ServiceHelper;
040import org.apache.camel.util.StringHelper;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044import static org.apache.camel.util.ObjectHelper.notNull;
045
046/**
047 * Implements a dynamic <a
048 * href="http://camel.apache.org/recipient-list.html">Recipient List</a>
049 * pattern where the list of actual endpoints to send a message exchange to are
050 * dependent on some dynamic expression.
051 *
052 * @version 
053 */
054public class RecipientList extends ServiceSupport implements AsyncProcessor, IdAware {
055
056    private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class);
057    private static final String IGNORE_DELIMITER_MARKER = "false";
058    private final CamelContext camelContext;
059    private String id;
060    private ProducerCache producerCache;
061    private Expression expression;
062    private final String delimiter;
063    private boolean parallelProcessing;
064    private boolean parallelAggregate;
065    private boolean stopOnAggregateException;
066    private boolean stopOnException;
067    private boolean ignoreInvalidEndpoints;
068    private boolean streaming;
069    private long timeout;
070    private int cacheSize;
071    private Processor onPrepare;
072    private boolean shareUnitOfWork;
073    private ExecutorService executorService;
074    private boolean shutdownExecutorService;
075    private ExecutorService aggregateExecutorService;
076    private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
077
078    public RecipientList(CamelContext camelContext) {
079        // use comma by default as delimiter
080        this(camelContext, ",");
081    }
082
083    public RecipientList(CamelContext camelContext, String delimiter) {
084        notNull(camelContext, "camelContext");
085        StringHelper.notEmpty(delimiter, "delimiter");
086        this.camelContext = camelContext;
087        this.delimiter = delimiter;
088    }
089
090    public RecipientList(CamelContext camelContext, Expression expression) {
091        // use comma by default as delimiter
092        this(camelContext, expression, ",");
093    }
094
095    public RecipientList(CamelContext camelContext, Expression expression, String delimiter) {
096        notNull(camelContext, "camelContext");
097        ObjectHelper.notNull(expression, "expression");
098        StringHelper.notEmpty(delimiter, "delimiter");
099        this.camelContext = camelContext;
100        this.expression = expression;
101        this.delimiter = delimiter;
102    }
103
104    @Override
105    public String toString() {
106        return "RecipientList[" + (expression != null ? expression : "") + "]";
107    }
108
109    public String getId() {
110        return id;
111    }
112
113    public void setId(String id) {
114        this.id = id;
115    }
116
117    public void process(Exchange exchange) throws Exception {
118        AsyncProcessorHelper.process(this, exchange);
119    }
120
121    public boolean process(Exchange exchange, AsyncCallback callback) {
122        if (!isStarted()) {
123            throw new IllegalStateException("RecipientList has not been started: " + this);
124        }
125
126        // use the evaluate expression result if exists
127        Object recipientList = exchange.removeProperty(Exchange.EVALUATE_EXPRESSION_RESULT);
128        if (recipientList == null && expression != null) {
129            // fallback and evaluate the expression
130            recipientList = expression.evaluate(exchange, Object.class);
131        }
132
133        return sendToRecipientList(exchange, recipientList, callback);
134    }
135
136    /**
137     * Sends the given exchange to the recipient list
138     */
139    public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback callback) {
140        Iterator<Object> iter;
141
142        if (delimiter != null && delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) {
143            iter = ObjectHelper.createIterator(recipientList, null);
144        } else {
145            iter = ObjectHelper.createIterator(recipientList, delimiter);
146        }
147
148        RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(),
149                isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
150                isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate(),
151                isStopOnAggregateException()) {
152            @Override
153            protected synchronized ExecutorService createAggregateExecutorService(String name) {
154                // use a shared executor service to avoid creating new thread pools
155                if (aggregateExecutorService == null) {
156                    aggregateExecutorService = super.createAggregateExecutorService("RecipientList-AggregateTask");
157                }
158                return aggregateExecutorService;
159            }
160        };
161        rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
162
163        // start the service
164        try {
165            ServiceHelper.startService(rlp);
166        } catch (Exception e) {
167            exchange.setException(e);
168            callback.done(true);
169            return true;
170        }
171
172        // now let the multicast process the exchange
173        return rlp.process(exchange, callback);
174    }
175
176    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
177        // trim strings as end users might have added spaces between separators
178        if (recipient instanceof String) {
179            recipient = ((String)recipient).trim();
180        }
181        return ExchangeHelper.resolveEndpoint(exchange, recipient);
182    }
183
184    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
185        return producerCache.getEndpointUtilizationStatistics();
186    }
187
188    protected void doStart() throws Exception {
189        if (producerCache == null) {
190            if (cacheSize < 0) {
191                producerCache = new EmptyProducerCache(this, camelContext);
192                LOG.debug("RecipientList {} is not using ProducerCache", this);
193            } else if (cacheSize == 0) {
194                producerCache = new ProducerCache(this, camelContext);
195                LOG.debug("RecipientList {} using ProducerCache with default cache size", this);
196            } else {
197                producerCache = new ProducerCache(this, camelContext, cacheSize);
198                LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize);
199            }
200        }
201        ServiceHelper.startServices(aggregationStrategy, producerCache);
202    }
203
204    protected void doStop() throws Exception {
205        ServiceHelper.stopServices(producerCache, aggregationStrategy);
206    }
207
208    protected void doShutdown() throws Exception {
209        ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy);
210
211        if (shutdownExecutorService && executorService != null) {
212            camelContext.getExecutorServiceManager().shutdownNow(executorService);
213        }
214    }
215
216    public Expression getExpression() {
217        return expression;
218    }
219
220    public String getDelimiter() {
221        return delimiter;
222    }
223
224    public boolean isStreaming() {
225        return streaming;
226    }
227    
228    public void setStreaming(boolean streaming) {
229        this.streaming = streaming;
230    }
231 
232    public boolean isIgnoreInvalidEndpoints() {
233        return ignoreInvalidEndpoints;
234    }
235    
236    public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
237        this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
238    }
239
240    public boolean isParallelProcessing() {
241        return parallelProcessing;
242    }
243
244    public void setParallelProcessing(boolean parallelProcessing) {
245        this.parallelProcessing = parallelProcessing;
246    }
247
248    public boolean isParallelAggregate() {
249        return parallelAggregate;
250    }
251
252    public void setParallelAggregate(boolean parallelAggregate) {
253        this.parallelAggregate = parallelAggregate;
254    }
255
256    public boolean isStopOnAggregateException() {
257        return stopOnAggregateException;
258    }
259
260    public void setStopOnAggregateException(boolean stopOnAggregateException) {
261        this.stopOnAggregateException = stopOnAggregateException;
262    }
263
264    public boolean isStopOnException() {
265        return stopOnException;
266    }
267
268    public void setStopOnException(boolean stopOnException) {
269        this.stopOnException = stopOnException;
270    }
271
272    public ExecutorService getExecutorService() {
273        return executorService;
274    }
275
276    public void setExecutorService(ExecutorService executorService) {
277        this.executorService = executorService;
278    }
279
280    public boolean isShutdownExecutorService() {
281        return shutdownExecutorService;
282    }
283
284    public void setShutdownExecutorService(boolean shutdownExecutorService) {
285        this.shutdownExecutorService = shutdownExecutorService;
286    }
287
288    public AggregationStrategy getAggregationStrategy() {
289        return aggregationStrategy;
290    }
291
292    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
293        this.aggregationStrategy = aggregationStrategy;
294    }
295
296    public long getTimeout() {
297        return timeout;
298    }
299
300    public void setTimeout(long timeout) {
301        this.timeout = timeout;
302    }
303
304    public Processor getOnPrepare() {
305        return onPrepare;
306    }
307
308    public void setOnPrepare(Processor onPrepare) {
309        this.onPrepare = onPrepare;
310    }
311
312    public boolean isShareUnitOfWork() {
313        return shareUnitOfWork;
314    }
315
316    public void setShareUnitOfWork(boolean shareUnitOfWork) {
317        this.shareUnitOfWork = shareUnitOfWork;
318    }
319
320    public int getCacheSize() {
321        return cacheSize;
322    }
323
324    public void setCacheSize(int cacheSize) {
325        this.cacheSize = cacheSize;
326    }
327}