001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.ExecutorService; 022import javax.xml.bind.annotation.XmlAccessType; 023import javax.xml.bind.annotation.XmlAccessorType; 024import javax.xml.bind.annotation.XmlAttribute; 025import javax.xml.bind.annotation.XmlRootElement; 026import javax.xml.bind.annotation.XmlTransient; 027 028import org.apache.camel.CamelContextAware; 029import org.apache.camel.Processor; 030import org.apache.camel.processor.CamelInternalProcessor; 031import org.apache.camel.processor.MulticastProcessor; 032import org.apache.camel.processor.aggregate.AggregationStrategy; 033import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 034import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; 035import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 036import org.apache.camel.spi.Metadata; 037import org.apache.camel.spi.RouteContext; 038import org.apache.camel.util.CamelContextHelper; 039 040/** 041 * Routes the same message to multiple paths either sequentially or in parallel. 042 * 043 * @version 044 */ 045@Metadata(label = "eip,routing") 046@XmlRootElement(name = "multicast") 047@XmlAccessorType(XmlAccessType.FIELD) 048public class MulticastDefinition extends OutputDefinition<MulticastDefinition> implements ExecutorServiceAwareDefinition<MulticastDefinition> { 049 @XmlAttribute 050 private Boolean parallelProcessing; 051 @XmlAttribute 052 private String strategyRef; 053 @XmlAttribute 054 private String strategyMethodName; 055 @XmlAttribute 056 private Boolean strategyMethodAllowNull; 057 @XmlTransient 058 private ExecutorService executorService; 059 @XmlAttribute 060 private String executorServiceRef; 061 @XmlAttribute 062 private Boolean streaming; 063 @XmlAttribute 064 private Boolean stopOnException; 065 @XmlAttribute @Metadata(defaultValue = "0") 066 private Long timeout; 067 @XmlTransient 068 private AggregationStrategy aggregationStrategy; 069 @XmlAttribute 070 private String onPrepareRef; 071 @XmlTransient 072 private Processor onPrepare; 073 @XmlAttribute 074 private Boolean shareUnitOfWork; 075 @XmlAttribute 076 private Boolean parallelAggregate; 077 078 public MulticastDefinition() { 079 } 080 081 @Override 082 public String toString() { 083 return "Multicast[" + getOutputs() + "]"; 084 } 085 086 @Override 087 public String getLabel() { 088 return "multicast"; 089 } 090 091 @Override 092 public Processor createProcessor(RouteContext routeContext) throws Exception { 093 Processor answer = this.createChildProcessor(routeContext, true); 094 095 // force the answer as a multicast processor even if there is only one child processor in the multicast 096 if (!(answer instanceof MulticastProcessor)) { 097 List<Processor> list = new ArrayList<Processor>(1); 098 list.add(answer); 099 answer = createCompositeProcessor(routeContext, list); 100 } 101 return answer; 102 } 103 104 // Fluent API 105 // ------------------------------------------------------------------------- 106 107 /** 108 * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast. 109 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy. 110 * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception 111 * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option. 112 */ 113 public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 114 setAggregationStrategy(aggregationStrategy); 115 return this; 116 } 117 118 /** 119 * Sets a reference to the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast. 120 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy 121 * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception 122 * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option. 123 */ 124 public MulticastDefinition aggregationStrategyRef(String aggregationStrategyRef) { 125 setStrategyRef(aggregationStrategyRef); 126 return this; 127 } 128 129 /** 130 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 131 * 132 * @param methodName the method name to call 133 * @return the builder 134 */ 135 public MulticastDefinition aggregationStrategyMethodName(String methodName) { 136 setStrategyMethodName(methodName); 137 return this; 138 } 139 140 /** 141 * If this option is false then the aggregate method is not used if there was no data to enrich. 142 * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy 143 * 144 * @return the builder 145 */ 146 public MulticastDefinition aggregationStrategyMethodAllowNull() { 147 setStrategyMethodAllowNull(true); 148 return this; 149 } 150 151 /** 152 * If enabled then sending messages to the multicasts occurs concurrently. 153 * Note the caller thread will still wait until all messages has been fully processed, before it continues. 154 * Its only the sending and processing the replies from the multicasts which happens concurrently. 155 * 156 * @return the builder 157 */ 158 public MulticastDefinition parallelProcessing() { 159 setParallelProcessing(true); 160 return this; 161 } 162 163 /** 164 * If enabled then sending messages to the multicasts occurs concurrently. 165 * Note the caller thread will still wait until all messages has been fully processed, before it continues. 166 * Its only the sending and processing the replies from the multicasts which happens concurrently. 167 * 168 * @return the builder 169 */ 170 public MulticastDefinition parallelProcessing(boolean parallelProcessing) { 171 setParallelProcessing(parallelProcessing); 172 return this; 173 } 174 175 /** 176 * If enabled then the aggregate method on AggregationStrategy can be called concurrently. 177 * Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. 178 * By default this is false meaning that Camel synchronizes the call to the aggregate method. 179 * Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe. 180 * 181 * @return the builder 182 */ 183 public MulticastDefinition parallelAggregate() { 184 setParallelAggregate(true); 185 return this; 186 } 187 188 /** 189 * If enabled then Camel will process replies out-of-order, eg in the order they come back. 190 * If disabled, Camel will process replies in the same order as defined by the multicast. 191 * 192 * @return the builder 193 */ 194 public MulticastDefinition streaming() { 195 setStreaming(true); 196 return this; 197 } 198 199 /** 200 * Will now stop further processing if an exception or failure occurred during processing of an 201 * {@link org.apache.camel.Exchange} and the caused exception will be thrown. 202 * <p/> 203 * Will also stop if processing the exchange failed (has a fault message) or an exception 204 * was thrown and handled by the error handler (such as using onException). In all situations 205 * the multicast will stop further processing. This is the same behavior as in pipeline, which 206 * is used by the routing engine. 207 * <p/> 208 * The default behavior is to <b>not</b> stop but continue processing till the end 209 * 210 * @return the builder 211 */ 212 public MulticastDefinition stopOnException() { 213 setStopOnException(true); 214 return this; 215 } 216 217 /** 218 * To use a custom Thread Pool to be used for parallel processing. 219 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 220 */ 221 public MulticastDefinition executorService(ExecutorService executorService) { 222 setExecutorService(executorService); 223 return this; 224 } 225 226 /** 227 * Refers to a custom Thread Pool to be used for parallel processing. 228 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 229 */ 230 public MulticastDefinition executorServiceRef(String executorServiceRef) { 231 setExecutorServiceRef(executorServiceRef); 232 return this; 233 } 234 235 /** 236 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 237 * This can be used to deep-clone messages that should be send, or any custom logic needed before 238 * the exchange is send. 239 * 240 * @param onPrepare the processor 241 * @return the builder 242 */ 243 public MulticastDefinition onPrepare(Processor onPrepare) { 244 setOnPrepare(onPrepare); 245 return this; 246 } 247 248 /** 249 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 250 * This can be used to deep-clone messages that should be send, or any custom logic needed before 251 * the exchange is send. 252 * 253 * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} 254 * @return the builder 255 */ 256 public MulticastDefinition onPrepareRef(String onPrepareRef) { 257 setOnPrepareRef(onPrepareRef); 258 return this; 259 } 260 261 /** 262 * Sets a total timeout specified in millis, when using parallel processing. 263 * If the Multicast hasn't been able to send and process all replies within the given timeframe, 264 * then the timeout triggers and the Multicast breaks out and continues. 265 * Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. 266 * If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel 267 * to shut down in a graceful manner may continue to run. So use this option with a bit of care. 268 * 269 * @param timeout timeout in millis 270 * @return the builder 271 */ 272 public MulticastDefinition timeout(long timeout) { 273 setTimeout(timeout); 274 return this; 275 } 276 277 /** 278 * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages. 279 * Multicast will by default not share unit of work between the parent exchange and each multicasted exchange. 280 * This means each sub exchange has its own individual unit of work. 281 * 282 * @return the builder. 283 * @see org.apache.camel.spi.SubUnitOfWork 284 */ 285 public MulticastDefinition shareUnitOfWork() { 286 setShareUnitOfWork(true); 287 return this; 288 } 289 290 protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception { 291 final AggregationStrategy strategy = createAggregationStrategy(routeContext); 292 293 boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing(); 294 boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); 295 boolean isStreaming = getStreaming() != null && getStreaming(); 296 boolean isStopOnException = getStopOnException() != null && getStopOnException(); 297 boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate(); 298 299 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing); 300 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing); 301 302 long timeout = getTimeout() != null ? getTimeout() : 0; 303 if (timeout > 0 && !isParallelProcessing) { 304 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); 305 } 306 if (onPrepareRef != null) { 307 onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); 308 } 309 310 MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, strategy, isParallelProcessing, 311 threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, isParallelAggregate); 312 return answer; 313 } 314 315 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 316 AggregationStrategy strategy = getAggregationStrategy(); 317 if (strategy == null && strategyRef != null) { 318 Object aggStrategy = routeContext.lookup(strategyRef, Object.class); 319 if (aggStrategy instanceof AggregationStrategy) { 320 strategy = (AggregationStrategy) aggStrategy; 321 } else if (aggStrategy != null) { 322 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName()); 323 if (getStrategyMethodAllowNull() != null) { 324 adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); 325 adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); 326 } 327 strategy = adapter; 328 } else { 329 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); 330 } 331 } 332 333 if (strategy == null) { 334 // default to use latest aggregation strategy 335 strategy = new UseLatestAggregationStrategy(); 336 } 337 338 if (strategy instanceof CamelContextAware) { 339 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 340 } 341 342 if (shareUnitOfWork != null && shareUnitOfWork) { 343 // wrap strategy in share unit of work 344 strategy = new ShareUnitOfWorkAggregationStrategy(strategy); 345 } 346 347 return strategy; 348 } 349 350 public AggregationStrategy getAggregationStrategy() { 351 return aggregationStrategy; 352 } 353 354 public MulticastDefinition setAggregationStrategy(AggregationStrategy aggregationStrategy) { 355 this.aggregationStrategy = aggregationStrategy; 356 return this; 357 } 358 359 public Boolean getParallelProcessing() { 360 return parallelProcessing; 361 } 362 363 public void setParallelProcessing(Boolean parallelProcessing) { 364 this.parallelProcessing = parallelProcessing; 365 } 366 367 public Boolean getStreaming() { 368 return streaming; 369 } 370 371 public void setStreaming(Boolean streaming) { 372 this.streaming = streaming; 373 } 374 375 public Boolean getStopOnException() { 376 return stopOnException; 377 } 378 379 public void setStopOnException(Boolean stopOnException) { 380 this.stopOnException = stopOnException; 381 } 382 383 public ExecutorService getExecutorService() { 384 return executorService; 385 } 386 387 public void setExecutorService(ExecutorService executorService) { 388 this.executorService = executorService; 389 } 390 391 public String getStrategyRef() { 392 return strategyRef; 393 } 394 395 /** 396 * Refers to an AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast. 397 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy 398 */ 399 public void setStrategyRef(String strategyRef) { 400 this.strategyRef = strategyRef; 401 } 402 403 public String getStrategyMethodName() { 404 return strategyMethodName; 405 } 406 407 /** 408 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 409 */ 410 public void setStrategyMethodName(String strategyMethodName) { 411 this.strategyMethodName = strategyMethodName; 412 } 413 414 public Boolean getStrategyMethodAllowNull() { 415 return strategyMethodAllowNull; 416 } 417 418 /** 419 * If this option is false then the aggregate method is not used if there was no data to enrich. 420 * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy 421 */ 422 public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { 423 this.strategyMethodAllowNull = strategyMethodAllowNull; 424 } 425 426 public String getExecutorServiceRef() { 427 return executorServiceRef; 428 } 429 430 /** 431 * Refers to a custom Thread Pool to be used for parallel processing. 432 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 433 */ 434 public void setExecutorServiceRef(String executorServiceRef) { 435 this.executorServiceRef = executorServiceRef; 436 } 437 438 public Long getTimeout() { 439 return timeout; 440 } 441 442 public void setTimeout(Long timeout) { 443 this.timeout = timeout; 444 } 445 446 public String getOnPrepareRef() { 447 return onPrepareRef; 448 } 449 450 public void setOnPrepareRef(String onPrepareRef) { 451 this.onPrepareRef = onPrepareRef; 452 } 453 454 public Processor getOnPrepare() { 455 return onPrepare; 456 } 457 458 public void setOnPrepare(Processor onPrepare) { 459 this.onPrepare = onPrepare; 460 } 461 462 public Boolean getShareUnitOfWork() { 463 return shareUnitOfWork; 464 } 465 466 public void setShareUnitOfWork(Boolean shareUnitOfWork) { 467 this.shareUnitOfWork = shareUnitOfWork; 468 } 469 470 public Boolean getParallelAggregate() { 471 return parallelAggregate; 472 } 473 474 public void setParallelAggregate(Boolean parallelAggregate) { 475 this.parallelAggregate = parallelAggregate; 476 } 477 478}