001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.io.Closeable; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Scanner; 027import java.util.concurrent.ExecutorService; 028 029import org.apache.camel.AsyncCallback; 030import org.apache.camel.AsyncProcessor; 031import org.apache.camel.CamelContext; 032import org.apache.camel.Exchange; 033import org.apache.camel.Expression; 034import org.apache.camel.Message; 035import org.apache.camel.Processor; 036import org.apache.camel.RuntimeCamelException; 037import org.apache.camel.Traceable; 038import org.apache.camel.processor.aggregate.AggregationStrategy; 039import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy; 040import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy; 041import org.apache.camel.spi.RouteContext; 042import org.apache.camel.util.ExchangeHelper; 043import org.apache.camel.util.IOHelper; 044import org.apache.camel.util.ObjectHelper; 045 046import static org.apache.camel.util.ObjectHelper.notNull; 047 048/** 049 * Implements a dynamic <a 050 * href="http://camel.apache.org/splitter.html">Splitter</a> pattern 051 * where an expression is evaluated to iterate through each of the parts of a 052 * message and then each part is then send to some endpoint. 053 * 054 * @version 055 */ 056public class Splitter extends MulticastProcessor implements AsyncProcessor, Traceable { 057 058 private final Expression expression; 059 060 public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) { 061 this(camelContext, expression, destination, aggregationStrategy, false, null, false, false, false, 0, null, false); 062 } 063 064 @Deprecated 065 public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, 066 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, 067 boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork) { 068 this(camelContext, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, 069 streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, false); 070 } 071 072 public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, 073 boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, 074 boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork, 075 boolean parallelAggregate) { 076 super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, 077 shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, parallelAggregate); 078 this.expression = expression; 079 notNull(expression, "expression"); 080 notNull(destination, "destination"); 081 } 082 083 @Override 084 public String toString() { 085 return "Splitter[on: " + expression + " to: " + getProcessors().iterator().next() + " aggregate: " + getAggregationStrategy() + "]"; 086 } 087 088 @Override 089 public String getTraceLabel() { 090 return "split[" + expression + "]"; 091 } 092 093 @Override 094 public boolean process(Exchange exchange, final AsyncCallback callback) { 095 final AggregationStrategy strategy = getAggregationStrategy(); 096 097 // if no custom aggregation strategy is being used then fallback to keep the original 098 // and propagate exceptions which is done by a per exchange specific aggregation strategy 099 // to ensure it supports async routing 100 if (strategy == null) { 101 AggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true); 102 if (isShareUnitOfWork()) { 103 original = new ShareUnitOfWorkAggregationStrategy(original); 104 } 105 setAggregationStrategyOnExchange(exchange, original); 106 } 107 108 return super.process(exchange, callback); 109 } 110 111 @Override 112 protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { 113 Object value = expression.evaluate(exchange, Object.class); 114 if (exchange.getException() != null) { 115 // force any exceptions occurred during evaluation to be thrown 116 throw exchange.getException(); 117 } 118 119 Iterable<ProcessorExchangePair> answer; 120 if (isStreaming()) { 121 answer = createProcessorExchangePairsIterable(exchange, value); 122 } else { 123 answer = createProcessorExchangePairsList(exchange, value); 124 } 125 if (exchange.getException() != null) { 126 // force any exceptions occurred during creation of exchange paris to be thrown 127 // before returning the answer; 128 throw exchange.getException(); 129 } 130 131 return answer; 132 } 133 134 private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, final Object value) { 135 return new SplitterIterable(exchange, value); 136 } 137 138 private final class SplitterIterable implements Iterable<ProcessorExchangePair>, Closeable { 139 140 // create a copy which we use as master to copy during splitting 141 // this avoids any side effect reflected upon the incoming exchange 142 final Object value; 143 final Iterator<?> iterator; 144 private final Exchange copy; 145 private final RouteContext routeContext; 146 private final Exchange original; 147 148 private SplitterIterable(Exchange exchange, Object value) { 149 this.original = exchange; 150 this.value = value; 151 this.iterator = ObjectHelper.createIterator(value); 152 this.copy = copyExchangeNoAttachments(exchange, true); 153 this.routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; 154 } 155 156 @Override 157 public Iterator<ProcessorExchangePair> iterator() { 158 return new Iterator<ProcessorExchangePair>() { 159 private int index; 160 private boolean closed; 161 162 public boolean hasNext() { 163 if (closed) { 164 return false; 165 } 166 167 boolean answer = iterator.hasNext(); 168 if (!answer) { 169 // we are now closed 170 closed = true; 171 // nothing more so we need to close the expression value in case it needs to be 172 try { 173 close(); 174 } catch (IOException e) { 175 throw new RuntimeCamelException("Scanner aborted because of an IOException!", e); 176 } 177 } 178 return answer; 179 } 180 181 public ProcessorExchangePair next() { 182 Object part = iterator.next(); 183 if (part != null) { 184 // create a correlated copy as the new exchange to be routed in the splitter from the copy 185 // and do not share the unit of work 186 Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false); 187 // If the splitter has an aggregation strategy 188 // then the StreamCache created by the child routes must not be 189 // closed by the unit of work of the child route, but by the unit of 190 // work of the parent route or grand parent route or grand grand parent route... (in case of nesting). 191 // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set. 192 if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) { 193 newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork()); 194 } 195 // if we share unit of work, we need to prepare the child exchange 196 if (isShareUnitOfWork()) { 197 prepareSharedUnitOfWork(newExchange, copy); 198 } 199 if (part instanceof Message) { 200 newExchange.setIn((Message) part); 201 } else { 202 Message in = newExchange.getIn(); 203 in.setBody(part); 204 } 205 return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext); 206 } else { 207 return null; 208 } 209 } 210 211 public void remove() { 212 throw new UnsupportedOperationException("Remove is not supported by this iterator"); 213 } 214 }; 215 } 216 217 @Override 218 public void close() throws IOException { 219 if (value instanceof Scanner) { 220 // special for Scanner which implement the Closeable since JDK7 221 Scanner scanner = (Scanner) value; 222 scanner.close(); 223 IOException ioException = scanner.ioException(); 224 if (ioException != null) { 225 throw ioException; 226 } 227 } else if (value instanceof Closeable) { 228 // we should throw out the exception here 229 IOHelper.closeWithException((Closeable) value); 230 } 231 } 232 233 } 234 235 private Iterable<ProcessorExchangePair> createProcessorExchangePairsList(Exchange exchange, Object value) { 236 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(); 237 238 // reuse iterable and add it to the result list 239 Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairsIterable(exchange, value); 240 try { 241 for (ProcessorExchangePair pair : pairs) { 242 if (pair != null) { 243 result.add(pair); 244 } 245 } 246 } finally { 247 if (pairs instanceof Closeable) { 248 IOHelper.close((Closeable) pairs, "Splitter:ProcessorExchangePairs"); 249 } 250 } 251 252 return result; 253 } 254 255 @Override 256 protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, 257 Iterator<ProcessorExchangePair> it) { 258 // do not share unit of work 259 exchange.setUnitOfWork(null); 260 261 exchange.setProperty(Exchange.SPLIT_INDEX, index); 262 if (allPairs instanceof Collection) { 263 // non streaming mode, so we know the total size already 264 exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>) allPairs).size()); 265 } 266 if (it.hasNext()) { 267 exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE); 268 } else { 269 exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE); 270 // streaming mode, so set total size when we are complete based on the index 271 exchange.setProperty(Exchange.SPLIT_SIZE, index + 1); 272 } 273 } 274 275 @Override 276 protected Integer getExchangeIndex(Exchange exchange) { 277 return exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class); 278 } 279 280 public Expression getExpression() { 281 return expression; 282 } 283 284 private static Exchange copyExchangeNoAttachments(Exchange exchange, boolean preserveExchangeId) { 285 Exchange answer = ExchangeHelper.createCopy(exchange, preserveExchangeId); 286 // we do not want attachments for the splitted sub-messages 287 answer.getIn().setAttachmentObjects(null); 288 // we do not want to copy the message history for splitted sub-messages 289 answer.getProperties().remove(Exchange.MESSAGE_HISTORY); 290 return answer; 291 } 292}