001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.Iterator;
020
021import org.apache.camel.AsyncCallback;
022import org.apache.camel.AsyncProcessor;
023import org.apache.camel.AsyncProducerCallback;
024import org.apache.camel.CamelContext;
025import org.apache.camel.Endpoint;
026import org.apache.camel.Exchange;
027import org.apache.camel.ExchangePattern;
028import org.apache.camel.Expression;
029import org.apache.camel.FailedToCreateProducerException;
030import org.apache.camel.Message;
031import org.apache.camel.Producer;
032import org.apache.camel.Traceable;
033import org.apache.camel.builder.ExpressionBuilder;
034import org.apache.camel.impl.DefaultExchange;
035import org.apache.camel.impl.EmptyProducerCache;
036import org.apache.camel.impl.ProducerCache;
037import org.apache.camel.spi.EndpointUtilizationStatistics;
038import org.apache.camel.spi.IdAware;
039import org.apache.camel.spi.RouteContext;
040import org.apache.camel.support.ServiceSupport;
041import org.apache.camel.util.AsyncProcessorHelper;
042import org.apache.camel.util.ExchangeHelper;
043import org.apache.camel.util.MessageHelper;
044import org.apache.camel.util.ObjectHelper;
045import org.apache.camel.util.ServiceHelper;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049import static org.apache.camel.processor.PipelineHelper.continueProcessing;
050import static org.apache.camel.util.ObjectHelper.notNull;
051
052/**
053 * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a>
054 * pattern where the list of actual endpoints to send a message exchange to are
055 * dependent on the value of a message header.
056 * <p/>
057 * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation
058 * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the
059 * pipeline to ensure it works the same and the async routing engine is flawless.
060 */
061public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
062    protected final Logger log = LoggerFactory.getLogger(getClass());
063    protected String id;
064    protected ProducerCache producerCache;
065    protected int cacheSize;
066    protected boolean ignoreInvalidEndpoints;
067    protected String header;
068    protected Expression expression;
069    protected String uriDelimiter;
070    protected final CamelContext camelContext;
071    protected AsyncProcessor errorHandler;
072    protected SendDynamicProcessor sendDynamicProcessor;
073
074    /**
075     * The iterator to be used for retrieving the next routing slip(s) to be used.
076     */
077    protected interface RoutingSlipIterator {
078
079        /**
080         * Are the more routing slip(s)?
081         *
082         * @param exchange the current exchange
083         * @return <tt>true</tt> if more slips, <tt>false</tt> otherwise.
084         */
085        boolean hasNext(Exchange exchange);
086
087        /**
088         * Returns the next routing slip(s).
089         *
090         * @param exchange the current exchange
091         * @return the slip(s).
092         */
093        Object next(Exchange exchange);
094
095    }
096
097    public RoutingSlip(CamelContext camelContext) {
098        notNull(camelContext, "camelContext");
099        this.camelContext = camelContext;
100    }
101
102    public RoutingSlip(CamelContext camelContext, Expression expression, String uriDelimiter) {
103        notNull(camelContext, "camelContext");
104        notNull(expression, "expression");
105        
106        this.camelContext = camelContext;
107        this.expression = expression;
108        this.uriDelimiter = uriDelimiter;
109        this.header = null;
110    }
111
112    public String getId() {
113        return id;
114    }
115
116    public void setId(String id) {
117        this.id = id;
118    }
119
120    public Expression getExpression() {
121        return expression;
122    }
123
124    public String getUriDelimiter() {
125        return uriDelimiter;
126    }
127
128    public void setDelimiter(String delimiter) {
129        this.uriDelimiter = delimiter;
130    }
131    
132    public boolean isIgnoreInvalidEndpoints() {
133        return ignoreInvalidEndpoints;
134    }
135    
136    public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
137        this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
138    }
139
140    public int getCacheSize() {
141        return cacheSize;
142    }
143
144    public void setCacheSize(int cacheSize) {
145        this.cacheSize = cacheSize;
146    }
147
148    public AsyncProcessor getErrorHandler() {
149        return errorHandler;
150    }
151
152    public void setErrorHandler(AsyncProcessor errorHandler) {
153        this.errorHandler = errorHandler;
154    }
155
156    @Override
157    public String toString() {
158        return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]";
159    }
160
161    public String getTraceLabel() {
162        return "routingSlip[" + expression + "]";
163    }
164
165    public void process(Exchange exchange) throws Exception {
166        AsyncProcessorHelper.process(this, exchange);
167    }
168
169    public boolean process(Exchange exchange, AsyncCallback callback) {
170        if (!isStarted()) {
171            exchange.setException(new IllegalStateException("RoutingSlip has not been started: " + this));
172            callback.done(true);
173            return true;
174        }
175
176        return doRoutingSlipWithExpression(exchange, this.expression, callback);
177    }
178
179    public boolean doRoutingSlip(Exchange exchange, Object routingSlip, AsyncCallback callback) {
180        if (routingSlip instanceof Expression) {
181            return doRoutingSlipWithExpression(exchange, (Expression) routingSlip, callback);
182        } else {
183            return doRoutingSlipWithExpression(exchange, ExpressionBuilder.constantExpression(routingSlip), callback);
184        }
185    }
186
187    /**
188     * Creates the route slip iterator to be used.
189     *
190     * @param exchange the exchange
191     * @param expression the expression
192     * @return the iterator, should never be <tt>null</tt>
193     */
194    protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange, final Expression expression) throws Exception {
195        Object slip = expression.evaluate(exchange, Object.class);
196        if (exchange.getException() != null) {
197            // force any exceptions occurred during evaluation to be thrown
198            throw exchange.getException();
199        }
200
201        final Iterator<Object> delegate = ObjectHelper.createIterator(slip, uriDelimiter);
202
203        return new RoutingSlipIterator() {
204            public boolean hasNext(Exchange exchange) {
205                return delegate.hasNext();
206            }
207
208            public Object next(Exchange exchange) {
209                return delegate.next();
210            }
211        };
212    }
213
214    private boolean doRoutingSlipWithExpression(final Exchange exchange, final Expression expression, final AsyncCallback originalCallback) {
215        Exchange current = exchange;
216        RoutingSlipIterator iter;
217        try {
218            iter = createRoutingSlipIterator(exchange, expression);
219        } catch (Exception e) {
220            exchange.setException(e);
221            originalCallback.done(true);
222            return true;
223        }
224
225        // ensure the slip is empty when we start
226        if (current.hasProperties()) {
227            current.setProperty(Exchange.SLIP_ENDPOINT, null);
228        }
229
230        while (iter.hasNext(current)) {
231            Endpoint endpoint;
232            try {
233                endpoint = resolveEndpoint(iter, exchange);
234                // if no endpoint was resolved then try the next
235                if (endpoint == null) {
236                    continue;
237                }
238            } catch (Exception e) {
239                // error resolving endpoint so we should break out
240                current.setException(e);
241                break;
242            }
243
244            //process and prepare the routing slip
245            boolean sync = processExchange(endpoint, current, exchange, originalCallback, iter);
246            current = prepareExchangeForRoutingSlip(current, endpoint);
247            
248            if (!sync) {
249                log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
250                // the remainder of the routing slip will be completed async
251                // so we break out now, then the callback will be invoked which then continue routing from where we left here
252                return false;
253            }
254
255            log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
256
257            // we ignore some kind of exceptions and allow us to continue
258            if (isIgnoreInvalidEndpoints()) {
259                FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class);
260                if (e != null) {
261                    if (log.isDebugEnabled()) {
262                        log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e);
263                    }
264                    current.setException(null);
265                }
266            }
267
268            // Decide whether to continue with the recipients or not; similar logic to the Pipeline
269            // check for error if so we should break out
270            if (!continueProcessing(current, "so breaking out of the routing slip", log)) {
271                break;
272            }
273        }
274
275        // logging nextExchange as it contains the exchange that might have altered the payload and since
276        // we are logging the completion if will be confusing if we log the original instead
277        // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
278        log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current);
279
280        // copy results back to the original exchange
281        ExchangeHelper.copyResults(exchange, current);
282
283        // okay we are completely done with the routing slip
284        // so we need to signal done on the original callback so it can continue
285        originalCallback.done(true);
286        return true;
287    }
288
289    protected Endpoint resolveEndpoint(RoutingSlipIterator iter, Exchange exchange) throws Exception {
290        Object nextRecipient = iter.next(exchange);
291        Endpoint endpoint = null;
292        try {
293            endpoint = ExchangeHelper.resolveEndpoint(exchange, nextRecipient);
294        } catch (Exception e) {
295            if (isIgnoreInvalidEndpoints()) {
296                log.info("Endpoint uri is invalid: " + nextRecipient + ". This exception will be ignored.", e);
297            } else {
298                throw e;
299            }
300        }
301        return endpoint;
302    }
303
304    protected Exchange prepareExchangeForRoutingSlip(Exchange current, Endpoint endpoint) {
305        Exchange copy = new DefaultExchange(current);
306        // we must use the same id as this is a snapshot strategy where Camel copies a snapshot
307        // before processing the next step in the pipeline, so we have a snapshot of the exchange
308        // just before. This snapshot is used if Camel should do redeliveries (re try) using
309        // DeadLetterChannel. That is why it's important the id is the same, as it is the *same*
310        // exchange being routed.
311        copy.setExchangeId(current.getExchangeId());
312        copyOutToIn(copy, current);
313
314        // ensure stream caching is reset
315        MessageHelper.resetStreamCache(copy.getIn());
316
317        return copy;
318    }
319
320    protected AsyncProcessor createErrorHandler(RouteContext routeContext, Exchange exchange, AsyncProcessor processor, Endpoint endpoint) {
321        AsyncProcessor answer = processor;
322
323        boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class);
324
325        // do not wrap in error handler if we are inside a try block
326        if (!tryBlock && routeContext != null && errorHandler != null) {
327            // wrap the producer in error handler so we have fine grained error handling on
328            // the output side instead of the input side
329            // this is needed to support redelivery on that output alone and not doing redelivery
330            // for the entire routingslip/dynamic-router block again which will start from scratch again
331            answer = errorHandler;
332        }
333
334        return answer;
335    }
336
337    protected boolean processExchange(final Endpoint endpoint, final Exchange exchange, final Exchange original,
338                                      final AsyncCallback originalCallback, final RoutingSlipIterator iter) {
339
340        // this does the actual processing so log at trace level
341        log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
342
343        // routing slip callback which are used when
344        // - routing slip was routed asynchronously
345        // - and we are completely done with the routing slip
346        // so we need to signal done on the original callback so it can continue
347        AsyncCallback callback = new AsyncCallback() {
348            @Override
349            public void done(boolean doneSync) {
350                if (!doneSync) {
351                    originalCallback.done(false);
352                }
353            }
354        };
355        boolean sync = producerCache.doInAsyncProducer(endpoint, exchange, null, callback, new AsyncProducerCallback() {
356            public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange,
357                                             ExchangePattern exchangePattern, final AsyncCallback callback) {
358
359                // rework error handling to support fine grained error handling
360                RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
361                AsyncProcessor target = createErrorHandler(routeContext, exchange, asyncProducer, endpoint);
362
363                // set property which endpoint we send to and the producer that can do it
364                exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
365                exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri());
366                exchange.setProperty(Exchange.SLIP_PRODUCER, asyncProducer);
367
368                boolean answer = target.process(exchange, new AsyncCallback() {
369                    public void done(boolean doneSync) {
370                        // cleanup producer after usage
371                        exchange.removeProperty(Exchange.SLIP_PRODUCER);
372
373                        // we only have to handle async completion of the routing slip
374                        if (doneSync) {
375                            callback.done(true);
376                            return;
377                        }
378
379                        try {
380                            // continue processing the routing slip asynchronously
381                            Exchange current = prepareExchangeForRoutingSlip(exchange, endpoint);
382
383                            while (iter.hasNext(current)) {
384
385                                // we ignore some kind of exceptions and allow us to continue
386                                if (isIgnoreInvalidEndpoints()) {
387                                    FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class);
388                                    if (e != null) {
389                                        if (log.isDebugEnabled()) {
390                                            log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e);
391                                        }
392                                        current.setException(null);
393                                    }
394                                }
395
396                                // Decide whether to continue with the recipients or not; similar logic to the Pipeline
397                                // check for error if so we should break out
398                                if (!continueProcessing(current, "so breaking out of the routing slip", log)) {
399                                    break;
400                                }
401
402                                Endpoint endpoint;
403                                try {
404                                    endpoint = resolveEndpoint(iter, exchange);
405                                    // if no endpoint was resolved then try the next
406                                    if (endpoint == null) {
407                                        continue;
408                                    }
409                                } catch (Exception e) {
410                                    // error resolving endpoint so we should break out
411                                    exchange.setException(e);
412                                    break;
413                                }
414
415                                // prepare and process the routing slip
416                                boolean sync = processExchange(endpoint, current, original, callback, iter);
417                                current = prepareExchangeForRoutingSlip(current, endpoint);
418
419                                if (!sync) {
420                                    log.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
421                                    return;
422                                }
423                            }
424
425                            // logging nextExchange as it contains the exchange that might have altered the payload and since
426                            // we are logging the completion if will be confusing if we log the original instead
427                            // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
428                            log.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current);
429
430                            // copy results back to the original exchange
431                            ExchangeHelper.copyResults(original, current);
432                        } catch (Throwable e) {
433                            exchange.setException(e);
434                        }
435
436                        // okay we are completely done with the routing slip
437                        // so we need to signal done on the original callback so it can continue
438                        originalCallback.done(false);
439                    }
440                });
441
442                return answer;
443            }
444        });
445
446        return sync;
447    }
448
449    protected void doStart() throws Exception {
450        if (producerCache == null) {
451            if (cacheSize < 0) {
452                producerCache = new EmptyProducerCache(this, camelContext);
453                log.debug("RoutingSlip {} is not using ProducerCache", this);
454            } else if (cacheSize == 0) {
455                producerCache = new ProducerCache(this, camelContext);
456                log.debug("RoutingSlip {} using ProducerCache with default cache size", this);
457            } else {
458                producerCache = new ProducerCache(this, camelContext, cacheSize);
459                log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize);
460            }
461        }
462
463        ServiceHelper.startServices(producerCache, errorHandler);
464    }
465
466    protected void doStop() throws Exception {
467        ServiceHelper.stopServices(producerCache, errorHandler);
468    }
469
470    protected void doShutdown() throws Exception {
471        ServiceHelper.stopAndShutdownServices(producerCache, errorHandler);
472    }
473
474    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
475        return producerCache.getEndpointUtilizationStatistics();
476    }
477
478    /**
479     * Returns the outbound message if available. Otherwise return the inbound message.
480     */
481    private Message getResultMessage(Exchange exchange) {
482        if (exchange.hasOut()) {
483            return exchange.getOut();
484        } else {
485            // if this endpoint had no out (like a mock endpoint) just take the in
486            return exchange.getIn();
487        }
488    }
489
490    /**
491     * Copy the outbound data in 'source' to the inbound data in 'result'.
492     */
493    private void copyOutToIn(Exchange result, Exchange source) {
494        result.setException(source.getException());
495        result.setIn(getResultMessage(source));
496
497        result.getProperties().clear();
498        result.getProperties().putAll(source.getProperties());
499    }
500
501    /**
502     * Creates the embedded processor to use when wrapping this routing slip in an error handler.
503     */
504    public AsyncProcessor newRoutingSlipProcessorForErrorHandler() {
505        return new RoutingSlipProcessor();
506    }
507
508    /**
509     * Embedded processor that routes to the routing slip that has been set via the
510     * exchange property {@link Exchange#SLIP_PRODUCER}.
511     */
512    private final class RoutingSlipProcessor implements AsyncProcessor {
513
514        @Override
515        public void process(Exchange exchange) throws Exception {
516            AsyncProcessorHelper.process(this, exchange);
517        }
518
519        @Override
520        public boolean process(Exchange exchange, AsyncCallback callback) {
521            AsyncProcessor producer = exchange.getProperty(Exchange.SLIP_PRODUCER, AsyncProcessor.class);
522            return producer.process(exchange, callback);
523        }
524
525        @Override
526        public String toString() {
527            return "RoutingSlipProcessor";
528        }
529    }
530}