001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.List; 020 import java.util.concurrent.ExecutorService; 021 022 import javax.xml.bind.annotation.XmlAccessType; 023 import javax.xml.bind.annotation.XmlAccessorType; 024 import javax.xml.bind.annotation.XmlAttribute; 025 import javax.xml.bind.annotation.XmlRootElement; 026 import javax.xml.bind.annotation.XmlTransient; 027 028 import org.apache.camel.Processor; 029 import org.apache.camel.processor.MulticastProcessor; 030 import org.apache.camel.processor.SubUnitOfWorkProcessor; 031 import org.apache.camel.processor.aggregate.AggregationStrategy; 032 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy; 033 import org.apache.camel.spi.RouteContext; 034 import org.apache.camel.util.CamelContextHelper; 035 036 /** 037 * Represents an XML <multicast/> element 038 * 039 * @version 040 */ 041 @XmlRootElement(name = "multicast") 042 @XmlAccessorType(XmlAccessType.FIELD) 043 public class MulticastDefinition extends OutputDefinition<MulticastDefinition> implements ExecutorServiceAwareDefinition<MulticastDefinition> { 044 @XmlAttribute 045 private Boolean parallelProcessing; 046 @XmlAttribute 047 private String strategyRef; 048 @XmlTransient 049 private ExecutorService executorService; 050 @XmlAttribute 051 private String executorServiceRef; 052 @XmlAttribute 053 private Boolean streaming; 054 @XmlAttribute 055 private Boolean stopOnException; 056 @XmlAttribute 057 private Long timeout; 058 @XmlTransient 059 private AggregationStrategy aggregationStrategy; 060 @XmlAttribute 061 private String onPrepareRef; 062 @XmlTransient 063 private Processor onPrepare; 064 @XmlAttribute 065 private Boolean shareUnitOfWork; 066 067 public MulticastDefinition() { 068 } 069 070 @Override 071 public String toString() { 072 return "Multicast[" + getOutputs() + "]"; 073 } 074 075 @Override 076 public String getLabel() { 077 return "multicast"; 078 } 079 080 @Override 081 public String getShortName() { 082 return "multicast"; 083 } 084 085 @Override 086 public Processor createProcessor(RouteContext routeContext) throws Exception { 087 return this.createChildProcessor(routeContext, true); 088 } 089 090 // Fluent API 091 // ------------------------------------------------------------------------- 092 093 /** 094 * Set the multicasting aggregationStrategy 095 * 096 * @return the builder 097 */ 098 public MulticastDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 099 setAggregationStrategy(aggregationStrategy); 100 return this; 101 } 102 103 /** 104 * Set the aggregationStrategy 105 * 106 * @param aggregationStrategyRef a reference to a strategy to lookup 107 * @return the builder 108 */ 109 public MulticastDefinition aggregationStrategyRef(String aggregationStrategyRef) { 110 setStrategyRef(aggregationStrategyRef); 111 return this; 112 } 113 114 /** 115 * Uses the {@link java.util.concurrent.ExecutorService} to do the multicasting work 116 * 117 * @return the builder 118 */ 119 public MulticastDefinition parallelProcessing() { 120 setParallelProcessing(true); 121 return this; 122 } 123 124 /** 125 * Aggregates the responses as the are done (e.g. out of order sequence) 126 * 127 * @return the builder 128 */ 129 public MulticastDefinition streaming() { 130 setStreaming(true); 131 return this; 132 } 133 134 /** 135 * Will now stop further processing if an exception or failure occurred during processing of an 136 * {@link org.apache.camel.Exchange} and the caused exception will be thrown. 137 * <p/> 138 * Will also stop if processing the exchange failed (has a fault message) or an exception 139 * was thrown and handled by the error handler (such as using onException). In all situations 140 * the multicast will stop further processing. This is the same behavior as in pipeline, which 141 * is used by the routing engine. 142 * <p/> 143 * The default behavior is to <b>not</b> stop but continue processing till the end 144 * 145 * @return the builder 146 */ 147 public MulticastDefinition stopOnException() { 148 setStopOnException(true); 149 return this; 150 } 151 152 public MulticastDefinition executorService(ExecutorService executorService) { 153 setExecutorService(executorService); 154 return this; 155 } 156 157 public MulticastDefinition executorServiceRef(String executorServiceRef) { 158 setExecutorServiceRef(executorServiceRef); 159 return this; 160 } 161 162 /** 163 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 164 * This can be used to deep-clone messages that should be send, or any custom logic needed before 165 * the exchange is send. 166 * 167 * @param onPrepare the processor 168 * @return the builder 169 */ 170 public MulticastDefinition onPrepare(Processor onPrepare) { 171 setOnPrepare(onPrepare); 172 return this; 173 } 174 175 /** 176 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 177 * This can be used to deep-clone messages that should be send, or any custom logic needed before 178 * the exchange is send. 179 * 180 * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} 181 * @return the builder 182 */ 183 public MulticastDefinition onPrepareRef(String onPrepareRef) { 184 setOnPrepareRef(onPrepareRef); 185 return this; 186 } 187 188 /** 189 * Sets a timeout value in millis to use when using parallelProcessing. 190 * 191 * @param timeout timeout in millis 192 * @return the builder 193 */ 194 public MulticastDefinition timeout(long timeout) { 195 setTimeout(timeout); 196 return this; 197 } 198 199 /** 200 * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages. 201 * 202 * @return the builder. 203 * @see org.apache.camel.spi.SubUnitOfWork 204 */ 205 public MulticastDefinition shareUnitOfWork() { 206 setShareUnitOfWork(true); 207 return this; 208 } 209 210 protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception { 211 if (strategyRef != null) { 212 aggregationStrategy = routeContext.mandatoryLookup(strategyRef, AggregationStrategy.class); 213 } 214 if (aggregationStrategy == null) { 215 // default to use latest aggregation strategy 216 aggregationStrategy = new UseLatestAggregationStrategy(); 217 } 218 219 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing()); 220 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Multicast", this, isParallelProcessing()); 221 222 long timeout = getTimeout() != null ? getTimeout() : 0; 223 if (timeout > 0 && !isParallelProcessing()) { 224 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); 225 } 226 if (onPrepareRef != null) { 227 onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); 228 } 229 230 MulticastProcessor answer = new MulticastProcessor(routeContext.getCamelContext(), list, aggregationStrategy, isParallelProcessing(), 231 threadPool, shutdownThreadPool, isStreaming(), isStopOnException(), timeout, onPrepare, isShareUnitOfWork()); 232 if (isShareUnitOfWork()) { 233 // wrap answer in a sub unit of work, since we share the unit of work 234 return new SubUnitOfWorkProcessor(answer); 235 } 236 return answer; 237 } 238 239 public AggregationStrategy getAggregationStrategy() { 240 return aggregationStrategy; 241 } 242 243 public MulticastDefinition setAggregationStrategy(AggregationStrategy aggregationStrategy) { 244 this.aggregationStrategy = aggregationStrategy; 245 return this; 246 } 247 248 public Boolean getParallelProcessing() { 249 return parallelProcessing; 250 } 251 252 public void setParallelProcessing(Boolean parallelProcessing) { 253 this.parallelProcessing = parallelProcessing; 254 } 255 256 public boolean isParallelProcessing() { 257 return parallelProcessing != null && parallelProcessing; 258 } 259 260 public Boolean getStreaming() { 261 return streaming; 262 } 263 264 public void setStreaming(Boolean streaming) { 265 this.streaming = streaming; 266 } 267 268 public boolean isStreaming() { 269 return streaming != null && streaming; 270 } 271 272 public Boolean getStopOnException() { 273 return stopOnException; 274 } 275 276 public void setStopOnException(Boolean stopOnException) { 277 this.stopOnException = stopOnException; 278 } 279 280 public Boolean isStopOnException() { 281 return stopOnException != null && stopOnException; 282 } 283 284 public ExecutorService getExecutorService() { 285 return executorService; 286 } 287 288 public void setExecutorService(ExecutorService executorService) { 289 this.executorService = executorService; 290 } 291 292 public String getStrategyRef() { 293 return strategyRef; 294 } 295 296 public void setStrategyRef(String strategyRef) { 297 this.strategyRef = strategyRef; 298 } 299 300 public String getExecutorServiceRef() { 301 return executorServiceRef; 302 } 303 304 public void setExecutorServiceRef(String executorServiceRef) { 305 this.executorServiceRef = executorServiceRef; 306 } 307 308 public Long getTimeout() { 309 return timeout; 310 } 311 312 public void setTimeout(Long timeout) { 313 this.timeout = timeout; 314 } 315 316 public String getOnPrepareRef() { 317 return onPrepareRef; 318 } 319 320 public void setOnPrepareRef(String onPrepareRef) { 321 this.onPrepareRef = onPrepareRef; 322 } 323 324 public Processor getOnPrepare() { 325 return onPrepare; 326 } 327 328 public void setOnPrepare(Processor onPrepare) { 329 this.onPrepare = onPrepare; 330 } 331 332 public Boolean getShareUnitOfWork() { 333 return shareUnitOfWork; 334 } 335 336 public void setShareUnitOfWork(Boolean shareUnitOfWork) { 337 this.shareUnitOfWork = shareUnitOfWork; 338 } 339 340 public boolean isShareUnitOfWork() { 341 return shareUnitOfWork != null && shareUnitOfWork; 342 } 343 344 }