001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Collection;
020    import java.util.Iterator;
021    import java.util.List;
022    
023    import org.apache.camel.AsyncCallback;
024    import org.apache.camel.AsyncProcessor;
025    import org.apache.camel.CamelContext;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.Processor;
028    import org.apache.camel.Traceable;
029    import org.apache.camel.util.AsyncProcessorConverterHelper;
030    import org.apache.camel.util.AsyncProcessorHelper;
031    import org.apache.camel.util.ExchangeHelper;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    import static org.apache.camel.processor.PipelineHelper.continueProcessing;
036    
037    /**
038     * Creates a Pipeline pattern where the output of the previous step is sent as
039     * input to the next step, reusing the same message exchanges
040     *
041     * @version 
042     */
043    public class Pipeline extends MulticastProcessor implements AsyncProcessor, Traceable {
044        private static final transient Logger LOG = LoggerFactory.getLogger(Pipeline.class);
045    
046        public Pipeline(CamelContext camelContext, Collection<Processor> processors) {
047            super(camelContext, processors);
048        }
049    
050        public static Processor newInstance(CamelContext camelContext, List<Processor> processors) {
051            if (processors.isEmpty()) {
052                return null;
053            } else if (processors.size() == 1) {
054                return processors.get(0);
055            }
056            return new Pipeline(camelContext, processors);
057        }
058    
059        public void process(Exchange exchange) throws Exception {
060            AsyncProcessorHelper.process(this, exchange);
061        }
062    
063        public boolean process(Exchange exchange, AsyncCallback callback) {
064            Iterator<Processor> processors = getProcessors().iterator();
065            Exchange nextExchange = exchange;
066            boolean first = true;
067    
068            while (continueRouting(processors, nextExchange)) {
069                if (first) {
070                    first = false;
071                } else {
072                    // prepare for next run
073                    nextExchange = createNextExchange(nextExchange);
074                }
075    
076                // get the next processor
077                Processor processor = processors.next();
078    
079                AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
080                boolean sync = process(exchange, nextExchange, callback, processors, async);
081    
082                // continue as long its being processed synchronously
083                if (!sync) {
084                    LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
085                    // the remainder of the pipeline will be completed async
086                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
087                    return false;
088                }
089    
090                LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
091    
092                // check for error if so we should break out
093                if (!continueProcessing(nextExchange, "so breaking out of pipeline", LOG)) {
094                    break;
095                }
096            }
097    
098            // logging nextExchange as it contains the exchange that might have altered the payload and since
099            // we are logging the completion if will be confusing if we log the original instead
100            // we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
101            LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), nextExchange);
102    
103            // copy results back to the original exchange
104            ExchangeHelper.copyResults(exchange, nextExchange);
105    
106            callback.done(true);
107            return true;
108        }
109    
110        private boolean process(final Exchange original, final Exchange exchange, final AsyncCallback callback,
111                                final Iterator<Processor> processors, final AsyncProcessor asyncProcessor) {
112            // this does the actual processing so log at trace level
113            LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
114    
115            // implement asynchronous routing logic in callback so we can have the callback being
116            // triggered and then continue routing where we left
117            boolean sync = AsyncProcessorHelper.process(asyncProcessor, exchange, new AsyncCallback() {
118                public void done(boolean doneSync) {
119                    // we only have to handle async completion of the pipeline
120                    if (doneSync) {
121                        return;
122                    }
123    
124                    // continue processing the pipeline asynchronously
125                    Exchange nextExchange = exchange;
126                    while (continueRouting(processors, nextExchange)) {
127                        AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next());
128    
129                        // check for error if so we should break out
130                        if (!continueProcessing(nextExchange, "so breaking out of pipeline", LOG)) {
131                            break;
132                        }
133    
134                        nextExchange = createNextExchange(nextExchange);
135                        doneSync = process(original, nextExchange, callback, processors, processor);
136                        if (!doneSync) {
137                            LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
138                            return;
139                        }
140                    }
141    
142                    ExchangeHelper.copyResults(original, nextExchange);
143                    LOG.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), original);
144                    callback.done(false);
145                }
146            });
147    
148            return sync;
149        }
150    
151        /**
152         * Strategy method to create the next exchange from the previous exchange.
153         * <p/>
154         * Remember to copy the original exchange id otherwise correlation of ids in the log is a problem
155         *
156         * @param previousExchange the previous exchange
157         * @return a new exchange
158         */
159        protected Exchange createNextExchange(Exchange previousExchange) {
160            Exchange answer = previousExchange;
161    
162            // now lets set the input of the next exchange to the output of the
163            // previous message if it is not null
164            if (answer.hasOut()) {
165                answer.setIn(answer.getOut());
166                answer.setOut(null);
167            }
168            return answer;
169        }
170    
171        protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) {
172            boolean answer = true;
173    
174            Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
175            if (stop != null) {
176                boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
177                if (doStop) {
178                    LOG.debug("ExchangeId: {} is marked to stop routing: {}", exchange.getExchangeId(), exchange);
179                    answer = false;
180                }
181            } else {
182                // continue if there are more processors to route
183                answer = it.hasNext();
184            }
185    
186            LOG.trace("ExchangeId: {} should continue routing: {}", exchange.getExchangeId(), answer);
187            return answer;
188        }
189    
190        @Override
191        public String toString() {
192            return "Pipeline[" + getProcessors() + "]";
193        }
194    
195        @Override
196        public String getTraceLabel() {
197            return "pipeline";
198        }
199    }