001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.Deque;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.Queue;
025import java.util.regex.Pattern;
026
027import org.apache.camel.AsyncCallback;
028import org.apache.camel.Exchange;
029import org.apache.camel.Message;
030import org.apache.camel.Processor;
031import org.apache.camel.ShutdownRunningTask;
032import org.apache.camel.impl.ScheduledBatchPollingConsumer;
033import org.apache.camel.util.CastUtils;
034import org.apache.camel.util.ObjectHelper;
035import org.apache.camel.util.StopWatch;
036import org.apache.camel.util.TimeUtils;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * Base class for file consumers.
042 */
043public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer {
044    protected final Logger log = LoggerFactory.getLogger(getClass());
045    protected GenericFileEndpoint<T> endpoint;
046    protected GenericFileOperations<T> operations;
047    protected volatile boolean loggedIn;
048    protected String fileExpressionResult;
049    protected volatile ShutdownRunningTask shutdownRunningTask;
050    protected volatile int pendingExchanges;
051    protected Processor customProcessor;
052    protected boolean eagerLimitMaxMessagesPerPoll = true;
053    protected volatile boolean prepareOnStartup;
054    private final Pattern includePattern;
055    private final Pattern excludePattern;
056
057    public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) {
058        super(endpoint, processor);
059        this.endpoint = endpoint;
060        this.operations = operations;
061
062        if (endpoint.getInclude() != null) {
063            this.includePattern = Pattern.compile(endpoint.getInclude(), Pattern.CASE_INSENSITIVE);
064        } else {
065            this.includePattern = null;
066        }
067        if (endpoint.getExclude() != null) {
068            this.excludePattern = Pattern.compile(endpoint.getExclude(), Pattern.CASE_INSENSITIVE);
069        } else {
070            this.excludePattern = null;
071        }
072    }
073
074    public Processor getCustomProcessor() {
075        return customProcessor;
076    }
077
078    /**
079     * Use a custom processor to process the exchange.
080     * <p/>
081     * Only set this if you need to do custom processing, instead of the regular processing.
082     * <p/>
083     * This is for example used to browse file endpoints by leveraging the file consumer to poll
084     * the directory to gather the list of exchanges. But to avoid processing the files regularly
085     * we can use a custom processor.
086     *
087     * @param processor a custom processor
088     */
089    public void setCustomProcessor(Processor processor) {
090        this.customProcessor = processor;
091    }
092
093    public boolean isEagerLimitMaxMessagesPerPoll() {
094        return eagerLimitMaxMessagesPerPoll;
095    }
096
097    public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) {
098        this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll;
099    }
100
101    /**
102     * Poll for files
103     */
104    protected int poll() throws Exception {
105        // must prepare on startup the very first time
106        if (!prepareOnStartup) {
107            // prepare on startup
108            endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint);
109            prepareOnStartup = true;
110        }
111
112        // must reset for each poll
113        fileExpressionResult = null;
114        shutdownRunningTask = null;
115        pendingExchanges = 0;
116
117        // before we poll is there anything we need to check?
118        // such as are we connected to the FTP Server still?
119        if (!prePollCheck()) {
120            log.debug("Skipping poll as pre poll check returned false");
121            return 0;
122        }
123
124        // gather list of files to process
125        List<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
126        String name = endpoint.getConfiguration().getDirectory();
127
128        // time how long it takes to poll
129        StopWatch stop = new StopWatch();
130        boolean limitHit;
131        try {
132            limitHit = !pollDirectory(name, files, 0);
133        } catch (Exception e) {
134            // during poll directory we add files to the in progress repository, in case of any exception thrown after this work
135            // we must then drain the in progress files before rethrowing the exception
136            log.debug("Error occurred during poll directory: " + name + " due " + e.getMessage() + ". Removing " + files.size() + " files marked as in-progress.");
137            removeExcessiveInProgressFiles(files);
138            throw e;
139        }
140
141        long delta = stop.stop();
142        if (log.isDebugEnabled()) {
143            log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name);
144        }
145
146        // log if we hit the limit
147        if (limitHit) {
148            log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", maxMessagesPerPoll);
149        }
150
151        // sort files using file comparator if provided
152        if (endpoint.getSorter() != null) {
153            Collections.sort(files, endpoint.getSorter());
154        }
155
156        // sort using build in sorters so we can use expressions
157        // use a linked list so we can dequeue the exchanges
158        LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
159        for (GenericFile<T> file : files) {
160            Exchange exchange = endpoint.createExchange(file);
161            endpoint.configureExchange(exchange);
162            endpoint.configureMessage(file, exchange.getIn());
163            exchanges.add(exchange);
164        }
165        // sort files using exchange comparator if provided
166        if (endpoint.getSortBy() != null) {
167            Collections.sort(exchanges, endpoint.getSortBy());
168        }
169        if (endpoint.isShuffle()) {
170            Collections.shuffle(exchanges);
171        }
172
173        // use a queue for the exchanges
174        Deque<Exchange> q = exchanges;
175
176        // we are not eager limiting, but we have configured a limit, so cut the list of files
177        if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) {
178            if (files.size() > maxMessagesPerPoll) {
179                log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", maxMessagesPerPoll);
180                // must first remove excessive files from the in progress repository
181                removeExcessiveInProgressFiles(q, maxMessagesPerPoll);
182            }
183        }
184
185        // consume files one by one
186        int total = exchanges.size();
187        if (total > 0) {
188            log.debug("Total {} files to consume", total);
189        }
190
191        int polledMessages = processBatch(CastUtils.cast(q));
192
193        postPollCheck(polledMessages);
194
195        return polledMessages;
196    }
197
198    public int processBatch(Queue<Object> exchanges) {
199        int total = exchanges.size();
200        int answer = total;
201
202        // limit if needed
203        if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
204            log.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", maxMessagesPerPoll, total);
205            total = maxMessagesPerPoll;
206        }
207
208        for (int index = 0; index < total && isBatchAllowed(); index++) {
209            // only loop if we are started (allowed to run)
210            // use poll to remove the head so it does not consume memory even after we have processed it
211            Exchange exchange = (Exchange) exchanges.poll();
212            // add current index and total as properties
213            exchange.setProperty(Exchange.BATCH_INDEX, index);
214            exchange.setProperty(Exchange.BATCH_SIZE, total);
215            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
216
217            // update pending number of exchanges
218            pendingExchanges = total - index - 1;
219
220            // process the current exchange
221            boolean started;
222            if (customProcessor != null) {
223                // use a custom processor
224                started = customProcessExchange(exchange, customProcessor);
225            } else {
226                // process the exchange regular
227                started = processExchange(exchange);
228            }
229
230            // if we did not start process the file then decrement the counter
231            if (!started) {
232                answer--;
233            }
234        }
235
236        // drain any in progress files as we are done with this batch
237        removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0);
238
239        return answer;
240    }
241
242    /**
243     * Drain any in progress files as we are done with this batch
244     *
245     * @param exchanges  the exchanges
246     * @param limit      the limit
247     */
248    protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) {
249        // remove the file from the in progress list in case the batch was limited by max messages per poll
250        while (exchanges.size() > limit) {
251            // must remove last
252            Exchange exchange = exchanges.removeLast();
253            GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class);
254            String key = file.getAbsoluteFilePath();
255            endpoint.getInProgressRepository().remove(key);
256        }
257    }
258
259    /**
260     * Drain any in progress files as we are done with the files
261     *
262     * @param files  the files
263     */
264    protected void removeExcessiveInProgressFiles(List<GenericFile<T>> files) {
265        for (GenericFile file : files) {
266            String key = file.getAbsoluteFilePath();
267            endpoint.getInProgressRepository().remove(key);
268        }
269    }
270
271    /**
272     * Whether or not we can continue polling for more files
273     *
274     * @param fileList  the current list of gathered files
275     * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit
276     */
277    public boolean canPollMoreFiles(List<?> fileList) {
278        // at this point we should not limit if we are not eager
279        if (!eagerLimitMaxMessagesPerPoll) {
280            return true;
281        }
282
283        if (maxMessagesPerPoll <= 0) {
284            // no limitation
285            return true;
286        }
287
288        // then only poll if we haven't reached the max limit
289        return fileList.size() < maxMessagesPerPoll;
290    }
291
292    /**
293     * Override if required. Perform some checks (and perhaps actions) before we poll.
294     *
295     * @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll.
296     */
297    protected boolean prePollCheck() throws Exception {
298        return true;
299    }
300
301    /**
302     * Override if required. Perform some checks (and perhaps actions) after we have polled.
303     *
304     * @param polledMessages number of polled messages
305     */
306    protected void postPollCheck(int polledMessages) {
307        // noop
308    }
309
310    /**
311     * Polls the given directory for files to process
312     *
313     * @param fileName current directory or file
314     * @param fileList current list of files gathered
315     * @param depth the current depth of the directory (will start from 0)
316     * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit
317     */
318    protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth);
319
320    /**
321     * Sets the operations to be used.
322     * <p/>
323     * Can be used to set a fresh operations in case of recovery attempts
324     *
325     * @param operations the operations
326     */
327    public void setOperations(GenericFileOperations<T> operations) {
328        this.operations = operations;
329    }
330
331    /**
332     * Whether to ignore if the file cannot be retrieved.
333     * <p/>
334     * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved.
335     * <p/>
336     * This method allows to suppress this and just ignore that.
337     *
338     * @param name        the file name
339     * @param exchange    the exchange
340     * @param cause       optional exception occurred during retrieving file
341     * @return <tt>true</tt> to ignore, <tt>false</tt> is the default.
342     */
343    protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) {
344        return false;
345    }
346
347    /**
348     * Processes the exchange
349     *
350     * @param exchange the exchange
351     * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started
352     * to be processed, for some reason (not found, or aborted etc)
353     */
354    protected boolean processExchange(final Exchange exchange) {
355        GenericFile<T> file = getExchangeFileProperty(exchange);
356        log.trace("Processing file: {}", file);
357
358        // must extract the absolute name before the begin strategy as the file could potentially be pre moved
359        // and then the file name would be changed
360        String absoluteFileName = file.getAbsoluteFilePath();
361
362        // check if we can begin processing the file
363        final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
364
365        Exception beginCause = null;
366        boolean begin = false;
367        try {
368            begin = processStrategy.begin(operations, endpoint, exchange, file);
369        } catch (Exception e) {
370            beginCause = e;
371        }
372
373        if (!begin) {
374            // no something was wrong, so we need to abort and remove the file from the in progress list
375            Exception abortCause = null;
376            log.debug("{} cannot begin processing file: {}", endpoint, file);
377            try {
378                // abort
379                processStrategy.abort(operations, endpoint, exchange, file);
380            } catch (Exception e) {
381                abortCause = e;
382            } finally {
383                // begin returned false, so remove file from the in progress list as its no longer in progress
384                endpoint.getInProgressRepository().remove(absoluteFileName);
385            }
386            if (beginCause != null) {
387                String msg = endpoint + " cannot begin processing file: " + file + " due to: " + beginCause.getMessage();
388                handleException(msg, beginCause);
389            }
390            if (abortCause != null) {
391                String msg2 = endpoint + " cannot abort processing file: " + file + " due to: " + abortCause.getMessage();
392                handleException(msg2, abortCause);
393            }
394            return false;
395        }
396
397        // must use file from exchange as it can be updated due the
398        // preMoveNamePrefix/preMoveNamePostfix options
399        final GenericFile<T> target = getExchangeFileProperty(exchange);
400
401        // we can begin processing the file so update file headers on the Camel message
402        // in case it took some time to acquire read lock, and file size/timestamp has been updated since etc
403        updateFileHeaders(target, exchange.getIn());
404
405        // must use full name when downloading so we have the correct path
406        final String name = target.getAbsoluteFilePath();
407        try {
408            
409            if (isRetrieveFile()) {
410                // retrieve the file using the stream
411                log.trace("Retrieving file: {} from: {}", name, endpoint);
412    
413                // retrieve the file and check it was a success
414                boolean retrieved;
415                Exception cause = null;
416                try {
417                    retrieved = operations.retrieveFile(name, exchange);
418                } catch (Exception e) {
419                    retrieved = false;
420                    cause = e;
421                }
422
423                if (!retrieved) {
424                    if (ignoreCannotRetrieveFile(name, exchange, cause)) {
425                        log.trace("Cannot retrieve file {} maybe it does not exists. Ignoring.", name);
426                        // remove file from the in progress list as we could not retrieve it, but should ignore
427                        endpoint.getInProgressRepository().remove(absoluteFileName);
428                        return false;
429                    } else {
430                        // throw exception to handle the problem with retrieving the file
431                        // then if the method return false or throws an exception is handled the same in here
432                        // as in both cases an exception is being thrown
433                        if (cause != null && cause instanceof GenericFileOperationFailedException) {
434                            throw cause;
435                        } else {
436                            throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint, cause);
437                        }
438                    }
439                }
440    
441                log.trace("Retrieved file: {} from: {}", name, endpoint);                
442            } else {
443                log.trace("Skipped retrieval of file: {} from: {}", name, endpoint);
444                exchange.getIn().setBody(null);
445            }
446
447            // register on completion callback that does the completion strategies
448            // (for instance to move the file after we have processed it)
449            exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName));
450
451            log.debug("About to process file: {} using exchange: {}", target, exchange);
452
453            if (endpoint.isSynchronous()) {
454                // process synchronously
455                getProcessor().process(exchange);
456            } else {
457                // process the exchange using the async consumer to support async routing engine
458                // which can be supported by this file consumer as all the done work is
459                // provided in the GenericFileOnCompletion
460                getAsyncProcessor().process(exchange, new AsyncCallback() {
461                    public void done(boolean doneSync) {
462                        // noop
463                        if (log.isTraceEnabled()) {
464                            log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously");
465                        }
466                    }
467                });
468            }
469
470        } catch (Exception e) {
471            // remove file from the in progress list due to failure
472            // (cannot be in finally block due to GenericFileOnCompletion will remove it
473            // from in progress when it takes over and processes the file, which may happen
474            // by another thread at a later time. So its only safe to remove it if there was an exception)
475            endpoint.getInProgressRepository().remove(absoluteFileName);
476
477            String msg = "Error processing file " + file + " due to " + e.getMessage();
478            handleException(msg, e);
479        }
480
481        return true;
482    }
483
484    /**
485     * Updates the information on {@link Message} after we have acquired read-lock and
486     * can begin process the file.
487     *
488     * @param file    the file
489     * @param message the Camel message to update its headers
490     */
491    protected abstract void updateFileHeaders(GenericFile<T> file, Message message);
492
493    /**
494     * Override if required.  Files are retrieved / returns true by default
495     *
496     * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files.
497     */
498    protected boolean isRetrieveFile() {
499        return true;
500    }
501
502    /**
503     * Processes the exchange using a custom processor.
504     *
505     * @param exchange the exchange
506     * @param processor the custom processor
507     */
508    protected boolean customProcessExchange(final Exchange exchange, final Processor processor) {
509        GenericFile<T> file = getExchangeFileProperty(exchange);
510        log.trace("Custom processing file: {}", file);
511
512        // must extract the absolute name before the begin strategy as the file could potentially be pre moved
513        // and then the file name would be changed
514        String absoluteFileName = file.getAbsoluteFilePath();
515
516        try {
517            // process using the custom processor
518            processor.process(exchange);
519        } catch (Exception e) {
520            if (log.isDebugEnabled()) {
521                log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e);
522            }
523            handleException(e);
524        } finally {
525            // always remove file from the in progress list as its no longer in progress
526            // use the original file name that was used to add it to the repository
527            // as the name can be different when using preMove option
528            endpoint.getInProgressRepository().remove(absoluteFileName);
529        }
530
531        return true;
532    }
533
534    /**
535     * Strategy for validating if the given remote file should be included or not
536     *
537     * @param file        the file
538     * @param isDirectory whether the file is a directory or a file
539     * @param files       files in the directory
540     * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
541     */
542    protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) {
543        String absoluteFilePath = file.getAbsoluteFilePath();
544
545        if (!isMatched(file, isDirectory, files)) {
546            log.trace("File did not match. Will skip this file: {}", file);
547            return false;
548        }
549
550        // directory is always valid
551        if (isDirectory) {
552            return true;
553        }
554
555        // check if file is already in progress
556        if (endpoint.getInProgressRepository().contains(absoluteFilePath)) {
557            if (log.isTraceEnabled()) {
558                log.trace("Skipping as file is already in progress: {}", file.getFileName());
559            }
560            return false;
561        }
562
563        // if its a file then check we have the file in the idempotent registry already
564        if (endpoint.isIdempotent()) {
565            // use absolute file path as default key, but evaluate if an expression key was configured
566            String key = file.getAbsoluteFilePath();
567            if (endpoint.getIdempotentKey() != null) {
568                Exchange dummy = endpoint.createExchange(file);
569                key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
570            }
571            if (key != null && endpoint.getIdempotentRepository().contains(key)) {
572                log.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", key, file);
573                return false;
574            }
575        }
576
577        // okay so final step is to be able to add atomic as in-progress, so we are the
578        // only thread processing this file
579        return endpoint.getInProgressRepository().add(absoluteFilePath);
580    }
581
582    /**
583     * Strategy to perform file matching based on endpoint configuration.
584     * <p/>
585     * Will always return <tt>false</tt> for certain files/folders:
586     * <ul>
587     * <li>Starting with a dot</li>
588     * <li>lock files</li>
589     * </ul>
590     * And then <tt>true</tt> for directories.
591     *
592     * @param file        the file
593     * @param isDirectory whether the file is a directory or a file
594     * @param files       files in the directory
595     * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
596     */
597    protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) {
598        String name = file.getFileNameOnly();
599
600        // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
601        if (name.startsWith(".")) {
602            return false;
603        }
604
605        // lock files should be skipped
606        if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
607            return false;
608        }
609
610        if (endpoint.getFilter() != null) {
611            if (!endpoint.getFilter().accept(file)) {
612                return false;
613            }
614        }
615
616        if (endpoint.getAntFilter() != null) {
617            if (!endpoint.getAntFilter().accept(file)) {
618                return false;
619            }
620        }
621
622        if (isDirectory && endpoint.getFilterDirectory() != null) {
623            // create a dummy exchange as Exchange is needed for expression evaluation
624            Exchange dummy = endpoint.createExchange(file);
625            boolean matches = endpoint.getFilterDirectory().matches(dummy);
626            if (!matches) {
627                return false;
628            }
629        }
630
631        // directories are regarded as matched if filter accepted them
632        if (isDirectory) {
633            return true;
634        }
635
636        // exclude take precedence over include
637        if (excludePattern != null)  {
638            if (excludePattern.matcher(name).matches()) {
639                return false;
640            }
641        }
642        if (includePattern != null)  {
643            if (!includePattern.matcher(name).matches()) {
644                return false;
645            }
646        }
647
648        // use file expression for a simple dynamic file filter
649        if (endpoint.getFileName() != null) {
650            fileExpressionResult = evaluateFileExpression();
651            if (fileExpressionResult != null) {
652                if (!name.equals(fileExpressionResult)) {
653                    return false;
654                }
655            }
656        }
657
658        if (endpoint.getFilterFile() != null) {
659            // create a dummy exchange as Exchange is needed for expression evaluation
660            Exchange dummy = endpoint.createExchange(file);
661            boolean matches = endpoint.getFilterFile().matches(dummy);
662            if (!matches) {
663                return false;
664            }
665        }
666
667        // if done file name is enabled, then the file is only valid if a done file exists
668        if (endpoint.getDoneFileName() != null) {
669            // done file must be in same path as the file
670            String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath());
671            ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
672
673            // is it a done file name?
674            if (endpoint.isDoneFile(file.getFileNameOnly())) {
675                log.trace("Skipping done file: {}", file);
676                return false;
677            }
678
679            if (!isMatched(file, doneFileName, files)) {
680                return false;
681            }
682        }
683
684        return true;
685    }
686
687    /**
688     * Strategy to perform file matching based on endpoint configuration in terms of done file name.
689     *
690     * @param file         the file
691     * @param doneFileName the done file name (without any paths)
692     * @param files        files in the directory
693     * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
694     */
695    protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files);
696
697    /**
698     * Is the given file already in progress.
699     *
700     * @param file the file
701     * @return <tt>true</tt> if the file is already in progress
702     * @deprecated no longer in use, use {@link org.apache.camel.component.file.GenericFileEndpoint#getInProgressRepository()} instead.
703     */
704    @Deprecated
705    protected boolean isInProgress(GenericFile<T> file) {
706        String key = file.getAbsoluteFilePath();
707        // must use add, to have operation as atomic
708        return !endpoint.getInProgressRepository().add(key);
709    }
710
711    protected String evaluateFileExpression() {
712        if (fileExpressionResult == null && endpoint.getFileName() != null) {
713            // create a dummy exchange as Exchange is needed for expression evaluation
714            Exchange dummy = endpoint.createExchange();
715            fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
716        }
717        return fileExpressionResult;
718    }
719
720    @SuppressWarnings("unchecked")
721    private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
722        return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
723    }
724
725    @Override
726    protected void doStart() throws Exception {
727        super.doStart();
728    }
729
730    @Override
731    protected void doStop() throws Exception {
732        prepareOnStartup = false;
733        super.doStop();
734    }
735}