001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.List; 021 import java.util.concurrent.ExecutorService; 022 023 import javax.xml.bind.annotation.XmlAccessType; 024 import javax.xml.bind.annotation.XmlAccessorType; 025 import javax.xml.bind.annotation.XmlAttribute; 026 import javax.xml.bind.annotation.XmlRootElement; 027 import javax.xml.bind.annotation.XmlTransient; 028 029 import org.apache.camel.Expression; 030 import org.apache.camel.Processor; 031 import org.apache.camel.model.language.ExpressionDefinition; 032 import org.apache.camel.processor.EvaluateExpressionProcessor; 033 import org.apache.camel.processor.Pipeline; 034 import org.apache.camel.processor.RecipientList; 035 import org.apache.camel.processor.aggregate.AggregationStrategy; 036 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 037 import org.apache.camel.spi.RouteContext; 038 import org.apache.camel.util.CamelContextHelper; 039 040 /** 041 * Represents an XML <recipientList/> element 042 * 043 * @version 044 */ 045 @XmlRootElement(name = "recipientList") 046 @XmlAccessorType(XmlAccessType.FIELD) 047 public class RecipientListDefinition<Type extends ProcessorDefinition<Type>> extends NoOutputExpressionNode implements ExecutorServiceAwareDefinition<RecipientListDefinition<Type>> { 048 @XmlTransient 049 private AggregationStrategy aggregationStrategy; 050 @XmlTransient 051 private ExecutorService executorService; 052 @XmlAttribute 053 private String delimiter; 054 @XmlAttribute 055 private Boolean parallelProcessing; 056 @XmlAttribute 057 private String strategyRef; 058 @XmlAttribute 059 private String executorServiceRef; 060 @XmlAttribute 061 private Boolean stopOnException; 062 @XmlAttribute 063 private Boolean ignoreInvalidEndpoints; 064 @XmlAttribute 065 private Boolean streaming; 066 @XmlAttribute 067 private Long timeout; 068 @XmlAttribute 069 private String onPrepareRef; 070 @XmlTransient 071 private Processor onPrepare; 072 @XmlAttribute 073 private Boolean shareUnitOfWork; 074 075 public RecipientListDefinition() { 076 } 077 078 public RecipientListDefinition(ExpressionDefinition expression) { 079 super(expression); 080 } 081 082 public RecipientListDefinition(Expression expression) { 083 super(expression); 084 } 085 086 @Override 087 public String toString() { 088 return "RecipientList[" + getExpression() + "]"; 089 } 090 091 @Override 092 public String getShortName() { 093 return "recipientList"; 094 } 095 096 @Override 097 public String getLabel() { 098 return "recipientList[" + getExpression() + "]"; 099 } 100 101 @Override 102 public Processor createProcessor(RouteContext routeContext) throws Exception { 103 final Expression expression = getExpression().createExpression(routeContext); 104 105 RecipientList answer; 106 if (delimiter != null) { 107 answer = new RecipientList(routeContext.getCamelContext(), expression, delimiter); 108 } else { 109 answer = new RecipientList(routeContext.getCamelContext(), expression); 110 } 111 answer.setAggregationStrategy(createAggregationStrategy(routeContext)); 112 answer.setParallelProcessing(isParallelProcessing()); 113 answer.setStreaming(isStreaming()); 114 answer.setShareUnitOfWork(isShareUnitOfWork()); 115 if (onPrepareRef != null) { 116 onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); 117 } 118 if (onPrepare != null) { 119 answer.setOnPrepare(onPrepare); 120 } 121 if (stopOnException != null) { 122 answer.setStopOnException(isStopOnException()); 123 } 124 if (ignoreInvalidEndpoints != null) { 125 answer.setIgnoreInvalidEndpoints(ignoreInvalidEndpoints); 126 } 127 if (getTimeout() != null) { 128 answer.setTimeout(getTimeout()); 129 } 130 131 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing()); 132 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "RecipientList", this, isParallelProcessing()); 133 answer.setExecutorService(threadPool); 134 answer.setShutdownExecutorService(shutdownThreadPool); 135 long timeout = getTimeout() != null ? getTimeout() : 0; 136 if (timeout > 0 && !isParallelProcessing()) { 137 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); 138 } 139 140 // create a pipeline with two processors 141 // the first is the eval processor which evaluates the expression to use 142 // the second is the recipient list 143 List<Processor> pipe = new ArrayList<Processor>(2); 144 145 // the eval processor must be wrapped in error handler, so in case there was an 146 // error during evaluation, the error handler can deal with it 147 // the recipient list is not in error handler, as its has its own special error handling 148 // when sending to the recipients individually 149 Processor evalProcessor = new EvaluateExpressionProcessor(expression); 150 evalProcessor = super.wrapInErrorHandler(routeContext, evalProcessor); 151 152 pipe.add(evalProcessor); 153 pipe.add(answer); 154 155 // wrap in nested pipeline so this appears as one processor 156 // (threads definition does this as well) 157 return new Pipeline(routeContext.getCamelContext(), pipe) { 158 @Override 159 public String toString() { 160 return "RecipientList[" + expression + "]"; 161 } 162 }; 163 } 164 165 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 166 if (aggregationStrategy == null && strategyRef != null) { 167 aggregationStrategy = routeContext.mandatoryLookup(strategyRef, AggregationStrategy.class); 168 } 169 if (aggregationStrategy == null) { 170 // fallback to use latest 171 aggregationStrategy = new UseLatestAggregationStrategy(); 172 } 173 return aggregationStrategy; 174 } 175 176 // Fluent API 177 // ------------------------------------------------------------------------- 178 179 @Override 180 @SuppressWarnings("unchecked") 181 public Type end() { 182 // allow end() to return to previous type so you can continue in the DSL 183 return (Type) super.end(); 184 } 185 186 /** 187 * Set the delimiter 188 * 189 * @param delimiter the delimiter 190 * @return the builder 191 */ 192 public RecipientListDefinition<Type> delimiter(String delimiter) { 193 setDelimiter(delimiter); 194 return this; 195 } 196 197 /** 198 * Set the aggregationStrategy 199 * 200 * @param aggregationStrategy the strategy 201 * @return the builder 202 */ 203 public RecipientListDefinition<Type> aggregationStrategy(AggregationStrategy aggregationStrategy) { 204 setAggregationStrategy(aggregationStrategy); 205 return this; 206 } 207 208 /** 209 * Set the aggregationStrategy 210 * 211 * @param aggregationStrategyRef a reference to a strategy to lookup 212 * @return the builder 213 */ 214 public RecipientListDefinition<Type> aggregationStrategyRef(String aggregationStrategyRef) { 215 setStrategyRef(aggregationStrategyRef); 216 return this; 217 } 218 219 /** 220 * Ignore the invalidate endpoint exception when try to create a producer with that endpoint 221 * 222 * @return the builder 223 */ 224 public RecipientListDefinition<Type> ignoreInvalidEndpoints() { 225 setIgnoreInvalidEndpoints(true); 226 return this; 227 } 228 229 /** 230 * Doing the recipient list work in parallel 231 * 232 * @return the builder 233 */ 234 public RecipientListDefinition<Type> parallelProcessing() { 235 setParallelProcessing(true); 236 return this; 237 } 238 239 /** 240 * Doing the recipient list work in streaming model 241 * 242 * @return the builder 243 */ 244 public RecipientListDefinition<Type> streaming() { 245 setStreaming(true); 246 return this; 247 } 248 249 /** 250 * Will now stop further processing if an exception or failure occurred during processing of an 251 * {@link org.apache.camel.Exchange} and the caused exception will be thrown. 252 * <p/> 253 * Will also stop if processing the exchange failed (has a fault message) or an exception 254 * was thrown and handled by the error handler (such as using onException). In all situations 255 * the recipient list will stop further processing. This is the same behavior as in pipeline, which 256 * is used by the routing engine. 257 * <p/> 258 * The default behavior is to <b>not</b> stop but continue processing till the end 259 * 260 * @return the builder 261 */ 262 public RecipientListDefinition<Type> stopOnException() { 263 setStopOnException(true); 264 return this; 265 } 266 267 public RecipientListDefinition<Type> executorService(ExecutorService executorService) { 268 setExecutorService(executorService); 269 return this; 270 } 271 272 public RecipientListDefinition<Type> executorServiceRef(String executorServiceRef) { 273 setExecutorServiceRef(executorServiceRef); 274 return this; 275 } 276 277 /** 278 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be used send. 279 * This can be used to deep-clone messages that should be send, or any custom logic needed before 280 * the exchange is send. 281 * 282 * @param onPrepare the processor 283 * @return the builder 284 */ 285 public RecipientListDefinition<Type> onPrepare(Processor onPrepare) { 286 setOnPrepare(onPrepare); 287 return this; 288 } 289 290 /** 291 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 292 * This can be used to deep-clone messages that should be send, or any custom logic needed before 293 * the exchange is send. 294 * 295 * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} 296 * @return the builder 297 */ 298 public RecipientListDefinition<Type> onPrepareRef(String onPrepareRef) { 299 setOnPrepareRef(onPrepareRef); 300 return this; 301 } 302 303 /** 304 * Sets a timeout value in millis to use when using parallelProcessing. 305 * 306 * @param timeout timeout in millis 307 * @return the builder 308 */ 309 public RecipientListDefinition<Type> timeout(long timeout) { 310 setTimeout(timeout); 311 return this; 312 } 313 314 /** 315 * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages. 316 * 317 * @return the builder. 318 * @see org.apache.camel.spi.SubUnitOfWork 319 */ 320 public RecipientListDefinition<Type> shareUnitOfWork() { 321 setShareUnitOfWork(true); 322 return this; 323 } 324 325 // Properties 326 //------------------------------------------------------------------------- 327 328 public String getDelimiter() { 329 return delimiter; 330 } 331 332 public void setDelimiter(String delimiter) { 333 this.delimiter = delimiter; 334 } 335 336 public Boolean getParallelProcessing() { 337 return parallelProcessing; 338 } 339 340 public void setParallelProcessing(Boolean parallelProcessing) { 341 this.parallelProcessing = parallelProcessing; 342 } 343 344 public boolean isParallelProcessing() { 345 return parallelProcessing != null && parallelProcessing; 346 } 347 348 public String getStrategyRef() { 349 return strategyRef; 350 } 351 352 public void setStrategyRef(String strategyRef) { 353 this.strategyRef = strategyRef; 354 } 355 356 public String getExecutorServiceRef() { 357 return executorServiceRef; 358 } 359 360 public void setExecutorServiceRef(String executorServiceRef) { 361 this.executorServiceRef = executorServiceRef; 362 } 363 364 public Boolean getIgnoreInvalidEndpoints() { 365 return ignoreInvalidEndpoints; 366 } 367 368 public void setIgnoreInvalidEndpoints(Boolean ignoreInvalidEndpoints) { 369 this.ignoreInvalidEndpoints = ignoreInvalidEndpoints; 370 } 371 372 public boolean isIgnoreInvalidEndpoints() { 373 return ignoreInvalidEndpoints != null && ignoreInvalidEndpoints; 374 } 375 376 public Boolean getStopOnException() { 377 return stopOnException; 378 } 379 380 public void setStopOnException(Boolean stopOnException) { 381 this.stopOnException = stopOnException; 382 } 383 384 public boolean isStopOnException() { 385 return stopOnException != null && stopOnException; 386 } 387 388 public AggregationStrategy getAggregationStrategy() { 389 return aggregationStrategy; 390 } 391 392 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 393 this.aggregationStrategy = aggregationStrategy; 394 } 395 396 public ExecutorService getExecutorService() { 397 return executorService; 398 } 399 400 public void setExecutorService(ExecutorService executorService) { 401 this.executorService = executorService; 402 } 403 404 public Boolean getStreaming() { 405 return streaming; 406 } 407 408 public void setStreaming(Boolean streaming) { 409 this.streaming = streaming; 410 } 411 412 public boolean isStreaming() { 413 return streaming != null && streaming; 414 } 415 416 public Long getTimeout() { 417 return timeout; 418 } 419 420 public void setTimeout(Long timeout) { 421 this.timeout = timeout; 422 } 423 424 public String getOnPrepareRef() { 425 return onPrepareRef; 426 } 427 428 public void setOnPrepareRef(String onPrepareRef) { 429 this.onPrepareRef = onPrepareRef; 430 } 431 432 public Processor getOnPrepare() { 433 return onPrepare; 434 } 435 436 public void setOnPrepare(Processor onPrepare) { 437 this.onPrepare = onPrepare; 438 } 439 440 public Boolean getShareUnitOfWork() { 441 return shareUnitOfWork; 442 } 443 444 public void setShareUnitOfWork(Boolean shareUnitOfWork) { 445 this.shareUnitOfWork = shareUnitOfWork; 446 } 447 448 public boolean isShareUnitOfWork() { 449 return shareUnitOfWork != null && shareUnitOfWork; 450 } 451 452 }