001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.HashMap; 022 import java.util.Iterator; 023 import java.util.List; 024 import java.util.Map; 025 import java.util.concurrent.ExecutorService; 026 027 import javax.xml.bind.annotation.XmlAccessType; 028 import javax.xml.bind.annotation.XmlAccessorType; 029 import javax.xml.bind.annotation.XmlAttribute; 030 import javax.xml.bind.annotation.XmlElement; 031 import javax.xml.bind.annotation.XmlElementRef; 032 import javax.xml.bind.annotation.XmlRootElement; 033 import javax.xml.bind.annotation.XmlTransient; 034 035 import org.apache.camel.Predicate; 036 import org.apache.camel.Processor; 037 import org.apache.camel.processor.OnCompletionProcessor; 038 import org.apache.camel.processor.UnitOfWorkProcessor; 039 import org.apache.camel.spi.RouteContext; 040 041 /** 042 * Represents an XML <onCompletion/> element 043 * 044 * @version 045 */ 046 @XmlRootElement(name = "onCompletion") 047 @XmlAccessorType(XmlAccessType.FIELD) 048 public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> { 049 @XmlAttribute 050 private Boolean onCompleteOnly; 051 @XmlAttribute 052 private Boolean onFailureOnly; 053 @XmlElement(name = "onWhen") 054 private WhenDefinition onWhen; 055 @XmlAttribute 056 private String executorServiceRef; 057 @XmlAttribute(name = "useOriginalMessage") 058 private Boolean useOriginalMessagePolicy; 059 @XmlElementRef 060 private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); 061 @XmlTransient 062 private ExecutorService executorService; 063 @XmlTransient 064 private Boolean routeScoped; 065 // TODO: in Camel 3.0 the OnCompletionDefinition should not contain state and OnCompletion processors 066 @XmlTransient 067 private final Map<String, Processor> onCompletions = new HashMap<String, Processor>(); 068 069 public OnCompletionDefinition() { 070 } 071 072 public boolean isRouteScoped() { 073 // is context scoped by default 074 return routeScoped != null ? routeScoped : false; 075 } 076 077 public Processor getOnCompletion(String routeId) { 078 return onCompletions.get(routeId); 079 } 080 081 public Collection<Processor> getOnCompletions() { 082 return onCompletions.values(); 083 } 084 085 @Override 086 public String toString() { 087 return "onCompletion[" + getOutputs() + "]"; 088 } 089 090 @Override 091 public String getShortName() { 092 return "onCompletion"; 093 } 094 095 @Override 096 public String getLabel() { 097 return "onCompletion"; 098 } 099 100 @Override 101 public boolean isAbstract() { 102 return true; 103 } 104 105 @Override 106 public Processor createProcessor(RouteContext routeContext) throws Exception { 107 // assign whether this was a route scoped onCompletion or not 108 // we need to know this later when setting the parent, as only route scoped should have parent 109 // Note: this logic can possible be removed when the Camel routing engine decides at runtime 110 // to apply onCompletion in a more dynamic fashion than current code base 111 // and therefore is in a better position to decide among context/route scoped OnCompletion at runtime 112 if (routeScoped == null) { 113 routeScoped = super.getParent() != null; 114 } 115 116 if (isOnCompleteOnly() && isOnFailureOnly()) { 117 throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); 118 } 119 120 Processor childProcessor = this.createChildProcessor(routeContext, true); 121 // wrap the on completion route in a unit of work processor 122 childProcessor = new UnitOfWorkProcessor(routeContext, childProcessor); 123 124 String id = routeContext.getRoute().getId(); 125 onCompletions.put(id, childProcessor); 126 127 Predicate when = null; 128 if (onWhen != null) { 129 when = onWhen.getExpression().createPredicate(routeContext); 130 } 131 132 // executor service is mandatory for on completion 133 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true); 134 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, true); 135 136 // should be false by default 137 boolean original = getUseOriginalMessagePolicy() != null ? getUseOriginalMessagePolicy() : false; 138 OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), childProcessor, 139 threadPool, shutdownThreadPool, isOnCompleteOnly(), isOnFailureOnly(), when, original); 140 return answer; 141 } 142 143 /** 144 * Removes all existing {@link org.apache.camel.model.OnCompletionDefinition} from the definition. 145 * <p/> 146 * This is used to let route scoped <tt>onCompletion</tt> overrule any global <tt>onCompletion</tt>. 147 * Hence we remove all existing as they are global. 148 * 149 * @param definition the parent definition that is the route 150 */ 151 public void removeAllOnCompletionDefinition(ProcessorDefinition<?> definition) { 152 for (Iterator<ProcessorDefinition<?>> it = definition.getOutputs().iterator(); it.hasNext();) { 153 ProcessorDefinition<?> out = it.next(); 154 if (out instanceof OnCompletionDefinition) { 155 it.remove(); 156 } 157 } 158 } 159 160 @Override 161 public ProcessorDefinition<?> end() { 162 // pop parent block, as we added our self as block to parent when synchronized was defined in the route 163 getParent().popBlock(); 164 return super.end(); 165 } 166 167 /** 168 * Will only synchronize when the {@link org.apache.camel.Exchange} completed successfully (no errors). 169 * 170 * @return the builder 171 */ 172 public OnCompletionDefinition onCompleteOnly() { 173 if (isOnFailureOnly()) { 174 throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); 175 } 176 // must define return type as OutputDefinition and not this type to avoid end user being able 177 // to invoke onFailureOnly/onCompleteOnly more than once 178 setOnCompleteOnly(Boolean.TRUE); 179 setOnFailureOnly(Boolean.FALSE); 180 return this; 181 } 182 183 /** 184 * Will only synchronize when the {@link org.apache.camel.Exchange} ended with failure (exception or FAULT message). 185 * 186 * @return the builder 187 */ 188 public OnCompletionDefinition onFailureOnly() { 189 if (isOnCompleteOnly()) { 190 throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); 191 } 192 // must define return type as OutputDefinition and not this type to avoid end user being able 193 // to invoke onFailureOnly/onCompleteOnly more than once 194 setOnCompleteOnly(Boolean.FALSE); 195 setOnFailureOnly(Boolean.TRUE); 196 return this; 197 } 198 199 /** 200 * Sets an additional predicate that should be true before the onCompletion is triggered. 201 * <p/> 202 * To be used for fine grained controlling whether a completion callback should be invoked or not 203 * 204 * @param predicate predicate that determines true or false 205 * @return the builder 206 */ 207 public OnCompletionDefinition onWhen(Predicate predicate) { 208 setOnWhen(new WhenDefinition(predicate)); 209 return this; 210 } 211 212 /** 213 * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion. 214 * <p/> 215 * By default this feature is off. 216 * 217 * @return the builder 218 */ 219 public OnCompletionDefinition useOriginalBody() { 220 setUseOriginalMessagePolicy(Boolean.TRUE); 221 return this; 222 } 223 224 public OnCompletionDefinition executorService(ExecutorService executorService) { 225 setExecutorService(executorService); 226 return this; 227 } 228 229 public OnCompletionDefinition executorServiceRef(String executorServiceRef) { 230 setExecutorServiceRef(executorServiceRef); 231 return this; 232 } 233 234 public List<ProcessorDefinition<?>> getOutputs() { 235 return outputs; 236 } 237 238 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 239 this.outputs = outputs; 240 } 241 242 public boolean isOutputSupported() { 243 return true; 244 } 245 246 public Boolean getOnCompleteOnly() { 247 return onCompleteOnly; 248 } 249 250 public void setOnCompleteOnly(Boolean onCompleteOnly) { 251 this.onCompleteOnly = onCompleteOnly; 252 } 253 254 public boolean isOnCompleteOnly() { 255 return onCompleteOnly != null && onCompleteOnly; 256 } 257 258 public Boolean getOnFailureOnly() { 259 return onFailureOnly; 260 } 261 262 public void setOnFailureOnly(Boolean onFailureOnly) { 263 this.onFailureOnly = onFailureOnly; 264 } 265 266 public boolean isOnFailureOnly() { 267 return onFailureOnly != null && onFailureOnly; 268 } 269 270 public WhenDefinition getOnWhen() { 271 return onWhen; 272 } 273 274 public void setOnWhen(WhenDefinition onWhen) { 275 this.onWhen = onWhen; 276 } 277 278 public ExecutorService getExecutorService() { 279 return executorService; 280 } 281 282 public void setExecutorService(ExecutorService executorService) { 283 this.executorService = executorService; 284 } 285 286 public String getExecutorServiceRef() { 287 return executorServiceRef; 288 } 289 290 public void setExecutorServiceRef(String executorServiceRef) { 291 this.executorServiceRef = executorServiceRef; 292 } 293 294 public Boolean getUseOriginalMessagePolicy() { 295 return useOriginalMessagePolicy != null; 296 } 297 298 public void setUseOriginalMessagePolicy(Boolean useOriginalMessagePolicy) { 299 this.useOriginalMessagePolicy = useOriginalMessagePolicy; 300 } 301 302 }