001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.concurrent.Callable;
020import java.util.concurrent.ExecutorService;
021
022import org.apache.camel.AsyncCallback;
023import org.apache.camel.AsyncProcessor;
024import org.apache.camel.CamelContext;
025import org.apache.camel.Exchange;
026import org.apache.camel.ExchangePattern;
027import org.apache.camel.Message;
028import org.apache.camel.Ordered;
029import org.apache.camel.Predicate;
030import org.apache.camel.Processor;
031import org.apache.camel.Route;
032import org.apache.camel.Traceable;
033import org.apache.camel.spi.IdAware;
034import org.apache.camel.support.ServiceSupport;
035import org.apache.camel.support.SynchronizationAdapter;
036import org.apache.camel.util.AsyncProcessorHelper;
037import org.apache.camel.util.ExchangeHelper;
038import org.apache.camel.util.ServiceHelper;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import static org.apache.camel.util.ObjectHelper.notNull;
043
044/**
045 * Processor implementing <a href="http://camel.apache.org/oncompletion.html">onCompletion</a>.
046 *
047 * @version 
048 */
049public class OnCompletionProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
050
051    private static final Logger LOG = LoggerFactory.getLogger(OnCompletionProcessor.class);
052    private final CamelContext camelContext;
053    private String id;
054    private final Processor processor;
055    private final ExecutorService executorService;
056    private final boolean shutdownExecutorService;
057    private final boolean onCompleteOnly;
058    private final boolean onFailureOnly;
059    private final Predicate onWhen;
060    private final boolean useOriginalBody;
061    private final boolean afterConsumer;
062
063    public OnCompletionProcessor(CamelContext camelContext, Processor processor, ExecutorService executorService, boolean shutdownExecutorService,
064                                 boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen, boolean useOriginalBody, boolean afterConsumer) {
065        notNull(camelContext, "camelContext");
066        notNull(processor, "processor");
067        this.camelContext = camelContext;
068        this.processor = processor;
069        this.executorService = executorService;
070        this.shutdownExecutorService = shutdownExecutorService;
071        this.onCompleteOnly = onCompleteOnly;
072        this.onFailureOnly = onFailureOnly;
073        this.onWhen = onWhen;
074        this.useOriginalBody = useOriginalBody;
075        this.afterConsumer = afterConsumer;
076    }
077
078    @Override
079    protected void doStart() throws Exception {
080        ServiceHelper.startService(processor);
081    }
082
083    @Override
084    protected void doStop() throws Exception {
085        ServiceHelper.stopService(processor);
086    }
087
088    @Override
089    protected void doShutdown() throws Exception {
090        ServiceHelper.stopAndShutdownService(processor);
091        if (shutdownExecutorService) {
092            getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
093        }
094    }
095
096    public CamelContext getCamelContext() {
097        return camelContext;
098    }
099
100    public String getId() {
101        return id;
102    }
103
104    public void setId(String id) {
105        this.id = id;
106    }
107
108    public void process(Exchange exchange) throws Exception {
109        AsyncProcessorHelper.process(this, exchange);
110    }
111
112    public boolean process(Exchange exchange, AsyncCallback callback) {
113        if (processor != null) {
114            // register callback
115            if (afterConsumer) {
116                exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronizationAfterConsumer());
117            } else {
118                exchange.getUnitOfWork().addSynchronization(new OnCompletionSynchronizationBeforeConsumer());
119            }
120        }
121
122        callback.done(true);
123        return true;
124    }
125
126    protected boolean isCreateCopy() {
127        // we need to create a correlated copy if we run in parallel mode or is in after consumer mode (as the UoW would be done on the original exchange otherwise)
128        return executorService != null || afterConsumer;
129    }
130
131    /**
132     * Processes the exchange by the processors
133     *
134     * @param processor the processor
135     * @param exchange the exchange
136     */
137    protected static void doProcess(Processor processor, Exchange exchange) {
138        // must remember some properties which we cannot use during onCompletion processing
139        // as otherwise we may cause issues
140        // but keep the caused exception stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange
141        Object stop = exchange.removeProperty(Exchange.ROUTE_STOP);
142        Object failureHandled = exchange.removeProperty(Exchange.FAILURE_HANDLED);
143        Object errorhandlerHandled = exchange.removeProperty(Exchange.ERRORHANDLER_HANDLED);
144        Object rollbackOnly = exchange.removeProperty(Exchange.ROLLBACK_ONLY);
145        Object rollbackOnlyLast = exchange.removeProperty(Exchange.ROLLBACK_ONLY_LAST);
146
147        Exception cause = exchange.getException();
148        exchange.setException(null);
149
150        try {
151            processor.process(exchange);
152        } catch (Exception e) {
153            exchange.setException(e);
154        } finally {
155            // restore the options
156            if (stop != null) {
157                exchange.setProperty(Exchange.ROUTE_STOP, stop);
158            }
159            if (failureHandled != null) {
160                exchange.setProperty(Exchange.FAILURE_HANDLED, failureHandled);
161            }
162            if (errorhandlerHandled != null) {
163                exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, errorhandlerHandled);
164            }
165            if (rollbackOnly != null) {
166                exchange.setProperty(Exchange.ROLLBACK_ONLY, rollbackOnly);
167            }
168            if (rollbackOnlyLast != null) {
169                exchange.setProperty(Exchange.ROLLBACK_ONLY_LAST, rollbackOnlyLast);
170            }
171            if (cause != null) {
172                exchange.setException(cause);
173            }
174        }
175    }
176
177    /**
178     * Prepares the {@link Exchange} to send as onCompletion.
179     *
180     * @param exchange the current exchange
181     * @return the exchange to be routed in onComplete
182     */
183    protected Exchange prepareExchange(Exchange exchange) {
184        Exchange answer;
185
186        if (isCreateCopy()) {
187            // for asynchronous routing we must use a copy as we dont want it
188            // to cause side effects of the original exchange
189            // (the original thread will run in parallel)
190            answer = ExchangeHelper.createCorrelatedCopy(exchange, false);
191            if (answer.hasOut()) {
192                // move OUT to IN (pipes and filters)
193                answer.setIn(answer.getOut());
194                answer.setOut(null);
195            }
196            // set MEP to InOnly as this onCompletion is a fire and forget
197            answer.setPattern(ExchangePattern.InOnly);
198        } else {
199            // use the exchange as-is
200            answer = exchange;
201        }
202
203        if (useOriginalBody) {
204            LOG.trace("Using the original IN message instead of current");
205
206            Message original = ExchangeHelper.getOriginalInMessage(exchange);
207            answer.setIn(original);
208        }
209
210        // add a header flag to indicate its a on completion exchange
211        answer.setProperty(Exchange.ON_COMPLETION, Boolean.TRUE);
212
213        return answer;
214    }
215
216    private final class OnCompletionSynchronizationAfterConsumer extends SynchronizationAdapter implements Ordered {
217
218        public int getOrder() {
219            // we want to be last
220            return Ordered.LOWEST;
221        }
222
223        @Override
224        public void onComplete(final Exchange exchange) {
225            if (onFailureOnly) {
226                return;
227            }
228
229            if (onWhen != null && !onWhen.matches(exchange)) {
230                // predicate did not match so do not route the onComplete
231                return;
232            }
233
234            // must use a copy as we dont want it to cause side effects of the original exchange
235            final Exchange copy = prepareExchange(exchange);
236
237            if (executorService != null) {
238                executorService.submit(new Callable<Exchange>() {
239                    public Exchange call() throws Exception {
240                        LOG.debug("Processing onComplete: {}", copy);
241                        doProcess(processor, copy);
242                        return copy;
243                    }
244                });
245            } else {
246                // run without thread-pool
247                LOG.debug("Processing onComplete: {}", copy);
248                doProcess(processor, copy);
249            }
250        }
251
252        public void onFailure(final Exchange exchange) {
253            if (onCompleteOnly) {
254                return;
255            }
256
257            if (onWhen != null && !onWhen.matches(exchange)) {
258                // predicate did not match so do not route the onComplete
259                return;
260            }
261
262
263            // must use a copy as we dont want it to cause side effects of the original exchange
264            final Exchange copy = prepareExchange(exchange);
265            final Exception original = copy.getException();
266            final boolean originalFault = copy.hasOut() ? copy.getOut().isFault() : copy.getIn().isFault();
267            // must remove exception otherwise onFailure routing will fail as well
268            // the caused exception is stored as a property (Exchange.EXCEPTION_CAUGHT) on the exchange
269            copy.setException(null);
270            // must clear fault otherwise onFailure routing will fail as well
271            if (copy.hasOut()) {
272                copy.getOut().setFault(false);
273            } else {
274                copy.getIn().setFault(false);
275            }
276
277            if (executorService != null) {
278                executorService.submit(new Callable<Exchange>() {
279                    public Exchange call() throws Exception {
280                        LOG.debug("Processing onFailure: {}", copy);
281                        doProcess(processor, copy);
282                        // restore exception after processing
283                        copy.setException(original);
284                        return null;
285                    }
286                });
287            } else {
288                // run without thread-pool
289                LOG.debug("Processing onFailure: {}", copy);
290                doProcess(processor, copy);
291                // restore exception after processing
292                copy.setException(original);
293                // restore fault after processing
294                if (copy.hasOut()) {
295                    copy.getOut().setFault(originalFault);
296                } else {
297                    copy.getIn().setFault(originalFault);
298                }
299            }
300        }
301
302        @Override
303        public String toString() {
304            if (!onCompleteOnly && !onFailureOnly) {
305                return "onCompleteOrFailure";
306            } else if (onCompleteOnly) {
307                return "onCompleteOnly";
308            } else {
309                return "onFailureOnly";
310            }
311        }
312    }
313
314    private final class OnCompletionSynchronizationBeforeConsumer extends SynchronizationAdapter implements Ordered {
315
316        public int getOrder() {
317            // we want to be last
318            return Ordered.LOWEST;
319        }
320
321        @Override
322        public void onAfterRoute(Route route, Exchange exchange) {
323            if (exchange.isFailed() && onCompleteOnly) {
324                return;
325            }
326
327            if (!exchange.isFailed() && onFailureOnly) {
328                return;
329            }
330
331            if (onWhen != null && !onWhen.matches(exchange)) {
332                // predicate did not match so do not route the onComplete
333                return;
334            }
335
336            // must use a copy as we dont want it to cause side effects of the original exchange
337            final Exchange copy = prepareExchange(exchange);
338
339            if (executorService != null) {
340                executorService.submit(new Callable<Exchange>() {
341                    public Exchange call() throws Exception {
342                        LOG.debug("Processing onAfterRoute: {}", copy);
343                        doProcess(processor, copy);
344                        return copy;
345                    }
346                });
347            } else {
348                // run without thread-pool
349                LOG.debug("Processing onAfterRoute: {}", copy);
350                doProcess(processor, copy);
351            }
352        }
353
354        @Override
355        public String toString() {
356            return "onAfterRoute";
357        }
358    }
359
360    @Override
361    public String toString() {
362        return "OnCompletionProcessor[" + processor + "]";
363    }
364
365    public String getTraceLabel() {
366        return "onCompletion";
367    }
368}