001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Iterator;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.AsyncProcessor;
023    import org.apache.camel.AsyncProducerCallback;
024    import org.apache.camel.CamelContext;
025    import org.apache.camel.Endpoint;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.ExchangePattern;
028    import org.apache.camel.Expression;
029    import org.apache.camel.FailedToCreateProducerException;
030    import org.apache.camel.Message;
031    import org.apache.camel.Producer;
032    import org.apache.camel.Traceable;
033    import org.apache.camel.builder.ExpressionBuilder;
034    import org.apache.camel.impl.DefaultExchange;
035    import org.apache.camel.impl.ProducerCache;
036    import org.apache.camel.support.ServiceSupport;
037    import org.apache.camel.util.AsyncProcessorHelper;
038    import org.apache.camel.util.ExchangeHelper;
039    import org.apache.camel.util.ObjectHelper;
040    import org.apache.camel.util.ServiceHelper;
041    import org.slf4j.Logger;
042    import org.slf4j.LoggerFactory;
043    
044    import static org.apache.camel.processor.PipelineHelper.continueProcessing;
045    import static org.apache.camel.util.ObjectHelper.notNull;
046    
047    /**
048     * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a>
049     * pattern where the list of actual endpoints to send a message exchange to are
050     * dependent on the value of a message header.
051     * <p/>
052     * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation
053     * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the
054     * pipeline to ensure it works the same and the async routing engine is flawless.
055     */
056    public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable {
057        protected final transient Logger log = LoggerFactory.getLogger(getClass());
058        protected ProducerCache producerCache;
059        protected boolean ignoreInvalidEndpoints;
060        protected String header;
061        protected Expression expression;
062        protected String uriDelimiter;
063        protected final CamelContext camelContext;
064    
065        /**
066         * The iterator to be used for retrieving the next routing slip(s) to be used.
067         */
068        protected interface RoutingSlipIterator {
069    
070            /**
071             * Are the more routing slip(s)?
072             *
073             * @param exchange the current exchange
074             * @return <tt>true</tt> if more slips, <tt>false</tt> otherwise.
075             */
076            boolean hasNext(Exchange exchange);
077    
078            /**
079             * Returns the next routing slip(s).
080             *
081             * @param exchange the current exchange
082             * @return the slip(s).
083             */
084            Object next(Exchange exchange);
085    
086        }
087    
088        public RoutingSlip(CamelContext camelContext) {
089            notNull(camelContext, "camelContext");
090            this.camelContext = camelContext;
091        }
092    
093        public RoutingSlip(CamelContext camelContext, Expression expression, String uriDelimiter) {
094            notNull(camelContext, "camelContext");
095            notNull(expression, "expression");
096            
097            this.camelContext = camelContext;
098            this.expression = expression;
099            this.uriDelimiter = uriDelimiter;
100            this.header = null;
101        }
102        
103        public void setDelimiter(String delimiter) {
104            this.uriDelimiter = delimiter;
105        }
106        
107        public boolean isIgnoreInvalidEndpoints() {
108            return ignoreInvalidEndpoints;
109        }
110        
111        public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
112            this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
113        }
114    
115        @Override
116        public String toString() {
117            return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]";
118        }
119    
120        public String getTraceLabel() {
121            return "routingSlip[" + expression + "]";
122        }
123    
124        public void process(Exchange exchange) throws Exception {
125            AsyncProcessorHelper.process(this, exchange);
126        }
127    
128        public boolean process(Exchange exchange, AsyncCallback callback) {
129            if (!isStarted()) {
130                exchange.setException(new IllegalStateException("RoutingSlip has not been started: " + this));
131                callback.done(true);
132                return true;
133            }
134    
135            return doRoutingSlip(exchange, callback);
136        }
137    
138        public boolean doRoutingSlip(Exchange exchange, Object routingSlip, AsyncCallback callback) {
139            if (routingSlip instanceof Expression) {
140                this.expression = (Expression) routingSlip;
141            } else {
142                this.expression = ExpressionBuilder.constantExpression(routingSlip);
143            }
144            return doRoutingSlip(exchange, callback);
145        }
146    
147        /**
148         * Creates the route slip iterator to be used.
149         *
150         * @param exchange the exchange
151         * @return the iterator, should never be <tt>null</tt>
152         */
153        protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange) throws Exception {
154            Object slip = expression.evaluate(exchange, Object.class);
155            if (exchange.getException() != null) {
156                // force any exceptions occurred during evaluation to be thrown
157                throw exchange.getException();
158            }
159    
160            final Iterator<Object> delegate = ObjectHelper.createIterator(slip, uriDelimiter);
161    
162            return new RoutingSlipIterator() {
163                public boolean hasNext(Exchange exchange) {
164                    return delegate.hasNext();
165                }
166    
167                public Object next(Exchange exchange) {
168                    return delegate.next();
169                }
170            };
171        }
172    
173        private boolean doRoutingSlip(final Exchange exchange, final AsyncCallback callback) {
174            Exchange current = exchange;
175            RoutingSlipIterator iter;
176            try {
177                iter = createRoutingSlipIterator(exchange);
178            } catch (Exception e) {
179                exchange.setException(e);
180                callback.done(true);
181                return true;
182            }
183    
184            // ensure the slip is empty when we start
185            if (current.hasProperties()) {
186                current.setProperty(Exchange.SLIP_ENDPOINT, null);
187            }
188    
189            while (iter.hasNext(current)) {
190                Endpoint endpoint;
191                try {
192                    endpoint = resolveEndpoint(iter, exchange);
193                    // if no endpoint was resolved then try the next
194                    if (endpoint == null) {
195                        continue;
196                    }
197                } catch (Exception e) {
198                    // error resolving endpoint so we should break out
199                    current.setException(e);
200                    break;
201                }
202    
203                // prepare and process the routing slip
204                Exchange copy = prepareExchangeForRoutingSlip(current, endpoint);
205                boolean sync = processExchange(endpoint, copy, exchange, callback, iter);
206                current = copy;
207    
208                if (!sync) {
209                    log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
210                    // the remainder of the routing slip will be completed async
211                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
212                    return false;
213                }
214    
215                log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
216    
217                // we ignore some kind of exceptions and allow us to continue
218                if (isIgnoreInvalidEndpoints()) {
219                    FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class);
220                    if (e != null) {
221                        if (log.isDebugEnabled()) {
222                            log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e);
223                        }
224                        current.setException(null);
225                    }
226                }
227    
228                // Decide whether to continue with the recipients or not; similar logic to the Pipeline
229                // check for error if so we should break out
230                if (!continueProcessing(current, "so breaking out of the routing slip", log)) {
231                    break;
232                }
233            }
234    
235            // logging nextExchange as it contains the exchange that might have altered the payload and since
236            // we are logging the completion if will be confusing if we log the original instead
237            // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
238            log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current);
239    
240            // copy results back to the original exchange
241            ExchangeHelper.copyResults(exchange, current);
242    
243            callback.done(true);
244            return true;
245        }
246    
247        protected Endpoint resolveEndpoint(RoutingSlipIterator iter, Exchange exchange) throws Exception {
248            Object nextRecipient = iter.next(exchange);
249            Endpoint endpoint = null;
250            try {
251                endpoint = ExchangeHelper.resolveEndpoint(exchange, nextRecipient);
252            } catch (Exception e) {
253                if (isIgnoreInvalidEndpoints()) {
254                    log.info("Endpoint uri is invalid: " + nextRecipient + ". This exception will be ignored.", e);
255                } else {
256                    throw e;
257                }
258            }
259            return endpoint;
260        }
261    
262        protected Exchange prepareExchangeForRoutingSlip(Exchange current, Endpoint endpoint) {
263            Exchange copy = new DefaultExchange(current);
264            // we must use the same id as this is a snapshot strategy where Camel copies a snapshot
265            // before processing the next step in the pipeline, so we have a snapshot of the exchange
266            // just before. This snapshot is used if Camel should do redeliveries (re try) using
267            // DeadLetterChannel. That is why it's important the id is the same, as it is the *same*
268            // exchange being routed.
269            copy.setExchangeId(current.getExchangeId());
270            copyOutToIn(copy, current);
271            return copy;
272        }
273    
274        protected boolean processExchange(final Endpoint endpoint, final Exchange exchange, final Exchange original,
275                                          final AsyncCallback callback, final RoutingSlipIterator iter) {
276    
277            // this does the actual processing so log at trace level
278            log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
279    
280            boolean sync = producerCache.doInAsyncProducer(endpoint, exchange, null, callback, new AsyncProducerCallback() {
281                public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange,
282                                                 ExchangePattern exchangePattern, final AsyncCallback callback) {
283                    // set property which endpoint we send to
284                    exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
285                    exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri());
286    
287                    boolean sync = AsyncProcessorHelper.process(asyncProducer, exchange, new AsyncCallback() {
288                        public void done(boolean doneSync) {
289                            // we only have to handle async completion of the routing slip
290                            if (doneSync) {
291                                return;
292                            }
293    
294                            // continue processing the routing slip asynchronously
295                            Exchange current = exchange;
296    
297                            while (iter.hasNext(current)) {
298    
299                                // we ignore some kind of exceptions and allow us to continue
300                                if (isIgnoreInvalidEndpoints()) {
301                                    FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class);
302                                    if (e != null) {
303                                        if (log.isDebugEnabled()) {
304                                            log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e);
305                                        }
306                                        current.setException(null);
307                                    }
308                                }
309    
310                                // Decide whether to continue with the recipients or not; similar logic to the Pipeline
311                                // check for error if so we should break out
312                                if (!continueProcessing(current, "so breaking out of the routing slip", log)) {
313                                    break;
314                                }
315    
316                                Endpoint endpoint;
317                                try {
318                                    endpoint = resolveEndpoint(iter, exchange);
319                                    // if no endpoint was resolved then try the next
320                                    if (endpoint == null) {
321                                        continue;
322                                    }
323                                } catch (Exception e) {
324                                    // error resolving endpoint so we should break out
325                                    exchange.setException(e);
326                                    break;
327                                }
328    
329                                // prepare and process the routing slip
330                                Exchange copy = prepareExchangeForRoutingSlip(current, endpoint);
331                                boolean sync = processExchange(endpoint, copy, original, callback, iter);
332                                current = copy;
333    
334                                if (!sync) {
335                                    log.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
336                                    return;
337                                }
338                            }
339    
340                            // logging nextExchange as it contains the exchange that might have altered the payload and since
341                            // we are logging the completion if will be confusing if we log the original instead
342                            // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
343                            log.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current);
344    
345                            // copy results back to the original exchange
346                            ExchangeHelper.copyResults(original, current);
347                            callback.done(false);
348                        }
349                    });
350    
351                    return sync;
352                }
353            });
354    
355            return sync;
356        }
357    
358        protected void doStart() throws Exception {
359            if (producerCache == null) {
360                producerCache = new ProducerCache(this, camelContext);
361            }
362            ServiceHelper.startService(producerCache);
363        }
364    
365        protected void doStop() throws Exception {
366            ServiceHelper.stopService(producerCache);
367        }
368    
369        protected void doShutdown() throws Exception {
370            ServiceHelper.stopAndShutdownService(producerCache);
371        }
372    
373        /**
374         * Returns the outbound message if available. Otherwise return the inbound message.
375         */
376        private Message getResultMessage(Exchange exchange) {
377            if (exchange.hasOut()) {
378                return exchange.getOut();
379            } else {
380                // if this endpoint had no out (like a mock endpoint) just take the in
381                return exchange.getIn();
382            }
383        }
384    
385        /**
386         * Copy the outbound data in 'source' to the inbound data in 'result'.
387         */
388        private void copyOutToIn(Exchange result, Exchange source) {
389            result.setException(source.getException());
390    
391            if (source.hasOut() && source.getOut().isFault()) {
392                result.getOut().copyFrom(source.getOut());
393            }
394    
395            result.setIn(getResultMessage(source));
396    
397            result.getProperties().clear();
398            result.getProperties().putAll(source.getProperties());
399        }
400    }