001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.ExecutorService; 022 023import javax.xml.bind.annotation.XmlAccessType; 024import javax.xml.bind.annotation.XmlAccessorType; 025import javax.xml.bind.annotation.XmlAttribute; 026import javax.xml.bind.annotation.XmlRootElement; 027import javax.xml.bind.annotation.XmlTransient; 028 029import org.apache.camel.CamelContextAware; 030import org.apache.camel.Expression; 031import org.apache.camel.Processor; 032import org.apache.camel.builder.ProcessClause; 033import org.apache.camel.model.language.ExpressionDefinition; 034import org.apache.camel.processor.EvaluateExpressionProcessor; 035import org.apache.camel.processor.Pipeline; 036import org.apache.camel.processor.RecipientList; 037import org.apache.camel.processor.aggregate.AggregationStrategy; 038import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 039import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; 040import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 041import org.apache.camel.spi.Metadata; 042import org.apache.camel.spi.RouteContext; 043import org.apache.camel.util.CamelContextHelper; 044 045/** 046 * Routes messages to a number of dynamically specified recipients (dynamic to) 047 * 048 * @version 049 */ 050@Metadata(label = "eip,endpoint,routing") 051@XmlRootElement(name = "recipientList") 052@XmlAccessorType(XmlAccessType.FIELD) 053public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> extends NoOutputExpressionNode implements ExecutorServiceAwareDefinition<RecipientListDefinition<Type>> { 054 @XmlTransient 055 private AggregationStrategy aggregationStrategy; 056 @XmlTransient 057 private ExecutorService executorService; 058 @XmlAttribute @Metadata(defaultValue = ",") 059 private String delimiter; 060 @XmlAttribute 061 private Boolean parallelProcessing; 062 @XmlAttribute 063 private String strategyRef; 064 @XmlAttribute 065 private String strategyMethodName; 066 @XmlAttribute 067 private Boolean strategyMethodAllowNull; 068 @XmlAttribute 069 private String executorServiceRef; 070 @XmlAttribute 071 private Boolean stopOnException; 072 @XmlAttribute 073 private Boolean ignoreInvalidEndpoints; 074 @XmlAttribute 075 private Boolean streaming; 076 @XmlAttribute @Metadata(defaultValue = "0") 077 private Long timeout; 078 @XmlAttribute 079 private String onPrepareRef; 080 @XmlTransient 081 private Processor onPrepare; 082 @XmlAttribute 083 private Boolean shareUnitOfWork; 084 @XmlAttribute 085 private Integer cacheSize; 086 @XmlAttribute 087 private Boolean parallelAggregate; 088 @XmlAttribute 089 private Boolean stopOnAggregateException; 090 091 public RecipientListDefinition() { 092 } 093 094 public RecipientListDefinition(ExpressionDefinition expression) { 095 super(expression); 096 } 097 098 public RecipientListDefinition(Expression expression) { 099 super(expression); 100 } 101 102 @Override 103 public String toString() { 104 return "RecipientList[" + getExpression() + "]"; 105 } 106 107 @Override 108 public String getShortName() { 109 return "recipientList"; 110 } 111 112 @Override 113 public String getLabel() { 114 return "recipientList[" + getExpression() + "]"; 115 } 116 117 @Override 118 public Processor createProcessor(RouteContext routeContext) throws Exception { 119 final Expression expression = getExpression().createExpression(routeContext); 120 121 boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing(); 122 boolean isStreaming = getStreaming() != null && getStreaming(); 123 boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate(); 124 boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); 125 boolean isStopOnException = getStopOnException() != null && getStopOnException(); 126 boolean isIgnoreInvalidEndpoints = getIgnoreInvalidEndpoints() != null && getIgnoreInvalidEndpoints(); 127 boolean isStopOnAggregateException = getStopOnAggregateException() != null && getStopOnAggregateException(); 128 129 RecipientList answer; 130 if (delimiter != null) { 131 answer = new RecipientList(routeContext.getCamelContext(), expression, delimiter); 132 } else { 133 answer = new RecipientList(routeContext.getCamelContext(), expression); 134 } 135 answer.setAggregationStrategy(createAggregationStrategy(routeContext)); 136 answer.setParallelProcessing(isParallelProcessing); 137 answer.setParallelAggregate(isParallelAggregate); 138 answer.setStreaming(isStreaming); 139 answer.setShareUnitOfWork(isShareUnitOfWork); 140 answer.setStopOnException(isStopOnException); 141 answer.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints); 142 answer.setStopOnAggregateException(isStopOnAggregateException); 143 if (getCacheSize() != null) { 144 answer.setCacheSize(getCacheSize()); 145 } 146 if (onPrepareRef != null) { 147 onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); 148 } 149 if (onPrepare != null) { 150 answer.setOnPrepare(onPrepare); 151 } 152 if (getTimeout() != null) { 153 answer.setTimeout(getTimeout()); 154 } 155 156 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing); 157 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing); 158 answer.setExecutorService(threadPool); 159 answer.setShutdownExecutorService(shutdownThreadPool); 160 long timeout = getTimeout() != null ? getTimeout() : 0; 161 if (timeout > 0 && !isParallelProcessing) { 162 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); 163 } 164 165 // create a pipeline with two processors 166 // the first is the eval processor which evaluates the expression to use 167 // the second is the recipient list 168 List<Processor> pipe = new ArrayList<>(2); 169 170 // the eval processor must be wrapped in error handler, so in case there was an 171 // error during evaluation, the error handler can deal with it 172 // the recipient list is not in error handler, as its has its own special error handling 173 // when sending to the recipients individually 174 Processor evalProcessor = new EvaluateExpressionProcessor(expression); 175 evalProcessor = super.wrapInErrorHandler(routeContext, evalProcessor); 176 177 pipe.add(evalProcessor); 178 pipe.add(answer); 179 180 // wrap in nested pipeline so this appears as one processor 181 // (threads definition does this as well) 182 return new Pipeline(routeContext.getCamelContext(), pipe) { 183 @Override 184 public String toString() { 185 return "RecipientList[" + expression + "]"; 186 } 187 }; 188 } 189 190 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 191 AggregationStrategy strategy = getAggregationStrategy(); 192 if (strategy == null && strategyRef != null) { 193 Object aggStrategy = routeContext.lookup(strategyRef, Object.class); 194 if (aggStrategy instanceof AggregationStrategy) { 195 strategy = (AggregationStrategy) aggStrategy; 196 } else if (aggStrategy != null) { 197 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName()); 198 if (getStrategyMethodAllowNull() != null) { 199 adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); 200 adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); 201 } 202 strategy = adapter; 203 } else { 204 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); 205 } 206 } 207 208 if (strategy == null) { 209 // default to use latest aggregation strategy 210 strategy = new UseLatestAggregationStrategy(); 211 } 212 213 if (strategy instanceof CamelContextAware) { 214 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 215 } 216 217 if (shareUnitOfWork != null && shareUnitOfWork) { 218 // wrap strategy in share unit of work 219 strategy = new ShareUnitOfWorkAggregationStrategy(strategy); 220 } 221 222 return strategy; 223 } 224 225 // Fluent API 226 // ------------------------------------------------------------------------- 227 228 @Override 229 @SuppressWarnings("unchecked") 230 public Type end() { 231 // allow end() to return to previous type so you can continue in the DSL 232 return (Type) super.end(); 233 } 234 235 /** 236 * Delimiter used if the Expression returned multiple endpoints. Can be turned off using the value <tt>false</tt>. 237 * <p/> 238 * The default value is , 239 * 240 * @param delimiter the delimiter 241 * @return the builder 242 */ 243 public RecipientListDefinition<Type> delimiter(String delimiter) { 244 setDelimiter(delimiter); 245 return this; 246 } 247 248 /** 249 * Sets the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList. 250 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy 251 */ 252 public RecipientListDefinition<Type> aggregationStrategy(AggregationStrategy aggregationStrategy) { 253 setAggregationStrategy(aggregationStrategy); 254 return this; 255 } 256 257 /** 258 * Sets a reference to the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList. 259 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy 260 */ 261 public RecipientListDefinition<Type> aggregationStrategyRef(String aggregationStrategyRef) { 262 setStrategyRef(aggregationStrategyRef); 263 return this; 264 } 265 266 /** 267 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 268 * 269 * @param methodName the method name to call 270 * @return the builder 271 */ 272 public RecipientListDefinition<Type> aggregationStrategyMethodName(String methodName) { 273 setStrategyMethodName(methodName); 274 return this; 275 } 276 277 /** 278 * If this option is false then the aggregate method is not used if there was no data to enrich. 279 * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy 280 * 281 * @return the builder 282 */ 283 public RecipientListDefinition<Type> aggregationStrategyMethodAllowNull() { 284 setStrategyMethodAllowNull(true); 285 return this; 286 } 287 288 /** 289 * Ignore the invalidate endpoint exception when try to create a producer with that endpoint 290 * 291 * @return the builder 292 */ 293 public RecipientListDefinition<Type> ignoreInvalidEndpoints() { 294 setIgnoreInvalidEndpoints(true); 295 return this; 296 } 297 298 /** 299 * If enabled then sending messages to the recipients occurs concurrently. 300 * Note the caller thread will still wait until all messages has been fully processed, before it continues. 301 * Its only the sending and processing the replies from the recipients which happens concurrently. 302 * 303 * @return the builder 304 */ 305 public RecipientListDefinition<Type> parallelProcessing() { 306 setParallelProcessing(true); 307 return this; 308 } 309 310 /** 311 * If enabled then sending messages to the recipients occurs concurrently. 312 * Note the caller thread will still wait until all messages has been fully processed, before it continues. 313 * Its only the sending and processing the replies from the recipients which happens concurrently. 314 * 315 * @return the builder 316 */ 317 public RecipientListDefinition<Type> parallelProcessing(boolean parallelProcessing) { 318 setParallelProcessing(parallelProcessing); 319 return this; 320 } 321 322 /** 323 * If enabled then the aggregate method on AggregationStrategy can be called concurrently. 324 * Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. 325 * By default this is false meaning that Camel synchronizes the call to the aggregate method. 326 * Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe. 327 * 328 * @return the builder 329 */ 330 public RecipientListDefinition<Type> parallelAggregate() { 331 setParallelAggregate(true); 332 return this; 333 } 334 335 /** 336 * If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. 337 * Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used. 338 * Enabling this option allows to work around this behavior. 339 * 340 * The default value is <code>false</code> for the sake of backward compatibility. 341 * 342 * @return the builder 343 */ 344 public RecipientListDefinition<Type> stopOnAggregateException() { 345 setStopOnAggregateException(true); 346 return this; 347 } 348 349 /** 350 * If enabled then Camel will process replies out-of-order, eg in the order they come back. 351 * If disabled, Camel will process replies in the same order as defined by the recipient list. 352 * 353 * @return the builder 354 */ 355 public RecipientListDefinition<Type> streaming() { 356 setStreaming(true); 357 return this; 358 } 359 360 /** 361 * Will now stop further processing if an exception or failure occurred during processing of an 362 * {@link org.apache.camel.Exchange} and the caused exception will be thrown. 363 * <p/> 364 * Will also stop if processing the exchange failed (has a fault message) or an exception 365 * was thrown and handled by the error handler (such as using onException). In all situations 366 * the recipient list will stop further processing. This is the same behavior as in pipeline, which 367 * is used by the routing engine. 368 * <p/> 369 * The default behavior is to <b>not</b> stop but continue processing till the end 370 * 371 * @return the builder 372 */ 373 public RecipientListDefinition<Type> stopOnException() { 374 setStopOnException(true); 375 return this; 376 } 377 378 /** 379 * To use a custom Thread Pool to be used for parallel processing. 380 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 381 */ 382 public RecipientListDefinition<Type> executorService(ExecutorService executorService) { 383 setExecutorService(executorService); 384 return this; 385 } 386 387 /** 388 * Refers to a custom Thread Pool to be used for parallel processing. 389 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 390 */ 391 public RecipientListDefinition<Type> executorServiceRef(String executorServiceRef) { 392 setExecutorServiceRef(executorServiceRef); 393 return this; 394 } 395 396 /** 397 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send. 398 * This can be used to deep-clone messages that should be send, or any custom logic needed before 399 * the exchange is send. 400 * 401 * @param onPrepare the processor 402 * @return the builder 403 */ 404 public RecipientListDefinition<Type> onPrepare(Processor onPrepare) { 405 setOnPrepare(onPrepare); 406 return this; 407 } 408 409 /** 410 * Sets the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send using a fluent buidler. 411 */ 412 public ProcessClause<RecipientListDefinition<Type>> onPrepare() { 413 ProcessClause<RecipientListDefinition<Type>> clause = new ProcessClause<>(this); 414 setOnPrepare(clause); 415 return clause; 416 } 417 418 /** 419 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 420 * This can be used to deep-clone messages that should be send, or any custom logic needed before 421 * the exchange is send. 422 * 423 * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} 424 * @return the builder 425 */ 426 public RecipientListDefinition<Type> onPrepareRef(String onPrepareRef) { 427 setOnPrepareRef(onPrepareRef); 428 return this; 429 } 430 431 /** 432 * Sets a total timeout specified in millis, when using parallel processing. 433 * If the Recipient List hasn't been able to send and process all replies within the given timeframe, 434 * then the timeout triggers and the Recipient List breaks out and continues. 435 * Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. 436 * If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel 437 * to shut down in a graceful manner may continue to run. So use this option with a bit of care. 438 * 439 * @param timeout timeout in millis 440 * @return the builder 441 */ 442 public RecipientListDefinition<Type> timeout(long timeout) { 443 setTimeout(timeout); 444 return this; 445 } 446 447 /** 448 * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages. 449 * Recipient List will by default not share unit of work between the parent exchange and each recipient exchange. 450 * This means each sub exchange has its own individual unit of work. 451 * 452 * @return the builder. 453 * @see org.apache.camel.spi.SubUnitOfWork 454 */ 455 public RecipientListDefinition<Type> shareUnitOfWork() { 456 setShareUnitOfWork(true); 457 return this; 458 } 459 460 /** 461 * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used 462 * to cache and reuse producers when using this recipient list, when uris are reused. 463 * 464 * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. 465 * @return the builder 466 */ 467 public RecipientListDefinition<Type> cacheSize(int cacheSize) { 468 setCacheSize(cacheSize); 469 return this; 470 } 471 472 // Properties 473 //------------------------------------------------------------------------- 474 475 476 /** 477 * Expression that returns which endpoints (url) to send the message to (the recipients). 478 * If the expression return an empty value then the message is not sent to any recipients. 479 */ 480 @Override 481 public void setExpression(ExpressionDefinition expression) { 482 // override to include javadoc what the expression is used for 483 super.setExpression(expression); 484 } 485 486 public String getDelimiter() { 487 return delimiter; 488 } 489 490 public void setDelimiter(String delimiter) { 491 this.delimiter = delimiter; 492 } 493 494 public Boolean getParallelProcessing() { 495 return parallelProcessing; 496 } 497 498 public void setParallelProcessing(Boolean parallelProcessing) { 499 this.parallelProcessing = parallelProcessing; 500 } 501 502 public String getStrategyRef() { 503 return strategyRef; 504 } 505 506 /** 507 * Sets a reference to the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList. 508 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy 509 */ 510 public void setStrategyRef(String strategyRef) { 511 this.strategyRef = strategyRef; 512 } 513 514 public String getStrategyMethodName() { 515 return strategyMethodName; 516 } 517 518 /** 519 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 520 */ 521 public void setStrategyMethodName(String strategyMethodName) { 522 this.strategyMethodName = strategyMethodName; 523 } 524 525 public Boolean getStrategyMethodAllowNull() { 526 return strategyMethodAllowNull; 527 } 528 529 /** 530 * If this option is false then the aggregate method is not used if there was no data to enrich. 531 * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy 532 */ 533 public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { 534 this.strategyMethodAllowNull = strategyMethodAllowNull; 535 } 536 537 public String getExecutorServiceRef() { 538 return executorServiceRef; 539 } 540 541 public void setExecutorServiceRef(String executorServiceRef) { 542 this.executorServiceRef = executorServiceRef; 543 } 544 545 public Boolean getIgnoreInvalidEndpoints() { 546 return ignoreInvalidEndpoints; 547 } 548 549 public void setIgnoreInvalidEndpoints(Boolean ignoreInvalidEndpoints) { 550 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 551 } 552 553 public Boolean getStopOnException() { 554 return stopOnException; 555 } 556 557 public void setStopOnException(Boolean stopOnException) { 558 this.stopOnException = stopOnException; 559 } 560 561 public AggregationStrategy getAggregationStrategy() { 562 return aggregationStrategy; 563 } 564 565 /** 566 * Sets the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList. 567 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy 568 */ 569 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 570 this.aggregationStrategy = aggregationStrategy; 571 } 572 573 public ExecutorService getExecutorService() { 574 return executorService; 575 } 576 577 public void setExecutorService(ExecutorService executorService) { 578 this.executorService = executorService; 579 } 580 581 public Boolean getStreaming() { 582 return streaming; 583 } 584 585 public void setStreaming(Boolean streaming) { 586 this.streaming = streaming; 587 } 588 589 public Long getTimeout() { 590 return timeout; 591 } 592 593 public void setTimeout(Long timeout) { 594 this.timeout = timeout; 595 } 596 597 public String getOnPrepareRef() { 598 return onPrepareRef; 599 } 600 601 public void setOnPrepareRef(String onPrepareRef) { 602 this.onPrepareRef = onPrepareRef; 603 } 604 605 public Processor getOnPrepare() { 606 return onPrepare; 607 } 608 609 public void setOnPrepare(Processor onPrepare) { 610 this.onPrepare = onPrepare; 611 } 612 613 public Boolean getShareUnitOfWork() { 614 return shareUnitOfWork; 615 } 616 617 public void setShareUnitOfWork(Boolean shareUnitOfWork) { 618 this.shareUnitOfWork = shareUnitOfWork; 619 } 620 621 public Integer getCacheSize() { 622 return cacheSize; 623 } 624 625 public void setCacheSize(Integer cacheSize) { 626 this.cacheSize = cacheSize; 627 } 628 629 public Boolean getParallelAggregate() { 630 return parallelAggregate; 631 } 632 633 public void setParallelAggregate(Boolean parallelAggregate) { 634 this.parallelAggregate = parallelAggregate; 635 } 636 637 public Boolean getStopOnAggregateException() { 638 return stopOnAggregateException; 639 } 640 641 public void setStopOnAggregateException(Boolean stopOnAggregateException) { 642 this.stopOnAggregateException = stopOnAggregateException; 643 } 644}