001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.ExecutorService; 022 023import javax.xml.bind.annotation.XmlAccessType; 024import javax.xml.bind.annotation.XmlAccessorType; 025import javax.xml.bind.annotation.XmlAttribute; 026import javax.xml.bind.annotation.XmlRootElement; 027import javax.xml.bind.annotation.XmlTransient; 028 029import org.apache.camel.CamelContextAware; 030import org.apache.camel.Processor; 031import org.apache.camel.builder.AggregationStrategyClause; 032import org.apache.camel.builder.ProcessClause; 033import org.apache.camel.processor.MulticastProcessor; 034import org.apache.camel.processor.aggregate.AggregationStrategy; 035import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 036import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; 037import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 038import org.apache.camel.spi.Metadata; 039import org.apache.camel.spi.RouteContext; 040import org.apache.camel.util.CamelContextHelper; 041 042/** 043 * Routes the same message to multiple paths either sequentially or in parallel. 044 * 045 * @version 046 */ 047@Metadata(label = "eip,routing") 048@XmlRootElement(name = "multicast") 049@XmlAccessorType(XmlAccessType.FIELD) 050public class MulticastDefinition extends OutputDefinition<MulticastDefinition> implements ExecutorServiceAwareDefinition<MulticastDefinition> { 051 @XmlAttribute 052 private Boolean parallelProcessing; 053 @XmlAttribute 054 private String strategyRef; 055 @XmlAttribute 056 private String strategyMethodName; 057 @XmlAttribute 058 private Boolean strategyMethodAllowNull; 059 @XmlTransient 060 private ExecutorService executorService; 061 @XmlAttribute 062 private String executorServiceRef; 063 @XmlAttribute 064 private Boolean streaming; 065 @XmlAttribute 066 private Boolean stopOnException; 067 @XmlAttribute @Metadata(defaultValue = "0") 068 private Long timeout; 069 @XmlTransient 070 private AggregationStrategy aggregationStrategy; 071 @XmlAttribute 072 private String onPrepareRef; 073 @XmlTransient 074 private Processor onPrepare; 075 @XmlAttribute 076 private Boolean shareUnitOfWork; 077 @XmlAttribute 078 private Boolean parallelAggregate; 079 @XmlAttribute 080 private Boolean stopOnAggregateException; 081 082 public MulticastDefinition() { 083 } 084 085 @Override 086 public String toString() { 087 return "Multicast[" + getOutputs() + "]"; 088 } 089 090 @Override 091 public String getShortName() { 092 return "multicast"; 093 } 094 095 @Override 096 public String getLabel() { 097 return "multicast"; 098 } 099 100 @Override 101 public Processor createProcessor(RouteContext routeContext) throws Exception { 102 Processor answer = this.createChildProcessor(routeContext, true); 103 104 // force the answer as a multicast processor even if there is only one child processor in the multicast 105 if (!(answer instanceof MulticastProcessor)) { 106 List<Processor> list = new ArrayList<>(1); 107 list.add(answer); 108 answer = createCompositeProcessor(routeContext, list); 109 } 110 return answer; 111 } 112 113 // Fluent API 114 // ------------------------------------------------------------------------- 115 116 /** 117 * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast using a fluent builder. 118 */ 119 public AggregationStrategyClause<MulticastDefinition> aggregationStrategy() { 120 AggregationStrategyClause<MulticastDefinition> clause = new AggregationStrategyClause<>(this); 121 setAggregationStrategy(clause); 122 return clause; 123 } 124 125 /** 126 * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast. 127 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy. 128 * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception 129 * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option. 130 */ 131 public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 132 setAggregationStrategy(aggregationStrategy); 133 return this; 134 } 135 136 /** 137 * Sets a reference to the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast. 138 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy 139 * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception 140 * is not handled by the error handler. The error handler can be enabled to react if enabling the shareUnitOfWork option. 141 */ 142 public MulticastDefinition aggregationStrategyRef(String aggregationStrategyRef) { 143 setStrategyRef(aggregationStrategyRef); 144 return this; 145 } 146 147 /** 148 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 149 * 150 * @param methodName the method name to call 151 * @return the builder 152 */ 153 public MulticastDefinition aggregationStrategyMethodName(String methodName) { 154 setStrategyMethodName(methodName); 155 return this; 156 } 157 158 /** 159 * If this option is false then the aggregate method is not used if there was no data to enrich. 160 * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy 161 * 162 * @return the builder 163 */ 164 public MulticastDefinition aggregationStrategyMethodAllowNull() { 165 setStrategyMethodAllowNull(true); 166 return this; 167 } 168 169 /** 170 * If enabled then sending messages to the multicasts occurs concurrently. 171 * Note the caller thread will still wait until all messages has been fully processed, before it continues. 172 * Its only the sending and processing the replies from the multicasts which happens concurrently. 173 * 174 * @return the builder 175 */ 176 public MulticastDefinition parallelProcessing() { 177 setParallelProcessing(true); 178 return this; 179 } 180 181 /** 182 * If enabled then sending messages to the multicasts occurs concurrently. 183 * Note the caller thread will still wait until all messages has been fully processed, before it continues. 184 * Its only the sending and processing the replies from the multicasts which happens concurrently. 185 * 186 * @return the builder 187 */ 188 public MulticastDefinition parallelProcessing(boolean parallelProcessing) { 189 setParallelProcessing(parallelProcessing); 190 return this; 191 } 192 193 /** 194 * If enabled then the aggregate method on AggregationStrategy can be called concurrently. 195 * Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. 196 * By default this is false meaning that Camel synchronizes the call to the aggregate method. 197 * Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe. 198 * 199 * @return the builder 200 */ 201 public MulticastDefinition parallelAggregate() { 202 setParallelAggregate(true); 203 return this; 204 } 205 206 /** 207 * If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. 208 * Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used. 209 * Enabling this option allows to work around this behavior. 210 * 211 * The default value is <code>false</code> for the sake of backward compatibility. 212 * 213 * @return the builder 214 */ 215 public MulticastDefinition stopOnAggregateException() { 216 setStopOnAggregateException(true); 217 return this; 218 } 219 220 /** 221 * If enabled then Camel will process replies out-of-order, eg in the order they come back. 222 * If disabled, Camel will process replies in the same order as defined by the multicast. 223 * 224 * @return the builder 225 */ 226 public MulticastDefinition streaming() { 227 setStreaming(true); 228 return this; 229 } 230 231 /** 232 * Will now stop further processing if an exception or failure occurred during processing of an 233 * {@link org.apache.camel.Exchange} and the caused exception will be thrown. 234 * <p/> 235 * Will also stop if processing the exchange failed (has a fault message) or an exception 236 * was thrown and handled by the error handler (such as using onException). In all situations 237 * the multicast will stop further processing. This is the same behavior as in pipeline, which 238 * is used by the routing engine. 239 * <p/> 240 * The default behavior is to <b>not</b> stop but continue processing till the end 241 * 242 * @return the builder 243 */ 244 public MulticastDefinition stopOnException() { 245 setStopOnException(true); 246 return this; 247 } 248 249 /** 250 * To use a custom Thread Pool to be used for parallel processing. 251 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 252 */ 253 public MulticastDefinition executorService(ExecutorService executorService) { 254 setExecutorService(executorService); 255 return this; 256 } 257 258 /** 259 * Refers to a custom Thread Pool to be used for parallel processing. 260 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 261 */ 262 public MulticastDefinition executorServiceRef(String executorServiceRef) { 263 setExecutorServiceRef(executorServiceRef); 264 return this; 265 } 266 267 /** 268 * Set the {@link Processor} to use when preparing the {@link org.apache.camel.Exchange} to be send using a fluent builder. 269 */ 270 public ProcessClause<MulticastDefinition> onPrepare() { 271 ProcessClause<MulticastDefinition> clause = new ProcessClause<>(this); 272 setOnPrepare(clause); 273 return clause; 274 } 275 276 /** 277 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 278 * This can be used to deep-clone messages that should be send, or any custom logic needed before 279 * the exchange is send. 280 * 281 * @param onPrepare the processor 282 * @return the builder 283 */ 284 public MulticastDefinition onPrepare(Processor onPrepare) { 285 setOnPrepare(onPrepare); 286 return this; 287 } 288 289 /** 290 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 291 * This can be used to deep-clone messages that should be send, or any custom logic needed before 292 * the exchange is send. 293 * 294 * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} 295 * @return the builder 296 */ 297 public MulticastDefinition onPrepareRef(String onPrepareRef) { 298 setOnPrepareRef(onPrepareRef); 299 return this; 300 } 301 302 /** 303 * Sets a total timeout specified in millis, when using parallel processing. 304 * If the Multicast hasn't been able to send and process all replies within the given timeframe, 305 * then the timeout triggers and the Multicast breaks out and continues. 306 * Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. 307 * If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel 308 * to shut down in a graceful manner may continue to run. So use this option with a bit of care. 309 * 310 * @param timeout timeout in millis 311 * @return the builder 312 */ 313 public MulticastDefinition timeout(long timeout) { 314 setTimeout(timeout); 315 return this; 316 } 317 318 /** 319 * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages. 320 * Multicast will by default not share unit of work between the parent exchange and each multicasted exchange. 321 * This means each sub exchange has its own individual unit of work. 322 * 323 * @return the builder. 324 * @see org.apache.camel.spi.SubUnitOfWork 325 */ 326 public MulticastDefinition shareUnitOfWork() { 327 setShareUnitOfWork(true); 328 return this; 329 } 330 331 protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception { 332 final AggregationStrategy strategy = createAggregationStrategy(routeContext); 333 334 boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing(); 335 boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); 336 boolean isStreaming = getStreaming() != null && getStreaming(); 337 boolean isStopOnException = getStopOnException() != null && getStopOnException(); 338 boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate(); 339 boolean isStopOnAggregateException = getStopOnAggregateException() != null && getStopOnAggregateException(); 340 341 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing); 342 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing); 343 344 long timeout = getTimeout() != null ? getTimeout() : 0; 345 if (timeout > 0 && !isParallelProcessing) { 346 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); 347 } 348 if (onPrepareRef != null) { 349 onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); 350 } 351 352 MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, strategy, isParallelProcessing, 353 threadPool, shutdownThreadPool, isStreaming, isStopOnException, timeout, onPrepare, isShareUnitOfWork, isParallelAggregate, isStopOnAggregateException); 354 return answer; 355 } 356 357 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 358 AggregationStrategy strategy = getAggregationStrategy(); 359 if (strategy == null && strategyRef != null) { 360 Object aggStrategy = routeContext.lookup(strategyRef, Object.class); 361 if (aggStrategy instanceof AggregationStrategy) { 362 strategy = (AggregationStrategy) aggStrategy; 363 } else if (aggStrategy != null) { 364 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName()); 365 if (getStrategyMethodAllowNull() != null) { 366 adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); 367 adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); 368 } 369 strategy = adapter; 370 } else { 371 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); 372 } 373 } 374 375 if (strategy == null) { 376 // default to use latest aggregation strategy 377 strategy = new UseLatestAggregationStrategy(); 378 } 379 380 if (strategy instanceof CamelContextAware) { 381 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 382 } 383 384 if (shareUnitOfWork != null && shareUnitOfWork) { 385 // wrap strategy in share unit of work 386 strategy = new ShareUnitOfWorkAggregationStrategy(strategy); 387 } 388 389 return strategy; 390 } 391 392 public AggregationStrategy getAggregationStrategy() { 393 return aggregationStrategy; 394 } 395 396 public MulticastDefinition setAggregationStrategy(AggregationStrategy aggregationStrategy) { 397 this.aggregationStrategy = aggregationStrategy; 398 return this; 399 } 400 401 public Boolean getParallelProcessing() { 402 return parallelProcessing; 403 } 404 405 public void setParallelProcessing(Boolean parallelProcessing) { 406 this.parallelProcessing = parallelProcessing; 407 } 408 409 public Boolean getStreaming() { 410 return streaming; 411 } 412 413 public void setStreaming(Boolean streaming) { 414 this.streaming = streaming; 415 } 416 417 public Boolean getStopOnException() { 418 return stopOnException; 419 } 420 421 public void setStopOnException(Boolean stopOnException) { 422 this.stopOnException = stopOnException; 423 } 424 425 public ExecutorService getExecutorService() { 426 return executorService; 427 } 428 429 public void setExecutorService(ExecutorService executorService) { 430 this.executorService = executorService; 431 } 432 433 public String getStrategyRef() { 434 return strategyRef; 435 } 436 437 /** 438 * Refers to an AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast. 439 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy 440 */ 441 public void setStrategyRef(String strategyRef) { 442 this.strategyRef = strategyRef; 443 } 444 445 public String getStrategyMethodName() { 446 return strategyMethodName; 447 } 448 449 /** 450 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 451 */ 452 public void setStrategyMethodName(String strategyMethodName) { 453 this.strategyMethodName = strategyMethodName; 454 } 455 456 public Boolean getStrategyMethodAllowNull() { 457 return strategyMethodAllowNull; 458 } 459 460 /** 461 * If this option is false then the aggregate method is not used if there was no data to enrich. 462 * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy 463 */ 464 public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { 465 this.strategyMethodAllowNull = strategyMethodAllowNull; 466 } 467 468 public String getExecutorServiceRef() { 469 return executorServiceRef; 470 } 471 472 /** 473 * Refers to a custom Thread Pool to be used for parallel processing. 474 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 475 */ 476 public void setExecutorServiceRef(String executorServiceRef) { 477 this.executorServiceRef = executorServiceRef; 478 } 479 480 public Long getTimeout() { 481 return timeout; 482 } 483 484 public void setTimeout(Long timeout) { 485 this.timeout = timeout; 486 } 487 488 public String getOnPrepareRef() { 489 return onPrepareRef; 490 } 491 492 public void setOnPrepareRef(String onPrepareRef) { 493 this.onPrepareRef = onPrepareRef; 494 } 495 496 public Processor getOnPrepare() { 497 return onPrepare; 498 } 499 500 public void setOnPrepare(Processor onPrepare) { 501 this.onPrepare = onPrepare; 502 } 503 504 public Boolean getShareUnitOfWork() { 505 return shareUnitOfWork; 506 } 507 508 public void setShareUnitOfWork(Boolean shareUnitOfWork) { 509 this.shareUnitOfWork = shareUnitOfWork; 510 } 511 512 public Boolean getParallelAggregate() { 513 return parallelAggregate; 514 } 515 516 public void setParallelAggregate(Boolean parallelAggregate) { 517 this.parallelAggregate = parallelAggregate; 518 } 519 520 public Boolean getStopOnAggregateException() { 521 return stopOnAggregateException; 522 } 523 524 public void setStopOnAggregateException(Boolean stopOnAggregateException) { 525 this.stopOnAggregateException = stopOnAggregateException; 526 } 527 528}