001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.concurrent.ExecutorService; 020 021 import javax.xml.bind.annotation.XmlAccessType; 022 import javax.xml.bind.annotation.XmlAccessorType; 023 import javax.xml.bind.annotation.XmlAttribute; 024 import javax.xml.bind.annotation.XmlRootElement; 025 import javax.xml.bind.annotation.XmlTransient; 026 027 import org.apache.camel.Expression; 028 import org.apache.camel.Processor; 029 import org.apache.camel.model.language.ExpressionDefinition; 030 import org.apache.camel.processor.Splitter; 031 import org.apache.camel.processor.SubUnitOfWorkProcessor; 032 import org.apache.camel.processor.aggregate.AggregationStrategy; 033 import org.apache.camel.spi.RouteContext; 034 import org.apache.camel.util.CamelContextHelper; 035 036 /** 037 * Represents an XML <split/> element 038 * 039 * @version 040 */ 041 @XmlRootElement(name = "split") 042 @XmlAccessorType(XmlAccessType.FIELD) 043 public class SplitDefinition extends ExpressionNode implements ExecutorServiceAwareDefinition<SplitDefinition> { 044 @XmlTransient 045 private AggregationStrategy aggregationStrategy; 046 @XmlTransient 047 private ExecutorService executorService; 048 @XmlAttribute 049 private Boolean parallelProcessing; 050 @XmlAttribute 051 private String strategyRef; 052 @XmlAttribute 053 private String executorServiceRef; 054 @XmlAttribute 055 private Boolean streaming; 056 @XmlAttribute 057 private Boolean stopOnException; 058 @XmlAttribute 059 private Long timeout; 060 @XmlAttribute 061 private String onPrepareRef; 062 @XmlTransient 063 private Processor onPrepare; 064 @XmlAttribute 065 private Boolean shareUnitOfWork; 066 067 public SplitDefinition() { 068 } 069 070 public SplitDefinition(Expression expression) { 071 super(expression); 072 } 073 074 public SplitDefinition(ExpressionDefinition expression) { 075 super(expression); 076 } 077 078 @Override 079 public String toString() { 080 return "Split[" + getExpression() + " -> " + getOutputs() + "]"; 081 } 082 083 @Override 084 public String getShortName() { 085 return "split"; 086 } 087 088 @Override 089 public String getLabel() { 090 return "split[" + getExpression() + "]"; 091 } 092 093 @Override 094 public Processor createProcessor(RouteContext routeContext) throws Exception { 095 Processor childProcessor = this.createChildProcessor(routeContext, true); 096 aggregationStrategy = createAggregationStrategy(routeContext); 097 098 boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, isParallelProcessing()); 099 ExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredExecutorService(routeContext, "Split", this, isParallelProcessing()); 100 101 long timeout = getTimeout() != null ? getTimeout() : 0; 102 if (timeout > 0 && !isParallelProcessing()) { 103 throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled."); 104 } 105 if (onPrepareRef != null) { 106 onPrepare = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), onPrepareRef, Processor.class); 107 } 108 109 Expression exp = getExpression().createExpression(routeContext); 110 111 Splitter answer = new Splitter(routeContext.getCamelContext(), exp, childProcessor, aggregationStrategy, 112 isParallelProcessing(), threadPool, shutdownThreadPool, isStreaming(), isStopOnException(), 113 timeout, onPrepare, isShareUnitOfWork()); 114 if (isShareUnitOfWork()) { 115 // wrap answer in a sub unit of work, since we share the unit of work 116 return new SubUnitOfWorkProcessor(answer); 117 } 118 return answer; 119 } 120 121 private AggregationStrategy createAggregationStrategy(RouteContext routeContext) { 122 AggregationStrategy strategy = getAggregationStrategy(); 123 if (strategy == null && strategyRef != null) { 124 strategy = CamelContextHelper.mandatoryLookup(routeContext.getCamelContext(), strategyRef, AggregationStrategy.class); 125 } 126 return strategy; 127 } 128 129 // Fluent API 130 // ------------------------------------------------------------------------- 131 132 /** 133 * Set the aggregationStrategy 134 * 135 * @return the builder 136 */ 137 public SplitDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { 138 setAggregationStrategy(aggregationStrategy); 139 return this; 140 } 141 142 /** 143 * Set the aggregationStrategy 144 * 145 * @param aggregationStrategyRef a reference to a strategy to lookup 146 * @return the builder 147 */ 148 public SplitDefinition aggregationStrategyRef(String aggregationStrategyRef) { 149 setStrategyRef(aggregationStrategyRef); 150 return this; 151 } 152 153 /** 154 * Doing the splitting work in parallel 155 * 156 * @return the builder 157 */ 158 public SplitDefinition parallelProcessing() { 159 setParallelProcessing(true); 160 return this; 161 } 162 163 /** 164 * Enables streaming. 165 * See {@link org.apache.camel.model.SplitDefinition#isStreaming()} for more information 166 * 167 * @return the builder 168 */ 169 public SplitDefinition streaming() { 170 setStreaming(true); 171 return this; 172 } 173 174 /** 175 * Will now stop further processing if an exception or failure occurred during processing of an 176 * {@link org.apache.camel.Exchange} and the caused exception will be thrown. 177 * <p/> 178 * Will also stop if processing the exchange failed (has a fault message) or an exception 179 * was thrown and handled by the error handler (such as using onException). In all situations 180 * the splitter will stop further processing. This is the same behavior as in pipeline, which 181 * is used by the routing engine. 182 * <p/> 183 * The default behavior is to <b>not</b> stop but continue processing till the end 184 * 185 * @return the builder 186 */ 187 public SplitDefinition stopOnException() { 188 setStopOnException(true); 189 return this; 190 } 191 192 public SplitDefinition executorService(ExecutorService executorService) { 193 setExecutorService(executorService); 194 return this; 195 } 196 197 public SplitDefinition executorServiceRef(String executorServiceRef) { 198 setExecutorServiceRef(executorServiceRef); 199 return this; 200 } 201 202 /** 203 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 204 * This can be used to deep-clone messages that should be send, or any custom logic needed before 205 * the exchange is send. 206 * 207 * @param onPrepare the processor 208 * @return the builder 209 */ 210 public SplitDefinition onPrepare(Processor onPrepare) { 211 setOnPrepare(onPrepare); 212 return this; 213 } 214 215 /** 216 * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. 217 * This can be used to deep-clone messages that should be send, or any custom logic needed before 218 * the exchange is send. 219 * 220 * @param onPrepareRef reference to the processor to lookup in the {@link org.apache.camel.spi.Registry} 221 * @return the builder 222 */ 223 public SplitDefinition onPrepareRef(String onPrepareRef) { 224 setOnPrepareRef(onPrepareRef); 225 return this; 226 } 227 228 /** 229 * Sets a timeout value in millis to use when using parallelProcessing. 230 * 231 * @param timeout timeout in millis 232 * @return the builder 233 */ 234 public SplitDefinition timeout(long timeout) { 235 setTimeout(timeout); 236 return this; 237 } 238 239 /** 240 * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and each of the sub messages. 241 * 242 * @return the builder. 243 * @see org.apache.camel.spi.SubUnitOfWork 244 */ 245 public SplitDefinition shareUnitOfWork() { 246 setShareUnitOfWork(true); 247 return this; 248 } 249 250 // Properties 251 //------------------------------------------------------------------------- 252 253 public AggregationStrategy getAggregationStrategy() { 254 return aggregationStrategy; 255 } 256 257 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { 258 this.aggregationStrategy = aggregationStrategy; 259 } 260 261 public Boolean getParallelProcessing() { 262 return parallelProcessing; 263 } 264 265 public void setParallelProcessing(Boolean parallelProcessing) { 266 this.parallelProcessing = parallelProcessing; 267 } 268 269 public boolean isParallelProcessing() { 270 return parallelProcessing != null && parallelProcessing; 271 } 272 273 public Boolean getStreaming() { 274 return streaming; 275 } 276 277 public void setStreaming(Boolean streaming) { 278 this.streaming = streaming; 279 } 280 281 /** 282 * The splitter should use streaming -- exchanges are being sent as the data for them becomes available. 283 * This improves throughput and memory usage, but it has a drawback: 284 * - the sent exchanges will no longer contain the {@link org.apache.camel.Exchange#SPLIT_SIZE} header property 285 * 286 * @return whether or not streaming should be used 287 */ 288 public boolean isStreaming() { 289 return streaming != null && streaming; 290 } 291 292 public Boolean getStopOnException() { 293 return stopOnException; 294 } 295 296 public void setStopOnException(Boolean stopOnException) { 297 this.stopOnException = stopOnException; 298 } 299 300 public Boolean isStopOnException() { 301 return stopOnException != null && stopOnException; 302 } 303 304 public ExecutorService getExecutorService() { 305 return executorService; 306 } 307 308 public void setExecutorService(ExecutorService executorService) { 309 this.executorService = executorService; 310 } 311 312 public String getStrategyRef() { 313 return strategyRef; 314 } 315 316 public void setStrategyRef(String strategyRef) { 317 this.strategyRef = strategyRef; 318 } 319 320 public String getExecutorServiceRef() { 321 return executorServiceRef; 322 } 323 324 public void setExecutorServiceRef(String executorServiceRef) { 325 this.executorServiceRef = executorServiceRef; 326 } 327 328 public Long getTimeout() { 329 return timeout; 330 } 331 332 public void setTimeout(Long timeout) { 333 this.timeout = timeout; 334 } 335 336 public String getOnPrepareRef() { 337 return onPrepareRef; 338 } 339 340 public void setOnPrepareRef(String onPrepareRef) { 341 this.onPrepareRef = onPrepareRef; 342 } 343 344 public Processor getOnPrepare() { 345 return onPrepare; 346 } 347 348 public void setOnPrepare(Processor onPrepare) { 349 this.onPrepare = onPrepare; 350 } 351 352 public Boolean getShareUnitOfWork() { 353 return shareUnitOfWork; 354 } 355 356 public void setShareUnitOfWork(Boolean shareUnitOfWork) { 357 this.shareUnitOfWork = shareUnitOfWork; 358 } 359 360 public boolean isShareUnitOfWork() { 361 return shareUnitOfWork != null && shareUnitOfWork; 362 } 363 }