001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.io.Closeable;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Scanner;
027import java.util.concurrent.ExecutorService;
028
029import org.apache.camel.AsyncCallback;
030import org.apache.camel.AsyncProcessor;
031import org.apache.camel.CamelContext;
032import org.apache.camel.Exchange;
033import org.apache.camel.Expression;
034import org.apache.camel.Message;
035import org.apache.camel.Processor;
036import org.apache.camel.RuntimeCamelException;
037import org.apache.camel.Traceable;
038import org.apache.camel.processor.aggregate.AggregationStrategy;
039import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
040import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
041import org.apache.camel.spi.RouteContext;
042import org.apache.camel.util.ExchangeHelper;
043import org.apache.camel.util.IOHelper;
044import org.apache.camel.util.ObjectHelper;
045
046import static org.apache.camel.util.ObjectHelper.notNull;
047
048/**
049 * Implements a dynamic <a
050 * href="http://camel.apache.org/splitter.html">Splitter</a> pattern
051 * where an expression is evaluated to iterate through each of the parts of a
052 * message and then each part is then send to some endpoint.
053 *
054 * @version 
055 */
056public class Splitter extends MulticastProcessor implements AsyncProcessor, Traceable {
057
058    private final Expression expression;
059
060    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
061        this(camelContext, expression, destination, aggregationStrategy, false, null, false, false, false, 0, null, false);
062    }
063
064    @Deprecated
065    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
066                    boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
067                    boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork) {
068        this(camelContext, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
069                streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, false);
070    }
071
072    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
073                    ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare,
074                    boolean useSubUnitOfWork, boolean parallelAggregate) {
075        this(camelContext, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout,
076             onPrepare, useSubUnitOfWork, false, false);
077    }
078
079    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy, boolean parallelProcessing,
080                    ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare,
081                    boolean useSubUnitOfWork, boolean parallelAggregate, boolean stopOnAggregateException) {
082        super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException,
083              timeout, onPrepare, useSubUnitOfWork, parallelAggregate, stopOnAggregateException);
084        this.expression = expression;
085        notNull(expression, "expression");
086        notNull(destination, "destination");
087    }
088
089    @Override
090    public String toString() {
091        return "Splitter[on: " + expression + " to: " + getProcessors().iterator().next() + " aggregate: " + getAggregationStrategy() + "]";
092    }
093
094    @Override
095    public String getTraceLabel() {
096        return "split[" + expression + "]";
097    }
098
099    @Override
100    public boolean process(Exchange exchange, final AsyncCallback callback) {
101        final AggregationStrategy strategy = getAggregationStrategy();
102
103        // set original exchange if not already pre-configured
104        if (strategy instanceof UseOriginalAggregationStrategy) {
105            UseOriginalAggregationStrategy original = (UseOriginalAggregationStrategy) strategy;
106            if (original.getOriginal() == null) {
107                original.setOriginal(exchange);
108            }
109        }
110
111        // if no custom aggregation strategy is being used then fallback to keep the original
112        // and propagate exceptions which is done by a per exchange specific aggregation strategy
113        // to ensure it supports async routing
114        if (strategy == null) {
115            AggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true);
116            if (isShareUnitOfWork()) {
117                original = new ShareUnitOfWorkAggregationStrategy(original);
118            }
119            setAggregationStrategyOnExchange(exchange, original);
120        }
121
122        return super.process(exchange, callback);
123    }
124
125    @Override
126    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
127        Object value = expression.evaluate(exchange, Object.class);
128        if (exchange.getException() != null) {
129            // force any exceptions occurred during evaluation to be thrown
130            throw exchange.getException();
131        }
132
133        Iterable<ProcessorExchangePair> answer;
134        if (isStreaming()) {
135            answer = createProcessorExchangePairsIterable(exchange, value);
136        } else {
137            answer = createProcessorExchangePairsList(exchange, value);
138        }
139        if (exchange.getException() != null) {
140            // force any exceptions occurred during creation of exchange paris to be thrown
141            // before returning the answer;
142            throw exchange.getException();
143        }
144
145        return answer;
146    }
147
148    private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, final Object value) {
149        return new SplitterIterable(exchange, value);
150    }
151
152    private final class SplitterIterable implements Iterable<ProcessorExchangePair>, Closeable {
153
154        // create a copy which we use as master to copy during splitting
155        // this avoids any side effect reflected upon the incoming exchange
156        final Object value;
157        final Iterator<?> iterator;
158        private final Exchange copy;
159        private final RouteContext routeContext;
160        private final Exchange original;
161
162        private SplitterIterable(Exchange exchange, Object value) {
163            this.original = exchange;
164            this.value = value;
165            this.iterator = ObjectHelper.createIterator(value);
166            this.copy = copyExchangeNoAttachments(exchange, true);
167            this.routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
168        }
169
170        @Override
171        public Iterator<ProcessorExchangePair> iterator() {
172            return new Iterator<ProcessorExchangePair>() {
173                private int index;
174                private boolean closed;
175
176                public boolean hasNext() {
177                    if (closed) {
178                        return false;
179                    }
180
181                    boolean answer = iterator.hasNext();
182                    if (!answer) {
183                        // we are now closed
184                        closed = true;
185                        // nothing more so we need to close the expression value in case it needs to be
186                        try {
187                            close();
188                        } catch (IOException e) {
189                            throw new RuntimeCamelException("Scanner aborted because of an IOException!", e);
190                        }
191                    }
192                    return answer;
193                }
194
195                public ProcessorExchangePair next() {
196                    Object part = iterator.next();
197                    if (part != null) {
198                        // create a correlated copy as the new exchange to be routed in the splitter from the copy
199                        // and do not share the unit of work
200                        Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
201                        // If the splitter has an aggregation strategy
202                        // then the StreamCache created by the child routes must not be
203                        // closed by the unit of work of the child route, but by the unit of
204                        // work of the parent route or grand parent route or grand grand parent route... (in case of nesting).
205                        // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set.
206                        if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
207                            newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork());
208                        }
209                        // if we share unit of work, we need to prepare the child exchange
210                        if (isShareUnitOfWork()) {
211                            prepareSharedUnitOfWork(newExchange, copy);
212                        }
213                        if (part instanceof Message) {
214                            newExchange.setIn((Message) part);
215                        } else {
216                            Message in = newExchange.getIn();
217                            in.setBody(part);
218                        }
219                        return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
220                    } else {
221                        return null;
222                    }
223                }
224
225                public void remove() {
226                    throw new UnsupportedOperationException("Remove is not supported by this iterator");
227                }
228            };
229        }
230
231        @Override
232        public void close() throws IOException {
233            if (value instanceof Scanner) {
234                // special for Scanner which implement the Closeable since JDK7 
235                Scanner scanner = (Scanner) value;
236                scanner.close();
237                IOException ioException = scanner.ioException();
238                if (ioException != null) {
239                    throw ioException;
240                }
241            } else if (value instanceof Closeable) {
242                // we should throw out the exception here   
243                IOHelper.closeWithException((Closeable) value);
244            }
245        }
246       
247    }
248
249    private Iterable<ProcessorExchangePair> createProcessorExchangePairsList(Exchange exchange, Object value) {
250        List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>();
251
252        // reuse iterable and add it to the result list
253        Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairsIterable(exchange, value);
254        try {
255            for (ProcessorExchangePair pair : pairs) {
256                if (pair != null) {
257                    result.add(pair);
258                }
259            }
260        } finally {
261            if (pairs instanceof Closeable) {
262                IOHelper.close((Closeable) pairs, "Splitter:ProcessorExchangePairs");
263            }
264        }
265
266        return result;
267    }
268
269    @Override
270    protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
271                                     Iterator<ProcessorExchangePair> it) {
272        // do not share unit of work
273        exchange.setUnitOfWork(null);
274
275        exchange.setProperty(Exchange.SPLIT_INDEX, index);
276        if (allPairs instanceof Collection) {
277            // non streaming mode, so we know the total size already
278            exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>) allPairs).size());
279        }
280        if (it.hasNext()) {
281            exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
282        } else {
283            exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE);
284            // streaming mode, so set total size when we are complete based on the index
285            exchange.setProperty(Exchange.SPLIT_SIZE, index + 1);
286        }
287    }
288
289    @Override
290    protected Integer getExchangeIndex(Exchange exchange) {
291        return exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
292    }
293
294    public Expression getExpression() {
295        return expression;
296    }
297    
298    private static Exchange copyExchangeNoAttachments(Exchange exchange, boolean preserveExchangeId) {
299        Exchange answer = ExchangeHelper.createCopy(exchange, preserveExchangeId);
300        // we do not want attachments for the splitted sub-messages
301        answer.getIn().setAttachmentObjects(null);
302        // we do not want to copy the message history for splitted sub-messages
303        answer.getProperties().remove(Exchange.MESSAGE_HISTORY);
304        return answer;
305    }
306}