001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.model; 018 019import java.util.ArrayList; 020import java.util.List; 021import java.util.concurrent.ExecutorService; 022import javax.xml.bind.annotation.XmlAccessType; 023import javax.xml.bind.annotation.XmlAccessorType; 024import javax.xml.bind.annotation.XmlAttribute; 025import javax.xml.bind.annotation.XmlRootElement; 026import javax.xml.bind.annotation.XmlTransient; 027 028import org.apache.camel.CamelContextAware; 029import org.apache.camel.Expression; 030import org.apache.camel.Processor; 031import org.apache.camel.model.language.ExpressionDefinition; 032import org.apache.camel.processor.EvaluateExpressionProcessor; 033import org.apache.camel.processor.Pipeline; 034import org.apache.camel.processor.RecipientList; 035import org.apache.camel.processor.aggregate.AggregationStrategy; 036import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; 037import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; 038import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 039import org.apache.camel.spi.Metadata; 040import org.apache.camel.spi.RouteContext; 041import org.apache.camel.util.CamelContextHelper; 042 043/** 044 * Routes messages to a number of dynamically specified recipients (dynamic to) 045 * 046 * @version 047 */ 048@Metadata(label = "eip,endpoint,routing") 049@XmlRootElement(name = "recipientList") 050@XmlAccessorType(XmlAccessType.FIELD) 051public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> extends NoOutputExpressionNode implements ExecutorServiceAwareDefinition<RecipientListDefinition<Type>> { 052 @XmlTransient 053 private AggregationStrategy aggregationStrategy; 054 @XmlTransient 055 private ExecutorService executorService; 056 @XmlAttribute @Metadata(defaultValue = ",") 057 private String delimiter; 058 @XmlAttribute 059 private Boolean parallelProcessing; 060 @XmlAttribute 061 private String strategyRef; 062 @XmlAttribute 063 private String strategyMethodName; 064 @XmlAttribute 065 private Boolean strategyMethodAllowNull; 066 @XmlAttribute 067 private String executorServiceRef; 068 @XmlAttribute 069 private Boolean stopOnException; 070 @XmlAttribute 071 private Boolean ignoreInvalidEndpoints; 072 @XmlAttribute 073 private Boolean streaming; 074 @XmlAttribute @Metadata(defaultValue = "0") 075 private Long timeout; 076 @XmlAttribute 077 private String onPrepareRef; 078 @XmlTransient 079 private Processor onPrepare; 080 @XmlAttribute 081 private Boolean shareUnitOfWork; 082 @XmlAttribute 083 private Integer cacheSize; 084 @XmlAttribute 085 private Boolean parallelAggregate; 086 087 public RecipientListDefinition() { 088 } 089 090 public RecipientListDefinition(ExpressionDefinition expression) { 091 super(expression); 092 } 093 094 public RecipientListDefinition(Expression expression) { 095 super(expression); 096 } 097 098 @Override 099 public String toString() { 100 return "RecipientList[" + getExpression() + "]"; 101 } 102 103 @Override 104 public String getLabel() { 105 return "recipientList[" + getExpression() + "]"; 106 } 107 108 @Override 109 public Processor createProcessor(RouteContext routeContext) throws Exception { 110 final Expression expression = getExpression().createExpression(routeContext); 111 112 boolean isParallelProcessing = getParallelProcessing() != null && getParallelProcessing(); 113 boolean isStreaming = getStreaming() != null && getStreaming(); 114 boolean isParallelAggregate = getParallelAggregate() != null && getParallelAggregate(); 115 boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); 116 boolean isStopOnException = getStopOnException() != null && getStopOnException(); 117 boolean isIgnoreInvalidEndpoints = getIgnoreInvalidEndpoints() != null && getIgnoreInvalidEndpoints(); 118 119 RecipientList answer; 120 if (delimiter != null) { 121 answer = new RecipientList(routeContext.getCamelContext(), expression, delimiter); 122 } else { 123 answer = new RecipientList(routeContext.getCamelContext(), expression); 124 } 125 answer.setAggregationStrategy(createAggregationStrategy(routeContext)); 126 answer.setParallelProcessing(isParallelProcessing); 127 answer.setParallelAggregate(isParallelAggregate); 128 answer.setStreaming(isStreaming); 129 answer.setShareUnitOfWork(isShareUnitOfWork); 130 answer.setStopOnException(isStopOnException); 131 answer.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints); 132 if (getCacheSize() != null) { 133 answer.setCacheSize(getCacheSize()); 134 } 135 if (onPrepareRef != null) { 136 onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); 137 } 138 if (onPrepare != null) { 139 answer.setOnPrepare(onPrepare); 140 } 141 if (getTimeout() != null) { 142 answer.setTimeout(getTimeout()); 143 } 144 145 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing); 146 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing); 147 answer.setExecutorService(threadPool); 148 answer.setShutdownExecutorService(shutdownThreadPool); 149 long timeout = getTimeout() != null ? getTimeout() : 0; 150 if (timeout > 0 && !isParallelProcessing) { 151 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); 152 } 153 154 // create a pipeline with two processors 155 // the first is the eval processor which evaluates the expression to use 156 // the second is the recipient list 157 List<Processor> pipe = new ArrayList<Processor>(2); 158 159 // the eval processor must be wrapped in error handler, so in case there was an 160 // error during evaluation, the error handler can deal with it 161 // the recipient list is not in error handler, as its has its own special error handling 162 // when sending to the recipients individually 163 Processor evalProcessor = new EvaluateExpressionProcessor(expression); 164 evalProcessor = super.wrapInErrorHandler(routeContext, evalProcessor); 165 166 pipe.add(evalProcessor); 167 pipe.add(answer); 168 169 // wrap in nested pipeline so this appears as one processor 170 // (threads definition does this as well) 171 return new Pipeline(routeContext.getCamelContext(), pipe) { 172 @Override 173 public String toString() { 174 return "RecipientList[" + expression + "]"; 175 } 176 }; 177 } 178 179 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 180 AggregationStrategy strategy = getAggregationStrategy(); 181 if (strategy == null && strategyRef != null) { 182 Object aggStrategy = routeContext.lookup(strategyRef, Object.class); 183 if (aggStrategy instanceof AggregationStrategy) { 184 strategy = (AggregationStrategy) aggStrategy; 185 } else if (aggStrategy != null) { 186 AggregationStrategyBeanAdapter adapter = new AggregationStrategyBeanAdapter(aggStrategy, getStrategyMethodName()); 187 if (getStrategyMethodAllowNull() != null) { 188 adapter.setAllowNullNewExchange(getStrategyMethodAllowNull()); 189 adapter.setAllowNullOldExchange(getStrategyMethodAllowNull()); 190 } 191 strategy = adapter; 192 } else { 193 throw new IllegalArgumentException("Cannot find AggregationStrategy in Registry with name: " + strategyRef); 194 } 195 } 196 197 if (strategy == null) { 198 // default to use latest aggregation strategy 199 strategy = new UseLatestAggregationStrategy(); 200 } 201 202 if (strategy instanceof CamelContextAware) { 203 ((CamelContextAware) strategy).setCamelContext(routeContext.getCamelContext()); 204 } 205 206 if (shareUnitOfWork != null && shareUnitOfWork) { 207 // wrap strategy in share unit of work 208 strategy = new ShareUnitOfWorkAggregationStrategy(strategy); 209 } 210 211 return strategy; 212 } 213 214 // Fluent API 215 // ------------------------------------------------------------------------- 216 217 @Override 218 @SuppressWarnings("unchecked") 219 public Type end() { 220 // allow end() to return to previous type so you can continue in the DSL 221 return (Type) super.end(); 222 } 223 224 /** 225 * Delimiter used if the Expression returned multiple endpoints. Can be turned off using the value <tt>false</tt>. 226 * <p/> 227 * The default value is , 228 * 229 * @param delimiter the delimiter 230 * @return the builder 231 */ 232 public RecipientListDefinition<Type> delimiter(String delimiter) { 233 setDelimiter(delimiter); 234 return this; 235 } 236 237 /** 238 * Sets the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList. 239 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy 240 */ 241 public RecipientListDefinition<Type> aggregationStrategy(AggregationStrategy aggregationStrategy) { 242 setAggregationStrategy(aggregationStrategy); 243 return this; 244 } 245 246 /** 247 * Sets a reference to the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList. 248 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy 249 */ 250 public RecipientListDefinition<Type> aggregationStrategyRef(String aggregationStrategyRef) { 251 setStrategyRef(aggregationStrategyRef); 252 return this; 253 } 254 255 /** 256 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 257 * 258 * @param methodName the method name to call 259 * @return the builder 260 */ 261 public RecipientListDefinition<Type> aggregationStrategyMethodName(String methodName) { 262 setStrategyMethodName(methodName); 263 return this; 264 } 265 266 /** 267 * If this option is false then the aggregate method is not used if there was no data to enrich. 268 * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy 269 * 270 * @return the builder 271 */ 272 public RecipientListDefinition<Type> aggregationStrategyMethodAllowNull() { 273 setStrategyMethodAllowNull(true); 274 return this; 275 } 276 277 /** 278 * Ignore the invalidate endpoint exception when try to create a producer with that endpoint 279 * 280 * @return the builder 281 */ 282 public RecipientListDefinition<Type> ignoreInvalidEndpoints() { 283 setIgnoreInvalidEndpoints(true); 284 return this; 285 } 286 287 /** 288 * If enabled then sending messages to the recipients occurs concurrently. 289 * Note the caller thread will still wait until all messages has been fully processed, before it continues. 290 * Its only the sending and processing the replies from the recipients which happens concurrently. 291 * 292 * @return the builder 293 */ 294 public RecipientListDefinition<Type> parallelProcessing() { 295 setParallelProcessing(true); 296 return this; 297 } 298 299 /** 300 * If enabled then sending messages to the recipients occurs concurrently. 301 * Note the caller thread will still wait until all messages has been fully processed, before it continues. 302 * Its only the sending and processing the replies from the recipients which happens concurrently. 303 * 304 * @return the builder 305 */ 306 public RecipientListDefinition<Type> parallelProcessing(boolean parallelProcessing) { 307 setParallelProcessing(parallelProcessing); 308 return this; 309 } 310 311 /** 312 * If enabled then the aggregate method on AggregationStrategy can be called concurrently. 313 * Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. 314 * By default this is false meaning that Camel synchronizes the call to the aggregate method. 315 * Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe. 316 * 317 * @return the builder 318 */ 319 public RecipientListDefinition<Type> parallelAggregate() { 320 setParallelAggregate(true); 321 return this; 322 } 323 324 /** 325 * If enabled then Camel will process replies out-of-order, eg in the order they come back. 326 * If disabled, Camel will process replies in the same order as defined by the recipient list. 327 * 328 * @return the builder 329 */ 330 public RecipientListDefinition<Type> streaming() { 331 setStreaming(true); 332 return this; 333 } 334 335 /** 336 * Will now stop further processing if an exception or failure occurred during processing of an 337 * {@link org.apache.camel.Exchange} and the caused exception will be thrown. 338 * <p/> 339 * Will also stop if processing the exchange failed (has a fault message) or an exception 340 * was thrown and handled by the error handler (such as using onException). In all situations 341 * the recipient list will stop further processing. This is the same behavior as in pipeline, which 342 * is used by the routing engine. 343 * <p/> 344 * The default behavior is to <b>not</b> stop but continue processing till the end 345 * 346 * @return the builder 347 */ 348 public RecipientListDefinition<Type> stopOnException() { 349 setStopOnException(true); 350 return this; 351 } 352 353 /** 354 * To use a custom Thread Pool to be used for parallel processing. 355 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 356 */ 357 public RecipientListDefinition<Type> executorService(ExecutorService executorService) { 358 setExecutorService(executorService); 359 return this; 360 } 361 362 /** 363 * Refers to a custom Thread Pool to be used for parallel processing. 364 * Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. 365 */ 366 public RecipientListDefinition<Type> executorServiceRef(String executorServiceRef) { 367 setExecutorServiceRef(executorServiceRef); 368 return this; 369 } 370 371 /** 372 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send. 373 * This can be used to deep-clone messages that should be send, or any custom logic needed before 374 * the exchange is send. 375 * 376 * @param onPrepare the processor 377 * @return the builder 378 */ 379 public RecipientListDefinition<Type> onPrepare(Processor onPrepare) { 380 setOnPrepare(onPrepare); 381 return this; 382 } 383 384 /** 385 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 386 * This can be used to deep-clone messages that should be send, or any custom logic needed before 387 * the exchange is send. 388 * 389 * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} 390 * @return the builder 391 */ 392 public RecipientListDefinition<Type> onPrepareRef(String onPrepareRef) { 393 setOnPrepareRef(onPrepareRef); 394 return this; 395 } 396 397 /** 398 * Sets a total timeout specified in millis, when using parallel processing. 399 * If the Recipient List hasn't been able to send and process all replies within the given timeframe, 400 * then the timeout triggers and the Recipient List breaks out and continues. 401 * Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. 402 * If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel 403 * to shut down in a graceful manner may continue to run. So use this option with a bit of care. 404 * 405 * @param timeout timeout in millis 406 * @return the builder 407 */ 408 public RecipientListDefinition<Type> timeout(long timeout) { 409 setTimeout(timeout); 410 return this; 411 } 412 413 /** 414 * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages. 415 * Recipient List will by default not share unit of work between the parent exchange and each recipient exchange. 416 * This means each sub exchange has its own individual unit of work. 417 * 418 * @return the builder. 419 * @see org.apache.camel.spi.SubUnitOfWork 420 */ 421 public RecipientListDefinition<Type> shareUnitOfWork() { 422 setShareUnitOfWork(true); 423 return this; 424 } 425 426 /** 427 * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used 428 * to cache and reuse producers when using this recipient list, when uris are reused. 429 * 430 * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. 431 * @return the builder 432 */ 433 public RecipientListDefinition<Type> cacheSize(int cacheSize) { 434 setCacheSize(cacheSize); 435 return this; 436 } 437 438 // Properties 439 //------------------------------------------------------------------------- 440 441 442 /** 443 * Expression that returns which endpoints (url) to send the message to (the recipients). 444 * If the expression return an empty value then the message is not sent to any recipients. 445 */ 446 @Override 447 public void setExpression(ExpressionDefinition expression) { 448 // override to include javadoc what the expression is used for 449 super.setExpression(expression); 450 } 451 452 public String getDelimiter() { 453 return delimiter; 454 } 455 456 public void setDelimiter(String delimiter) { 457 this.delimiter = delimiter; 458 } 459 460 public Boolean getParallelProcessing() { 461 return parallelProcessing; 462 } 463 464 public void setParallelProcessing(Boolean parallelProcessing) { 465 this.parallelProcessing = parallelProcessing; 466 } 467 468 public String getStrategyRef() { 469 return strategyRef; 470 } 471 472 /** 473 * Sets a reference to the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList. 474 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy 475 */ 476 public void setStrategyRef(String strategyRef) { 477 this.strategyRef = strategyRef; 478 } 479 480 public String getStrategyMethodName() { 481 return strategyMethodName; 482 } 483 484 /** 485 * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. 486 */ 487 public void setStrategyMethodName(String strategyMethodName) { 488 this.strategyMethodName = strategyMethodName; 489 } 490 491 public Boolean getStrategyMethodAllowNull() { 492 return strategyMethodAllowNull; 493 } 494 495 /** 496 * If this option is false then the aggregate method is not used if there was no data to enrich. 497 * If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy 498 */ 499 public void setStrategyMethodAllowNull(Boolean strategyMethodAllowNull) { 500 this.strategyMethodAllowNull = strategyMethodAllowNull; 501 } 502 503 public String getExecutorServiceRef() { 504 return executorServiceRef; 505 } 506 507 public void setExecutorServiceRef(String executorServiceRef) { 508 this.executorServiceRef = executorServiceRef; 509 } 510 511 public Boolean getIgnoreInvalidEndpoints() { 512 return ignoreInvalidEndpoints; 513 } 514 515 public void setIgnoreInvalidEndpoints(Boolean ignoreInvalidEndpoints) { 516 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 517 } 518 519 public Boolean getStopOnException() { 520 return stopOnException; 521 } 522 523 public void setStopOnException(Boolean stopOnException) { 524 this.stopOnException = stopOnException; 525 } 526 527 public AggregationStrategy getAggregationStrategy() { 528 return aggregationStrategy; 529 } 530 531 /** 532 * Sets the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList. 533 * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy 534 */ 535 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 536 this.aggregationStrategy = aggregationStrategy; 537 } 538 539 public ExecutorService getExecutorService() { 540 return executorService; 541 } 542 543 public void setExecutorService(ExecutorService executorService) { 544 this.executorService = executorService; 545 } 546 547 public Boolean getStreaming() { 548 return streaming; 549 } 550 551 public void setStreaming(Boolean streaming) { 552 this.streaming = streaming; 553 } 554 555 public Long getTimeout() { 556 return timeout; 557 } 558 559 public void setTimeout(Long timeout) { 560 this.timeout = timeout; 561 } 562 563 public String getOnPrepareRef() { 564 return onPrepareRef; 565 } 566 567 public void setOnPrepareRef(String onPrepareRef) { 568 this.onPrepareRef = onPrepareRef; 569 } 570 571 public Processor getOnPrepare() { 572 return onPrepare; 573 } 574 575 public void setOnPrepare(Processor onPrepare) { 576 this.onPrepare = onPrepare; 577 } 578 579 public Boolean getShareUnitOfWork() { 580 return shareUnitOfWork; 581 } 582 583 public void setShareUnitOfWork(Boolean shareUnitOfWork) { 584 this.shareUnitOfWork = shareUnitOfWork; 585 } 586 587 public Integer getCacheSize() { 588 return cacheSize; 589 } 590 591 public void setCacheSize(Integer cacheSize) { 592 this.cacheSize = cacheSize; 593 } 594 595 public Boolean getParallelAggregate() { 596 return parallelAggregate; 597 } 598 599 public void setParallelAggregate(Boolean parallelAggregate) { 600 this.parallelAggregate = parallelAggregate; 601 } 602}