001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.ExecutorService; 026 027import javax.xml.bind.annotation.XmlAccessType; 028import javax.xml.bind.annotation.XmlAccessorType; 029import javax.xml.bind.annotation.XmlAttribute; 030import javax.xml.bind.annotation.XmlElement; 031import javax.xml.bind.annotation.XmlElementRef; 032import javax.xml.bind.annotation.XmlRootElement; 033import javax.xml.bind.annotation.XmlTransient; 034 035import org.apache.camel.Predicate; 036import org.apache.camel.Processor; 037import org.apache.camel.processor.CamelInternalProcessor; 038import org.apache.camel.processor.OnCompletionProcessor; 039import org.apache.camel.spi.AsPredicate; 040import org.apache.camel.spi.Metadata; 041import org.apache.camel.spi.RouteContext; 042 043/** 044 * Route to be executed when normal route processing completes 045 * 046 * @version 047 */ 048@Metadata(label = "configuration") 049@XmlRootElement(name = "onCompletion") 050@XmlAccessorType(XmlAccessType.FIELD) 051public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> { 052 @XmlAttribute @Metadata(defaultValue = "AfterConsumer") 053 private OnCompletionMode mode; 054 @XmlAttribute 055 private Boolean onCompleteOnly; 056 @XmlAttribute 057 private Boolean onFailureOnly; 058 @XmlElement(name = "onWhen") @AsPredicate 059 private WhenDefinition onWhen; 060 @XmlAttribute 061 private Boolean parallelProcessing; 062 @XmlAttribute 063 private String executorServiceRef; 064 @XmlAttribute(name = "useOriginalMessage") 065 private Boolean useOriginalMessagePolicy; 066 @XmlElementRef 067 private List<ProcessorDefinition<?>> outputs = new ArrayList<>(); 068 @XmlTransient 069 private ExecutorService executorService; 070 @XmlTransient 071 private Boolean routeScoped; 072 // TODO: in Camel 3.0 the OnCompletionDefinition should not contain state and OnCompletion processors 073 @XmlTransient 074 private final Map<String, Processor> onCompletions = new HashMap<>(); 075 076 public OnCompletionDefinition() { 077 } 078 079 public boolean isRouteScoped() { 080 // is context scoped by default 081 return routeScoped != null ? routeScoped : false; 082 } 083 084 public Processor getOnCompletion(String routeId) { 085 return onCompletions.get(routeId); 086 } 087 088 public Collection<Processor> getOnCompletions() { 089 return onCompletions.values(); 090 } 091 092 @Override 093 public String toString() { 094 return "onCompletion[" + getOutputs() + "]"; 095 } 096 097 @Override 098 public String getShortName() { 099 return "onCompletion"; 100 } 101 102 @Override 103 public String getLabel() { 104 return "onCompletion"; 105 } 106 107 @Override 108 public boolean isAbstract() { 109 return true; 110 } 111 112 @Override 113 public boolean isTopLevelOnly() { 114 return true; 115 } 116 117 @Override 118 public Processor createProcessor(RouteContext routeContext) throws Exception { 119 // assign whether this was a route scoped onCompletion or not 120 // we need to know this later when setting the parent, as only route scoped should have parent 121 // Note: this logic can possible be removed when the Camel routing engine decides at runtime 122 // to apply onCompletion in a more dynamic fashion than current code base 123 // and therefore is in a better position to decide among context/route scoped OnCompletion at runtime 124 if (routeScoped == null) { 125 routeScoped = super.getParent() != null; 126 } 127 128 boolean isOnCompleteOnly = getOnCompleteOnly() != null && getOnCompleteOnly(); 129 boolean isOnFailureOnly = getOnFailureOnly() != null && getOnFailureOnly(); 130 boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing(); 131 boolean original = getUseOriginalMessagePolicy() != null && getUseOriginalMessagePolicy(); 132 133 if (isOnCompleteOnly && isOnFailureOnly) { 134 throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); 135 } 136 if (original) { 137 // ensure allow original is turned on 138 routeContext.setAllowUseOriginalMessage(true); 139 } 140 141 String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); 142 143 Processor childProcessor = this.createChildProcessor(routeContext, true); 144 145 // wrap the on completion route in a unit of work processor 146 CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor); 147 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); 148 149 onCompletions.put(routeId, internal); 150 151 Predicate when = null; 152 if (onWhen != null) { 153 when = onWhen.getExpression().createPredicate(routeContext); 154 } 155 156 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing); 157 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, isParallelProcessing); 158 159 // should be after consumer by default 160 boolean afterConsumer = mode == null || mode == OnCompletionMode.AfterConsumer; 161 162 OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), internal, 163 threadPool, shutdownThreadPool, isOnCompleteOnly, isOnFailureOnly, when, original, afterConsumer); 164 return answer; 165 } 166 167 /** 168 * Removes all existing {@link org.apache.camel.model.OnCompletionDefinition} from the definition. 169 * <p/> 170 * This is used to let route scoped <tt>onCompletion</tt> overrule any global <tt>onCompletion</tt>. 171 * Hence we remove all existing as they are global. 172 * 173 * @param definition the parent definition that is the route 174 */ 175 public void removeAllOnCompletionDefinition(ProcessorDefinition<?> definition) { 176 for (Iterator<ProcessorDefinition<?>> it = definition.getOutputs().iterator(); it.hasNext();) { 177 ProcessorDefinition<?> out = it.next(); 178 if (out instanceof OnCompletionDefinition) { 179 it.remove(); 180 } 181 } 182 } 183 184 @Override 185 public ProcessorDefinition<?> end() { 186 // pop parent block, as we added our self as block to parent when synchronized was defined in the route 187 getParent().popBlock(); 188 return super.end(); 189 } 190 191 /** 192 * Sets the mode to be after route is done (default due backwards compatible). 193 * <p/> 194 * This executes the on completion work <i>after</i> the route consumer have written response 195 * back to the callee (if its InOut mode). 196 * 197 * @return the builder 198 */ 199 public OnCompletionDefinition modeAfterConsumer() { 200 setMode(OnCompletionMode.AfterConsumer); 201 return this; 202 } 203 204 /** 205 * Sets the mode to be before consumer is done. 206 * <p/> 207 * This allows the on completion work to execute <i>before</i> the route consumer, writes any response 208 * back to the callee (if its InOut mode). 209 * 210 * @return the builder 211 */ 212 public OnCompletionDefinition modeBeforeConsumer() { 213 setMode(OnCompletionMode.BeforeConsumer); 214 return this; 215 } 216 217 /** 218 * Will only synchronize when the {@link org.apache.camel.Exchange} completed successfully (no errors). 219 * 220 * @return the builder 221 */ 222 public OnCompletionDefinition onCompleteOnly() { 223 boolean isOnFailureOnly = getOnFailureOnly() != null && getOnFailureOnly(); 224 if (isOnFailureOnly) { 225 throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); 226 } 227 // must define return type as OutputDefinition and not this type to avoid end user being able 228 // to invoke onFailureOnly/onCompleteOnly more than once 229 setOnCompleteOnly(Boolean.TRUE); 230 setOnFailureOnly(Boolean.FALSE); 231 return this; 232 } 233 234 /** 235 * Will only synchronize when the {@link org.apache.camel.Exchange} ended with failure (exception or FAULT message). 236 * 237 * @return the builder 238 */ 239 public OnCompletionDefinition onFailureOnly() { 240 boolean isOnCompleteOnly = getOnCompleteOnly() != null && getOnCompleteOnly(); 241 if (isOnCompleteOnly) { 242 throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); 243 } 244 // must define return type as OutputDefinition and not this type to avoid end user being able 245 // to invoke onFailureOnly/onCompleteOnly more than once 246 setOnCompleteOnly(Boolean.FALSE); 247 setOnFailureOnly(Boolean.TRUE); 248 return this; 249 } 250 251 /** 252 * Sets an additional predicate that should be true before the onCompletion is triggered. 253 * <p/> 254 * To be used for fine grained controlling whether a completion callback should be invoked or not 255 * 256 * @param predicate predicate that determines true or false 257 * @return the builder 258 */ 259 public OnCompletionDefinition onWhen(@AsPredicate Predicate predicate) { 260 setOnWhen(new WhenDefinition(predicate)); 261 return this; 262 } 263 264 /** 265 * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion. 266 * <p/> 267 * By default this feature is off. 268 * 269 * @return the builder 270 */ 271 public OnCompletionDefinition useOriginalBody() { 272 setUseOriginalMessagePolicy(Boolean.TRUE); 273 return this; 274 } 275 276 /** 277 * To use a custom Thread Pool to be used for parallel processing. 278 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 279 */ 280 public OnCompletionDefinition executorService(ExecutorService executorService) { 281 setExecutorService(executorService); 282 return this; 283 } 284 285 /** 286 * Refers to a custom Thread Pool to be used for parallel processing. 287 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 288 */ 289 public OnCompletionDefinition executorServiceRef(String executorServiceRef) { 290 setExecutorServiceRef(executorServiceRef); 291 return this; 292 } 293 294 /** 295 * If enabled then the on completion process will run asynchronously by a separate thread from a thread pool. 296 * By default this is false, meaning the on completion process will run synchronously using the same caller thread as from the route. 297 * 298 * @return the builder 299 */ 300 public OnCompletionDefinition parallelProcessing() { 301 setParallelProcessing(true); 302 return this; 303 } 304 305 /** 306 * If enabled then the on completion process will run asynchronously by a separate thread from a thread pool. 307 * By default this is false, meaning the on completion process will run synchronously using the same caller thread as from the route. 308 * 309 * @return the builder 310 */ 311 public OnCompletionDefinition parallelProcessing(boolean parallelProcessing) { 312 setParallelProcessing(parallelProcessing); 313 return this; 314 } 315 316 public List<ProcessorDefinition<?>> getOutputs() { 317 return outputs; 318 } 319 320 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 321 this.outputs = outputs; 322 } 323 324 public boolean isOutputSupported() { 325 return true; 326 } 327 328 public OnCompletionMode getMode() { 329 return mode; 330 } 331 332 /** 333 * Sets the on completion mode. 334 * <p/> 335 * The default value is AfterConsumer 336 */ 337 public void setMode(OnCompletionMode mode) { 338 this.mode = mode; 339 } 340 341 public Boolean getOnCompleteOnly() { 342 return onCompleteOnly; 343 } 344 345 public void setOnCompleteOnly(Boolean onCompleteOnly) { 346 this.onCompleteOnly = onCompleteOnly; 347 } 348 349 public Boolean getOnFailureOnly() { 350 return onFailureOnly; 351 } 352 353 public void setOnFailureOnly(Boolean onFailureOnly) { 354 this.onFailureOnly = onFailureOnly; 355 } 356 357 public WhenDefinition getOnWhen() { 358 return onWhen; 359 } 360 361 public void setOnWhen(WhenDefinition onWhen) { 362 this.onWhen = onWhen; 363 } 364 365 public ExecutorService getExecutorService() { 366 return executorService; 367 } 368 369 public void setExecutorService(ExecutorService executorService) { 370 this.executorService = executorService; 371 } 372 373 public String getExecutorServiceRef() { 374 return executorServiceRef; 375 } 376 377 public void setExecutorServiceRef(String executorServiceRef) { 378 this.executorServiceRef = executorServiceRef; 379 } 380 381 public Boolean getUseOriginalMessagePolicy() { 382 return useOriginalMessagePolicy; 383 } 384 385 /** 386 * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion. 387 * <p/> 388 * By default this feature is off. 389 */ 390 public void setUseOriginalMessagePolicy(Boolean useOriginalMessagePolicy) { 391 this.useOriginalMessagePolicy = useOriginalMessagePolicy; 392 } 393 394 public Boolean getParallelProcessing() { 395 return parallelProcessing; 396 } 397 398 public void setParallelProcessing(Boolean parallelProcessing) { 399 this.parallelProcessing = parallelProcessing; 400 } 401 402}