001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.Collection; 021import java.util.HashMap; 022import java.util.Iterator; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.ExecutorService; 026import javax.xml.bind.annotation.XmlAccessType; 027import javax.xml.bind.annotation.XmlAccessorType; 028import javax.xml.bind.annotation.XmlAttribute; 029import javax.xml.bind.annotation.XmlElement; 030import javax.xml.bind.annotation.XmlElementRef; 031import javax.xml.bind.annotation.XmlRootElement; 032import javax.xml.bind.annotation.XmlTransient; 033 034import org.apache.camel.Predicate; 035import org.apache.camel.Processor; 036import org.apache.camel.processor.CamelInternalProcessor; 037import org.apache.camel.processor.OnCompletionProcessor; 038import org.apache.camel.spi.Metadata; 039import org.apache.camel.spi.RouteContext; 040 041/** 042 * Route to be executed when normal route processing completes 043 * 044 * @version 045 */ 046@Metadata(label = "configuration") 047@XmlRootElement(name = "onCompletion") 048@XmlAccessorType(XmlAccessType.FIELD) 049public class OnCompletionDefinition extends ProcessorDefinition<OnCompletionDefinition> implements ExecutorServiceAwareDefinition<OnCompletionDefinition> { 050 @XmlAttribute @Metadata(defaultValue = "AfterConsumer") 051 private OnCompletionMode mode; 052 @XmlAttribute 053 private Boolean onCompleteOnly; 054 @XmlAttribute 055 private Boolean onFailureOnly; 056 @XmlElement(name = "onWhen") 057 private WhenDefinition onWhen; 058 @XmlAttribute 059 private Boolean parallelProcessing; 060 @XmlAttribute 061 private String executorServiceRef; 062 @XmlAttribute(name = "useOriginalMessage") 063 private Boolean useOriginalMessagePolicy; 064 @XmlElementRef 065 private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>(); 066 @XmlTransient 067 private ExecutorService executorService; 068 @XmlTransient 069 private Boolean routeScoped; 070 // TODO: in Camel 3.0 the OnCompletionDefinition should not contain state and OnCompletion processors 071 @XmlTransient 072 private final Map<String, Processor> onCompletions = new HashMap<String, Processor>(); 073 074 public OnCompletionDefinition() { 075 } 076 077 public boolean isRouteScoped() { 078 // is context scoped by default 079 return routeScoped != null ? routeScoped : false; 080 } 081 082 public Processor getOnCompletion(String routeId) { 083 return onCompletions.get(routeId); 084 } 085 086 public Collection<Processor> getOnCompletions() { 087 return onCompletions.values(); 088 } 089 090 @Override 091 public String toString() { 092 return "onCompletion[" + getOutputs() + "]"; 093 } 094 095 @Override 096 public String getLabel() { 097 return "onCompletion"; 098 } 099 100 @Override 101 public boolean isAbstract() { 102 return true; 103 } 104 105 @Override 106 public boolean isTopLevelOnly() { 107 return true; 108 } 109 110 @Override 111 public Processor createProcessor(RouteContext routeContext) throws Exception { 112 // assign whether this was a route scoped onCompletion or not 113 // we need to know this later when setting the parent, as only route scoped should have parent 114 // Note: this logic can possible be removed when the Camel routing engine decides at runtime 115 // to apply onCompletion in a more dynamic fashion than current code base 116 // and therefore is in a better position to decide among context/route scoped OnCompletion at runtime 117 if (routeScoped == null) { 118 routeScoped = super.getParent() != null; 119 } 120 121 boolean isOnCompleteOnly = getOnCompleteOnly() != null && getOnCompleteOnly(); 122 boolean isOnFailureOnly = getOnFailureOnly() != null && getOnFailureOnly(); 123 boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing(); 124 boolean original = getUseOriginalMessagePolicy() != null && getUseOriginalMessagePolicy(); 125 126 if (isOnCompleteOnly && isOnFailureOnly) { 127 throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); 128 } 129 if (original) { 130 // ensure allow original is turned on 131 routeContext.setAllowUseOriginalMessage(true); 132 } 133 134 String routeId = routeContext.getRoute().idOrCreate(routeContext.getCamelContext().getNodeIdFactory()); 135 136 Processor childProcessor = this.createChildProcessor(routeContext, true); 137 138 // wrap the on completion route in a unit of work processor 139 CamelInternalProcessor internal = new CamelInternalProcessor(childProcessor); 140 internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); 141 142 onCompletions.put(routeId, internal); 143 144 Predicate when = null; 145 if (onWhen != null) { 146 when = onWhen.getExpression().createPredicate(routeContext); 147 } 148 149 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing); 150 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "OnCompletion", this, isParallelProcessing); 151 152 // should be after consumer by default 153 boolean afterConsumer = mode == null || mode == OnCompletionMode.AfterConsumer; 154 155 OnCompletionProcessor answer = new OnCompletionProcessor(routeContext.getCamelContext(), internal, 156 threadPool, shutdownThreadPool, isOnCompleteOnly, isOnFailureOnly, when, original, afterConsumer); 157 return answer; 158 } 159 160 /** 161 * Removes all existing {@link org.apache.camel.model.OnCompletionDefinition} from the definition. 162 * <p/> 163 * This is used to let route scoped <tt>onCompletion</tt> overrule any global <tt>onCompletion</tt>. 164 * Hence we remove all existing as they are global. 165 * 166 * @param definition the parent definition that is the route 167 */ 168 public void removeAllOnCompletionDefinition(ProcessorDefinition<?> definition) { 169 for (Iterator<ProcessorDefinition<?>> it = definition.getOutputs().iterator(); it.hasNext();) { 170 ProcessorDefinition<?> out = it.next(); 171 if (out instanceof OnCompletionDefinition) { 172 it.remove(); 173 } 174 } 175 } 176 177 @Override 178 public ProcessorDefinition<?> end() { 179 // pop parent block, as we added our self as block to parent when synchronized was defined in the route 180 getParent().popBlock(); 181 return super.end(); 182 } 183 184 /** 185 * Sets the mode to be after route is done (default due backwards compatible). 186 * <p/> 187 * This executes the on completion work <i>after</i> the route consumer have written response 188 * back to the callee (if its InOut mode). 189 * 190 * @return the builder 191 */ 192 public OnCompletionDefinition modeAfterConsumer() { 193 setMode(OnCompletionMode.AfterConsumer); 194 return this; 195 } 196 197 /** 198 * Sets the mode to be before consumer is done. 199 * <p/> 200 * This allows the on completion work to execute <i>before</i> the route consumer, writes any response 201 * back to the callee (if its InOut mode). 202 * 203 * @return the builder 204 */ 205 public OnCompletionDefinition modeBeforeConsumer() { 206 setMode(OnCompletionMode.BeforeConsumer); 207 return this; 208 } 209 210 /** 211 * Will only synchronize when the {@link org.apache.camel.Exchange} completed successfully (no errors). 212 * 213 * @return the builder 214 */ 215 public OnCompletionDefinition onCompleteOnly() { 216 boolean isOnFailureOnly = getOnFailureOnly() != null && getOnFailureOnly(); 217 if (isOnFailureOnly) { 218 throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); 219 } 220 // must define return type as OutputDefinition and not this type to avoid end user being able 221 // to invoke onFailureOnly/onCompleteOnly more than once 222 setOnCompleteOnly(Boolean.TRUE); 223 setOnFailureOnly(Boolean.FALSE); 224 return this; 225 } 226 227 /** 228 * Will only synchronize when the {@link org.apache.camel.Exchange} ended with failure (exception or FAULT message). 229 * 230 * @return the builder 231 */ 232 public OnCompletionDefinition onFailureOnly() { 233 boolean isOnCompleteOnly = getOnCompleteOnly() != null && getOnCompleteOnly(); 234 if (isOnCompleteOnly) { 235 throw new IllegalArgumentException("Both onCompleteOnly and onFailureOnly cannot be true. Only one of them can be true. On node: " + this); 236 } 237 // must define return type as OutputDefinition and not this type to avoid end user being able 238 // to invoke onFailureOnly/onCompleteOnly more than once 239 setOnCompleteOnly(Boolean.FALSE); 240 setOnFailureOnly(Boolean.TRUE); 241 return this; 242 } 243 244 /** 245 * Sets an additional predicate that should be true before the onCompletion is triggered. 246 * <p/> 247 * To be used for fine grained controlling whether a completion callback should be invoked or not 248 * 249 * @param predicate predicate that determines true or false 250 * @return the builder 251 */ 252 public OnCompletionDefinition onWhen(Predicate predicate) { 253 setOnWhen(new WhenDefinition(predicate)); 254 return this; 255 } 256 257 /** 258 * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion. 259 * <p/> 260 * By default this feature is off. 261 * 262 * @return the builder 263 */ 264 public OnCompletionDefinition useOriginalBody() { 265 setUseOriginalMessagePolicy(Boolean.TRUE); 266 return this; 267 } 268 269 /** 270 * To use a custom Thread Pool to be used for parallel processing. 271 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 272 */ 273 public OnCompletionDefinition executorService(ExecutorService executorService) { 274 setExecutorService(executorService); 275 return this; 276 } 277 278 /** 279 * Refers to a custom Thread Pool to be used for parallel processing. 280 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 281 */ 282 public OnCompletionDefinition executorServiceRef(String executorServiceRef) { 283 setExecutorServiceRef(executorServiceRef); 284 return this; 285 } 286 287 /** 288 * If enabled then the on completion process will run asynchronously by a separate thread from a thread pool. 289 * By default this is false, meaning the on completion process will run synchronously using the same caller thread as from the route. 290 * 291 * @return the builder 292 */ 293 public OnCompletionDefinition parallelProcessing() { 294 setParallelProcessing(true); 295 return this; 296 } 297 298 /** 299 * If enabled then the on completion process will run asynchronously by a separate thread from a thread pool. 300 * By default this is false, meaning the on completion process will run synchronously using the same caller thread as from the route. 301 * 302 * @return the builder 303 */ 304 public OnCompletionDefinition parallelProcessing(boolean parallelProcessing) { 305 setParallelProcessing(parallelProcessing); 306 return this; 307 } 308 309 public List<ProcessorDefinition<?>> getOutputs() { 310 return outputs; 311 } 312 313 public void setOutputs(List<ProcessorDefinition<?>> outputs) { 314 this.outputs = outputs; 315 } 316 317 public boolean isOutputSupported() { 318 return true; 319 } 320 321 public OnCompletionMode getMode() { 322 return mode; 323 } 324 325 /** 326 * Sets the on completion mode. 327 * <p/> 328 * The default value is AfterConsumer 329 */ 330 public void setMode(OnCompletionMode mode) { 331 this.mode = mode; 332 } 333 334 public Boolean getOnCompleteOnly() { 335 return onCompleteOnly; 336 } 337 338 public void setOnCompleteOnly(Boolean onCompleteOnly) { 339 this.onCompleteOnly = onCompleteOnly; 340 } 341 342 public Boolean getOnFailureOnly() { 343 return onFailureOnly; 344 } 345 346 public void setOnFailureOnly(Boolean onFailureOnly) { 347 this.onFailureOnly = onFailureOnly; 348 } 349 350 public WhenDefinition getOnWhen() { 351 return onWhen; 352 } 353 354 public void setOnWhen(WhenDefinition onWhen) { 355 this.onWhen = onWhen; 356 } 357 358 public ExecutorService getExecutorService() { 359 return executorService; 360 } 361 362 public void setExecutorService(ExecutorService executorService) { 363 this.executorService = executorService; 364 } 365 366 public String getExecutorServiceRef() { 367 return executorServiceRef; 368 } 369 370 public void setExecutorServiceRef(String executorServiceRef) { 371 this.executorServiceRef = executorServiceRef; 372 } 373 374 public Boolean getUseOriginalMessagePolicy() { 375 return useOriginalMessagePolicy; 376 } 377 378 /** 379 * Will use the original input body when an {@link org.apache.camel.Exchange} for this on completion. 380 * <p/> 381 * By default this feature is off. 382 */ 383 public void setUseOriginalMessagePolicy(Boolean useOriginalMessagePolicy) { 384 this.useOriginalMessagePolicy = useOriginalMessagePolicy; 385 } 386 387 public Boolean getParallelProcessing() { 388 return parallelProcessing; 389 } 390 391 public void setParallelProcessing(Boolean parallelProcessing) { 392 this.parallelProcessing = parallelProcessing; 393 } 394 395}