001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.concurrent.atomic.AtomicInteger;
020    
021    import org.apache.camel.AsyncCallback;
022    import org.apache.camel.Exchange;
023    import org.apache.camel.Expression;
024    import org.apache.camel.NoTypeConversionAvailableException;
025    import org.apache.camel.Processor;
026    import org.apache.camel.Traceable;
027    import org.apache.camel.util.ExchangeHelper;
028    import org.slf4j.Logger;
029    import org.slf4j.LoggerFactory;
030    
031    /**
032     * The processor which sends messages in a loop.
033     */
034    public class LoopProcessor extends DelegateAsyncProcessor implements Traceable {
035        private static final Logger LOG = LoggerFactory.getLogger(LoopProcessor.class);
036    
037        private final Expression expression;
038        private final boolean copy;
039    
040        public LoopProcessor(Processor processor, Expression expression, boolean copy) {
041            super(processor);
042            this.expression = expression;
043            this.copy = copy;
044        }
045    
046        @Override
047        public boolean process(Exchange exchange, AsyncCallback callback) {
048            // use atomic integer to be able to pass reference and keep track on the values
049            AtomicInteger index = new AtomicInteger();
050            AtomicInteger count = new AtomicInteger();
051    
052            // Intermediate conversion to String is needed when direct conversion to Integer is not available
053            // but evaluation result is a textual representation of a numeric value.
054            String text = expression.evaluate(exchange, String.class);
055            try {
056                int num = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
057                count.set(num);
058            } catch (NoTypeConversionAvailableException e) {
059                exchange.setException(e);
060                callback.done(true);
061                return true;
062            }
063    
064            Exchange target = exchange;
065    
066            // set the size before we start
067            exchange.setProperty(Exchange.LOOP_SIZE, count);
068    
069            // loop synchronously
070            while (index.get() < count.get()) {
071    
072                // and prepare for next iteration
073                target = prepareExchange(exchange, index.get());
074                boolean sync = process(target, callback, index, count);
075    
076                if (!sync) {
077                    LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
078                    // the remainder of the routing slip will be completed async
079                    // so we break out now, then the callback will be invoked which then continue routing from where we left here
080                    return false;
081                }
082    
083                LOG.trace("Processing exchangeId: {} is continued being processed synchronously", target.getExchangeId());
084    
085                // increment counter before next loop
086                index.getAndIncrement();
087            }
088    
089            // we are done so prepare the result
090            ExchangeHelper.copyResults(exchange, target);
091            LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
092            callback.done(true);
093            return true;
094        }
095    
096        protected boolean process(final Exchange exchange, final AsyncCallback callback,
097                                  final AtomicInteger index, final AtomicInteger count) {
098    
099            // set current index as property
100            LOG.debug("LoopProcessor: iteration #{}", index.get());
101            exchange.setProperty(Exchange.LOOP_INDEX, index.get());
102    
103            boolean sync = processNext(exchange, new AsyncCallback() {
104                public void done(boolean doneSync) {
105                    // we only have to handle async completion of the routing slip
106                    if (doneSync) {
107                        return;
108                    }
109    
110                    Exchange target = exchange;
111    
112                    // increment index as we have just processed once
113                    index.getAndIncrement();
114    
115                    // continue looping asynchronously
116                    while (index.get() < count.get()) {
117    
118                        // and prepare for next iteration
119                        target = prepareExchange(exchange, index.get());
120    
121                        // process again
122                        boolean sync = process(target, callback, index, count);
123                        if (!sync) {
124                            LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
125                            // the remainder of the routing slip will be completed async
126                            // so we break out now, then the callback will be invoked which then continue routing from where we left here
127                            return;
128                        }
129    
130                        // increment counter before next loop
131                        index.getAndIncrement();
132                    }
133    
134                    // we are done so prepare the result
135                    ExchangeHelper.copyResults(exchange, target);
136                    LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
137                    callback.done(false);
138                }
139            });
140    
141            return sync;
142        }
143    
144        /**
145         * Prepares the exchange for the next iteration
146         *
147         * @param exchange the exchange
148         * @param index the index of the next iteration
149         * @return the exchange to use
150         */
151        protected Exchange prepareExchange(Exchange exchange, int index) {
152            if (copy) {
153                // use a copy but let it reuse the same exchange id so it appear as one exchange
154                return ExchangeHelper.createCopy(exchange, true);
155            } else {
156                ExchangeHelper.prepareOutToIn(exchange);
157                return exchange;
158            }
159        }
160    
161        public Expression getExpression() {
162            return expression;
163        }
164    
165        public String getTraceLabel() {
166            return "loop[" + expression + "]";
167        }
168    
169        @Override
170        public String toString() {
171            return "Loop[for: " + expression + " times do: " + getProcessor() + "]";
172        }
173    }