001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor;
018
019import java.util.concurrent.atomic.AtomicBoolean;
020import java.util.concurrent.atomic.AtomicInteger;
021
022import org.apache.camel.AsyncCallback;
023import org.apache.camel.Exchange;
024import org.apache.camel.Expression;
025import org.apache.camel.Predicate;
026import org.apache.camel.Processor;
027import org.apache.camel.Traceable;
028import org.apache.camel.spi.IdAware;
029import org.apache.camel.util.ExchangeHelper;
030import org.slf4j.Logger;
031import org.slf4j.LoggerFactory;
032
033import static org.apache.camel.processor.PipelineHelper.continueProcessing;
034
035/**
036 * The processor which sends messages in a loop.
037 */
038public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware {
039    private static final Logger LOG = LoggerFactory.getLogger(LoopProcessor.class);
040
041    private String id;
042    private final Expression expression;
043    private final Predicate predicate;
044    private final boolean copy;
045
046    public LoopProcessor(Processor processor, Expression expression, Predicate predicate, boolean copy) {
047        super(processor);
048        this.expression = expression;
049        this.predicate = predicate;
050        this.copy = copy;
051    }
052
053    @Override
054    public boolean process(Exchange exchange, AsyncCallback callback) {
055        // use atomic integer to be able to pass reference and keep track on the values
056        AtomicInteger index = new AtomicInteger();
057        AtomicInteger count = new AtomicInteger();
058        AtomicBoolean doWhile = new AtomicBoolean();
059
060        try {
061            if (expression != null) {
062                // Intermediate conversion to String is needed when direct conversion to Integer is not available
063                // but evaluation result is a textual representation of a numeric value.
064                String text = expression.evaluate(exchange, String.class);
065                int num = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
066                count.set(num);
067            } else {
068                boolean result = predicate.matches(exchange);
069                doWhile.set(result);
070            }
071        } catch (Exception e) {
072            exchange.setException(e);
073            callback.done(true);
074            return true;
075        }
076
077        // we hold on to the original Exchange in case it's needed for copies
078        final Exchange original = exchange;
079        
080        // per-iteration exchange
081        Exchange target = exchange;
082
083        // set the size before we start
084        if (predicate == null) {
085            exchange.setProperty(Exchange.LOOP_SIZE, count);
086        }
087
088        // loop synchronously
089        while ((predicate != null && doWhile.get())  || (index.get() < count.get())) {
090
091            // and prepare for next iteration
092            // if (!copy) target = exchange; else copy of original
093            target = prepareExchange(exchange, index.get(), original);
094            boolean sync = process(target, callback, index, count, doWhile, original);
095
096            if (!sync) {
097                LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
098                // the remainder of the routing slip will be completed async
099                // so we break out now, then the callback will be invoked which then continue routing from where we left here
100                return false;
101            }
102
103            LOG.trace("Processing exchangeId: {} is continued being processed synchronously", target.getExchangeId());
104
105            // check for error if so we should break out
106            if (!continueProcessing(target, "so breaking out of loop", LOG)) {
107                break;
108            }
109
110            // increment counter before next loop
111            index.getAndIncrement();
112
113            // evaluate predicate
114            if (predicate != null) {
115                try {
116                    boolean result = predicate.matches(exchange);
117                    doWhile.set(result);
118                } catch (Exception e) {
119                    // break out looping due that exception
120                    exchange.setException(e);
121                    doWhile.set(false);
122                }
123            }
124        }
125
126        // we are done so prepare the result
127        ExchangeHelper.copyResults(exchange, target);
128        LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
129        callback.done(true);
130        return true;
131    }
132
133    protected boolean process(final Exchange exchange, final AsyncCallback callback,
134                              final AtomicInteger index, final AtomicInteger count, final AtomicBoolean doWhile,
135                              final Exchange original) {
136
137        // set current index as property
138        LOG.debug("LoopProcessor: iteration #{}", index.get());
139        exchange.setProperty(Exchange.LOOP_INDEX, index.get());
140        
141        boolean sync = processor.process(exchange, new AsyncCallback() {
142            public void done(boolean doneSync) {
143                // we only have to handle async completion of the routing slip
144                if (doneSync) {
145                    return;
146                }
147
148                Exchange target = exchange;
149
150                // increment index as we have just processed once
151                index.getAndIncrement();
152
153                // continue looping asynchronously
154                while ((predicate != null && doWhile.get())  || (index.get() < count.get())) {
155
156                    // and prepare for next iteration
157                    target = prepareExchange(exchange, index.get(), original);
158
159                    // process again
160                    boolean sync = process(target, callback, index, count, doWhile, original);
161                    if (!sync) {
162                        LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
163                        // the remainder of the routing slip will be completed async
164                        // so we break out now, then the callback will be invoked which then continue routing from where we left here
165                        return;
166                    }
167
168                    // check for error if so we should break out
169                    if (!continueProcessing(target, "so breaking out of loop", LOG)) {
170                        break;
171                    }
172
173                    // increment counter before next loop
174                    index.getAndIncrement();
175
176                    // evaluate predicate
177                    if (predicate != null) {
178                        try {
179                            boolean result = predicate.matches(exchange);
180                            doWhile.set(result);
181                        } catch (Exception e) {
182                            // break out looping due that exception
183                            exchange.setException(e);
184                            doWhile.set(false);
185                        }
186                    }
187                }
188
189                // we are done so prepare the result
190                ExchangeHelper.copyResults(exchange, target);
191                LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
192                callback.done(false);
193            }
194        });
195
196        return sync;
197    }
198
199    /**
200     * Prepares the exchange for the next iteration
201     *
202     * @param exchange the exchange
203     * @param index the index of the next iteration
204     * @return the exchange to use
205     */
206    protected Exchange prepareExchange(Exchange exchange, int index, Exchange original) {
207        if (copy) {
208            // use a copy but let it reuse the same exchange id so it appear as one exchange
209            // use the original exchange rather than the looping exchange (esp. with the async routing engine)
210            return ExchangeHelper.createCopy(original, true);
211        } else {
212            ExchangeHelper.prepareOutToIn(exchange);
213            return exchange;
214        }
215    }
216
217    public Expression getExpression() {
218        return expression;
219    }
220
221    public Predicate getPredicate() {
222        return predicate;
223    }
224
225    public boolean isCopy() {
226        return copy;
227    }
228
229    public String getTraceLabel() {
230        if (predicate != null) {
231            return "loopWhile[" + predicate + "]";
232        } else {
233            return "loop[" + expression + "]";
234        }
235    }
236
237    public String getId() {
238        return id;
239    }
240
241    public void setId(String id) {
242        this.id = id;
243    }
244
245    @Override
246    public String toString() {
247        if (predicate != null) {
248            return "Loop[while: " + predicate + " do: " + getProcessor() + "]";
249        } else {
250            return "Loop[for: " + expression + " times do: " + getProcessor() + "]";
251        }
252    }
253}