001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.io.UnsupportedEncodingException;
020import java.net.MalformedURLException;
021import java.net.URISyntaxException;
022import java.util.ArrayList;
023import java.util.Iterator;
024import java.util.List;
025import java.util.concurrent.ExecutorService;
026
027import org.apache.camel.CamelContext;
028import org.apache.camel.Endpoint;
029import org.apache.camel.Exchange;
030import org.apache.camel.ExchangePattern;
031import org.apache.camel.Processor;
032import org.apache.camel.Producer;
033import org.apache.camel.impl.ProducerCache;
034import org.apache.camel.processor.aggregate.AggregationStrategy;
035import org.apache.camel.spi.RouteContext;
036import org.apache.camel.util.EndpointHelper;
037import org.apache.camel.util.ExchangeHelper;
038import org.apache.camel.util.MessageHelper;
039import org.apache.camel.util.ServiceHelper;
040import org.apache.camel.util.URISupport;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * Implements a dynamic <a
046 * href="http://camel.apache.org/recipient-list.html">Recipient List</a>
047 * pattern where the list of actual endpoints to send a message exchange to are
048 * dependent on some dynamic expression.
049 * <p/>
050 * This implementation is a specialized {@link org.apache.camel.processor.MulticastProcessor} which is based
051 * on recipient lists. This implementation have to handle the fact the processors is not known at design time
052 * but evaluated at runtime from the dynamic recipient list. Therefore this implementation have to at runtime
053 * lookup endpoints and create producers which should act as the processors for the multicast processors which
054 * runs under the hood. Also this implementation supports the asynchronous routing engine which makes the code
055 * more trickier.
056 *
057 * @version 
058 */
059public class RecipientListProcessor extends MulticastProcessor {
060
061    private static final Logger LOG = LoggerFactory.getLogger(RecipientListProcessor.class);
062    private final Iterator<Object> iter;
063    private boolean ignoreInvalidEndpoints;
064    private ProducerCache producerCache;
065
066    /**
067     * Class that represent each step in the recipient list to do
068     * <p/>
069     * This implementation ensures the provided producer is being released back in the producer cache when
070     * its done using it.
071     */
072    static final class RecipientProcessorExchangePair implements ProcessorExchangePair {
073        private final int index;
074        private final Endpoint endpoint;
075        private final Producer producer;
076        private Processor prepared;
077        private final Exchange exchange;
078        private final ProducerCache producerCache;
079        private final ExchangePattern pattern;
080        private volatile ExchangePattern originalPattern;
081
082        private RecipientProcessorExchangePair(int index, ProducerCache producerCache, Endpoint endpoint, Producer producer,
083                                               Processor prepared, Exchange exchange, ExchangePattern pattern) {
084            this.index = index;
085            this.producerCache = producerCache;
086            this.endpoint = endpoint;
087            this.producer = producer;
088            this.prepared = prepared;
089            this.exchange = exchange;
090            this.pattern = pattern;
091        }
092
093        public int getIndex() {
094            return index;
095        }
096
097        public Exchange getExchange() {
098            return exchange;
099        }
100
101        public Producer getProducer() {
102            return producer;
103        }
104
105        public Processor getProcessor() {
106            return prepared;
107        }
108
109        public void begin() {
110            // we have already acquired and prepare the producer
111            LOG.trace("RecipientProcessorExchangePair #{} begin: {}", index, exchange);
112            exchange.setProperty(Exchange.RECIPIENT_LIST_ENDPOINT, endpoint.getEndpointUri());
113            // ensure stream caching is reset
114            MessageHelper.resetStreamCache(exchange.getIn());
115            // if the MEP on the endpoint is different then
116            if (pattern != null) {
117                originalPattern = exchange.getPattern();
118                LOG.trace("Using exchangePattern: {} on exchange: {}", pattern, exchange);
119                exchange.setPattern(pattern);
120            }
121        }
122
123        public void done() {
124            LOG.trace("RecipientProcessorExchangePair #{} done: {}", index, exchange);
125            try {
126                // preserve original MEP
127                if (originalPattern != null) {
128                    exchange.setPattern(originalPattern);
129                }
130                // when we are done we should release back in pool
131                producerCache.releaseProducer(endpoint, producer);
132            } catch (Exception e) {
133                if (LOG.isDebugEnabled()) {
134                    LOG.debug("Error releasing producer: " + producer + ". This exception will be ignored.", e);
135                }
136            }
137        }
138
139    }
140
141    public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter) {
142        super(camelContext, null);
143        this.producerCache = producerCache;
144        this.iter = iter;
145    }
146
147    public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy) {
148        super(camelContext, null, aggregationStrategy);
149        this.producerCache = producerCache;
150        this.iter = iter;
151    }
152
153    @Deprecated
154    public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
155                                  boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
156                                  boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
157        super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
158                streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false);
159        this.producerCache = producerCache;
160        this.iter = iter;
161    }
162
163    public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
164                                  boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
165                                  boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean parallelAggregate) {
166        super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
167                streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, parallelAggregate);
168        this.producerCache = producerCache;
169        this.iter = iter;
170    }
171
172    public boolean isIgnoreInvalidEndpoints() {
173        return ignoreInvalidEndpoints;
174    }
175
176    public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
177        this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
178    }
179
180    @Override
181    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
182        // here we iterate the recipient lists and create the exchange pair for each of those
183        List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>();
184
185        // at first we must lookup the endpoint and acquire the producer which can send to the endpoint
186        int index = 0;
187        while (iter.hasNext()) {
188            Object recipient = iter.next();
189            Endpoint endpoint;
190            Producer producer;
191            ExchangePattern pattern;
192            try {
193                endpoint = resolveEndpoint(exchange, recipient);
194                pattern = resolveExchangePattern(recipient);
195                producer = producerCache.acquireProducer(endpoint);
196            } catch (Exception e) {
197                if (isIgnoreInvalidEndpoints()) {
198                    if (LOG.isDebugEnabled()) {
199                        LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
200                    }
201                    continue;
202                } else {
203                    // failure so break out
204                    throw e;
205                }
206            }
207
208            // then create the exchange pair
209            result.add(createProcessorExchangePair(index++, endpoint, producer, exchange, pattern));
210        }
211
212        return result;
213    }
214
215    /**
216     * This logic is similar to MulticastProcessor but we have to return a RecipientProcessorExchangePair instead
217     */
218    protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange, ExchangePattern pattern) {
219        Processor prepared = producer;
220
221        // copy exchange, and do not share the unit of work
222        Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
223
224        // if we share unit of work, we need to prepare the child exchange
225        if (isShareUnitOfWork()) {
226            prepareSharedUnitOfWork(copy, exchange);
227        }
228
229        // set property which endpoint we send to
230        setToEndpoint(copy, prepared);
231
232        // rework error handling to support fine grained error handling
233        RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
234        prepared = createErrorHandler(routeContext, copy, prepared);
235
236        // invoke on prepare on the exchange if specified
237        if (onPrepare != null) {
238            try {
239                onPrepare.process(copy);
240            } catch (Exception e) {
241                copy.setException(e);
242            }
243        }
244
245        // and create the pair
246        return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy, pattern);
247    }
248
249    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
250        // trim strings as end users might have added spaces between separators
251        if (recipient instanceof String) {
252            recipient = ((String) recipient).trim();
253        }
254        return ExchangeHelper.resolveEndpoint(exchange, recipient);
255    }
256
257    protected ExchangePattern resolveExchangePattern(Object recipient) throws UnsupportedEncodingException, URISyntaxException, MalformedURLException {
258        // trim strings as end users might have added spaces between separators
259        if (recipient instanceof String) {
260            String s = ((String) recipient).trim();
261            // see if exchangePattern is a parameter in the url
262            s = URISupport.normalizeUri(s);
263            return EndpointHelper.resolveExchangePatternFromUrl(s);
264        }
265        return null;
266    }
267
268    protected void doStart() throws Exception {
269        super.doStart();
270        if (producerCache == null) {
271            producerCache = new ProducerCache(this, getCamelContext());
272        }
273        ServiceHelper.startService(producerCache);
274    }
275
276    protected void doStop() throws Exception {
277        ServiceHelper.stopService(producerCache);
278        super.doStop();
279    }
280
281    protected void doShutdown() throws Exception {
282        ServiceHelper.stopAndShutdownService(producerCache);
283        super.doShutdown();
284    }
285
286    @Override
287    public String toString() {
288        return "RecipientList";
289    }
290
291    @Override
292    public String getTraceLabel() {
293        return "recipientList";
294    }
295}