001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.Iterator;
020import java.util.concurrent.ExecutorService;
021
022import org.apache.camel.AsyncCallback;
023import org.apache.camel.AsyncProcessor;
024import org.apache.camel.CamelContext;
025import org.apache.camel.Endpoint;
026import org.apache.camel.Exchange;
027import org.apache.camel.Expression;
028import org.apache.camel.Processor;
029import org.apache.camel.impl.EmptyProducerCache;
030import org.apache.camel.impl.ProducerCache;
031import org.apache.camel.processor.aggregate.AggregationStrategy;
032import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
033import org.apache.camel.spi.EndpointUtilizationStatistics;
034import org.apache.camel.spi.IdAware;
035import org.apache.camel.support.ServiceSupport;
036import org.apache.camel.util.AsyncProcessorHelper;
037import org.apache.camel.util.ExchangeHelper;
038import org.apache.camel.util.ObjectHelper;
039import org.apache.camel.util.ServiceHelper;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043import static org.apache.camel.util.ObjectHelper.notNull;
044
045/**
046 * Implements a dynamic <a
047 * href="http://camel.apache.org/recipient-list.html">Recipient List</a>
048 * pattern where the list of actual endpoints to send a message exchange to are
049 * dependent on some dynamic expression.
050 *
051 * @version 
052 */
053public class RecipientList extends ServiceSupport implements AsyncProcessor, IdAware {
054
055    private static final Logger LOG = LoggerFactory.getLogger(RecipientList.class);
056    private static final String IGNORE_DELIMITER_MARKER = "false";
057    private final CamelContext camelContext;
058    private String id;
059    private ProducerCache producerCache;
060    private Expression expression;
061    private final String delimiter;
062    private boolean parallelProcessing;
063    private boolean parallelAggregate;
064    private boolean stopOnException;
065    private boolean ignoreInvalidEndpoints;
066    private boolean streaming;
067    private long timeout;
068    private int cacheSize;
069    private Processor onPrepare;
070    private boolean shareUnitOfWork;
071    private ExecutorService executorService;
072    private boolean shutdownExecutorService;
073    private ExecutorService aggregateExecutorService;
074    private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
075
076    public RecipientList(CamelContext camelContext) {
077        // use comma by default as delimiter
078        this(camelContext, ",");
079    }
080
081    public RecipientList(CamelContext camelContext, String delimiter) {
082        notNull(camelContext, "camelContext");
083        ObjectHelper.notEmpty(delimiter, "delimiter");
084        this.camelContext = camelContext;
085        this.delimiter = delimiter;
086    }
087
088    public RecipientList(CamelContext camelContext, Expression expression) {
089        // use comma by default as delimiter
090        this(camelContext, expression, ",");
091    }
092
093    public RecipientList(CamelContext camelContext, Expression expression, String delimiter) {
094        notNull(camelContext, "camelContext");
095        ObjectHelper.notNull(expression, "expression");
096        ObjectHelper.notEmpty(delimiter, "delimiter");
097        this.camelContext = camelContext;
098        this.expression = expression;
099        this.delimiter = delimiter;
100    }
101
102    @Override
103    public String toString() {
104        return "RecipientList[" + (expression != null ? expression : "") + "]";
105    }
106
107    public String getId() {
108        return id;
109    }
110
111    public void setId(String id) {
112        this.id = id;
113    }
114
115    public void process(Exchange exchange) throws Exception {
116        AsyncProcessorHelper.process(this, exchange);
117    }
118
119    public boolean process(Exchange exchange, AsyncCallback callback) {
120        if (!isStarted()) {
121            throw new IllegalStateException("RecipientList has not been started: " + this);
122        }
123
124        // use the evaluate expression result if exists
125        Object recipientList = exchange.removeProperty(Exchange.EVALUATE_EXPRESSION_RESULT);
126        if (recipientList == null && expression != null) {
127            // fallback and evaluate the expression
128            recipientList = expression.evaluate(exchange, Object.class);
129        }
130
131        return sendToRecipientList(exchange, recipientList, callback);
132    }
133
134    /**
135     * Sends the given exchange to the recipient list
136     */
137    public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback callback) {
138        Iterator<Object> iter;
139
140        if (delimiter != null && delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) {
141            iter = ObjectHelper.createIterator(recipientList, null);
142        } else {
143            iter = ObjectHelper.createIterator(recipientList, delimiter);
144        }
145
146        RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache, iter, getAggregationStrategy(),
147                isParallelProcessing(), getExecutorService(), isShutdownExecutorService(),
148                isStreaming(), isStopOnException(), getTimeout(), getOnPrepare(), isShareUnitOfWork(), isParallelAggregate()) {
149            @Override
150            protected synchronized ExecutorService createAggregateExecutorService(String name) {
151                // use a shared executor service to avoid creating new thread pools
152                if (aggregateExecutorService == null) {
153                    aggregateExecutorService = super.createAggregateExecutorService("RecipientList-AggregateTask");
154                }
155                return aggregateExecutorService;
156            }
157        };
158        rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
159
160        // start the service
161        try {
162            ServiceHelper.startService(rlp);
163        } catch (Exception e) {
164            exchange.setException(e);
165            callback.done(true);
166            return true;
167        }
168
169        // now let the multicast process the exchange
170        return rlp.process(exchange, callback);
171    }
172
173    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
174        // trim strings as end users might have added spaces between separators
175        if (recipient instanceof String) {
176            recipient = ((String)recipient).trim();
177        }
178        return ExchangeHelper.resolveEndpoint(exchange, recipient);
179    }
180
181    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
182        return producerCache.getEndpointUtilizationStatistics();
183    }
184
185    protected void doStart() throws Exception {
186        if (producerCache == null) {
187            if (cacheSize < 0) {
188                producerCache = new EmptyProducerCache(this, camelContext);
189                LOG.debug("RecipientList {} is not using ProducerCache", this);
190            } else if (cacheSize == 0) {
191                producerCache = new ProducerCache(this, camelContext);
192                LOG.debug("RecipientList {} using ProducerCache with default cache size", this);
193            } else {
194                producerCache = new ProducerCache(this, camelContext, cacheSize);
195                LOG.debug("RecipientList {} using ProducerCache with cacheSize={}", this, cacheSize);
196            }
197        }
198        ServiceHelper.startServices(aggregationStrategy, producerCache);
199    }
200
201    protected void doStop() throws Exception {
202        ServiceHelper.stopServices(producerCache, aggregationStrategy);
203    }
204
205    protected void doShutdown() throws Exception {
206        ServiceHelper.stopAndShutdownServices(producerCache, aggregationStrategy);
207
208        if (shutdownExecutorService && executorService != null) {
209            camelContext.getExecutorServiceManager().shutdownNow(executorService);
210        }
211    }
212
213    public Expression getExpression() {
214        return expression;
215    }
216
217    public String getDelimiter() {
218        return delimiter;
219    }
220
221    public boolean isStreaming() {
222        return streaming;
223    }
224    
225    public void setStreaming(boolean streaming) {
226        this.streaming = streaming;
227    }
228 
229    public boolean isIgnoreInvalidEndpoints() {
230        return ignoreInvalidEndpoints;
231    }
232    
233    public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
234        this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
235    }
236
237    public boolean isParallelProcessing() {
238        return parallelProcessing;
239    }
240
241    public void setParallelProcessing(boolean parallelProcessing) {
242        this.parallelProcessing = parallelProcessing;
243    }
244
245    public boolean isParallelAggregate() {
246        return parallelAggregate;
247    }
248
249    public void setParallelAggregate(boolean parallelAggregate) {
250        this.parallelAggregate = parallelAggregate;
251    }
252
253    public boolean isStopOnException() {
254        return stopOnException;
255    }
256
257    public void setStopOnException(boolean stopOnException) {
258        this.stopOnException = stopOnException;
259    }
260
261    public ExecutorService getExecutorService() {
262        return executorService;
263    }
264
265    public void setExecutorService(ExecutorService executorService) {
266        this.executorService = executorService;
267    }
268
269    public boolean isShutdownExecutorService() {
270        return shutdownExecutorService;
271    }
272
273    public void setShutdownExecutorService(boolean shutdownExecutorService) {
274        this.shutdownExecutorService = shutdownExecutorService;
275    }
276
277    public AggregationStrategy getAggregationStrategy() {
278        return aggregationStrategy;
279    }
280
281    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
282        this.aggregationStrategy = aggregationStrategy;
283    }
284
285    public long getTimeout() {
286        return timeout;
287    }
288
289    public void setTimeout(long timeout) {
290        this.timeout = timeout;
291    }
292
293    public Processor getOnPrepare() {
294        return onPrepare;
295    }
296
297    public void setOnPrepare(Processor onPrepare) {
298        this.onPrepare = onPrepare;
299    }
300
301    public boolean isShareUnitOfWork() {
302        return shareUnitOfWork;
303    }
304
305    public void setShareUnitOfWork(boolean shareUnitOfWork) {
306        this.shareUnitOfWork = shareUnitOfWork;
307    }
308
309    public int getCacheSize() {
310        return cacheSize;
311    }
312
313    public void setCacheSize(int cacheSize) {
314        this.cacheSize = cacheSize;
315    }
316}