001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.model;
018
019import java.util.ArrayList;
020import java.util.Collection;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.ExecutorService;
026
027import javax.xml.bind.annotation.XmlAccessType;
028import javax.xml.bind.annotation.XmlAccessorType;
029import javax.xml.bind.annotation.XmlAttribute;
030import javax.xml.bind.annotation.XmlElement;
031import javax.xml.bind.annotation.XmlElementRef;
032import javax.xml.bind.annotation.XmlRootElement;
033import javax.xml.bind.annotation.XmlTransient;
034
035import org.apache.camel.Predicate;
036import org.apache.camel.Processor;
037import org.apache.camel.processor.CamelInternalProcessor;
038import org.apache.camel.processor.OnCompletionProcessor;
039import org.apache.camel.spi.AsPredicate;
040import org.apache.camel.spi.Metadata;
041import org.apache.camel.spi.RouteContext;
042
043/**
044 * Route to be executed when normal route processing completes
045 *
046 * @version 
047 */
048@Metadata(label = "configuration")
049@XmlRootElement(name = "onCompletion")
050@XmlAccessorType(XmlAccessType.FIELD)
051public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> {
052    @XmlAttribute @Metadata(defaultValue = "AfterConsumer")
053    private OnCompletionMode mode;
054    @XmlAttribute
055    private Boolean onCompleteOnly;
056    @XmlAttribute
057    private Boolean onFailureOnly;
058    @XmlElement(name = "onWhen") @AsPredicate
059    private WhenDefinition onWhen;
060    @XmlAttribute
061    private Boolean parallelProcessing;
062    @XmlAttribute
063    private String executorServiceRef;
064    @XmlAttribute(name = "useOriginalMessage")
065    private Boolean useOriginalMessagePolicy;
066    @XmlElementRef
067    private List<ProcessorDefinition<?>> outputs = new ArrayList<>();
068    @XmlTransient
069    private ExecutorService executorService;
070    @XmlTransient
071    private Boolean routeScoped;
072    // TODO: in Camel 3.0 the OnCompletionDefinition should not contain state and OnCompletion processors
073    @XmlTransient
074    private final Map<String, Processor> onCompletions = new HashMap<>();
075
076    public OnCompletionDefinition() {
077    }
078
079    public boolean isRouteScoped() {
080        // is context scoped by default
081        return routeScoped != null ? routeScoped : false;
082    }
083
084    public Processor getOnCompletion(String routeId) {
085        return onCompletions.get(routeId);
086    }
087
088    public Collection<Processor> getOnCompletions() {
089        return onCompletions.values();
090    }
091
092    @Override
093    public String toString() {
094        return "onCompletion[" + getOutputs() + "]";
095    }
096
097    @Override
098    public String getShortName() {
099        return "onCompletion";
100    }
101
102    @Override
103    public String getLabel() {
104        return "onCompletion";
105    }
106
107    @Override
108    public boolean isAbstract() {
109        return true;
110    }
111
112    @Override
113    public boolean isTopLevelOnly() {
114        return true;
115    }
116
117    @Override
118    public Processor createProcessor(RouteContext routeContext) throws Exception {
119        // assign whether this was a route scoped onCompletion or not
120        // we need to know this later when setting the parent, as only route scoped should have parent
121        // Note: this logic can possible be removed when the Camel routing engine decides at runtime
122        // to apply onCompletion in a more dynamic fashion than current code base
123        // and therefore is in a better position to decide among context/route scoped OnCompletion at runtime
124        if (routeScoped == null) {
125            routeScoped = super.getParent() != null;
126        }
127
128        boolean isOnCompleteOnly = getOnCompleteOnly() != null && getOnCompleteOnly();
129        boolean isOnFailureOnly = getOnFailureOnly() != null && getOnFailureOnly();
130        boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing();
131        boolean original = getUseOriginalMessagePolicy() != null && getUseOriginalMessagePolicy();
132
133        if (isOnCompleteOnly && isOnFailureOnly) {
134            throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
135        }
136        if (original) {
137            // ensure allow original is turned on
138            routeContext.setAllowUseOriginalMessage(true);
139        }
140
141        String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory());
142
143        Processor childProcessor = this.createChildProcessor(routeContext, true);
144
145        // wrap the on completion route in a unit of work processor
146        CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor);
147        internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext));
148
149        onCompletions.put(routeId, internal);
150
151        Predicate when = null;
152        if (onWhen != null) {
153            when = onWhen.getExpression().createPredicate(routeContext);
154        }
155
156        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing);
157        ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, isParallelProcessing);
158
159        // should be after consumer by default
160        boolean afterConsumer = mode == null || mode == OnCompletionMode.AfterConsumer;
161
162        OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), internal,
163                threadPool, shutdownThreadPool, isOnCompleteOnly, isOnFailureOnly, when, original, afterConsumer);
164        return answer;
165    }
166
167    /**
168     * Removes all existing {@link org.apache.camel.model.OnCompletionDefinition} from the definition.
169     * <p/>
170     * This is used to let route scoped <tt>onCompletion</tt> overrule any global <tt>onCompletion</tt>.
171     * Hence we remove all existing as they are global.
172     *
173     * @param definition the parent definition that is the route
174     */
175    public void removeAllOnCompletionDefinition(ProcessorDefinition<?> definition) {
176        for (Iterator<ProcessorDefinition<?>> it = definition.getOutputs().iterator(); it.hasNext();) {
177            ProcessorDefinition<?> out = it.next();
178            if (out instanceof OnCompletionDefinition) {
179                it.remove();
180            }
181        }
182    }
183
184    @Override
185    public ProcessorDefinition<?> end() {
186        // pop parent block, as we added our self as block to parent when synchronized was defined in the route
187        getParent().popBlock();
188        return super.end();
189    }
190
191    /**
192     * Sets the mode to be after route is done (default due backwards compatible).
193     * <p/>
194     * This executes the on completion work <i>after</i> the route consumer have written response
195     * back to the callee (if its InOut mode).
196     *
197     * @return the builder
198     */
199    public OnCompletionDefinition modeAfterConsumer() {
200        setMode(OnCompletionMode.AfterConsumer);
201        return this;
202    }
203
204    /**
205     * Sets the mode to be before consumer is done.
206     * <p/>
207     * This allows the on completion work to execute <i>before</i> the route consumer, writes any response
208     * back to the callee (if its InOut mode).
209     *
210     * @return the builder
211     */
212    public OnCompletionDefinition modeBeforeConsumer() {
213        setMode(OnCompletionMode.BeforeConsumer);
214        return this;
215    }
216
217    /**
218     * Will only synchronize when the {@link org.apache.camel.Exchange} completed successfully (no errors).
219     *
220     * @return the builder
221     */
222    public OnCompletionDefinition onCompleteOnly() {
223        boolean isOnFailureOnly = getOnFailureOnly() != null && getOnFailureOnly();
224        if (isOnFailureOnly) {
225            throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
226        }
227        // must define return type as OutputDefinition and not this type to avoid end user being able
228        // to invoke onFailureOnly/onCompleteOnly more than once
229        setOnCompleteOnly(Boolean.TRUE);
230        setOnFailureOnly(Boolean.FALSE);
231        return this;
232    }
233
234    /**
235     * Will only synchronize when the {@link org.apache.camel.Exchange} ended with failure (exception or FAULT message).
236     *
237     * @return the builder
238     */
239    public OnCompletionDefinition onFailureOnly() {
240        boolean isOnCompleteOnly = getOnCompleteOnly() != null && getOnCompleteOnly();
241        if (isOnCompleteOnly) {
242            throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
243        }
244        // must define return type as OutputDefinition and not this type to avoid end user being able
245        // to invoke onFailureOnly/onCompleteOnly more than once
246        setOnCompleteOnly(Boolean.FALSE);
247        setOnFailureOnly(Boolean.TRUE);
248        return this;
249    }
250
251    /**
252     * Sets an additional predicate that should be true before the onCompletion is triggered.
253     * <p/>
254     * To be used for fine grained controlling whether a completion callback should be invoked or not
255     *
256     * @param predicate predicate that determines true or false
257     * @return the builder
258     */
259    public OnCompletionDefinition onWhen(@AsPredicate Predicate predicate) {
260        setOnWhen(new WhenDefinition(predicate));
261        return this;
262    }
263
264    /**
265     * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion.
266     * <p/>
267     * By default this feature is off.
268     *
269     * @return the builder
270     */
271    public OnCompletionDefinition useOriginalBody() {
272        setUseOriginalMessagePolicy(Boolean.TRUE);
273        return this;
274    }
275
276    /**
277     * To use a custom Thread Pool to be used for parallel processing.
278     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
279     */
280    public OnCompletionDefinition executorService(ExecutorService executorService) {
281        setExecutorService(executorService);
282        return this;
283    }
284
285    /**
286     * Refers to a custom Thread Pool to be used for parallel processing.
287     * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well.
288     */
289    public OnCompletionDefinition executorServiceRef(String executorServiceRef) {
290        setExecutorServiceRef(executorServiceRef);
291        return this;
292    }
293
294    /**
295     * If enabled then the on completion process will run asynchronously by a separate thread from a thread pool.
296     * By default this is false, meaning the on completion process will run synchronously using the same caller thread as from the route.
297     *
298     * @return the builder
299     */
300    public OnCompletionDefinition parallelProcessing() {
301        setParallelProcessing(true);
302        return this;
303    }
304
305    /**
306     * If enabled then the on completion process will run asynchronously by a separate thread from a thread pool.
307     * By default this is false, meaning the on completion process will run synchronously using the same caller thread as from the route.
308     *
309     * @return the builder
310     */
311    public OnCompletionDefinition parallelProcessing(boolean parallelProcessing) {
312        setParallelProcessing(parallelProcessing);
313        return this;
314    }
315
316    public List<ProcessorDefinition<?>> getOutputs() {
317        return outputs;
318    }
319
320    public void setOutputs(List<ProcessorDefinition<?>> outputs) {
321        this.outputs = outputs;
322    }
323
324    public boolean isOutputSupported() {
325        return true;
326    }
327
328    public OnCompletionMode getMode() {
329        return mode;
330    }
331
332    /**
333     * Sets the on completion mode.
334     * <p/>
335     * The default value is AfterConsumer
336     */
337    public void setMode(OnCompletionMode mode) {
338        this.mode = mode;
339    }
340
341    public Boolean getOnCompleteOnly() {
342        return onCompleteOnly;
343    }
344
345    public void setOnCompleteOnly(Boolean onCompleteOnly) {
346        this.onCompleteOnly = onCompleteOnly;
347    }
348
349    public Boolean getOnFailureOnly() {
350        return onFailureOnly;
351    }
352
353    public void setOnFailureOnly(Boolean onFailureOnly) {
354        this.onFailureOnly = onFailureOnly;
355    }
356
357    public WhenDefinition getOnWhen() {
358        return onWhen;
359    }
360
361    public void setOnWhen(WhenDefinition onWhen) {
362        this.onWhen = onWhen;
363    }
364
365    public ExecutorService getExecutorService() {
366        return executorService;
367    }
368
369    public void setExecutorService(ExecutorService executorService) {
370        this.executorService = executorService;
371    }
372
373    public String getExecutorServiceRef() {
374        return executorServiceRef;
375    }
376
377    public void setExecutorServiceRef(String executorServiceRef) {
378        this.executorServiceRef = executorServiceRef;
379    }
380
381    public Boolean getUseOriginalMessagePolicy() {
382        return useOriginalMessagePolicy;
383    }
384
385    /**
386     * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion.
387     * <p/>
388     * By default this feature is off.
389     */
390    public void setUseOriginalMessagePolicy(Boolean useOriginalMessagePolicy) {
391        this.useOriginalMessagePolicy = useOriginalMessagePolicy;
392    }
393
394    public Boolean getParallelProcessing() {
395        return parallelProcessing;
396    }
397
398    public void setParallelProcessing(Boolean parallelProcessing) {
399        this.parallelProcessing = parallelProcessing;
400    }
401
402}