001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.ArrayList; 020 import java.util.Collection; 021 import java.util.Iterator; 022 import java.util.LinkedList; 023 import java.util.List; 024 import java.util.Queue; 025 import java.util.concurrent.ConcurrentLinkedQueue; 026 import java.util.concurrent.TimeUnit; 027 import java.util.concurrent.locks.Condition; 028 import java.util.concurrent.locks.Lock; 029 import java.util.concurrent.locks.ReentrantLock; 030 031 import org.apache.camel.CamelContext; 032 import org.apache.camel.CamelExchangeException; 033 import org.apache.camel.Exchange; 034 import org.apache.camel.Expression; 035 import org.apache.camel.Navigate; 036 import org.apache.camel.Predicate; 037 import org.apache.camel.Processor; 038 import org.apache.camel.impl.LoggingExceptionHandler; 039 import org.apache.camel.spi.ExceptionHandler; 040 import org.apache.camel.support.ServiceSupport; 041 import org.apache.camel.util.ObjectHelper; 042 import org.apache.camel.util.ServiceHelper; 043 import org.slf4j.Logger; 044 import org.slf4j.LoggerFactory; 045 046 /** 047 * A base class for any kind of {@link Processor} which implements some kind of batch processing. 048 * 049 * @version 050 * @deprecated may be removed in the future when we overhaul the resequencer EIP 051 */ 052 @Deprecated 053 public class BatchProcessor extends ServiceSupport implements Processor, Navigate<Processor> { 054 055 public static final long DEFAULT_BATCH_TIMEOUT = 1000L; 056 public static final int DEFAULT_BATCH_SIZE = 100; 057 058 private static final Logger LOG = LoggerFactory.getLogger(BatchProcessor.class); 059 060 private long batchTimeout = DEFAULT_BATCH_TIMEOUT; 061 private int batchSize = DEFAULT_BATCH_SIZE; 062 private int outBatchSize; 063 private boolean groupExchanges; 064 private boolean batchConsumer; 065 private boolean ignoreInvalidExchanges; 066 private Predicate completionPredicate; 067 private Expression expression; 068 069 private final CamelContext camelContext; 070 private final Processor processor; 071 private final Collection<Exchange> collection; 072 private ExceptionHandler exceptionHandler; 073 074 private final BatchSender sender; 075 076 public BatchProcessor(CamelContext camelContext, Processor processor, Collection<Exchange> collection, Expression expression) { 077 ObjectHelper.notNull(camelContext, "camelContext"); 078 ObjectHelper.notNull(processor, "processor"); 079 ObjectHelper.notNull(collection, "collection"); 080 ObjectHelper.notNull(expression, "expression"); 081 082 // wrap processor in UnitOfWork so what we send out of the batch runs in a UoW 083 this.camelContext = camelContext; 084 this.processor = new UnitOfWorkProcessor(processor); 085 this.collection = collection; 086 this.expression = expression; 087 this.sender = new BatchSender(); 088 } 089 090 @Override 091 public String toString() { 092 return "BatchProcessor[to: " + processor + "]"; 093 } 094 095 // Properties 096 // ------------------------------------------------------------------------- 097 public ExceptionHandler getExceptionHandler() { 098 if (exceptionHandler == null) { 099 exceptionHandler = new LoggingExceptionHandler(getClass()); 100 } 101 return exceptionHandler; 102 } 103 104 public void setExceptionHandler(ExceptionHandler exceptionHandler) { 105 this.exceptionHandler = exceptionHandler; 106 } 107 108 public int getBatchSize() { 109 return batchSize; 110 } 111 112 /** 113 * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor will 114 * process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}. 115 * 116 * @param batchSize the size 117 */ 118 public void setBatchSize(int batchSize) { 119 // setting batch size to 0 or negative is like disabling it, so we set it as the max value 120 // as the code logic is dependent on a batch size having 1..n value 121 if (batchSize <= 0) { 122 LOG.debug("Disabling batch size, will only be triggered by timeout"); 123 this.batchSize = Integer.MAX_VALUE; 124 } else { 125 this.batchSize = batchSize; 126 } 127 } 128 129 public int getOutBatchSize() { 130 return outBatchSize; 131 } 132 133 /** 134 * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then the 135 * completion is triggered. Can for instance be used to ensure that this batch is completed when a certain 136 * number of exchanges has been collected. By default this feature is <b>not</b> enabled. 137 * 138 * @param outBatchSize the size 139 */ 140 public void setOutBatchSize(int outBatchSize) { 141 this.outBatchSize = outBatchSize; 142 } 143 144 public long getBatchTimeout() { 145 return batchTimeout; 146 } 147 148 public void setBatchTimeout(long batchTimeout) { 149 this.batchTimeout = batchTimeout; 150 } 151 152 public boolean isGroupExchanges() { 153 return groupExchanges; 154 } 155 156 public void setGroupExchanges(boolean groupExchanges) { 157 this.groupExchanges = groupExchanges; 158 } 159 160 public boolean isBatchConsumer() { 161 return batchConsumer; 162 } 163 164 public void setBatchConsumer(boolean batchConsumer) { 165 this.batchConsumer = batchConsumer; 166 } 167 168 public boolean isIgnoreInvalidExchanges() { 169 return ignoreInvalidExchanges; 170 } 171 172 public void setIgnoreInvalidExchanges(boolean ignoreInvalidExchanges) { 173 this.ignoreInvalidExchanges = ignoreInvalidExchanges; 174 } 175 176 public Predicate getCompletionPredicate() { 177 return completionPredicate; 178 } 179 180 public void setCompletionPredicate(Predicate completionPredicate) { 181 this.completionPredicate = completionPredicate; 182 } 183 184 public Processor getProcessor() { 185 return processor; 186 } 187 188 public List<Processor> next() { 189 if (!hasNext()) { 190 return null; 191 } 192 List<Processor> answer = new ArrayList<Processor>(1); 193 answer.add(processor); 194 return answer; 195 } 196 197 public boolean hasNext() { 198 return processor != null; 199 } 200 201 /** 202 * A strategy method to decide if the "in" batch is completed. That is, whether the resulting exchanges in 203 * the in queue should be drained to the "out" collection. 204 */ 205 private boolean isInBatchCompleted(int num) { 206 return num >= batchSize; 207 } 208 209 /** 210 * A strategy method to decide if the "out" batch is completed. That is, whether the resulting exchange in 211 * the out collection should be sent. 212 */ 213 private boolean isOutBatchCompleted() { 214 if (outBatchSize == 0) { 215 // out batch is disabled, so go ahead and send. 216 return true; 217 } 218 return collection.size() > 0 && collection.size() >= outBatchSize; 219 } 220 221 /** 222 * Strategy Method to process an exchange in the batch. This method allows derived classes to perform 223 * custom processing before or after an individual exchange is processed 224 */ 225 protected void processExchange(Exchange exchange) throws Exception { 226 processor.process(exchange); 227 if (exchange.getException() != null) { 228 getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, exchange.getException()); 229 } 230 } 231 232 protected void doStart() throws Exception { 233 ServiceHelper.startServices(processor); 234 sender.start(); 235 } 236 237 protected void doStop() throws Exception { 238 sender.cancel(); 239 ServiceHelper.stopServices(sender); 240 ServiceHelper.stopServices(processor); 241 collection.clear(); 242 } 243 244 /** 245 * Enqueues an exchange for later batch processing. 246 */ 247 public void process(Exchange exchange) throws Exception { 248 249 // if batch consumer is enabled then we need to adjust the batch size 250 // with the size from the batch consumer 251 if (isBatchConsumer()) { 252 int size = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class); 253 if (batchSize != size) { 254 batchSize = size; 255 LOG.trace("Using batch consumer completion, so setting batch size to: {}", batchSize); 256 } 257 } 258 259 // validate that the exchange can be used 260 if (!isValid(exchange)) { 261 if (isIgnoreInvalidExchanges()) { 262 LOG.debug("Invalid Exchange. This Exchange will be ignored: {}", exchange); 263 return; 264 } else { 265 throw new CamelExchangeException("Exchange is not valid to be used by the BatchProcessor", exchange); 266 } 267 } 268 269 // exchange is valid so enqueue the exchange 270 sender.enqueueExchange(exchange); 271 } 272 273 /** 274 * Is the given exchange valid to be used. 275 * 276 * @param exchange the given exchange 277 * @return <tt>true</tt> if valid, <tt>false</tt> otherwise 278 */ 279 private boolean isValid(Exchange exchange) { 280 Object result = null; 281 try { 282 result = expression.evaluate(exchange, Object.class); 283 } catch (Exception e) { 284 // ignore 285 } 286 return result != null; 287 } 288 289 /** 290 * Sender thread for queued-up exchanges. 291 */ 292 private class BatchSender extends Thread { 293 294 private Queue<Exchange> queue; 295 private Lock queueLock = new ReentrantLock(); 296 private boolean exchangeEnqueued; 297 private final Queue<String> completionPredicateMatched = new ConcurrentLinkedQueue<String>(); 298 private Condition exchangeEnqueuedCondition = queueLock.newCondition(); 299 300 public BatchSender() { 301 super(camelContext.getExecutorServiceManager().resolveThreadName("Batch Sender")); 302 this.queue = new LinkedList<Exchange>(); 303 } 304 305 @Override 306 public void run() { 307 // Wait until one of either: 308 // * an exchange being queued; 309 // * the batch timeout expiring; or 310 // * the thread being cancelled. 311 // 312 // If an exchange is queued then we need to determine whether the 313 // batch is complete. If it is complete then we send out the batched 314 // exchanges. Otherwise we move back into our wait state. 315 // 316 // If the batch times out then we send out the batched exchanges 317 // collected so far. 318 // 319 // If we receive an interrupt then all blocking operations are 320 // interrupted and our thread terminates. 321 // 322 // The goal of the following algorithm in terms of synchronisation 323 // is to provide fine grained locking i.e. retaining the lock only 324 // when required. Special consideration is given to releasing the 325 // lock when calling an overloaded method i.e. sendExchanges. 326 // Unlocking is important as the process of sending out the exchanges 327 // would otherwise block new exchanges from being queued. 328 329 queueLock.lock(); 330 try { 331 do { 332 try { 333 if (!exchangeEnqueued) { 334 LOG.trace("Waiting for new exchange to arrive or batchTimeout to occur after {} ms.", batchTimeout); 335 exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS); 336 } 337 338 // if the completion predicate was triggered then there is an exchange id which denotes when to complete 339 String id = null; 340 if (!completionPredicateMatched.isEmpty()) { 341 id = completionPredicateMatched.poll(); 342 } 343 344 if (id != null || !exchangeEnqueued) { 345 if (id != null) { 346 LOG.trace("Collecting exchanges to be aggregated triggered by completion predicate"); 347 } else { 348 LOG.trace("Collecting exchanges to be aggregated triggered by batch timeout"); 349 } 350 drainQueueTo(collection, batchSize, id); 351 } else { 352 exchangeEnqueued = false; 353 boolean drained = false; 354 while (isInBatchCompleted(queue.size())) { 355 drained = true; 356 drainQueueTo(collection, batchSize, id); 357 } 358 if (drained) { 359 LOG.trace("Collecting exchanges to be aggregated triggered by new exchanges received"); 360 } 361 362 if (!isOutBatchCompleted()) { 363 continue; 364 } 365 } 366 367 queueLock.unlock(); 368 try { 369 try { 370 sendExchanges(); 371 } catch (Throwable t) { 372 // a fail safe to handle all exceptions being thrown 373 getExceptionHandler().handleException(t); 374 } 375 } finally { 376 queueLock.lock(); 377 } 378 379 } catch (InterruptedException e) { 380 break; 381 } 382 383 } while (isRunAllowed()); 384 385 } finally { 386 queueLock.unlock(); 387 } 388 } 389 390 /** 391 * This method should be called with queueLock held 392 */ 393 private void drainQueueTo(Collection<Exchange> collection, int batchSize, String exchangeId) { 394 for (int i = 0; i < batchSize; ++i) { 395 Exchange e = queue.poll(); 396 if (e != null) { 397 try { 398 collection.add(e); 399 } catch (Exception t) { 400 e.setException(t); 401 } catch (Throwable t) { 402 getExceptionHandler().handleException(t); 403 } 404 if (exchangeId != null && exchangeId.equals(e.getExchangeId())) { 405 // this batch is complete so stop draining 406 break; 407 } 408 } else { 409 break; 410 } 411 } 412 } 413 414 public void cancel() { 415 interrupt(); 416 } 417 418 public void enqueueExchange(Exchange exchange) { 419 LOG.debug("Received exchange to be batched: {}", exchange); 420 queueLock.lock(); 421 try { 422 // pre test whether the completion predicate matched 423 if (completionPredicate != null) { 424 boolean matches = completionPredicate.matches(exchange); 425 if (matches) { 426 LOG.trace("Exchange matched completion predicate: {}", exchange); 427 // add this exchange to the list of exchanges which marks the batch as complete 428 completionPredicateMatched.add(exchange.getExchangeId()); 429 } 430 } 431 queue.add(exchange); 432 exchangeEnqueued = true; 433 exchangeEnqueuedCondition.signal(); 434 } finally { 435 queueLock.unlock(); 436 } 437 } 438 439 private void sendExchanges() throws Exception { 440 Iterator<Exchange> iter = collection.iterator(); 441 while (iter.hasNext()) { 442 Exchange exchange = iter.next(); 443 iter.remove(); 444 try { 445 LOG.debug("Sending aggregated exchange: {}", exchange); 446 processExchange(exchange); 447 } catch (Throwable t) { 448 // must catch throwable to avoid growing memory 449 getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, t); 450 } 451 } 452 } 453 } 454 455 }