001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.atomic.AtomicInteger; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.Exchange; 023 import org.apache.camel.Expression; 024 import org.apache.camel.NoTypeConversionAvailableException; 025 import org.apache.camel.Processor; 026 import org.apache.camel.Traceable; 027 import org.apache.camel.util.ExchangeHelper; 028 import org.slf4j.Logger; 029 import org.slf4j.LoggerFactory; 030 031 /** 032 * The processor which sends messages in a loop. 033 */ 034 public class LoopProcessor extends DelegateAsyncProcessor implements Traceable { 035 private static final Logger LOG = LoggerFactory.getLogger(LoopProcessor.class); 036 037 private final Expression expression; 038 private final boolean copy; 039 040 public LoopProcessor(Processor processor, Expression expression, boolean copy) { 041 super(processor); 042 this.expression = expression; 043 this.copy = copy; 044 } 045 046 @Override 047 public boolean process(Exchange exchange, AsyncCallback callback) { 048 // use atomic integer to be able to pass reference and keep track on the values 049 AtomicInteger index = new AtomicInteger(); 050 AtomicInteger count = new AtomicInteger(); 051 052 // Intermediate conversion to String is needed when direct conversion to Integer is not available 053 // but evaluation result is a textual representation of a numeric value. 054 String text = expression.evaluate(exchange, String.class); 055 try { 056 int num = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text); 057 count.set(num); 058 } catch (NoTypeConversionAvailableException e) { 059 exchange.setException(e); 060 callback.done(true); 061 return true; 062 } 063 064 Exchange target = exchange; 065 066 // set the size before we start 067 exchange.setProperty(Exchange.LOOP_SIZE, count); 068 069 // loop synchronously 070 while (index.get() < count.get()) { 071 072 // and prepare for next iteration 073 target = prepareExchange(exchange, index.get()); 074 boolean sync = process(target, callback, index, count); 075 076 if (!sync) { 077 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId()); 078 // the remainder of the routing slip will be completed async 079 // so we break out now, then the callback will be invoked which then continue routing from where we left here 080 return false; 081 } 082 083 LOG.trace("Processing exchangeId: {} is continued being processed synchronously", target.getExchangeId()); 084 085 // increment counter before next loop 086 index.getAndIncrement(); 087 } 088 089 // we are done so prepare the result 090 ExchangeHelper.copyResults(exchange, target); 091 LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 092 callback.done(true); 093 return true; 094 } 095 096 protected boolean process(final Exchange exchange, final AsyncCallback callback, 097 final AtomicInteger index, final AtomicInteger count) { 098 099 // set current index as property 100 LOG.debug("LoopProcessor: iteration #{}", index.get()); 101 exchange.setProperty(Exchange.LOOP_INDEX, index.get()); 102 103 boolean sync = processNext(exchange, new AsyncCallback() { 104 public void done(boolean doneSync) { 105 // we only have to handle async completion of the routing slip 106 if (doneSync) { 107 return; 108 } 109 110 Exchange target = exchange; 111 112 // increment index as we have just processed once 113 index.getAndIncrement(); 114 115 // continue looping asynchronously 116 while (index.get() < count.get()) { 117 118 // and prepare for next iteration 119 target = prepareExchange(exchange, index.get()); 120 121 // process again 122 boolean sync = process(target, callback, index, count); 123 if (!sync) { 124 LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId()); 125 // the remainder of the routing slip will be completed async 126 // so we break out now, then the callback will be invoked which then continue routing from where we left here 127 return; 128 } 129 130 // increment counter before next loop 131 index.getAndIncrement(); 132 } 133 134 // we are done so prepare the result 135 ExchangeHelper.copyResults(exchange, target); 136 LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); 137 callback.done(false); 138 } 139 }); 140 141 return sync; 142 } 143 144 /** 145 * Prepares the exchange for the next iteration 146 * 147 * @param exchange the exchange 148 * @param index the index of the next iteration 149 * @return the exchange to use 150 */ 151 protected Exchange prepareExchange(Exchange exchange, int index) { 152 if (copy) { 153 // use a copy but let it reuse the same exchange id so it appear as one exchange 154 return ExchangeHelper.createCopy(exchange, true); 155 } else { 156 ExchangeHelper.prepareOutToIn(exchange); 157 return exchange; 158 } 159 } 160 161 public Expression getExpression() { 162 return expression; 163 } 164 165 public String getTraceLabel() { 166 return "loop[" + expression + "]"; 167 } 168 169 @Override 170 public String toString() { 171 return "Loop[for: " + expression + " times do: " + getProcessor() + "]"; 172 } 173 }