001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.ExecutorService;
026import javax.xml.bind.annotation.XmlAccessType;
027import javax.xml.bind.annotation.XmlAccessorType;
028import javax.xml.bind.annotation.XmlAttribute;
029import javax.xml.bind.annotation.XmlElement;
030import javax.xml.bind.annotation.XmlElementRef;
031import javax.xml.bind.annotation.XmlRootElement;
032import javax.xml.bind.annotation.XmlTransient;
033
034import org.apache.camel.Predicate;
035import org.apache.camel.Processor;
036import org.apache.camel.processor.CamelInternalProcessor;
037import org.apache.camel.processor.OnCompletionProcessor;
038import org.apache.camel.spi.Metadata;
039import org.apache.camel.spi.RouteContext;
040
041/**
042 * Route to be executed when normal route processing completes
043 *
044 * @version 
045 */
046@Metadata(label = "configuration")
047@XmlRootElement(name = "onCompletion")
048@XmlAccessorType(XmlAccessType.FIELD)
049public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> {
050    @XmlAttribute @Metadata(defaultValue = "AfterConsumer")
051    private OnCompletionMode mode;
052    @XmlAttribute
053    private Boolean onCompleteOnly;
054    @XmlAttribute
055    private Boolean onFailureOnly;
056    @XmlElement(name = "onWhen")
057    private WhenDefinition onWhen;
058    @XmlAttribute
059    private Boolean parallelProcessing;
060    @XmlAttribute
061    private String executorServiceRef;
062    @XmlAttribute(name = "useOriginalMessage")
063    private Boolean useOriginalMessagePolicy;
064    @XmlElementRef
065    private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
066    @XmlTransient
067    private ExecutorService executorService;
068    @XmlTransient
069    private Boolean routeScoped;
070    // TODO: in Camel 3.0 the OnCompletionDefinition should not contain state and OnCompletion processors
071    @XmlTransient
072    private final Map<String, Processor> onCompletions = new HashMap<String, Processor>();
073
074    public OnCompletionDefinition() {
075    }
076
077    public boolean isRouteScoped() {
078        // is context scoped by default
079        return routeScoped != null ? routeScoped : false;
080    }
081
082    public Processor getOnCompletion(String routeId) {
083        return onCompletions.get(routeId);
084    }
085
086    public Collection<Processor> getOnCompletions() {
087        return onCompletions.values();
088    }
089
090    @Override
091    public String toString() {
092        return "onCompletion[" + getOutputs() + "]";
093    }
094
095    @Override
096    public String getLabel() {
097        return "onCompletion";
098    }
099
100    @Override
101    public boolean isAbstract() {
102        return true;
103    }
104
105    @Override
106    public boolean isTopLevelOnly() {
107        return true;
108    }
109
110    @Override
111    public Processor createProcessor(RouteContext routeContext) throws Exception {
112        // assign whether this was a route scoped onCompletion or not
113        // we need to know this later when setting the parent, as only route scoped should have parent
114        // Note: this logic can possible be removed when the Camel routing engine decides at runtime
115        // to apply onCompletion in a more dynamic fashion than current code base
116        // and therefore is in a better position to decide among context/route scoped OnCompletion at runtime
117        if (routeScoped == null) {
118            routeScoped = super.getParent() != null;
119        }
120
121        boolean isOnCompleteOnly = getOnCompleteOnly() != null && getOnCompleteOnly();
122        boolean isOnFailureOnly = getOnFailureOnly() != null && getOnFailureOnly();
123        boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
124        boolean original = getUseOriginalMessagePolicy() != null && getUseOriginalMessagePolicy();
125
126        if (isOnCompleteOnly && isOnFailureOnly) {
127            throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
128        }
129        if (original) {
130            // ensure allow original is turned on
131            routeContext.setAllowUseOriginalMessage(true);
132        }
133
134        String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
135
136        Processor childProcessor = this.createChildProcessor(routeContext, true);
137
138        // wrap the on completion route in a unit of work processor
139        CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor);
140        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
141
142        onCompletions.put(routeId, internal);
143
144        Predicate when = null;
145        if (onWhen != null) {
146            when = onWhen.getExpression().createPredicate(routeContext);
147        }
148
149        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing);
150        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, isParallelProcessing);
151
152        // should be after consumer by default
153        boolean afterConsumer = mode == null || mode == OnCompletionMode.AfterConsumer;
154
155        OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), internal,
156                threadPool, shutdownThreadPool, isOnCompleteOnly, isOnFailureOnly, when, original, afterConsumer);
157        return answer;
158    }
159
160    /**
161     * Removes all existing {@link org.apache.camel.model.OnCompletionDefinition} from the definition.
162     * <p/>
163     * This is used to let route scoped <tt>onCompletion</tt> overrule any global <tt>onCompletion</tt>.
164     * Hence we remove all existing as they are global.
165     *
166     * @param definition the parent definition that is the route
167     */
168    public void removeAllOnCompletionDefinition(ProcessorDefinition<?> definition) {
169        for (Iterator<ProcessorDefinition<?>> it = definition.getOutputs().iterator(); it.hasNext();) {
170            ProcessorDefinition<?> out = it.next();
171            if (out instanceof OnCompletionDefinition) {
172                it.remove();
173            }
174        }
175    }
176
177    @Override
178    public ProcessorDefinition<?> end() {
179        // pop parent block, as we added our self as block to parent when synchronized was defined in the route
180        getParent().popBlock();
181        return super.end();
182    }
183
184    /**
185     * Sets the mode to be after route is done (default due backwards compatible).
186     * <p/>
187     * This executes the on completion work <i>after</i> the route consumer have written response
188     * back to the callee (if its InOut mode).
189     *
190     * @return the builder
191     */
192    public OnCompletionDefinition modeAfterConsumer() {
193        setMode(OnCompletionMode.AfterConsumer);
194        return this;
195    }
196
197    /**
198     * Sets the mode to be before consumer is done.
199     * <p/>
200     * This allows the on completion work to execute <i>before</i> the route consumer, writes any response
201     * back to the callee (if its InOut mode).
202     *
203     * @return the builder
204     */
205    public OnCompletionDefinition modeBeforeConsumer() {
206        setMode(OnCompletionMode.BeforeConsumer);
207        return this;
208    }
209
210    /**
211     * Will only synchronize when the {@link org.apache.camel.Exchange} completed successfully (no errors).
212     *
213     * @return the builder
214     */
215    public OnCompletionDefinition onCompleteOnly() {
216        boolean isOnFailureOnly = getOnFailureOnly() != null && getOnFailureOnly();
217        if (isOnFailureOnly) {
218            throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
219        }
220        // must define return type as OutputDefinition and not this type to avoid end user being able
221        // to invoke onFailureOnly/onCompleteOnly more than once
222        setOnCompleteOnly(Boolean.TRUE);
223        setOnFailureOnly(Boolean.FALSE);
224        return this;
225    }
226
227    /**
228     * Will only synchronize when the {@link org.apache.camel.Exchange} ended with failure (exception or FAULT message).
229     *
230     * @return the builder
231     */
232    public OnCompletionDefinition onFailureOnly() {
233        boolean isOnCompleteOnly = getOnCompleteOnly() != null && getOnCompleteOnly();
234        if (isOnCompleteOnly) {
235            throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
236        }
237        // must define return type as OutputDefinition and not this type to avoid end user being able
238        // to invoke onFailureOnly/onCompleteOnly more than once
239        setOnCompleteOnly(Boolean.FALSE);
240        setOnFailureOnly(Boolean.TRUE);
241        return this;
242    }
243
244    /**
245     * Sets an additional predicate that should be true before the onCompletion is triggered.
246     * <p/>
247     * To be used for fine grained controlling whether a completion callback should be invoked or not
248     *
249     * @param predicate predicate that determines true or false
250     * @return the builder
251     */
252    public OnCompletionDefinition onWhen(Predicate predicate) {
253        setOnWhen(new WhenDefinition(predicate));
254        return this;
255    }
256
257    /**
258     * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion.
259     * <p/>
260     * By default this feature is off.
261     *
262     * @return the builder
263     */
264    public OnCompletionDefinition useOriginalBody() {
265        setUseOriginalMessagePolicy(Boolean.TRUE);
266        return this;
267    }
268
269    /**
270     * To use a custom Thread Pool to be used for parallel processing.
271     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
272     */
273    public OnCompletionDefinition executorService(ExecutorService executorService) {
274        setExecutorService(executorService);
275        return this;
276    }
277
278    /**
279     * Refers to a custom Thread Pool to be used for parallel processing.
280     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
281     */
282    public OnCompletionDefinition executorServiceRef(String executorServiceRef) {
283        setExecutorServiceRef(executorServiceRef);
284        return this;
285    }
286
287    /**
288     * If enabled then the on completion process will run asynchronously by a separate thread from a thread pool.
289     * By default this is false, meaning the on completion process will run synchronously using the same caller thread as from the route.
290     *
291     * @return the builder
292     */
293    public OnCompletionDefinition parallelProcessing() {
294        setParallelProcessing(true);
295        return this;
296    }
297
298    /**
299     * If enabled then the on completion process will run asynchronously by a separate thread from a thread pool.
300     * By default this is false, meaning the on completion process will run synchronously using the same caller thread as from the route.
301     *
302     * @return the builder
303     */
304    public OnCompletionDefinition parallelProcessing(boolean parallelProcessing) {
305        setParallelProcessing(parallelProcessing);
306        return this;
307    }
308
309    public List<ProcessorDefinition<?>> getOutputs() {
310        return outputs;
311    }
312
313    public void setOutputs(List<ProcessorDefinition<?>> outputs) {
314        this.outputs = outputs;
315    }
316
317    public boolean isOutputSupported() {
318        return true;
319    }
320
321    public OnCompletionMode getMode() {
322        return mode;
323    }
324
325    /**
326     * Sets the on completion mode.
327     * <p/>
328     * The default value is AfterConsumer
329     */
330    public void setMode(OnCompletionMode mode) {
331        this.mode = mode;
332    }
333
334    public Boolean getOnCompleteOnly() {
335        return onCompleteOnly;
336    }
337
338    public void setOnCompleteOnly(Boolean onCompleteOnly) {
339        this.onCompleteOnly = onCompleteOnly;
340    }
341
342    public Boolean getOnFailureOnly() {
343        return onFailureOnly;
344    }
345
346    public void setOnFailureOnly(Boolean onFailureOnly) {
347        this.onFailureOnly = onFailureOnly;
348    }
349
350    public WhenDefinition getOnWhen() {
351        return onWhen;
352    }
353
354    public void setOnWhen(WhenDefinition onWhen) {
355        this.onWhen = onWhen;
356    }
357
358    public ExecutorService getExecutorService() {
359        return executorService;
360    }
361
362    public void setExecutorService(ExecutorService executorService) {
363        this.executorService = executorService;
364    }
365
366    public String getExecutorServiceRef() {
367        return executorServiceRef;
368    }
369
370    public void setExecutorServiceRef(String executorServiceRef) {
371        this.executorServiceRef = executorServiceRef;
372    }
373
374    public Boolean getUseOriginalMessagePolicy() {
375        return useOriginalMessagePolicy;
376    }
377
378    /**
379     * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion.
380     * <p/>
381     * By default this feature is off.
382     */
383    public void setUseOriginalMessagePolicy(Boolean useOriginalMessagePolicy) {
384        this.useOriginalMessagePolicy = useOriginalMessagePolicy;
385    }
386
387    public Boolean getParallelProcessing() {
388        return parallelProcessing;
389    }
390
391    public void setParallelProcessing(Boolean parallelProcessing) {
392        this.parallelProcessing = parallelProcessing;
393    }
394
395}