001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.io.Closeable;
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Scanner;
027import java.util.concurrent.ExecutorService;
028
029import org.apache.camel.AsyncCallback;
030import org.apache.camel.AsyncProcessor;
031import org.apache.camel.CamelContext;
032import org.apache.camel.Exchange;
033import org.apache.camel.Expression;
034import org.apache.camel.Message;
035import org.apache.camel.Processor;
036import org.apache.camel.RuntimeCamelException;
037import org.apache.camel.Traceable;
038import org.apache.camel.processor.aggregate.AggregationStrategy;
039import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
040import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
041import org.apache.camel.spi.RouteContext;
042import org.apache.camel.util.ExchangeHelper;
043import org.apache.camel.util.IOHelper;
044import org.apache.camel.util.ObjectHelper;
045
046import static org.apache.camel.util.ObjectHelper.notNull;
047
048/**
049 * Implements a dynamic <a
050 * href="http://camel.apache.org/splitter.html">Splitter</a> pattern
051 * where an expression is evaluated to iterate through each of the parts of a
052 * message and then each part is then send to some endpoint.
053 *
054 * @version 
055 */
056public class Splitter extends MulticastProcessor implements AsyncProcessor, Traceable {
057
058    private final Expression expression;
059
060    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy) {
061        this(camelContext, expression, destination, aggregationStrategy, false, null, false, false, false, 0, null, false);
062    }
063
064    @Deprecated
065    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
066                    boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
067                    boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork) {
068        this(camelContext, expression, destination, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService,
069                streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, false);
070    }
071
072    public Splitter(CamelContext camelContext, Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
073                    boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService,
074                    boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean useSubUnitOfWork,
075                    boolean parallelAggregate) {
076        super(camelContext, Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService,
077                shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, useSubUnitOfWork, parallelAggregate);
078        this.expression = expression;
079        notNull(expression, "expression");
080        notNull(destination, "destination");
081    }
082
083    @Override
084    public String toString() {
085        return "Splitter[on: " + expression + " to: " + getProcessors().iterator().next() + " aggregate: " + getAggregationStrategy() + "]";
086    }
087
088    @Override
089    public String getTraceLabel() {
090        return "split[" + expression + "]";
091    }
092
093    @Override
094    public boolean process(Exchange exchange, final AsyncCallback callback) {
095        final AggregationStrategy strategy = getAggregationStrategy();
096
097        // if no custom aggregation strategy is being used then fallback to keep the original
098        // and propagate exceptions which is done by a per exchange specific aggregation strategy
099        // to ensure it supports async routing
100        if (strategy == null) {
101            AggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true);
102            if (isShareUnitOfWork()) {
103                original = new ShareUnitOfWorkAggregationStrategy(original);
104            }
105            setAggregationStrategyOnExchange(exchange, original);
106        }
107
108        return super.process(exchange, callback);
109    }
110
111    @Override
112    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
113        Object value = expression.evaluate(exchange, Object.class);
114        if (exchange.getException() != null) {
115            // force any exceptions occurred during evaluation to be thrown
116            throw exchange.getException();
117        }
118
119        Iterable<ProcessorExchangePair> answer;
120        if (isStreaming()) {
121            answer = createProcessorExchangePairsIterable(exchange, value);
122        } else {
123            answer = createProcessorExchangePairsList(exchange, value);
124        }
125        if (exchange.getException() != null) {
126            // force any exceptions occurred during creation of exchange paris to be thrown
127            // before returning the answer;
128            throw exchange.getException();
129        }
130
131        return answer;
132    }
133
134    private Iterable<ProcessorExchangePair> createProcessorExchangePairsIterable(final Exchange exchange, final Object value) {
135        return new SplitterIterable(exchange, value);
136    }
137
138    private final class SplitterIterable implements Iterable<ProcessorExchangePair>, Closeable {
139
140        // create a copy which we use as master to copy during splitting
141        // this avoids any side effect reflected upon the incoming exchange
142        final Object value;
143        final Iterator<?> iterator;
144        private final Exchange copy;
145        private final RouteContext routeContext;
146        private final Exchange original;
147
148        private SplitterIterable(Exchange exchange, Object value) {
149            this.original = exchange;
150            this.value = value;
151            this.iterator = ObjectHelper.createIterator(value);
152            this.copy = copyExchangeNoAttachments(exchange, true);
153            this.routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
154        }
155
156        @Override
157        public Iterator<ProcessorExchangePair> iterator() {
158            return new Iterator<ProcessorExchangePair>() {
159                private int index;
160                private boolean closed;
161
162                public boolean hasNext() {
163                    if (closed) {
164                        return false;
165                    }
166
167                    boolean answer = iterator.hasNext();
168                    if (!answer) {
169                        // we are now closed
170                        closed = true;
171                        // nothing more so we need to close the expression value in case it needs to be
172                        try {
173                            close();
174                        } catch (IOException e) {
175                            throw new RuntimeCamelException("Scanner aborted because of an IOException!", e);
176                        }
177                    }
178                    return answer;
179                }
180
181                public ProcessorExchangePair next() {
182                    Object part = iterator.next();
183                    if (part != null) {
184                        // create a correlated copy as the new exchange to be routed in the splitter from the copy
185                        // and do not share the unit of work
186                        Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false);
187                        // If the splitter has an aggregation strategy
188                        // then the StreamCache created by the child routes must not be
189                        // closed by the unit of work of the child route, but by the unit of
190                        // work of the parent route or grand parent route or grand grand parent route... (in case of nesting).
191                        // Therefore, set the unit of work of the parent route as stream cache unit of work, if not already set.
192                        if (newExchange.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) {
193                            newExchange.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, original.getUnitOfWork());
194                        }
195                        // if we share unit of work, we need to prepare the child exchange
196                        if (isShareUnitOfWork()) {
197                            prepareSharedUnitOfWork(newExchange, copy);
198                        }
199                        if (part instanceof Message) {
200                            newExchange.setIn((Message) part);
201                        } else {
202                            Message in = newExchange.getIn();
203                            in.setBody(part);
204                        }
205                        return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext);
206                    } else {
207                        return null;
208                    }
209                }
210
211                public void remove() {
212                    throw new UnsupportedOperationException("Remove is not supported by this iterator");
213                }
214            };
215        }
216
217        @Override
218        public void close() throws IOException {
219            if (value instanceof Scanner) {
220                // special for Scanner which implement the Closeable since JDK7 
221                Scanner scanner = (Scanner) value;
222                scanner.close();
223                IOException ioException = scanner.ioException();
224                if (ioException != null) {
225                    throw ioException;
226                }
227            } else if (value instanceof Closeable) {
228                // we should throw out the exception here   
229                IOHelper.closeWithException((Closeable) value);
230            }
231        }
232       
233    }
234
235    private Iterable<ProcessorExchangePair> createProcessorExchangePairsList(Exchange exchange, Object value) {
236        List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>();
237
238        // reuse iterable and add it to the result list
239        Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairsIterable(exchange, value);
240        try {
241            for (ProcessorExchangePair pair : pairs) {
242                if (pair != null) {
243                    result.add(pair);
244                }
245            }
246        } finally {
247            if (pairs instanceof Closeable) {
248                IOHelper.close((Closeable) pairs, "Splitter:ProcessorExchangePairs");
249            }
250        }
251
252        return result;
253    }
254
255    @Override
256    protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs,
257                                     Iterator<ProcessorExchangePair> it) {
258        // do not share unit of work
259        exchange.setUnitOfWork(null);
260
261        exchange.setProperty(Exchange.SPLIT_INDEX, index);
262        if (allPairs instanceof Collection) {
263            // non streaming mode, so we know the total size already
264            exchange.setProperty(Exchange.SPLIT_SIZE, ((Collection<?>) allPairs).size());
265        }
266        if (it.hasNext()) {
267            exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE);
268        } else {
269            exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE);
270            // streaming mode, so set total size when we are complete based on the index
271            exchange.setProperty(Exchange.SPLIT_SIZE, index + 1);
272        }
273    }
274
275    @Override
276    protected Integer getExchangeIndex(Exchange exchange) {
277        return exchange.getProperty(Exchange.SPLIT_INDEX, Integer.class);
278    }
279
280    public Expression getExpression() {
281        return expression;
282    }
283    
284    private static Exchange copyExchangeNoAttachments(Exchange exchange, boolean preserveExchangeId) {
285        Exchange answer = ExchangeHelper.createCopy(exchange, preserveExchangeId);
286        // we do not want attachments for the splitted sub-messages
287        answer.getIn().setAttachments(null);
288        // we do not want to copy the message history for splitted sub-messages
289        answer.getProperties().remove(Exchange.MESSAGE_HISTORY);
290        return answer;
291    }
292}