001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.HashMap;
022    import java.util.Iterator;
023    import java.util.List;
024    import java.util.Map;
025    import java.util.concurrent.ExecutorService;
026    
027    import javax.xml.bind.annotation.XmlAccessType;
028    import javax.xml.bind.annotation.XmlAccessorType;
029    import javax.xml.bind.annotation.XmlAttribute;
030    import javax.xml.bind.annotation.XmlElement;
031    import javax.xml.bind.annotation.XmlElementRef;
032    import javax.xml.bind.annotation.XmlRootElement;
033    import javax.xml.bind.annotation.XmlTransient;
034    
035    import org.apache.camel.Predicate;
036    import org.apache.camel.Processor;
037    import org.apache.camel.processor.OnCompletionProcessor;
038    import org.apache.camel.processor.UnitOfWorkProcessor;
039    import org.apache.camel.spi.RouteContext;
040    
041    /**
042     * Represents an XML <onCompletion/> element
043     *
044     * @version 
045     */
046    @XmlRootElement(name = "onCompletion")
047    @XmlAccessorType(XmlAccessType.FIELD)
048    public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> {
049        @XmlAttribute
050        private Boolean onCompleteOnly;
051        @XmlAttribute
052        private Boolean onFailureOnly;
053        @XmlElement(name = "onWhen")
054        private WhenDefinition onWhen;
055        @XmlAttribute
056        private String executorServiceRef;
057        @XmlAttribute(name = "useOriginalMessage")
058        private Boolean useOriginalMessagePolicy;
059        @XmlElementRef
060        private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
061        @XmlTransient
062        private ExecutorService executorService;
063        @XmlTransient
064        private Boolean routeScoped;
065        // TODO: in Camel 3.0 the OnCompletionDefinition should not contain state and OnCompletion processors
066        @XmlTransient
067        private final Map<String, Processor> onCompletions = new HashMap<String, Processor>();
068    
069        public OnCompletionDefinition() {
070        }
071    
072        public boolean isRouteScoped() {
073            // is context scoped by default
074            return routeScoped != null ? routeScoped : false;
075        }
076    
077        public Processor getOnCompletion(String routeId) {
078            return onCompletions.get(routeId);
079        }
080    
081        public Collection<Processor> getOnCompletions() {
082            return onCompletions.values();
083        }
084    
085        @Override
086        public String toString() {
087            return "onCompletion[" + getOutputs() + "]";
088        }
089    
090        @Override
091        public String getShortName() {
092            return "onCompletion";
093        }
094    
095        @Override
096        public String getLabel() {
097            return "onCompletion";
098        }
099    
100        @Override
101        public boolean isAbstract() {
102            return true;
103        }
104    
105        @Override
106        public Processor createProcessor(RouteContext routeContext) throws Exception {
107            // assign whether this was a route scoped onCompletion or not
108            // we need to know this later when setting the parent, as only route scoped should have parent
109            // Note: this logic can possible be removed when the Camel routing engine decides at runtime
110            // to apply onCompletion in a more dynamic fashion than current code base
111            // and therefore is in a better position to decide among context/route scoped OnCompletion at runtime
112            if (routeScoped == null) {
113                routeScoped = super.getParent() != null;
114            }
115    
116            if (isOnCompleteOnly() && isOnFailureOnly()) {
117                throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
118            }
119    
120            Processor childProcessor = this.createChildProcessor(routeContext, true);
121            // wrap the on completion route in a unit of work processor
122            childProcessor = new UnitOfWorkProcessor(routeContext, childProcessor);
123    
124            String id = routeContext.getRoute().getId();
125            onCompletions.put(id, childProcessor);
126    
127            Predicate when = null;
128            if (onWhen != null) {
129                when = onWhen.getExpression().createPredicate(routeContext);
130            }
131    
132            // executor service is mandatory for on completion
133            boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
134            ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, true);
135    
136            // should be false by default
137            boolean original = getUseOriginalMessagePolicy() != null ? getUseOriginalMessagePolicy() : false;
138            OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor,
139                    threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original);
140            return answer;
141        }
142    
143        /**
144         * Removes all existing {@link org.apache.camel.model.OnCompletionDefinition} from the definition.
145         * <p/>
146         * This is used to let route scoped <tt>onCompletion</tt> overrule any global <tt>onCompletion</tt>.
147         * Hence we remove all existing as they are global.
148         *
149         * @param definition the parent definition that is the route
150         */
151        public void removeAllOnCompletionDefinition(ProcessorDefinition<?> definition) {
152            for (Iterator<ProcessorDefinition<?>> it = definition.getOutputs().iterator(); it.hasNext();) {
153                ProcessorDefinition<?> out = it.next();
154                if (out instanceof OnCompletionDefinition) {
155                    it.remove();
156                }
157            }
158        }
159    
160        @Override
161        public ProcessorDefinition<?> end() {
162            // pop parent block, as we added our self as block to parent when synchronized was defined in the route
163            getParent().popBlock();
164            return super.end();
165        }
166    
167        /**
168         * Will only synchronize when the {@link org.apache.camel.Exchange} completed successfully (no errors).
169         *
170         * @return the builder
171         */
172        public OnCompletionDefinition onCompleteOnly() {
173            if (isOnFailureOnly()) {
174                throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
175            }
176            // must define return type as OutputDefinition and not this type to avoid end user being able
177            // to invoke onFailureOnly/onCompleteOnly more than once
178            setOnCompleteOnly(Boolean.TRUE);
179            setOnFailureOnly(Boolean.FALSE);
180            return this;
181        }
182    
183        /**
184         * Will only synchronize when the {@link org.apache.camel.Exchange} ended with failure (exception or FAULT message).
185         *
186         * @return the builder
187         */
188        public OnCompletionDefinition onFailureOnly() {
189            if (isOnCompleteOnly()) {
190                throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this);
191            }
192            // must define return type as OutputDefinition and not this type to avoid end user being able
193            // to invoke onFailureOnly/onCompleteOnly more than once
194            setOnCompleteOnly(Boolean.FALSE);
195            setOnFailureOnly(Boolean.TRUE);
196            return this;
197        }
198    
199        /**
200         * Sets an additional predicate that should be true before the onCompletion is triggered.
201         * <p/>
202         * To be used for fine grained controlling whether a completion callback should be invoked or not
203         *
204         * @param predicate predicate that determines true or false
205         * @return the builder
206         */
207        public OnCompletionDefinition onWhen(Predicate predicate) {
208            setOnWhen(new WhenDefinition(predicate));
209            return this;
210        }
211    
212        /**
213         * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion.
214         * <p/>
215         * By default this feature is off.
216         *
217         * @return the builder
218         */
219        public OnCompletionDefinition useOriginalBody() {
220            setUseOriginalMessagePolicy(Boolean.TRUE);
221            return this;
222        }
223    
224        public OnCompletionDefinition executorService(ExecutorService executorService) {
225            setExecutorService(executorService);
226            return this;
227        }
228    
229        public OnCompletionDefinition executorServiceRef(String executorServiceRef) {
230            setExecutorServiceRef(executorServiceRef);
231            return this;
232        }
233    
234        public List<ProcessorDefinition<?>> getOutputs() {
235            return outputs;
236        }
237    
238        public void setOutputs(List<ProcessorDefinition<?>> outputs) {
239            this.outputs = outputs;
240        }
241    
242        public boolean isOutputSupported() {
243            return true;
244        }
245    
246        public Boolean getOnCompleteOnly() {
247            return onCompleteOnly;
248        }
249    
250        public void setOnCompleteOnly(Boolean onCompleteOnly) {
251            this.onCompleteOnly = onCompleteOnly;
252        }
253    
254        public boolean isOnCompleteOnly() {
255            return onCompleteOnly != null && onCompleteOnly;
256        }
257    
258        public Boolean getOnFailureOnly() {
259            return onFailureOnly;
260        }
261    
262        public void setOnFailureOnly(Boolean onFailureOnly) {
263            this.onFailureOnly = onFailureOnly;
264        }
265    
266        public boolean isOnFailureOnly() {
267            return onFailureOnly != null && onFailureOnly;
268        }
269    
270        public WhenDefinition getOnWhen() {
271            return onWhen;
272        }
273    
274        public void setOnWhen(WhenDefinition onWhen) {
275            this.onWhen = onWhen;
276        }
277    
278        public ExecutorService getExecutorService() {
279            return executorService;
280        }
281    
282        public void setExecutorService(ExecutorService executorService) {
283            this.executorService = executorService;
284        }
285    
286        public String getExecutorServiceRef() {
287            return executorServiceRef;
288        }
289    
290        public void setExecutorServiceRef(String executorServiceRef) {
291            this.executorServiceRef = executorServiceRef;
292        }
293    
294        public Boolean getUseOriginalMessagePolicy() {
295            return useOriginalMessagePolicy != null;
296        }
297    
298        public void setUseOriginalMessagePolicy(Boolean useOriginalMessagePolicy) {
299            this.useOriginalMessagePolicy = useOriginalMessagePolicy;
300        }
301    
302    }