001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Iterator;
021    import java.util.List;
022    import java.util.concurrent.ExecutorService;
023    
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.Processor;
028    import org.apache.camel.Producer;
029    import org.apache.camel.impl.ProducerCache;
030    import org.apache.camel.processor.aggregate.AggregationStrategy;
031    import org.apache.camel.spi.RouteContext;
032    import org.apache.camel.util.ExchangeHelper;
033    import org.apache.camel.util.ServiceHelper;
034    import org.slf4j.Logger;
035    import org.slf4j.LoggerFactory;
036    
037    /**
038     * Implements a dynamic <a
039     * href="http://camel.apache.org/recipient-list.html">Recipient List</a>
040     * pattern where the list of actual endpoints to send a message exchange to are
041     * dependent on some dynamic expression.
042     * <p/>
043     * This implementation is a specialized {@link org.apache.camel.processor.MulticastProcessor} which is based
044     * on recipient lists. This implementation have to handle the fact the processors is not known at design time
045     * but evaluated at runtime from the dynamic recipient list. Therefore this implementation have to at runtime
046     * lookup endpoints and create producers which should act as the processors for the multicast processors which
047     * runs under the hood. Also this implementation supports the asynchronous routing engine which makes the code
048     * more trickier.
049     *
050     * @version 
051     */
052    public class RecipientListProcessor extends MulticastProcessor {
053    
054        private static final transient Logger LOG = LoggerFactory.getLogger(RecipientListProcessor.class);
055        private final Iterator<Object> iter;
056        private boolean ignoreInvalidEndpoints;
057        private ProducerCache producerCache;
058    
059        /**
060         * Class that represent each step in the recipient list to do
061         * <p/>
062         * This implementation ensures the provided producer is being released back in the producer cache when
063         * its done using it.
064         */
065        static final class RecipientProcessorExchangePair implements ProcessorExchangePair {
066            private final int index;
067            private final Endpoint endpoint;
068            private final Producer producer;
069            private Processor prepared;
070            private final Exchange exchange;
071            private final ProducerCache producerCache;
072    
073            private RecipientProcessorExchangePair(int index, ProducerCache producerCache, Endpoint endpoint, Producer producer,
074                                                   Processor prepared, Exchange exchange) {
075                this.index = index;
076                this.producerCache = producerCache;
077                this.endpoint = endpoint;
078                this.producer = producer;
079                this.prepared = prepared;
080                this.exchange = exchange;
081            }
082    
083            public int getIndex() {
084                return index;
085            }
086    
087            public Exchange getExchange() {
088                return exchange;
089            }
090    
091            public Producer getProducer() {
092                return producer;
093            }
094    
095            public Processor getProcessor() {
096                return prepared;
097            }
098    
099            public void begin() {
100                // we have already acquired and prepare the producer so we
101                LOG.trace("RecipientProcessorExchangePair #{} begin: {}", index, exchange);
102            }
103    
104            public void done() {
105                LOG.trace("RecipientProcessorExchangePair #{} done: {}", index, exchange);
106                // when we are done we should release back in pool
107                try {
108                    producerCache.releaseProducer(endpoint, producer);
109                } catch (Exception e) {
110                    if (LOG.isDebugEnabled()) {
111                        LOG.debug("Error releasing producer: " + producer + ". This exception will be ignored.", e);
112                    }
113                }
114            }
115    
116        }
117    
118        public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter) {
119            super(camelContext, null);
120            this.producerCache = producerCache;
121            this.iter = iter;
122        }
123    
124        public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy) {
125            super(camelContext, null, aggregationStrategy);
126            this.producerCache = producerCache;
127            this.iter = iter;
128        }
129    
130        public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
131                                      boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
132                                      boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) {
133            super(camelContext, null, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
134                    streaming, stopOnException, timeout, onPrepare, shareUnitOfWork);
135            this.producerCache = producerCache;
136            this.iter = iter;
137        }
138    
139        public boolean isIgnoreInvalidEndpoints() {
140            return ignoreInvalidEndpoints;
141        }
142    
143        public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
144            this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
145        }
146    
147        @Override
148        protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
149            // here we iterate the recipient lists and create the exchange pair for each of those
150            List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>();
151    
152            // at first we must lookup the endpoint and acquire the producer which can send to the endpoint
153            int index = 0;
154            while (iter.hasNext()) {
155                Object recipient = iter.next();
156                Endpoint endpoint;
157                Producer producer;
158                try {
159                    endpoint = resolveEndpoint(exchange, recipient);
160                    producer = producerCache.acquireProducer(endpoint);
161                } catch (Exception e) {
162                    if (isIgnoreInvalidEndpoints()) {
163                        if (LOG.isDebugEnabled()) {
164                            LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
165                        }
166                        continue;
167                    } else {
168                        // failure so break out
169                        throw e;
170                    }
171                }
172    
173                // then create the exchange pair
174                result.add(createProcessorExchangePair(index++, endpoint, producer, exchange));
175            }
176    
177            return result;
178        }
179    
180        /**
181         * This logic is similar to MulticastProcessor but we have to return a RecipientProcessorExchangePair instead
182         */
183        protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange) {
184            Processor prepared = producer;
185    
186            // copy exchange, and do not share the unit of work
187            Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
188    
189            // if we share unit of work, we need to prepare the child exchange
190            if (isShareUnitOfWork()) {
191                prepareSharedUnitOfWork(copy, exchange);
192            }
193    
194            // set property which endpoint we send to
195            setToEndpoint(copy, prepared);
196    
197            // rework error handling to support fine grained error handling
198            RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
199            prepared = createErrorHandler(routeContext, copy, prepared);
200    
201            // invoke on prepare on the exchange if specified
202            if (onPrepare != null) {
203                try {
204                    onPrepare.process(copy);
205                } catch (Exception e) {
206                    copy.setException(e);
207                }
208            }
209    
210            // and create the pair
211            return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer, prepared, copy);
212        }
213    
214        protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
215            // trim strings as end users might have added spaces between separators
216            if (recipient instanceof String) {
217                recipient = ((String) recipient).trim();
218            }
219            return ExchangeHelper.resolveEndpoint(exchange, recipient);
220        }
221    
222        protected void doStart() throws Exception {
223            super.doStart();
224            if (producerCache == null) {
225                producerCache = new ProducerCache(this, getCamelContext());
226            }
227            ServiceHelper.startService(producerCache);
228        }
229    
230        protected void doStop() throws Exception {
231            ServiceHelper.stopService(producerCache);
232            super.doStop();
233        }
234    
235        protected void doShutdown() throws Exception {
236            ServiceHelper.stopAndShutdownService(producerCache);
237            super.doShutdown();
238        }
239    
240        @Override
241        public String toString() {
242            return "RecipientList";
243        }
244    
245        @Override
246        public String getTraceLabel() {
247            return "recipientList";
248        }
249    }