001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor; 018 019import java.util.concurrent.atomic.AtomicBoolean; 020import java.util.concurrent.atomic.AtomicInteger; 021 022import org.apache.camel.AsyncCallback; 023import org.apache.camel.Exchange; 024import org.apache.camel.Expression; 025import org.apache.camel.Predicate; 026import org.apache.camel.Processor; 027import org.apache.camel.Traceable; 028import org.apache.camel.spi.IdAware; 029import org.apache.camel.util.ExchangeHelper; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033import static org.apache.camel.processor.PipelineHelper.continueProcessing; 034 035/** 036 * The processor which sends messages in a loop. 037 */ 038public class LoopProcessor extends DelegateAsyncProcessor implements Traceable, IdAware { 039 private static final Logger LOG = LoggerFactory.getLogger(LoopProcessor.class); 040 041 private String id; 042 private final Expression expression; 043 private final Predicate predicate; 044 private final boolean copy; 045 046 public LoopProcessor(Processor processor, Expression expression, Predicate predicate, boolean copy) { 047 super(processor); 048 this.expression = expression; 049 this.predicate = predicate; 050 this.copy = copy; 051 } 052 053 @Override 054 public boolean process(Exchange exchange, AsyncCallback callback) { 055 // use atomic integer to be able to pass reference and keep track on the values 056 AtomicInteger index = new AtomicInteger(); 057 AtomicInteger count = new AtomicInteger(); 058 AtomicBoolean doWhile = new AtomicBoolean(); 059 060 try { 061 if (expression != null) { 062 // Intermediate conversion to String is needed when direct conversion to Integer is not available 063 // but evaluation result is a textual representation of a numeric value. 064 String text = expression.evaluate(exchange, String.class); 065 int num = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text); 066 count.set(num); 067 } else { 068 boolean result = predicate.matches(exchange); 069 doWhile.set(result); 070 } 071 } catch (Exception e) { 072 exchange.setException(e); 073 callback.done(true); 074 return true; 075 } 076 077 // we hold on to the original Exchange in case it's needed for copies 078 final Exchange original = exchange; 079 080 // per-iteration exchange 081 Exchange target = exchange; 082 083 // set the size before we start 084 if (predicate == null) { 085 exchange.setProperty(Exchange.LOOP_SIZE, count); 086 } 087 088 // loop synchronously 089 while ((predicate != null && doWhile.get()) || (index.get() < count.get())) { 090 091 // and prepare for next iteration 092 // if (!copy) target = exchange; else copy of original 093 target = prepareExchange(exchange, index.get(), original); 094 boolean sync = process(target, callback, index, count, doWhile, original); 095 096 if (!sync) { 097 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId()); 098 // the remainder of the routing slip will be completed async 099 // so we break out now, then the callback will be invoked which then continue routing from where we left here 100 return false; 101 } 102 103 LOG.trace("Processing exchangeId: {} is continued being processed synchronously", target.getExchangeId()); 104 105 // check for error if so we should break out 106 if (!continueProcessing(target, "so breaking out of loop", LOG)) { 107 break; 108 } 109 110 // increment counter before next loop 111 index.getAndIncrement(); 112 113 // evaluate predicate 114 if (predicate != null) { 115 try { 116 boolean result = predicate.matches(exchange); 117 doWhile.set(result); 118 } catch (Exception e) { 119 // break out looping due that exception 120 exchange.setException(e); 121 doWhile.set(false); 122 } 123 } 124 } 125 126 // we are done so prepare the result 127 ExchangeHelper.copyResults(exchange, target); 128 LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 129 callback.done(true); 130 return true; 131 } 132 133 protected boolean process(final Exchange exchange, final AsyncCallback callback, 134 final AtomicInteger index, final AtomicInteger count, final AtomicBoolean doWhile, 135 final Exchange original) { 136 137 // set current index as property 138 LOG.debug("LoopProcessor: iteration #{}", index.get()); 139 exchange.setProperty(Exchange.LOOP_INDEX, index.get()); 140 141 boolean sync = processor.process(exchange, new AsyncCallback() { 142 public void done(boolean doneSync) { 143 // we only have to handle async completion of the routing slip 144 if (doneSync) { 145 return; 146 } 147 148 Exchange target = exchange; 149 150 // increment index as we have just processed once 151 index.getAndIncrement(); 152 153 // continue looping asynchronously 154 while ((predicate != null && doWhile.get()) || (index.get() < count.get())) { 155 156 // and prepare for next iteration 157 target = prepareExchange(exchange, index.get(), original); 158 159 // process again 160 boolean sync = process(target, callback, index, count, doWhile, original); 161 if (!sync) { 162 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId()); 163 // the remainder of the routing slip will be completed async 164 // so we break out now, then the callback will be invoked which then continue routing from where we left here 165 return; 166 } 167 168 // check for error if so we should break out 169 if (!continueProcessing(target, "so breaking out of loop", LOG)) { 170 break; 171 } 172 173 // increment counter before next loop 174 index.getAndIncrement(); 175 176 // evaluate predicate 177 if (predicate != null) { 178 try { 179 boolean result = predicate.matches(exchange); 180 doWhile.set(result); 181 } catch (Exception e) { 182 // break out looping due that exception 183 exchange.setException(e); 184 doWhile.set(false); 185 } 186 } 187 } 188 189 // we are done so prepare the result 190 ExchangeHelper.copyResults(exchange, target); 191 LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 192 callback.done(false); 193 } 194 }); 195 196 return sync; 197 } 198 199 /** 200 * Prepares the exchange for the next iteration 201 * 202 * @param exchange the exchange 203 * @param index the index of the next iteration 204 * @return the exchange to use 205 */ 206 protected Exchange prepareExchange(Exchange exchange, int index, Exchange original) { 207 if (copy) { 208 // use a copy but let it reuse the same exchange id so it appear as one exchange 209 // use the original exchange rather than the looping exchange (esp. with the async routing engine) 210 return ExchangeHelper.createCopy(original, true); 211 } else { 212 ExchangeHelper.prepareOutToIn(exchange); 213 return exchange; 214 } 215 } 216 217 public Expression getExpression() { 218 return expression; 219 } 220 221 public Predicate getPredicate() { 222 return predicate; 223 } 224 225 public boolean isCopy() { 226 return copy; 227 } 228 229 public String getTraceLabel() { 230 if (predicate != null) { 231 return "loopWhile[" + predicate + "]"; 232 } else { 233 return "loop[" + expression + "]"; 234 } 235 } 236 237 public String getId() { 238 return id; 239 } 240 241 public void setId(String id) { 242 this.id = id; 243 } 244 245 @Override 246 public String toString() { 247 if (predicate != null) { 248 return "Loop[while: " + predicate + " do: " + getProcessor() + "]"; 249 } else { 250 return "Loop[for: " + expression + " times do: " + getProcessor() + "]"; 251 } 252 } 253}