001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.Iterator;
022    import java.util.LinkedList;
023    import java.util.List;
024    import java.util.Queue;
025    import java.util.concurrent.ConcurrentLinkedQueue;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.locks.Condition;
028    import java.util.concurrent.locks.Lock;
029    import java.util.concurrent.locks.ReentrantLock;
030    
031    import org.apache.camel.CamelContext;
032    import org.apache.camel.CamelExchangeException;
033    import org.apache.camel.Exchange;
034    import org.apache.camel.Expression;
035    import org.apache.camel.Navigate;
036    import org.apache.camel.Predicate;
037    import org.apache.camel.Processor;
038    import org.apache.camel.impl.LoggingExceptionHandler;
039    import org.apache.camel.spi.ExceptionHandler;
040    import org.apache.camel.support.ServiceSupport;
041    import org.apache.camel.util.ObjectHelper;
042    import org.apache.camel.util.ServiceHelper;
043    import org.slf4j.Logger;
044    import org.slf4j.LoggerFactory;
045    
046    /**
047     * A base class for any kind of {@link Processor} which implements some kind of batch processing.
048     * 
049     * @version 
050     * @deprecated may be removed in the future when we overhaul the resequencer EIP
051     */
052    @Deprecated
053    public class BatchProcessor extends ServiceSupport implements Processor, Navigate<Processor> {
054    
055        public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
056        public static final int DEFAULT_BATCH_SIZE = 100;
057    
058        private static final Logger LOG = LoggerFactory.getLogger(BatchProcessor.class);
059    
060        private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
061        private int batchSize = DEFAULT_BATCH_SIZE;
062        private int outBatchSize;
063        private boolean groupExchanges;
064        private boolean batchConsumer;
065        private boolean ignoreInvalidExchanges;
066        private Predicate completionPredicate;
067        private Expression expression;
068    
069        private final CamelContext camelContext;
070        private final Processor processor;
071        private final Collection<Exchange> collection;
072        private ExceptionHandler exceptionHandler;
073    
074        private final BatchSender sender;
075    
076        public BatchProcessor(CamelContext camelContext, Processor processor, Collection<Exchange> collection, Expression expression) {
077            ObjectHelper.notNull(camelContext, "camelContext");
078            ObjectHelper.notNull(processor, "processor");
079            ObjectHelper.notNull(collection, "collection");
080            ObjectHelper.notNull(expression, "expression");
081    
082            // wrap processor in UnitOfWork so what we send out of the batch runs in a UoW
083            this.camelContext = camelContext;
084            this.processor = new UnitOfWorkProcessor(processor);
085            this.collection = collection;
086            this.expression = expression;
087            this.sender = new BatchSender();
088        }
089    
090        @Override
091        public String toString() {
092            return "BatchProcessor[to: " + processor + "]";
093        }
094    
095        // Properties
096        // -------------------------------------------------------------------------
097        public ExceptionHandler getExceptionHandler() {
098            if (exceptionHandler == null) {
099                exceptionHandler = new LoggingExceptionHandler(getClass());
100            }
101            return exceptionHandler;
102        }
103    
104        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
105            this.exceptionHandler = exceptionHandler;
106        }
107    
108        public int getBatchSize() {
109            return batchSize;
110        }
111    
112        /**
113         * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor will
114         * process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
115         * 
116         * @param batchSize the size
117         */
118        public void setBatchSize(int batchSize) {
119            // setting batch size to 0 or negative is like disabling it, so we set it as the max value
120            // as the code logic is dependent on a batch size having 1..n value
121            if (batchSize <= 0) {
122                LOG.debug("Disabling batch size, will only be triggered by timeout");
123                this.batchSize = Integer.MAX_VALUE;
124            } else {
125                this.batchSize = batchSize;
126            }
127        }
128    
129        public int getOutBatchSize() {
130            return outBatchSize;
131        }
132    
133        /**
134         * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then the
135         * completion is triggered. Can for instance be used to ensure that this batch is completed when a certain
136         * number of exchanges has been collected. By default this feature is <b>not</b> enabled.
137         * 
138         * @param outBatchSize the size
139         */
140        public void setOutBatchSize(int outBatchSize) {
141            this.outBatchSize = outBatchSize;
142        }
143    
144        public long getBatchTimeout() {
145            return batchTimeout;
146        }
147    
148        public void setBatchTimeout(long batchTimeout) {
149            this.batchTimeout = batchTimeout;
150        }
151    
152        public boolean isGroupExchanges() {
153            return groupExchanges;
154        }
155    
156        public void setGroupExchanges(boolean groupExchanges) {
157            this.groupExchanges = groupExchanges;
158        }
159    
160        public boolean isBatchConsumer() {
161            return batchConsumer;
162        }
163    
164        public void setBatchConsumer(boolean batchConsumer) {
165            this.batchConsumer = batchConsumer;
166        }
167    
168        public boolean isIgnoreInvalidExchanges() {
169            return ignoreInvalidExchanges;
170        }
171    
172        public void setIgnoreInvalidExchanges(boolean ignoreInvalidExchanges) {
173            this.ignoreInvalidExchanges = ignoreInvalidExchanges;
174        }
175    
176        public Predicate getCompletionPredicate() {
177            return completionPredicate;
178        }
179    
180        public void setCompletionPredicate(Predicate completionPredicate) {
181            this.completionPredicate = completionPredicate;
182        }
183    
184        public Processor getProcessor() {
185            return processor;
186        }
187    
188        public List<Processor> next() {
189            if (!hasNext()) {
190                return null;
191            }
192            List<Processor> answer = new ArrayList<Processor>(1);
193            answer.add(processor);
194            return answer;
195        }
196    
197        public boolean hasNext() {
198            return processor != null;
199        }
200    
201        /**
202         * A strategy method to decide if the "in" batch is completed. That is, whether the resulting exchanges in
203         * the in queue should be drained to the "out" collection.
204         */
205        private boolean isInBatchCompleted(int num) {
206            return num >= batchSize;
207        }
208    
209        /**
210         * A strategy method to decide if the "out" batch is completed. That is, whether the resulting exchange in
211         * the out collection should be sent.
212         */
213        private boolean isOutBatchCompleted() {
214            if (outBatchSize == 0) {
215                // out batch is disabled, so go ahead and send.
216                return true;
217            }
218            return collection.size() > 0 && collection.size() >= outBatchSize;
219        }
220    
221        /**
222         * Strategy Method to process an exchange in the batch. This method allows derived classes to perform
223         * custom processing before or after an individual exchange is processed
224         */
225        protected void processExchange(Exchange exchange) throws Exception {
226            processor.process(exchange);
227            if (exchange.getException() != null) {
228                getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, exchange.getException());
229            }
230        }
231    
232        protected void doStart() throws Exception {
233            ServiceHelper.startServices(processor);
234            sender.start();
235        }
236    
237        protected void doStop() throws Exception {
238            sender.cancel();
239            ServiceHelper.stopServices(sender);
240            ServiceHelper.stopServices(processor);
241            collection.clear();
242        }
243    
244        /**
245         * Enqueues an exchange for later batch processing.
246         */
247        public void process(Exchange exchange) throws Exception {
248    
249            // if batch consumer is enabled then we need to adjust the batch size
250            // with the size from the batch consumer
251            if (isBatchConsumer()) {
252                int size = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class);
253                if (batchSize != size) {
254                    batchSize = size;
255                    LOG.trace("Using batch consumer completion, so setting batch size to: {}", batchSize);
256                }
257            }
258    
259            // validate that the exchange can be used
260            if (!isValid(exchange)) {
261                if (isIgnoreInvalidExchanges()) {
262                    LOG.debug("Invalid Exchange. This Exchange will be ignored: {}", exchange);
263                    return;
264                } else {
265                    throw new CamelExchangeException("Exchange is not valid to be used by the BatchProcessor", exchange);
266                }
267            }
268    
269            // exchange is valid so enqueue the exchange
270            sender.enqueueExchange(exchange);
271        }
272    
273        /**
274         * Is the given exchange valid to be used.
275         *
276         * @param exchange the given exchange
277         * @return <tt>true</tt> if valid, <tt>false</tt> otherwise
278         */
279        private boolean isValid(Exchange exchange) {
280            Object result = null;
281            try {
282                result = expression.evaluate(exchange, Object.class);
283            } catch (Exception e) {
284                // ignore
285            }
286            return result != null;
287        }
288    
289        /**
290         * Sender thread for queued-up exchanges.
291         */
292        private class BatchSender extends Thread {
293    
294            private Queue<Exchange> queue;
295            private Lock queueLock = new ReentrantLock();
296            private boolean exchangeEnqueued;
297            private final Queue<String> completionPredicateMatched = new ConcurrentLinkedQueue<String>();
298            private Condition exchangeEnqueuedCondition = queueLock.newCondition();
299    
300            public BatchSender() {
301                super(camelContext.getExecutorServiceManager().resolveThreadName("Batch Sender"));
302                this.queue = new LinkedList<Exchange>();
303            }
304    
305            @Override
306            public void run() {
307                // Wait until one of either:
308                // * an exchange being queued;
309                // * the batch timeout expiring; or
310                // * the thread being cancelled.
311                //
312                // If an exchange is queued then we need to determine whether the
313                // batch is complete. If it is complete then we send out the batched
314                // exchanges. Otherwise we move back into our wait state.
315                //
316                // If the batch times out then we send out the batched exchanges
317                // collected so far.
318                //
319                // If we receive an interrupt then all blocking operations are
320                // interrupted and our thread terminates.
321                //
322                // The goal of the following algorithm in terms of synchronisation
323                // is to provide fine grained locking i.e. retaining the lock only
324                // when required. Special consideration is given to releasing the
325                // lock when calling an overloaded method i.e. sendExchanges. 
326                // Unlocking is important as the process of sending out the exchanges
327                // would otherwise block new exchanges from being queued.
328    
329                queueLock.lock();
330                try {
331                    do {
332                        try {
333                            if (!exchangeEnqueued) {
334                                LOG.trace("Waiting for new exchange to arrive or batchTimeout to occur after {} ms.", batchTimeout);
335                                exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS);
336                            }
337    
338                            // if the completion predicate was triggered then there is an exchange id which denotes when to complete
339                            String id = null;
340                            if (!completionPredicateMatched.isEmpty()) {
341                                id = completionPredicateMatched.poll();
342                            }
343    
344                            if (id != null || !exchangeEnqueued) {
345                                if (id != null) {
346                                    LOG.trace("Collecting exchanges to be aggregated triggered by completion predicate");
347                                } else {
348                                    LOG.trace("Collecting exchanges to be aggregated triggered by batch timeout");
349                                }
350                                drainQueueTo(collection, batchSize, id);
351                            } else {
352                                exchangeEnqueued = false;
353                                boolean drained = false;
354                                while (isInBatchCompleted(queue.size())) {
355                                    drained = true;
356                                    drainQueueTo(collection, batchSize, id);
357                                }
358                                if (drained) {
359                                    LOG.trace("Collecting exchanges to be aggregated triggered by new exchanges received");
360                                }
361    
362                                if (!isOutBatchCompleted()) {
363                                    continue;
364                                }
365                            }
366    
367                            queueLock.unlock();
368                            try {
369                                try {
370                                    sendExchanges();
371                                } catch (Throwable t) {
372                                    // a fail safe to handle all exceptions being thrown
373                                    getExceptionHandler().handleException(t);
374                                }
375                            } finally {
376                                queueLock.lock();
377                            }
378    
379                        } catch (InterruptedException e) {
380                            break;
381                        }
382    
383                    } while (isRunAllowed());
384    
385                } finally {
386                    queueLock.unlock();
387                }
388            }
389    
390            /**
391             * This method should be called with queueLock held
392             */
393            private void drainQueueTo(Collection<Exchange> collection, int batchSize, String exchangeId) {
394                for (int i = 0; i < batchSize; ++i) {
395                    Exchange e = queue.poll();
396                    if (e != null) {
397                        try {
398                            collection.add(e);
399                        } catch (Exception t) {
400                            e.setException(t);
401                        } catch (Throwable t) {
402                            getExceptionHandler().handleException(t);
403                        }
404                        if (exchangeId != null && exchangeId.equals(e.getExchangeId())) {
405                            // this batch is complete so stop draining
406                            break;
407                        }
408                    } else {
409                        break;
410                    }
411                }
412            }
413    
414            public void cancel() {
415                interrupt();
416            }
417    
418            public void enqueueExchange(Exchange exchange) {
419                LOG.debug("Received exchange to be batched: {}", exchange);
420                queueLock.lock();
421                try {
422                    // pre test whether the completion predicate matched
423                    if (completionPredicate != null) {
424                        boolean matches = completionPredicate.matches(exchange);
425                        if (matches) {
426                            LOG.trace("Exchange matched completion predicate: {}", exchange);
427                            // add this exchange to the list of exchanges which marks the batch as complete
428                            completionPredicateMatched.add(exchange.getExchangeId());
429                        }
430                    }
431                    queue.add(exchange);
432                    exchangeEnqueued = true;
433                    exchangeEnqueuedCondition.signal();
434                } finally {
435                    queueLock.unlock();
436                }
437            }
438            
439            private void sendExchanges() throws Exception {
440                Iterator<Exchange> iter = collection.iterator();
441                while (iter.hasNext()) {
442                    Exchange exchange = iter.next();
443                    iter.remove();
444                    try {
445                        LOG.debug("Sending aggregated exchange: {}", exchange);
446                        processExchange(exchange);
447                    } catch (Throwable t) {
448                        // must catch throwable to avoid growing memory
449                        getExceptionHandler().handleException("Error processing aggregated exchange: " + exchange, t);
450                    }
451                }
452            }
453        }
454    
455    }