001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.Iterator;
022    import java.util.List;
023    
024    import org.apache.camel.AsyncCallback;
025    import org.apache.camel.AsyncProcessor;
026    import org.apache.camel.Exchange;
027    import org.apache.camel.Navigate;
028    import org.apache.camel.Processor;
029    import org.apache.camel.Traceable;
030    import org.apache.camel.support.ServiceSupport;
031    import org.apache.camel.util.AsyncProcessorConverterHelper;
032    import org.apache.camel.util.AsyncProcessorHelper;
033    import org.apache.camel.util.ExchangeHelper;
034    import org.apache.camel.util.ServiceHelper;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * Implements try/catch/finally type processing
040     *
041     * @version 
042     */
043    public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable {
044        private static final transient Logger LOG = LoggerFactory.getLogger(TryProcessor.class);
045    
046        protected final AsyncProcessor tryProcessor;
047        protected final DoCatchProcessor catchProcessor;
048        protected final DoFinallyProcessor finallyProcessor;
049        private List<AsyncProcessor> processors;
050    
051        public TryProcessor(Processor tryProcessor, List<CatchProcessor> catchClauses, Processor finallyProcessor) {
052            this.tryProcessor = AsyncProcessorConverterHelper.convert(tryProcessor);
053            this.catchProcessor = new DoCatchProcessor(catchClauses);
054            this.finallyProcessor = new DoFinallyProcessor(finallyProcessor);
055        }
056    
057        public String toString() {
058            String finallyText = (finallyProcessor == null) ? "" : " Finally {" + finallyProcessor + "}";
059            return "Try {" + tryProcessor + "} " + (catchProcessor != null ? catchProcessor : "") + finallyText;
060        }
061    
062        public String getTraceLabel() {
063            return "doTry";
064        }
065    
066        public void process(Exchange exchange) throws Exception {
067            AsyncProcessorHelper.process(this, exchange);
068        }
069    
070        public boolean process(Exchange exchange, AsyncCallback callback) {
071            Iterator<AsyncProcessor> processors = getProcessors().iterator();
072    
073            while (continueRouting(processors, exchange)) {
074                ExchangeHelper.prepareOutToIn(exchange);
075    
076                // process the next processor
077                AsyncProcessor processor = processors.next();
078                boolean sync = process(exchange, callback, processor, processors);
079    
080                // continue as long its being processed synchronously
081                if (!sync) {
082                    LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
083                    // the remainder of the try .. catch .. finally will be completed async
084                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
085                    return false;
086                }
087    
088                LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
089            }
090    
091            ExchangeHelper.prepareOutToIn(exchange);
092            LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
093            callback.done(true);
094            return true;
095        }
096    
097        protected boolean process(final Exchange exchange, final AsyncCallback callback,
098                                  final AsyncProcessor processor, final Iterator<AsyncProcessor> processors) {
099            // this does the actual processing so log at trace level
100            LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
101    
102            // implement asynchronous routing logic in callback so we can have the callback being
103            // triggered and then continue routing where we left
104            boolean sync = AsyncProcessorHelper.process(processor, exchange, new AsyncCallback() {
105                public void done(boolean doneSync) {
106                    // we only have to handle async completion of the pipeline
107                    if (doneSync) {
108                        return;
109                    }
110    
111                    // continue processing the try .. catch .. finally asynchronously
112                    while (continueRouting(processors, exchange)) {
113                        ExchangeHelper.prepareOutToIn(exchange);
114    
115                        // process the next processor
116                        AsyncProcessor processor = processors.next();
117                        doneSync = process(exchange, callback, processor, processors);
118    
119                        if (!doneSync) {
120                            LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
121                            // the remainder of the try .. catch .. finally will be completed async
122                            // so we break out now, then the callback will be invoked which then continue routing from where we left here
123                            return;
124                        }
125                    }
126    
127                    ExchangeHelper.prepareOutToIn(exchange);
128                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
129                    callback.done(false);
130                }
131            });
132    
133            return sync;
134        }
135    
136        protected Collection<AsyncProcessor> getProcessors() {
137            return processors;
138        }
139    
140        protected boolean continueRouting(Iterator<AsyncProcessor> it, Exchange exchange) {
141            Object stop = exchange.getProperty(Exchange.ROUTE_STOP);
142            if (stop != null) {
143                boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop);
144                if (doStop) {
145                    LOG.debug("Exchange is marked to stop routing: {}", exchange);
146                    return false;
147                }
148            }
149    
150            // continue if there are more processors to route
151            return it.hasNext();
152        }
153    
154        protected void doStart() throws Exception {
155            processors = new ArrayList<AsyncProcessor>();
156            processors.add(tryProcessor);
157            processors.add(catchProcessor);
158            processors.add(finallyProcessor);
159            ServiceHelper.startServices(tryProcessor, catchProcessor, finallyProcessor);
160        }
161    
162        protected void doStop() throws Exception {
163            ServiceHelper.stopServices(finallyProcessor, catchProcessor, tryProcessor);
164            processors.clear();
165        }
166    
167        public List<Processor> next() {
168            if (!hasNext()) {
169                return null;
170            }
171            List<Processor> answer = new ArrayList<Processor>();
172            if (tryProcessor != null) {
173                answer.add(tryProcessor);
174            }
175            if (catchProcessor != null) {
176                answer.add(catchProcessor);
177            }
178            if (finallyProcessor != null) {
179                answer.add(finallyProcessor);
180            }
181            return answer;
182        }
183    
184        public boolean hasNext() {
185            return tryProcessor != null;
186        }
187    
188        /**
189         * Processor to handle do catch supporting asynchronous routing engine
190         */
191        private final class DoCatchProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable {
192    
193            private final List<CatchProcessor> catchClauses;
194    
195            private DoCatchProcessor(List<CatchProcessor> catchClauses) {
196                this.catchClauses = catchClauses;
197            }
198    
199            public void process(Exchange exchange) throws Exception {
200                AsyncProcessorHelper.process(this, exchange);
201            }
202    
203            public boolean process(final Exchange exchange, final AsyncCallback callback) {
204                Exception e = exchange.getException();
205    
206                if (catchClauses == null || e == null) {
207                    return true;
208                }
209    
210                // find a catch clause to use
211                CatchProcessor processor = null;
212                for (CatchProcessor catchClause : catchClauses) {
213                    Throwable caught = catchClause.catches(exchange, e);
214                    if (caught != null) {
215                        if (LOG.isTraceEnabled()) {
216                            LOG.trace("This TryProcessor catches the exception: {} caused by: {}", caught.getClass().getName(), e.getMessage());
217                        }
218                        processor = catchClause;
219                        break;
220                    }
221                }
222    
223                if (processor != null) {
224                    // create the handle processor which performs the actual logic
225                    // this processor just lookup the right catch clause to use and then let the
226                    // HandleDoCatchProcessor do all the hard work (separate of concerns)
227                    HandleDoCatchProcessor cool = new HandleDoCatchProcessor(processor);
228                    return AsyncProcessorHelper.process(cool, exchange, callback);
229                } else {
230                    if (LOG.isTraceEnabled()) {
231                        LOG.trace("This TryProcessor does not catch the exception: {} caused by: {}", e.getClass().getName(), e.getMessage());
232                    }
233                }
234    
235                return true;
236            }
237    
238            @Override
239            protected void doStart() throws Exception {
240                ServiceHelper.startService(catchClauses);
241            }
242    
243            @Override
244            protected void doStop() throws Exception {
245                ServiceHelper.stopServices(catchClauses);
246            }
247    
248            @Override
249            public String toString() {
250                return "Catches{" + catchClauses + "}";
251            }
252    
253            public String getTraceLabel() {
254                return "doCatch";
255            }
256    
257            public List<Processor> next() {
258                List<Processor> answer = new ArrayList<Processor>();
259                if (catchProcessor != null) {
260                    answer.addAll(catchClauses);
261                }
262                return answer;
263            }
264    
265            public boolean hasNext() {
266                return catchClauses != null && catchClauses.size() > 0;
267            }
268        }
269    
270        /**
271         * Processor to handle do finally supporting asynchronous routing engine
272         */
273        private final class DoFinallyProcessor extends DelegateAsyncProcessor implements Traceable {
274    
275            private DoFinallyProcessor(Processor processor) {
276                super(processor);
277            }
278    
279            @Override
280            protected boolean processNext(final Exchange exchange, final AsyncCallback callback) {
281                // clear exception so finally block can be executed
282                final Exception e = exchange.getException();
283                exchange.setException(null);
284                // but store the caught exception as a property
285                if (e != null) {
286                    exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
287                }
288                // store the last to endpoint as the failure endpoint
289                if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
290                    exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
291                }
292    
293                boolean sync = super.processNext(exchange, new AsyncCallback() {
294                    public void done(boolean doneSync) {
295                        // we only have to handle async completion of the pipeline
296                        if (doneSync) {
297                            return;
298                        }
299    
300                        if (e == null) {
301                            exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
302                        } else {
303                            // set exception back on exchange
304                            exchange.setException(e);
305                            exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
306                        }
307    
308                        // signal callback to continue routing async
309                        ExchangeHelper.prepareOutToIn(exchange);
310                        LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
311                        callback.done(false);
312                    }
313                });
314    
315                if (sync) {
316                    if (e == null) {
317                        exchange.removeProperty(Exchange.FAILURE_ENDPOINT);
318                    } else {
319                        // set exception back on exchange
320                        exchange.setException(e);
321                        exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e);
322                    }
323                }
324    
325                return sync;
326            }
327    
328            @Override
329            public String toString() {
330                return "Finally{" + getProcessor() + "}";
331            }
332    
333            public String getTraceLabel() {
334                return "doFinally";
335            }
336        }
337    
338        /**
339         * Processor to handle do catch supporting asynchronous routing engine
340         */
341        private final class HandleDoCatchProcessor extends DelegateAsyncProcessor {
342    
343            private final CatchProcessor catchClause;
344    
345            private HandleDoCatchProcessor(CatchProcessor processor) {
346                super(processor);
347                this.catchClause = processor;
348            }
349    
350            @Override
351            protected boolean processNext(final Exchange exchange, final AsyncCallback callback) {
352                final Exception caught = exchange.getException();
353                if (caught == null) {
354                    return true;
355                }
356    
357                // store the last to endpoint as the failure endpoint
358                if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) {
359                    exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT));
360                }
361                // give the rest of the pipeline another chance
362                exchange.setProperty(Exchange.EXCEPTION_CAUGHT, caught);
363                exchange.setException(null);
364                // and we should not be regarded as exhausted as we are in a try .. catch block
365                exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
366    
367                // is the exception handled by the catch clause
368                final Boolean handled = catchClause.handles(exchange);
369    
370                if (LOG.isDebugEnabled()) {
371                    LOG.debug("The exception is handled: {} for the exception: {} caused by: {}",
372                            new Object[]{handled, caught.getClass().getName(), caught.getMessage()});
373                }
374    
375                boolean sync = super.processNext(exchange, new AsyncCallback() {
376                    public void done(boolean doneSync) {
377                        // we only have to handle async completion of the pipeline
378                        if (doneSync) {
379                            return;
380                        }
381    
382                        if (!handled) {
383                            if (exchange.getException() == null) {
384                                exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
385                            }
386                        }
387                        // always clear redelivery exhausted in a catch clause
388                        exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
389    
390                        // signal callback to continue routing async
391                        ExchangeHelper.prepareOutToIn(exchange);
392                        callback.done(false);
393                    }
394                });
395    
396                if (sync) {
397                    // set exception back on exchange
398                    if (!handled) {
399                        if (exchange.getException() == null) {
400                            exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class));
401                        }
402                    }
403                    // always clear redelivery exhausted in a catch clause
404                    exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED);
405                }
406    
407                return sync;
408            }
409        }
410    
411    }