001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import java.util.ArrayList;
020    import java.util.Collections;
021    import java.util.Deque;
022    import java.util.LinkedList;
023    import java.util.List;
024    import java.util.Queue;
025    
026    import org.apache.camel.AsyncCallback;
027    import org.apache.camel.Exchange;
028    import org.apache.camel.Processor;
029    import org.apache.camel.ShutdownRunningTask;
030    import org.apache.camel.impl.ScheduledBatchPollingConsumer;
031    import org.apache.camel.util.CastUtils;
032    import org.apache.camel.util.ObjectHelper;
033    import org.apache.camel.util.StopWatch;
034    import org.apache.camel.util.TimeUtils;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * Base class for file consumers.
040     */
041    public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer {
042        protected final transient Logger log = LoggerFactory.getLogger(getClass());
043        protected GenericFileEndpoint<T> endpoint;
044        protected GenericFileOperations<T> operations;
045        protected boolean loggedIn;
046        protected String fileExpressionResult;
047        protected volatile ShutdownRunningTask shutdownRunningTask;
048        protected volatile int pendingExchanges;
049        protected Processor customProcessor;
050        protected boolean eagerLimitMaxMessagesPerPoll = true;
051    
052        public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
053            super(endpoint, processor);
054            this.endpoint = endpoint;
055            this.operations = operations;
056        }
057    
058        public Processor getCustomProcessor() {
059            return customProcessor;
060        }
061    
062        /**
063         * Use a custom processor to process the exchange.
064         * <p/>
065         * Only set this if you need to do custom processing, instead of the regular processing.
066         * <p/>
067         * This is for example used to browse file endpoints by leveraging the file consumer to poll
068         * the directory to gather the list of exchanges. But to avoid processing the files regularly
069         * we can use a custom processor.
070         *
071         * @param processor a custom processor
072         */
073        public void setCustomProcessor(Processor processor) {
074            this.customProcessor = processor;
075        }
076    
077        public boolean isEagerLimitMaxMessagesPerPoll() {
078            return eagerLimitMaxMessagesPerPoll;
079        }
080    
081        public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) {
082            this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll;
083        }
084    
085        /**
086         * Poll for files
087         */
088        protected int poll() throws Exception {
089            // must reset for each poll
090            fileExpressionResult = null;
091            shutdownRunningTask = null;
092            pendingExchanges = 0;
093    
094            // before we poll is there anything we need to check?
095            // such as are we connected to the FTP Server still?
096            if (!prePollCheck()) {
097                log.debug("Skipping poll as pre poll check returned false");
098                return 0;
099            }
100    
101            // gather list of files to process
102            List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
103            String name = endpoint.getConfiguration().getDirectory();
104    
105            // time how long time it takes to poll
106            StopWatch stop = new StopWatch();
107            boolean limitHit = !pollDirectory(name, files, 0);
108            long delta = stop.stop();
109            if (log.isDebugEnabled()) {
110                log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name);
111            }
112    
113            // log if we hit the limit
114            if (limitHit) {
115                log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll);
116            }
117    
118            // sort files using file comparator if provided
119            if (endpoint.getSorter() != null) {
120                Collections.sort(files, endpoint.getSorter());
121            }
122    
123            // sort using build in sorters so we can use expressions
124            // use a linked list so we can deque the exchanges
125            LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
126            for (GenericFile<T> file : files) {
127                Exchange exchange = endpoint.createExchange(file);
128                endpoint.configureExchange(exchange);
129                endpoint.configureMessage(file, exchange.getIn());
130                exchanges.add(exchange);
131            }
132            // sort files using exchange comparator if provided
133            if (endpoint.getSortBy() != null) {
134                Collections.sort(exchanges, endpoint.getSortBy());
135            }
136    
137            // use a queue for the exchanges
138            Deque<Exchange> q = exchanges;
139    
140            // we are not eager limiting, but we have configured a limit, so cut the list of files
141            if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) {
142                if (files.size() > maxMessagesPerPoll) {
143                    log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll);
144                    // must first remove excessive files from the in progress repository
145                    removeExcessiveInProgressFiles(q, maxMessagesPerPoll);
146                }
147            }
148    
149            // consume files one by one
150            int total = exchanges.size();
151            if (total > 0) {
152                log.debug("Total {} files to consume", total);
153            }
154    
155            int polledMessages = processBatch(CastUtils.cast(q));
156    
157            postPollCheck();
158    
159            return polledMessages;
160        }
161    
162        public int processBatch(Queue<Object> exchanges) {
163            int total = exchanges.size();
164    
165            // limit if needed
166            if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
167                log.debug("Limiting to maximum messages to poll {} as there was {} messages in this poll.", maxMessagesPerPoll, total);
168                total = maxMessagesPerPoll;
169            }
170    
171            for (int index = 0; index < total && isBatchAllowed(); index++) {
172                // only loop if we are started (allowed to run)
173                // use poll to remove the head so it does not consume memory even after we have processed it
174                Exchange exchange = (Exchange) exchanges.poll();
175                // add current index and total as properties
176                exchange.setProperty(Exchange.BATCH_INDEX, index);
177                exchange.setProperty(Exchange.BATCH_SIZE, total);
178                exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
179    
180                // update pending number of exchanges
181                pendingExchanges = total - index - 1;
182    
183                // process the current exchange
184                if (customProcessor != null) {
185                    // use a custom processor
186                    customProcessExchange(exchange, customProcessor);
187                } else {
188                    // process the exchange regular
189                    processExchange(exchange);
190                }
191            }
192    
193            // drain any in progress files as we are done with this batch
194            removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0);
195    
196            return total;
197        }
198    
199        protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) {
200            // remove the file from the in progress list in case the batch was limited by max messages per poll
201            while (exchanges.size() > limit) {
202                // must remove last
203                Exchange exchange = exchanges.removeLast();
204                GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class);
205                String key = file.getAbsoluteFilePath();
206                endpoint.getInProgressRepository().remove(key);
207            }
208        }
209    
210    
211        /**
212         * Whether or not we can continue polling for more files
213         *
214         * @param fileList  the current list of gathered files
215         * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit
216         */
217        public boolean canPollMoreFiles(List<?> fileList) {
218            // at this point we should not limit if we are not eager
219            if (!eagerLimitMaxMessagesPerPoll) {
220                return true;
221            }
222    
223            if (maxMessagesPerPoll <= 0) {
224                // no limitation
225                return true;
226            }
227    
228            // then only poll if we haven't reached the max limit
229            return fileList.size() < maxMessagesPerPoll;
230        }
231    
232        /**
233         * Override if required. Perform some checks (and perhaps actions) before we poll.
234         *
235         * @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll.
236         */
237        protected boolean prePollCheck() throws Exception {
238            return true;
239        }
240    
241        /**
242         * Override if required. Perform some checks (and perhaps actions) after we have polled.
243         */
244        protected void postPollCheck() {
245            // noop
246        }
247    
248        /**
249         * Polls the given directory for files to process
250         *
251         * @param fileName current directory or file
252         * @param fileList current list of files gathered
253         * @param depth the current depth of the directory (will start from 0)
254         * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit
255         */
256        protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth);
257    
258        /**
259         * Sets the operations to be used.
260         * <p/>
261         * Can be used to set a fresh operations in case of recovery attempts
262         *
263         * @param operations the operations
264         */
265        public void setOperations(GenericFileOperations<T> operations) {
266            this.operations = operations;
267        }
268    
269        /**
270         * Processes the exchange
271         *
272         * @param exchange the exchange
273         */
274        protected void processExchange(final Exchange exchange) {
275            GenericFile<T> file = getExchangeFileProperty(exchange);
276            log.trace("Processing file: {}", file);
277    
278            // must extract the absolute name before the begin strategy as the file could potentially be pre moved
279            // and then the file name would be changed
280            String absoluteFileName = file.getAbsoluteFilePath();
281    
282            // check if we can begin processing the file
283            try {
284                final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
285    
286                boolean begin = processStrategy.begin(operations, endpoint, exchange, file);
287                if (!begin) {
288                    log.debug("{} cannot begin processing file: {}", endpoint, file);
289                    try {
290                        // abort
291                        processStrategy.abort(operations, endpoint, exchange, file);
292                    } finally {
293                        // begin returned false, so remove file from the in progress list as its no longer in progress
294                        endpoint.getInProgressRepository().remove(absoluteFileName);
295                    }
296                    return;
297                }
298            } catch (Exception e) {
299                // remove file from the in progress list due to failure
300                endpoint.getInProgressRepository().remove(absoluteFileName);
301    
302                String msg = endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage();
303                handleException(msg, e);
304                return;
305            }
306    
307            // must use file from exchange as it can be updated due the
308            // preMoveNamePrefix/preMoveNamePostfix options
309            final GenericFile<T> target = getExchangeFileProperty(exchange);
310            // must use full name when downloading so we have the correct path
311            final String name = target.getAbsoluteFilePath();
312            try {
313                // retrieve the file using the stream
314                log.trace("Retrieving file: {} from: {}", name, endpoint);
315    
316                // retrieve the file and check it was a success
317                boolean retrieved = operations.retrieveFile(name, exchange);
318                if (!retrieved) {
319                    // throw exception to handle the problem with retrieving the file
320                    // then if the method return false or throws an exception is handled the same in here
321                    // as in both cases an exception is being thrown
322                    throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint);
323                }
324    
325                log.trace("Retrieved file: {} from: {}", name, endpoint);
326    
327                // register on completion callback that does the completion strategies
328                // (for instance to move the file after we have processed it)
329                exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName));
330    
331                log.debug("About to process file: {} using exchange: {}", target, exchange);
332    
333                // process the exchange using the async consumer to support async routing engine
334                // which can be supported by this file consumer as all the done work is
335                // provided in the GenericFileOnCompletion
336                getAsyncProcessor().process(exchange, new AsyncCallback() {
337                    public void done(boolean doneSync) {
338                        // noop
339                        if (log.isTraceEnabled()) {
340                            log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously");
341                        }
342                    }
343                });
344    
345            } catch (Exception e) {
346                // remove file from the in progress list due to failure
347                // (cannot be in finally block due to GenericFileOnCompletion will remove it
348                // from in progress when it takes over and processes the file, which may happen
349                // by another thread at a later time. So its only safe to remove it if there was an exception)
350                endpoint.getInProgressRepository().remove(absoluteFileName);
351    
352                String msg = "Error processing file " + file + " due to " + e.getMessage();
353                handleException(msg, e);
354            }
355        }
356    
357        /**
358         * Processes the exchange using a custom processor.
359         *
360         * @param exchange the exchange
361         * @param processor the custom processor
362         */
363        protected void customProcessExchange(final Exchange exchange, final Processor processor) {
364            GenericFile<T> file = getExchangeFileProperty(exchange);
365            log.trace("Custom processing file: {}", file);
366    
367            // must extract the absolute name before the begin strategy as the file could potentially be pre moved
368            // and then the file name would be changed
369            String absoluteFileName = file.getAbsoluteFilePath();
370    
371            try {
372                // process using the custom processor
373                processor.process(exchange);
374            } catch (Exception e) {
375                if (log.isDebugEnabled()) {
376                    log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e);
377                }
378                handleException(e);
379            } finally {
380                // always remove file from the in progress list as its no longer in progress
381                // use the original file name that was used to add it to the repository
382                // as the name can be different when using preMove option
383                endpoint.getInProgressRepository().remove(absoluteFileName);
384            }
385        }
386    
387        /**
388         * Strategy for validating if the given remote file should be included or not
389         *
390         * @param file        the file
391         * @param isDirectory whether the file is a directory or a file
392         * @param files       files in the directory
393         * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
394         */
395        protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) {
396            if (!isMatched(file, isDirectory, files)) {
397                log.trace("File did not match. Will skip this file: {}", file);
398                return false;
399            } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) {
400                log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: {}", file);
401                return false;
402            }
403    
404            // file matched
405            return true;
406        }
407    
408        /**
409         * Strategy to perform file matching based on endpoint configuration.
410         * <p/>
411         * Will always return <tt>false</tt> for certain files/folders:
412         * <ul>
413         * <li>Starting with a dot</li>
414         * <li>lock files</li>
415         * </ul>
416         * And then <tt>true</tt> for directories.
417         *
418         * @param file        the file
419         * @param isDirectory whether the file is a directory or a file
420         * @param files       files in the directory
421         * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
422         */
423        protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) {
424            String name = file.getFileNameOnly();
425    
426            // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
427            if (name.startsWith(".")) {
428                return false;
429            }
430    
431            // lock files should be skipped
432            if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
433                return false;
434            }
435    
436            if (endpoint.getFilter() != null) {
437                if (!endpoint.getFilter().accept(file)) {
438                    return false;
439                }
440            }
441    
442            if (endpoint.getAntFilter() != null) {
443                if (!endpoint.getAntFilter().accept(file)) {
444                    return false;
445                }
446            }
447    
448            // directories are regarded as matched if filter accepted them
449            if (isDirectory) {
450                return true;
451            }
452    
453            if (ObjectHelper.isNotEmpty(endpoint.getExclude())) {
454                if (name.matches(endpoint.getExclude())) {
455                    return false;
456                }
457            }
458    
459            if (ObjectHelper.isNotEmpty(endpoint.getInclude())) {
460                if (!name.matches(endpoint.getInclude())) {
461                    return false;
462                }
463            }
464    
465            // use file expression for a simple dynamic file filter
466            if (endpoint.getFileName() != null) {
467                evaluateFileExpression();
468                if (fileExpressionResult != null) {
469                    if (!name.equals(fileExpressionResult)) {
470                        return false;
471                    }
472                }
473            }
474    
475            // if done file name is enabled, then the file is only valid if a done file exists
476            if (endpoint.getDoneFileName() != null) {
477                // done file must be in same path as the file
478                String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath());
479                ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
480    
481                // is it a done file name?
482                if (endpoint.isDoneFile(file.getFileNameOnly())) {
483                    log.trace("Skipping done file: {}", file);
484                    return false;
485                }
486    
487                if (!isMatched(file, doneFileName, files)) {
488                    return false;
489                }
490            }
491    
492            return true;
493        }
494    
495        /**
496         * Strategy to perform file matching based on endpoint configuration in terms of done file name.
497         *
498         * @param file         the file
499         * @param doneFileName the done file name (without any paths)
500         * @param files        files in the directory
501         * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
502         */
503        protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files);
504    
505        /**
506         * Is the given file already in progress.
507         *
508         * @param file the file
509         * @return <tt>true</tt> if the file is already in progress
510         */
511        protected boolean isInProgress(GenericFile<T> file) {
512            String key = file.getAbsoluteFilePath();
513            return !endpoint.getInProgressRepository().add(key);
514        }
515    
516        private void evaluateFileExpression() {
517            if (fileExpressionResult == null) {
518                // create a dummy exchange as Exchange is needed for expression evaluation
519                Exchange dummy = endpoint.createExchange();
520                fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
521            }
522        }
523    
524        @SuppressWarnings("unchecked")
525        private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
526            return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
527        }
528    
529        @Override
530        protected void doStart() throws Exception {
531            super.doStart();
532    
533            // prepare on startup
534            endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint);
535        }
536    }