001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.Iterator;
020import java.util.concurrent.ConcurrentHashMap;
021import java.util.concurrent.ConcurrentMap;
022
023import org.apache.camel.AsyncCallback;
024import org.apache.camel.AsyncProcessor;
025import org.apache.camel.AsyncProducerCallback;
026import org.apache.camel.CamelContext;
027import org.apache.camel.Endpoint;
028import org.apache.camel.ErrorHandlerFactory;
029import org.apache.camel.Exchange;
030import org.apache.camel.ExchangePattern;
031import org.apache.camel.Expression;
032import org.apache.camel.FailedToCreateProducerException;
033import org.apache.camel.Message;
034import org.apache.camel.Processor;
035import org.apache.camel.Producer;
036import org.apache.camel.Traceable;
037import org.apache.camel.builder.ExpressionBuilder;
038import org.apache.camel.impl.DefaultExchange;
039import org.apache.camel.impl.EmptyProducerCache;
040import org.apache.camel.impl.ProducerCache;
041import org.apache.camel.spi.EndpointUtilizationStatistics;
042import org.apache.camel.spi.IdAware;
043import org.apache.camel.spi.RouteContext;
044import org.apache.camel.support.ServiceSupport;
045import org.apache.camel.util.AsyncProcessorHelper;
046import org.apache.camel.util.ExchangeHelper;
047import org.apache.camel.util.KeyValueHolder;
048import org.apache.camel.util.MessageHelper;
049import org.apache.camel.util.ObjectHelper;
050import org.apache.camel.util.ServiceHelper;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054import static org.apache.camel.processor.PipelineHelper.continueProcessing;
055import static org.apache.camel.util.ObjectHelper.notNull;
056
057/**
058 * Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a>
059 * pattern where the list of actual endpoints to send a message exchange to are
060 * dependent on the value of a message header.
061 * <p/>
062 * This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation
063 * as the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the
064 * pipeline to ensure it works the same and the async routing engine is flawless.
065 */
066public class RoutingSlip extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
067    protected final Logger log = LoggerFactory.getLogger(getClass());
068    protected String id;
069    protected ProducerCache producerCache;
070    protected int cacheSize;
071    protected boolean ignoreInvalidEndpoints;
072    protected String header;
073    protected Expression expression;
074    protected String uriDelimiter;
075    protected final CamelContext camelContext;
076    private final ConcurrentMap<PreparedErrorHandler, AsyncProcessor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, AsyncProcessor>();
077
078    /**
079     * Class that represents prepared fine grained error handlers when processing routingslip/dynamic-router exchanges
080     * <p/>
081     * This is similar to how multicast processor does.
082     */
083    static final class PreparedErrorHandler extends KeyValueHolder<String, Processor> {
084
085        PreparedErrorHandler(String key, Processor value) {
086            super(key, value);
087        }
088
089    }
090
091    /**
092     * The iterator to be used for retrieving the next routing slip(s) to be used.
093     */
094    protected interface RoutingSlipIterator {
095
096        /**
097         * Are the more routing slip(s)?
098         *
099         * @param exchange the current exchange
100         * @return <tt>true</tt> if more slips, <tt>false</tt> otherwise.
101         */
102        boolean hasNext(Exchange exchange);
103
104        /**
105         * Returns the next routing slip(s).
106         *
107         * @param exchange the current exchange
108         * @return the slip(s).
109         */
110        Object next(Exchange exchange);
111
112    }
113
114    public RoutingSlip(CamelContext camelContext) {
115        notNull(camelContext, "camelContext");
116        this.camelContext = camelContext;
117    }
118
119    public RoutingSlip(CamelContext camelContext, Expression expression, String uriDelimiter) {
120        notNull(camelContext, "camelContext");
121        notNull(expression, "expression");
122        
123        this.camelContext = camelContext;
124        this.expression = expression;
125        this.uriDelimiter = uriDelimiter;
126        this.header = null;
127    }
128
129    public String getId() {
130        return id;
131    }
132
133    public void setId(String id) {
134        this.id = id;
135    }
136
137    public Expression getExpression() {
138        return expression;
139    }
140
141    public String getUriDelimiter() {
142        return uriDelimiter;
143    }
144
145    public void setDelimiter(String delimiter) {
146        this.uriDelimiter = delimiter;
147    }
148    
149    public boolean isIgnoreInvalidEndpoints() {
150        return ignoreInvalidEndpoints;
151    }
152    
153    public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
154        this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
155    }
156
157    public int getCacheSize() {
158        return cacheSize;
159    }
160
161    public void setCacheSize(int cacheSize) {
162        this.cacheSize = cacheSize;
163    }
164
165    @Override
166    public String toString() {
167        return "RoutingSlip[expression=" + expression + " uriDelimiter=" + uriDelimiter + "]";
168    }
169
170    public String getTraceLabel() {
171        return "routingSlip[" + expression + "]";
172    }
173
174    public void process(Exchange exchange) throws Exception {
175        AsyncProcessorHelper.process(this, exchange);
176    }
177
178    public boolean process(Exchange exchange, AsyncCallback callback) {
179        if (!isStarted()) {
180            exchange.setException(new IllegalStateException("RoutingSlip has not been started: " + this));
181            callback.done(true);
182            return true;
183        }
184
185        return doRoutingSlipWithExpression(exchange, this.expression, callback);
186    }
187
188    public boolean doRoutingSlip(Exchange exchange, Object routingSlip, AsyncCallback callback) {
189        if (routingSlip instanceof Expression) {
190            return doRoutingSlipWithExpression(exchange, (Expression) routingSlip, callback);
191        } else {
192            return doRoutingSlipWithExpression(exchange, ExpressionBuilder.constantExpression(routingSlip), callback);
193        }
194    }
195
196    /**
197     * Creates the route slip iterator to be used.
198     *
199     * @param exchange the exchange
200     * @param expression the expression
201     * @return the iterator, should never be <tt>null</tt>
202     */
203    protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange, final Expression expression) throws Exception {
204        Object slip = expression.evaluate(exchange, Object.class);
205        if (exchange.getException() != null) {
206            // force any exceptions occurred during evaluation to be thrown
207            throw exchange.getException();
208        }
209
210        final Iterator<Object> delegate = ObjectHelper.createIterator(slip, uriDelimiter);
211
212        return new RoutingSlipIterator() {
213            public boolean hasNext(Exchange exchange) {
214                return delegate.hasNext();
215            }
216
217            public Object next(Exchange exchange) {
218                return delegate.next();
219            }
220        };
221    }
222
223    private boolean doRoutingSlipWithExpression(final Exchange exchange, final Expression expression, final AsyncCallback callback) {
224        Exchange current = exchange;
225        RoutingSlipIterator iter;
226        try {
227            iter = createRoutingSlipIterator(exchange, expression);
228        } catch (Exception e) {
229            exchange.setException(e);
230            callback.done(true);
231            return true;
232        }
233
234        // ensure the slip is empty when we start
235        if (current.hasProperties()) {
236            current.setProperty(Exchange.SLIP_ENDPOINT, null);
237        }
238
239        while (iter.hasNext(current)) {
240            Endpoint endpoint;
241            try {
242                endpoint = resolveEndpoint(iter, exchange);
243                // if no endpoint was resolved then try the next
244                if (endpoint == null) {
245                    continue;
246                }
247            } catch (Exception e) {
248                // error resolving endpoint so we should break out
249                current.setException(e);
250                break;
251            }
252
253            //process and prepare the routing slip
254            boolean sync = processExchange(endpoint, current, exchange, callback, iter);
255            current = prepareExchangeForRoutingSlip(current, endpoint);
256            
257            if (!sync) {
258                log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
259                // the remainder of the routing slip will be completed async
260                // so we break out now, then the callback will be invoked which then continue routing from where we left here
261                return false;
262            }
263
264            log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
265
266            // we ignore some kind of exceptions and allow us to continue
267            if (isIgnoreInvalidEndpoints()) {
268                FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class);
269                if (e != null) {
270                    if (log.isDebugEnabled()) {
271                        log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e);
272                    }
273                    current.setException(null);
274                }
275            }
276
277            // Decide whether to continue with the recipients or not; similar logic to the Pipeline
278            // check for error if so we should break out
279            if (!continueProcessing(current, "so breaking out of the routing slip", log)) {
280                break;
281            }
282        }
283
284        // logging nextExchange as it contains the exchange that might have altered the payload and since
285        // we are logging the completion if will be confusing if we log the original instead
286        // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
287        log.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current);
288
289        // copy results back to the original exchange
290        ExchangeHelper.copyResults(exchange, current);
291
292        callback.done(true);
293        return true;
294    }
295
296    protected Endpoint resolveEndpoint(RoutingSlipIterator iter, Exchange exchange) throws Exception {
297        Object nextRecipient = iter.next(exchange);
298        Endpoint endpoint = null;
299        try {
300            endpoint = ExchangeHelper.resolveEndpoint(exchange, nextRecipient);
301        } catch (Exception e) {
302            if (isIgnoreInvalidEndpoints()) {
303                log.info("Endpoint uri is invalid: " + nextRecipient + ". This exception will be ignored.", e);
304            } else {
305                throw e;
306            }
307        }
308        return endpoint;
309    }
310
311    protected Exchange prepareExchangeForRoutingSlip(Exchange current, Endpoint endpoint) {
312        Exchange copy = new DefaultExchange(current);
313        // we must use the same id as this is a snapshot strategy where Camel copies a snapshot
314        // before processing the next step in the pipeline, so we have a snapshot of the exchange
315        // just before. This snapshot is used if Camel should do redeliveries (re try) using
316        // DeadLetterChannel. That is why it's important the id is the same, as it is the *same*
317        // exchange being routed.
318        copy.setExchangeId(current.getExchangeId());
319        copyOutToIn(copy, current);
320
321        // ensure stream caching is reset
322        MessageHelper.resetStreamCache(copy.getIn());
323
324        return copy;
325    }
326
327    protected AsyncProcessor createErrorHandler(RouteContext routeContext, Exchange exchange, AsyncProcessor processor, Endpoint endpoint) {
328        AsyncProcessor answer = processor;
329
330        boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class);
331
332        // do not wrap in error handler if we are inside a try block
333        if (!tryBlock && routeContext != null) {
334            // wrap the producer in error handler so we have fine grained error handling on
335            // the output side instead of the input side
336            // this is needed to support redelivery on that output alone and not doing redelivery
337            // for the entire routingslip/dynamic-router block again which will start from scratch again
338
339            // create key for cache
340            final PreparedErrorHandler key = new PreparedErrorHandler(endpoint.getEndpointUri(), processor);
341
342            // lookup cached first to reuse and preserve memory
343            answer = errorHandlers.get(key);
344            if (answer != null) {
345                log.trace("Using existing error handler for: {}", processor);
346                return answer;
347            }
348
349            log.trace("Creating error handler for: {}", processor);
350            ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder();
351            // create error handler (create error handler directly to keep it light weight,
352            // instead of using ProcessorDefinition.wrapInErrorHandler)
353            try {
354                answer = (AsyncProcessor) builder.createErrorHandler(routeContext, processor);
355
356                // must start the error handler
357                ServiceHelper.startServices(answer);
358
359                // add to cache
360                errorHandlers.putIfAbsent(key, answer);
361
362            } catch (Exception e) {
363                throw ObjectHelper.wrapRuntimeCamelException(e);
364            }
365        }
366
367        return answer;
368    }
369
370    protected boolean processExchange(final Endpoint endpoint, final Exchange exchange, final Exchange original,
371                                      final AsyncCallback callback, final RoutingSlipIterator iter) {
372
373        // this does the actual processing so log at trace level
374        log.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
375
376        boolean sync = producerCache.doInAsyncProducer(endpoint, exchange, null, callback, new AsyncProducerCallback() {
377            public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange,
378                                             ExchangePattern exchangePattern, final AsyncCallback callback) {
379
380                // rework error handling to support fine grained error handling
381                RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
382                asyncProducer = createErrorHandler(routeContext, exchange, asyncProducer, endpoint);
383
384                // set property which endpoint we send to
385                exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
386                exchange.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri());
387
388                return asyncProducer.process(exchange, new AsyncCallback() {
389                    public void done(boolean doneSync) {
390                        // we only have to handle async completion of the routing slip
391                        if (doneSync) {
392                            callback.done(doneSync);
393                            return;
394                        }
395
396                        // continue processing the routing slip asynchronously
397                        Exchange current = prepareExchangeForRoutingSlip(exchange, endpoint);
398
399                        while (iter.hasNext(current)) {
400
401                            // we ignore some kind of exceptions and allow us to continue
402                            if (isIgnoreInvalidEndpoints()) {
403                                FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class);
404                                if (e != null) {
405                                    if (log.isDebugEnabled()) {
406                                        log.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e);
407                                    }
408                                    current.setException(null);
409                                }
410                            }
411
412                            // Decide whether to continue with the recipients or not; similar logic to the Pipeline
413                            // check for error if so we should break out
414                            if (!continueProcessing(current, "so breaking out of the routing slip", log)) {
415                                break;
416                            }
417
418                            Endpoint endpoint;
419                            try {
420                                endpoint = resolveEndpoint(iter, exchange);
421                                // if no endpoint was resolved then try the next
422                                if (endpoint == null) {
423                                    continue;
424                                }
425                            } catch (Exception e) {
426                                // error resolving endpoint so we should break out
427                                exchange.setException(e);
428                                break;
429                            }
430
431                            // prepare and process the routing slip
432                            boolean sync = processExchange(endpoint, current, original, callback, iter);
433                            current = prepareExchangeForRoutingSlip(current, endpoint);
434
435                            if (!sync) {
436                                log.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId());
437                                return;
438                            }
439                        }
440
441                        // logging nextExchange as it contains the exchange that might have altered the payload and since
442                        // we are logging the completion if will be confusing if we log the original instead
443                        // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
444                        log.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current);
445
446                        // copy results back to the original exchange
447                        ExchangeHelper.copyResults(original, current);
448                        callback.done(false);
449                    }
450                });
451            }
452        });
453
454        return sync;
455    }
456
457    protected void doStart() throws Exception {
458        if (producerCache == null) {
459            if (cacheSize < 0) {
460                producerCache = new EmptyProducerCache(this, camelContext);
461                log.debug("RoutingSlip {} is not using ProducerCache", this);
462            } else if (cacheSize == 0) {
463                producerCache = new ProducerCache(this, camelContext);
464                log.debug("RoutingSlip {} using ProducerCache with default cache size", this);
465            } else {
466                producerCache = new ProducerCache(this, camelContext, cacheSize);
467                log.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize);
468            }
469        }
470        ServiceHelper.startService(producerCache);
471    }
472
473    protected void doStop() throws Exception {
474        ServiceHelper.stopServices(producerCache, errorHandlers);
475    }
476
477    protected void doShutdown() throws Exception {
478        ServiceHelper.stopAndShutdownServices(producerCache, errorHandlers);
479
480        // only clear error handlers when shutting down
481        errorHandlers.clear();
482    }
483
484    public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
485        return producerCache.getEndpointUtilizationStatistics();
486    }
487
488    /**
489     * Returns the outbound message if available. Otherwise return the inbound message.
490     */
491    private Message getResultMessage(Exchange exchange) {
492        if (exchange.hasOut()) {
493            return exchange.getOut();
494        } else {
495            // if this endpoint had no out (like a mock endpoint) just take the in
496            return exchange.getIn();
497        }
498    }
499
500    /**
501     * Copy the outbound data in 'source' to the inbound data in 'result'.
502     */
503    private void copyOutToIn(Exchange result, Exchange source) {
504        result.setException(source.getException());
505        result.setIn(getResultMessage(source));
506
507        result.getProperties().clear();
508        result.getProperties().putAll(source.getProperties());
509    }
510}