001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.io.Closeable; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Scanner; 027import java.util.concurrent.ExecutorService; 028 029import org.apache.camel.AsyncCallback; 030import org.apache.camel.AsyncProcessor; 031import org.apache.camel.CamelContext; 032import org.apache.camel.Exchange; 033import org.apache.camel.Expression; 034import org.apache.camel.Message; 035import org.apache.camel.Processor; 036import org.apache.camel.RuntimeCamelException; 037import org.apache.camel.Traceable; 038import org.apache.camel.processor.aggregate.AggregationStrategy; 039import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; 040import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy; 041import org.apache.camel.spi.RouteContext; 042import org.apache.camel.util.ExchangeHelper; 043import org.apache.camel.util.IOHelper; 044import org.apache.camel.util.ObjectHelper; 045 046import static org.apache.camel.util.ObjectHelper.notNull; 047 048/** 049 * Implements a dynamic <a 050 * href="http://camel.apache.org/splitter.html">Splitter</a> pattern 051 * where an expression is evaluated to iterate through each of the parts of a 052 * message and then each part is then send to some endpoint. 053 * 054 * @version 055 */ 056public class Splitter extends MulticastProcessor implements AsyncProcessor, Traceable { 057 058 private final Expression expression; 059 060 public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) { 061 this(camelContext, expression, destination, aggregationStrategy, false, null, false, false, false, 0, null, false); 062 } 063 064 @Deprecated 065 public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, 066 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, 067 boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork) { 068 this(camelContext, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, 069 streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, false); 070 } 071 072 public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing, 073 ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, 074 boolean useSubUnitOfWork, boolean parallelAggregate) { 075 this(camelContext, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, 076 onPrepare, useSubUnitOfWork, false, false); 077 } 078 079 public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing, 080 ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, 081 boolean useSubUnitOfWork, boolean parallelAggregate, boolean stopOnAggregateException) { 082 super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, 083 timeout, onPrepare, useSubUnitOfWork, parallelAggregate, stopOnAggregateException); 084 this.expression = expression; 085 notNull(expression, "expression"); 086 notNull(destination, "destination"); 087 } 088 089 @Override 090 public String toString() { 091 return "Splitter[on: " + expression + " to: " + getProcessors().iterator().next() + " aggregate: " + getAggregationStrategy() + "]"; 092 } 093 094 @Override 095 public String getTraceLabel() { 096 return "split[" + expression + "]"; 097 } 098 099 @Override 100 public boolean process(Exchange exchange, final AsyncCallback callback) { 101 final AggregationStrategy strategy = getAggregationStrategy(); 102 103 // set original exchange if not already pre-configured 104 if (strategy instanceof UseOriginalAggregationStrategy) { 105 UseOriginalAggregationStrategy original = (UseOriginalAggregationStrategy) strategy; 106 if (original.getOriginal() == null) { 107 original.setOriginal(exchange); 108 } 109 } 110 111 // if no custom aggregation strategy is being used then fallback to keep the original 112 // and propagate exceptions which is done by a per exchange specific aggregation strategy 113 // to ensure it supports async routing 114 if (strategy == null) { 115 AggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true); 116 if (isShareUnitOfWork()) { 117 original = new ShareUnitOfWorkAggregationStrategy(original); 118 } 119 setAggregationStrategyOnExchange(exchange, original); 120 } 121 122 return super.process(exchange, callback); 123 } 124 125 @Override 126 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { 127 Object value = expression.evaluate(exchange, Object.class); 128 if (exchange.getException() != null) { 129 // force any exceptions occurred during evaluation to be thrown 130 throw exchange.getException(); 131 } 132 133 Iterable<ProcessorExchangePair> answer; 134 if (isStreaming()) { 135 answer = createProcessorExchangePairsIterable(exchange, value); 136 } else { 137 answer = createProcessorExchangePairsList(exchange, value); 138 } 139 if (exchange.getException() != null) { 140 // force any exceptions occurred during creation of exchange paris to be thrown 141 // before returning the answer; 142 throw exchange.getException(); 143 } 144 145 return answer; 146 } 147 148 private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, final Object value) { 149 return new SplitterIterable(exchange, value); 150 } 151 152 private final class SplitterIterable implements Iterable<ProcessorExchangePair>, Closeable { 153 154 // create a copy which we use as master to copy during splitting 155 // this avoids any side effect reflected upon the incoming exchange 156 final Object value; 157 final Iterator<?> iterator; 158 private final Exchange copy; 159 private final RouteContext routeContext; 160 private final Exchange original; 161 162 private SplitterIterable(Exchange exchange, Object value) { 163 this.original = exchange; 164 this.value = value; 165 this.iterator = ObjectHelper.createIterator(value); 166 this.copy = copyExchangeNoAttachments(exchange, true); 167 this.routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; 168 } 169 170 @Override 171 public Iterator<ProcessorExchangePair> iterator() { 172 return new Iterator<ProcessorExchangePair>() { 173 private int index; 174 private boolean closed; 175 176 public boolean hasNext() { 177 if (closed) { 178 return false; 179 } 180 181 boolean answer = iterator.hasNext(); 182 if (!answer) { 183 // we are now closed 184 closed = true; 185 // nothing more so we need to close the expression value in case it needs to be 186 try { 187 close(); 188 } catch (IOException e) { 189 throw new RuntimeCamelException("Scanner aborted because of an IOException!", e); 190 } 191 } 192 return answer; 193 } 194 195 public ProcessorExchangePair next() { 196 Object part = iterator.next(); 197 if (part != null) { 198 // create a correlated copy as the new exchange to be routed in the splitter from the copy 199 // and do not share the unit of work 200 Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false); 201 // If the splitter has an aggregation strategy 202 // then the StreamCache created by the child routes must not be 203 // closed by the unit of work of the child route, but by the unit of 204 // work of the parent route or grand parent route or grand grand parent route... (in case of nesting). 205 // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set. 206 if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) { 207 newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork()); 208 } 209 // if we share unit of work, we need to prepare the child exchange 210 if (isShareUnitOfWork()) { 211 prepareSharedUnitOfWork(newExchange, copy); 212 } 213 if (part instanceof Message) { 214 newExchange.setIn((Message) part); 215 } else { 216 Message in = newExchange.getIn(); 217 in.setBody(part); 218 } 219 return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext); 220 } else { 221 return null; 222 } 223 } 224 225 public void remove() { 226 throw new UnsupportedOperationException("Remove is not supported by this iterator"); 227 } 228 }; 229 } 230 231 @Override 232 public void close() throws IOException { 233 if (value instanceof Scanner) { 234 // special for Scanner which implement the Closeable since JDK7 235 Scanner scanner = (Scanner) value; 236 scanner.close(); 237 IOException ioException = scanner.ioException(); 238 if (ioException != null) { 239 throw ioException; 240 } 241 } else if (value instanceof Closeable) { 242 // we should throw out the exception here 243 IOHelper.closeWithException((Closeable) value); 244 } 245 } 246 247 } 248 249 private Iterable<ProcessorExchangePair> createProcessorExchangePairsList(Exchange exchange, Object value) { 250 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(); 251 252 // reuse iterable and add it to the result list 253 Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairsIterable(exchange, value); 254 try { 255 for (ProcessorExchangePair pair : pairs) { 256 if (pair != null) { 257 result.add(pair); 258 } 259 } 260 } finally { 261 if (pairs instanceof Closeable) { 262 IOHelper.close((Closeable) pairs, "Splitter:ProcessorExchangePairs"); 263 } 264 } 265 266 return result; 267 } 268 269 @Override 270 protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, 271 Iterator<ProcessorExchangePair> it) { 272 // do not share unit of work 273 exchange.setUnitOfWork(null); 274 275 exchange.setProperty(Exchange.SPLIT_INDEX, index); 276 if (allPairs instanceof Collection) { 277 // non streaming mode, so we know the total size already 278 exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>) allPairs).size()); 279 } 280 if (it.hasNext()) { 281 exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE); 282 } else { 283 exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE); 284 // streaming mode, so set total size when we are complete based on the index 285 exchange.setProperty(Exchange.SPLIT_SIZE, index + 1); 286 } 287 } 288 289 @Override 290 protected Integer getExchangeIndex(Exchange exchange) { 291 return exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class); 292 } 293 294 public Expression getExpression() { 295 return expression; 296 } 297 298 private static Exchange copyExchangeNoAttachments(Exchange exchange, boolean preserveExchangeId) { 299 Exchange answer = ExchangeHelper.createCopy(exchange, preserveExchangeId); 300 // we do not want attachments for the splitted sub-messages 301 answer.getIn().setAttachmentObjects(null); 302 // we do not want to copy the message history for splitted sub-messages 303 answer.getProperties().remove(Exchange.MESSAGE_HISTORY); 304 return answer; 305 } 306}