001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.Deque;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.Queue;
025import java.util.regex.Pattern;
026
027import org.apache.camel.AsyncCallback;
028import org.apache.camel.Exchange;
029import org.apache.camel.Processor;
030import org.apache.camel.ShutdownRunningTask;
031import org.apache.camel.impl.ScheduledBatchPollingConsumer;
032import org.apache.camel.util.CastUtils;
033import org.apache.camel.util.ObjectHelper;
034import org.apache.camel.util.StopWatch;
035import org.apache.camel.util.TimeUtils;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039/**
040 * Base class for file consumers.
041 */
042public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer {
043    protected final Logger log = LoggerFactory.getLogger(getClass());
044    protected GenericFileEndpoint<T> endpoint;
045    protected GenericFileOperations<T> operations;
046    protected volatile boolean loggedIn;
047    protected String fileExpressionResult;
048    protected volatile ShutdownRunningTask shutdownRunningTask;
049    protected volatile int pendingExchanges;
050    protected Processor customProcessor;
051    protected boolean eagerLimitMaxMessagesPerPoll = true;
052    protected volatile boolean prepareOnStartup;
053    private final Pattern includePattern;
054    private final Pattern excludePattern;
055
056    public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
057        super(endpoint, processor);
058        this.endpoint = endpoint;
059        this.operations = operations;
060
061        if (endpoint.getInclude() != null) {
062            this.includePattern = Pattern.compile(endpoint.getInclude(), Pattern.CASE_INSENSITIVE);
063        } else {
064            this.includePattern = null;
065        }
066        if (endpoint.getExclude() != null) {
067            this.excludePattern = Pattern.compile(endpoint.getExclude(), Pattern.CASE_INSENSITIVE);
068        } else {
069            this.excludePattern = null;
070        }
071    }
072
073    public Processor getCustomProcessor() {
074        return customProcessor;
075    }
076
077    /**
078     * Use a custom processor to process the exchange.
079     * <p/>
080     * Only set this if you need to do custom processing, instead of the regular processing.
081     * <p/>
082     * This is for example used to browse file endpoints by leveraging the file consumer to poll
083     * the directory to gather the list of exchanges. But to avoid processing the files regularly
084     * we can use a custom processor.
085     *
086     * @param processor a custom processor
087     */
088    public void setCustomProcessor(Processor processor) {
089        this.customProcessor = processor;
090    }
091
092    public boolean isEagerLimitMaxMessagesPerPoll() {
093        return eagerLimitMaxMessagesPerPoll;
094    }
095
096    public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) {
097        this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll;
098    }
099
100    /**
101     * Poll for files
102     */
103    protected int poll() throws Exception {
104        // must prepare on startup the very first time
105        if (!prepareOnStartup) {
106            // prepare on startup
107            endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint);
108            prepareOnStartup = true;
109        }
110
111        // must reset for each poll
112        fileExpressionResult = null;
113        shutdownRunningTask = null;
114        pendingExchanges = 0;
115
116        // before we poll is there anything we need to check?
117        // such as are we connected to the FTP Server still?
118        if (!prePollCheck()) {
119            log.debug("Skipping poll as pre poll check returned false");
120            return 0;
121        }
122
123        // gather list of files to process
124        List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
125        String name = endpoint.getConfiguration().getDirectory();
126
127        // time how long time it takes to poll
128        StopWatch stop = new StopWatch();
129        boolean limitHit;
130        try {
131            limitHit = !pollDirectory(name, files, 0);
132        } catch (Exception e) {
133            // during poll directory we add files to the in progress repository, in case of any exception thrown after this work
134            // we must then drain the in progress files before rethrowing the exception
135            log.debug("Error occurred during poll directory: " + name + " due " + e.getMessage() + ". Removing " + files.size() + " files marked as in-progress.");
136            removeExcessiveInProgressFiles(files);
137            throw e;
138        }
139
140        long delta = stop.stop();
141        if (log.isDebugEnabled()) {
142            log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name);
143        }
144
145        // log if we hit the limit
146        if (limitHit) {
147            log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll);
148        }
149
150        // sort files using file comparator if provided
151        if (endpoint.getSorter() != null) {
152            Collections.sort(files, endpoint.getSorter());
153        }
154
155        // sort using build in sorters so we can use expressions
156        // use a linked list so we can dequeue the exchanges
157        LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
158        for (GenericFile<T> file : files) {
159            Exchange exchange = endpoint.createExchange(file);
160            endpoint.configureExchange(exchange);
161            endpoint.configureMessage(file, exchange.getIn());
162            exchanges.add(exchange);
163        }
164        // sort files using exchange comparator if provided
165        if (endpoint.getSortBy() != null) {
166            Collections.sort(exchanges, endpoint.getSortBy());
167        }
168        if (endpoint.isShuffle()) {
169            Collections.shuffle(exchanges);
170        }
171
172        // use a queue for the exchanges
173        Deque<Exchange> q = exchanges;
174
175        // we are not eager limiting, but we have configured a limit, so cut the list of files
176        if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) {
177            if (files.size() > maxMessagesPerPoll) {
178                log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll);
179                // must first remove excessive files from the in progress repository
180                removeExcessiveInProgressFiles(q, maxMessagesPerPoll);
181            }
182        }
183
184        // consume files one by one
185        int total = exchanges.size();
186        if (total > 0) {
187            log.debug("Total {} files to consume", total);
188        }
189
190        int polledMessages = processBatch(CastUtils.cast(q));
191
192        postPollCheck(polledMessages);
193
194        return polledMessages;
195    }
196
197    public int processBatch(Queue<Object> exchanges) {
198        int total = exchanges.size();
199        int answer = total;
200
201        // limit if needed
202        if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
203            log.debug("Limiting to maximum messages to poll {} as there was {} messages in this poll.", maxMessagesPerPoll, total);
204            total = maxMessagesPerPoll;
205        }
206
207        for (int index = 0; index < total && isBatchAllowed(); index++) {
208            // only loop if we are started (allowed to run)
209            // use poll to remove the head so it does not consume memory even after we have processed it
210            Exchange exchange = (Exchange) exchanges.poll();
211            // add current index and total as properties
212            exchange.setProperty(Exchange.BATCH_INDEX, index);
213            exchange.setProperty(Exchange.BATCH_SIZE, total);
214            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
215
216            // update pending number of exchanges
217            pendingExchanges = total - index - 1;
218
219            // process the current exchange
220            boolean started;
221            if (customProcessor != null) {
222                // use a custom processor
223                started = customProcessExchange(exchange, customProcessor);
224            } else {
225                // process the exchange regular
226                started = processExchange(exchange);
227            }
228
229            // if we did not start process the file then decrement the counter
230            if (!started) {
231                answer--;
232            }
233        }
234
235        // drain any in progress files as we are done with this batch
236        removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0);
237
238        return answer;
239    }
240
241    /**
242     * Drain any in progress files as we are done with this batch
243     *
244     * @param exchanges  the exchanges
245     * @param limit      the limit
246     */
247    protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) {
248        // remove the file from the in progress list in case the batch was limited by max messages per poll
249        while (exchanges.size() > limit) {
250            // must remove last
251            Exchange exchange = exchanges.removeLast();
252            GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class);
253            String key = file.getAbsoluteFilePath();
254            endpoint.getInProgressRepository().remove(key);
255        }
256    }
257
258    /**
259     * Drain any in progress files as we are done with the files
260     *
261     * @param files  the files
262     */
263    protected void removeExcessiveInProgressFiles(List<GenericFile<T>> files) {
264        for (GenericFile file : files) {
265            String key = file.getAbsoluteFilePath();
266            endpoint.getInProgressRepository().remove(key);
267        }
268    }
269
270    /**
271     * Whether or not we can continue polling for more files
272     *
273     * @param fileList  the current list of gathered files
274     * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit
275     */
276    public boolean canPollMoreFiles(List<?> fileList) {
277        // at this point we should not limit if we are not eager
278        if (!eagerLimitMaxMessagesPerPoll) {
279            return true;
280        }
281
282        if (maxMessagesPerPoll <= 0) {
283            // no limitation
284            return true;
285        }
286
287        // then only poll if we haven't reached the max limit
288        return fileList.size() < maxMessagesPerPoll;
289    }
290
291    /**
292     * Override if required. Perform some checks (and perhaps actions) before we poll.
293     *
294     * @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll.
295     */
296    protected boolean prePollCheck() throws Exception {
297        return true;
298    }
299
300    /**
301     * Override if required. Perform some checks (and perhaps actions) after we have polled.
302     *
303     * @param polledMessages number of polled messages
304     */
305    protected void postPollCheck(int polledMessages) {
306        // noop
307    }
308
309    /**
310     * Polls the given directory for files to process
311     *
312     * @param fileName current directory or file
313     * @param fileList current list of files gathered
314     * @param depth the current depth of the directory (will start from 0)
315     * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit
316     */
317    protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth);
318
319    /**
320     * Sets the operations to be used.
321     * <p/>
322     * Can be used to set a fresh operations in case of recovery attempts
323     *
324     * @param operations the operations
325     */
326    public void setOperations(GenericFileOperations<T> operations) {
327        this.operations = operations;
328    }
329
330    /**
331     * Whether to ignore if the file cannot be retrieved.
332     * <p/>
333     * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved.
334     * <p/>
335     * This method allows to suppress this and just ignore that.
336     *
337     * @param name        the file name
338     * @param exchange    the exchange
339     * @param cause       optional exception occurred during retrieving file
340     * @return <tt>true</tt> to ignore, <tt>false</tt> is the default.
341     */
342    protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) {
343        return false;
344    }
345
346    /**
347     * Processes the exchange
348     *
349     * @param exchange the exchange
350     * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started
351     * to be processed, for some reason (not found, or aborted etc)
352     */
353    protected boolean processExchange(final Exchange exchange) {
354        GenericFile<T> file = getExchangeFileProperty(exchange);
355        log.trace("Processing file: {}", file);
356
357        // must extract the absolute name before the begin strategy as the file could potentially be pre moved
358        // and then the file name would be changed
359        String absoluteFileName = file.getAbsoluteFilePath();
360
361        // check if we can begin processing the file
362        final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
363
364        Exception beginCause = null;
365        boolean begin = false;
366        try {
367            begin = processStrategy.begin(operations, endpoint, exchange, file);
368        } catch (Exception e) {
369            beginCause = e;
370        }
371
372        if (!begin) {
373            // no something was wrong, so we need to abort and remove the file from the in progress list
374            Exception abortCause = null;
375            log.debug("{} cannot begin processing file: {}", endpoint, file);
376            try {
377                // abort
378                processStrategy.abort(operations, endpoint, exchange, file);
379            } catch (Exception e) {
380                abortCause = e;
381            } finally {
382                // begin returned false, so remove file from the in progress list as its no longer in progress
383                endpoint.getInProgressRepository().remove(absoluteFileName);
384            }
385            if (beginCause != null) {
386                String msg = endpoint + " cannot begin processing file: " + file + " due to: " + beginCause.getMessage();
387                handleException(msg, beginCause);
388            }
389            if (abortCause != null) {
390                String msg2 = endpoint + " cannot abort processing file: " + file + " due to: " + abortCause.getMessage();
391                handleException(msg2, abortCause);
392            }
393            return false;
394        }
395
396        // must use file from exchange as it can be updated due the
397        // preMoveNamePrefix/preMoveNamePostfix options
398        final GenericFile<T> target = getExchangeFileProperty(exchange);
399        // must use full name when downloading so we have the correct path
400        final String name = target.getAbsoluteFilePath();
401        try {
402            
403            if (isRetrieveFile()) {
404                // retrieve the file using the stream
405                log.trace("Retrieving file: {} from: {}", name, endpoint);
406    
407                // retrieve the file and check it was a success
408                boolean retrieved;
409                Exception cause = null;
410                try {
411                    retrieved = operations.retrieveFile(name, exchange);
412                } catch (Exception e) {
413                    retrieved = false;
414                    cause = e;
415                }
416
417                if (!retrieved) {
418                    if (ignoreCannotRetrieveFile(name, exchange, cause)) {
419                        log.trace("Cannot retrieve file {} maybe it does not exists. Ignoring.", name);
420                        // remove file from the in progress list as we could not retrieve it, but should ignore
421                        endpoint.getInProgressRepository().remove(absoluteFileName);
422                        return false;
423                    } else {
424                        // throw exception to handle the problem with retrieving the file
425                        // then if the method return false or throws an exception is handled the same in here
426                        // as in both cases an exception is being thrown
427                        if (cause != null && cause instanceof GenericFileOperationFailedException) {
428                            throw cause;
429                        } else {
430                            throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint, cause);
431                        }
432                    }
433                }
434    
435                log.trace("Retrieved file: {} from: {}", name, endpoint);                
436            } else {
437                log.trace("Skipped retrieval of file: {} from: {}", name, endpoint);
438                exchange.getIn().setBody(null);
439            }
440
441            // register on completion callback that does the completion strategies
442            // (for instance to move the file after we have processed it)
443            exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName));
444
445            log.debug("About to process file: {} using exchange: {}", target, exchange);
446
447            if (endpoint.isSynchronous()) {
448                // process synchronously
449                getProcessor().process(exchange);
450            } else {
451                // process the exchange using the async consumer to support async routing engine
452                // which can be supported by this file consumer as all the done work is
453                // provided in the GenericFileOnCompletion
454                getAsyncProcessor().process(exchange, new AsyncCallback() {
455                    public void done(boolean doneSync) {
456                        // noop
457                        if (log.isTraceEnabled()) {
458                            log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously");
459                        }
460                    }
461                });
462            }
463
464        } catch (Exception e) {
465            // remove file from the in progress list due to failure
466            // (cannot be in finally block due to GenericFileOnCompletion will remove it
467            // from in progress when it takes over and processes the file, which may happen
468            // by another thread at a later time. So its only safe to remove it if there was an exception)
469            endpoint.getInProgressRepository().remove(absoluteFileName);
470
471            String msg = "Error processing file " + file + " due to " + e.getMessage();
472            handleException(msg, e);
473        }
474
475        return true;
476    }
477
478    /**
479     * Override if required.  Files are retrieved / returns true by default
480     *
481     * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files.
482     */
483    protected boolean isRetrieveFile() {
484        return true;
485    }
486
487    /**
488     * Processes the exchange using a custom processor.
489     *
490     * @param exchange the exchange
491     * @param processor the custom processor
492     */
493    protected boolean customProcessExchange(final Exchange exchange, final Processor processor) {
494        GenericFile<T> file = getExchangeFileProperty(exchange);
495        log.trace("Custom processing file: {}", file);
496
497        // must extract the absolute name before the begin strategy as the file could potentially be pre moved
498        // and then the file name would be changed
499        String absoluteFileName = file.getAbsoluteFilePath();
500
501        try {
502            // process using the custom processor
503            processor.process(exchange);
504        } catch (Exception e) {
505            if (log.isDebugEnabled()) {
506                log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e);
507            }
508            handleException(e);
509        } finally {
510            // always remove file from the in progress list as its no longer in progress
511            // use the original file name that was used to add it to the repository
512            // as the name can be different when using preMove option
513            endpoint.getInProgressRepository().remove(absoluteFileName);
514        }
515
516        return true;
517    }
518
519    /**
520     * Strategy for validating if the given remote file should be included or not
521     *
522     * @param file        the file
523     * @param isDirectory whether the file is a directory or a file
524     * @param files       files in the directory
525     * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
526     */
527    protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) {
528        String absoluteFilePath = file.getAbsoluteFilePath();
529
530        if (!isMatched(file, isDirectory, files)) {
531            log.trace("File did not match. Will skip this file: {}", file);
532            return false;
533        }
534
535        // directory is always valid
536        if (isDirectory) {
537            return true;
538        }
539
540        // check if file is already in progress
541        if (endpoint.getInProgressRepository().contains(absoluteFilePath)) {
542            if (log.isTraceEnabled()) {
543                log.trace("Skipping as file is already in progress: {}", file.getFileName());
544            }
545            return false;
546        }
547
548        // if its a file then check we have the file in the idempotent registry already
549        if (endpoint.isIdempotent()) {
550            // use absolute file path as default key, but evaluate if an expression key was configured
551            String key = file.getAbsoluteFilePath();
552            if (endpoint.getIdempotentKey() != null) {
553                Exchange dummy = endpoint.createExchange(file);
554                key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
555            }
556            if (key != null && endpoint.getIdempotentRepository().contains(key)) {
557                log.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", key, file);
558                return false;
559            }
560        }
561
562        // okay so final step is to be able to add atomic as in-progress, so we are the
563        // only thread processing this file
564        return endpoint.getInProgressRepository().add(absoluteFilePath);
565    }
566
567    /**
568     * Strategy to perform file matching based on endpoint configuration.
569     * <p/>
570     * Will always return <tt>false</tt> for certain files/folders:
571     * <ul>
572     * <li>Starting with a dot</li>
573     * <li>lock files</li>
574     * </ul>
575     * And then <tt>true</tt> for directories.
576     *
577     * @param file        the file
578     * @param isDirectory whether the file is a directory or a file
579     * @param files       files in the directory
580     * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
581     */
582    protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) {
583        String name = file.getFileNameOnly();
584
585        // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
586        if (name.startsWith(".")) {
587            return false;
588        }
589
590        // lock files should be skipped
591        if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
592            return false;
593        }
594
595        if (endpoint.getFilter() != null) {
596            if (!endpoint.getFilter().accept(file)) {
597                return false;
598            }
599        }
600
601        if (endpoint.getAntFilter() != null) {
602            if (!endpoint.getAntFilter().accept(file)) {
603                return false;
604            }
605        }
606
607        // directories are regarded as matched if filter accepted them
608        if (isDirectory) {
609            return true;
610        }
611
612        // exclude take precedence over include
613        if (excludePattern != null)  {
614            if (excludePattern.matcher(name).matches()) {
615                return false;
616            }
617        }
618        if (includePattern != null)  {
619            if (!includePattern.matcher(name).matches()) {
620                return false;
621            }
622        }
623
624        // use file expression for a simple dynamic file filter
625        if (endpoint.getFileName() != null) {
626            fileExpressionResult = evaluateFileExpression();
627            if (fileExpressionResult != null) {
628                if (!name.equals(fileExpressionResult)) {
629                    return false;
630                }
631            }
632        }
633
634        // if done file name is enabled, then the file is only valid if a done file exists
635        if (endpoint.getDoneFileName() != null) {
636            // done file must be in same path as the file
637            String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath());
638            ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
639
640            // is it a done file name?
641            if (endpoint.isDoneFile(file.getFileNameOnly())) {
642                log.trace("Skipping done file: {}", file);
643                return false;
644            }
645
646            if (!isMatched(file, doneFileName, files)) {
647                return false;
648            }
649        }
650
651        return true;
652    }
653
654    /**
655     * Strategy to perform file matching based on endpoint configuration in terms of done file name.
656     *
657     * @param file         the file
658     * @param doneFileName the done file name (without any paths)
659     * @param files        files in the directory
660     * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
661     */
662    protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files);
663
664    /**
665     * Is the given file already in progress.
666     *
667     * @param file the file
668     * @return <tt>true</tt> if the file is already in progress
669     * @deprecated no longer in use, use {@link org.apache.camel.component.file.GenericFileEndpoint#getInProgressRepository()} instead.
670     */
671    @Deprecated
672    protected boolean isInProgress(GenericFile<T> file) {
673        String key = file.getAbsoluteFilePath();
674        // must use add, to have operation as atomic
675        return !endpoint.getInProgressRepository().add(key);
676    }
677
678    protected String evaluateFileExpression() {
679        if (fileExpressionResult == null && endpoint.getFileName() != null) {
680            // create a dummy exchange as Exchange is needed for expression evaluation
681            Exchange dummy = endpoint.createExchange();
682            fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
683        }
684        return fileExpressionResult;
685    }
686
687    @SuppressWarnings("unchecked")
688    private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
689        return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
690    }
691
692    @Override
693    protected void doStart() throws Exception {
694        super.doStart();
695    }
696
697    @Override
698    protected void doStop() throws Exception {
699        prepareOnStartup = false;
700        super.doStop();
701    }
702}