001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.Deque;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.Queue;
025import java.util.regex.Pattern;
026
027import org.apache.camel.CamelContextAware;
028import org.apache.camel.Exchange;
029import org.apache.camel.Message;
030import org.apache.camel.Processor;
031import org.apache.camel.ShutdownRunningTask;
032import org.apache.camel.impl.ScheduledBatchPollingConsumer;
033import org.apache.camel.support.EmptyAsyncCallback;
034import org.apache.camel.util.CastUtils;
035import org.apache.camel.util.ObjectHelper;
036import org.apache.camel.util.ServiceHelper;
037import org.apache.camel.util.StopWatch;
038import org.apache.camel.util.StringHelper;
039import org.apache.camel.util.TimeUtils;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Base class for file consumers.
045 */
046public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer {
047    protected final Logger log = LoggerFactory.getLogger(getClass());
048    protected GenericFileEndpoint<T> endpoint;
049    protected GenericFileOperations<T> operations;
050    protected GenericFileProcessStrategy<T> processStrategy;
051    protected volatile ShutdownRunningTask shutdownRunningTask;
052    protected volatile int pendingExchanges;
053    protected Processor customProcessor;
054    protected boolean eagerLimitMaxMessagesPerPoll = true;
055    protected volatile boolean prepareOnStartup;
056    private final Pattern includePattern;
057    private final Pattern excludePattern;
058
059    public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations, GenericFileProcessStrategy<T> processStrategy) {
060        super(endpoint, processor);
061        this.endpoint = endpoint;
062        this.operations = operations;
063        this.processStrategy = processStrategy;
064
065        this.includePattern = endpoint.getIncludePattern();
066        this.excludePattern = endpoint.getExcludePattern();
067    }
068
069    public Processor getCustomProcessor() {
070        return customProcessor;
071    }
072
073    /**
074     * Use a custom processor to process the exchange.
075     * <p/>
076     * Only set this if you need to do custom processing, instead of the regular processing.
077     * <p/>
078     * This is for example used to browse file endpoints by leveraging the file consumer to poll
079     * the directory to gather the list of exchanges. But to avoid processing the files regularly
080     * we can use a custom processor.
081     *
082     * @param processor a custom processor
083     */
084    public void setCustomProcessor(Processor processor) {
085        this.customProcessor = processor;
086    }
087
088    public boolean isEagerLimitMaxMessagesPerPoll() {
089        return eagerLimitMaxMessagesPerPoll;
090    }
091
092    public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) {
093        this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll;
094    }
095
096    /**
097     * Poll for files
098     */
099    public int poll() throws Exception {
100        // must prepare on startup the very first time
101        if (!prepareOnStartup) {
102            // prepare on startup
103            processStrategy.prepareOnStartup(operations, endpoint);
104            prepareOnStartup = true;
105        }
106
107        // must reset for each poll
108        shutdownRunningTask = null;
109        pendingExchanges = 0;
110
111        // before we poll is there anything we need to check?
112        // such as are we connected to the FTP Server still?
113        if (!prePollCheck()) {
114            log.debug("Skipping poll as pre poll check returned false");
115            return 0;
116        }
117
118        // gather list of files to process
119        List<GenericFile<T>> files = new ArrayList<>();
120        String name = endpoint.getConfiguration().getDirectory();
121
122        // time how long it takes to poll
123        StopWatch stop = new StopWatch();
124        boolean limitHit;
125        try {
126            limitHit = !pollDirectory(name, files, 0);
127        } catch (Exception e) {
128            // during poll directory we add files to the in progress repository, in case of any exception thrown after this work
129            // we must then drain the in progress files before rethrowing the exception
130            log.debug("Error occurred during poll directory: {} due {}. Removing {} files marked as in-progress.", name, e.getMessage(), files.size());
131            removeExcessiveInProgressFiles(files);
132            throw e;
133        }
134
135        long delta = stop.taken();
136        if (log.isDebugEnabled()) {
137            log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name);
138        }
139
140        // log if we hit the limit
141        if (limitHit) {
142            log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", maxMessagesPerPoll);
143        }
144
145        // sort files using file comparator if provided
146        if (endpoint.getSorter() != null) {
147            files.sort(endpoint.getSorter());
148        }
149
150        // sort using build in sorters so we can use expressions
151        // use a linked list so we can dequeue the exchanges
152        LinkedList<Exchange> exchanges = new LinkedList<>();
153        for (GenericFile<T> file : files) {
154            Exchange exchange = endpoint.createExchange(file);
155            endpoint.configureExchange(exchange);
156            endpoint.configureMessage(file, exchange.getIn());
157            exchanges.add(exchange);
158        }
159        // sort files using exchange comparator if provided
160        if (endpoint.getSortBy() != null) {
161            exchanges.sort(endpoint.getSortBy());
162        }
163        if (endpoint.isShuffle()) {
164            Collections.shuffle(exchanges);
165        }
166
167        // use a queue for the exchanges
168        Deque<Exchange> q = exchanges;
169
170        // we are not eager limiting, but we have configured a limit, so cut the list of files
171        if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) {
172            if (files.size() > maxMessagesPerPoll) {
173                log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", maxMessagesPerPoll);
174                // must first remove excessive files from the in progress repository
175                removeExcessiveInProgressFiles(q, maxMessagesPerPoll);
176            }
177        }
178
179        // consume files one by one
180        int total = exchanges.size();
181        if (total > 0) {
182            log.debug("Total {} files to consume", total);
183        }
184
185        int polledMessages = processBatch(CastUtils.cast(q));
186
187        postPollCheck(polledMessages);
188
189        return polledMessages;
190    }
191
192    public int processBatch(Queue<Object> exchanges) {
193        int total = exchanges.size();
194        int answer = total;
195
196        // limit if needed
197        if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
198            log.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", maxMessagesPerPoll, total);
199            total = maxMessagesPerPoll;
200        }
201
202        for (int index = 0; index < total && isBatchAllowed(); index++) {
203            // only loop if we are started (allowed to run)
204            // use poll to remove the head so it does not consume memory even after we have processed it
205            Exchange exchange = (Exchange) exchanges.poll();
206            // add current index and total as properties
207            exchange.setProperty(Exchange.BATCH_INDEX, index);
208            exchange.setProperty(Exchange.BATCH_SIZE, total);
209            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
210
211            // update pending number of exchanges
212            pendingExchanges = total - index - 1;
213
214            // process the current exchange
215            boolean started;
216            if (customProcessor != null) {
217                // use a custom processor
218                started = customProcessExchange(exchange, customProcessor);
219            } else {
220                // process the exchange regular
221                started = processExchange(exchange);
222            }
223
224            // if we did not start process the file then decrement the counter
225            if (!started) {
226                answer--;
227            }
228        }
229
230        // drain any in progress files as we are done with this batch
231        removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0);
232
233        return answer;
234    }
235
236    /**
237     * Drain any in progress files as we are done with this batch
238     *
239     * @param exchanges  the exchanges
240     * @param limit      the limit
241     */
242    protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) {
243        // remove the file from the in progress list in case the batch was limited by max messages per poll
244        while (exchanges.size() > limit) {
245            // must remove last
246            Exchange exchange = exchanges.removeLast();
247            GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class);
248            String key = file.getAbsoluteFilePath();
249            endpoint.getInProgressRepository().remove(key);
250        }
251    }
252
253    /**
254     * Drain any in progress files as we are done with the files
255     *
256     * @param files  the files
257     */
258    protected void removeExcessiveInProgressFiles(List<GenericFile<T>> files) {
259        for (GenericFile file : files) {
260            String key = file.getAbsoluteFilePath();
261            endpoint.getInProgressRepository().remove(key);
262        }
263    }
264
265    /**
266     * Whether or not we can continue polling for more files
267     *
268     * @param fileList  the current list of gathered files
269     * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit
270     */
271    public boolean canPollMoreFiles(List<?> fileList) {
272        // at this point we should not limit if we are not eager
273        if (!eagerLimitMaxMessagesPerPoll) {
274            return true;
275        }
276
277        if (maxMessagesPerPoll <= 0) {
278            // no limitation
279            return true;
280        }
281
282        // then only poll if we haven't reached the max limit
283        return fileList.size() < maxMessagesPerPoll;
284    }
285
286    /**
287     * Override if required. Perform some checks (and perhaps actions) before we poll.
288     *
289     * @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll.
290     */
291    protected boolean prePollCheck() throws Exception {
292        return true;
293    }
294
295    /**
296     * Override if required. Perform some checks (and perhaps actions) after we have polled.
297     *
298     * @param polledMessages number of polled messages
299     */
300    protected void postPollCheck(int polledMessages) {
301        // noop
302    }
303
304    /**
305     * Polls the given directory for files to process
306     *
307     * @param fileName current directory or file
308     * @param fileList current list of files gathered
309     * @param depth the current depth of the directory (will start from 0)
310     * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit
311     */
312    protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth);
313
314    /**
315     * Sets the operations to be used.
316     * <p/>
317     * Can be used to set a fresh operations in case of recovery attempts
318     *
319     * @param operations the operations
320     */
321    public void setOperations(GenericFileOperations<T> operations) {
322        this.operations = operations;
323    }
324
325    /**
326     * Whether to ignore if the file cannot be retrieved.
327     * <p/>
328     * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved.
329     * <p/>
330     * This method allows to suppress this and just ignore that.
331     *
332     * @param name        the file name
333     * @param exchange    the exchange
334     * @param cause       optional exception occurred during retrieving file
335     * @return <tt>true</tt> to ignore, <tt>false</tt> is the default.
336     */
337    protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) {
338        return false;
339    }
340
341    /**
342     * Processes the exchange
343     *
344     * @param exchange the exchange
345     * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started
346     * to be processed, for some reason (not found, or aborted etc)
347     */
348    protected boolean processExchange(final Exchange exchange) {
349        GenericFile<T> file = getExchangeFileProperty(exchange);
350        log.trace("Processing file: {}", file);
351
352        // must extract the absolute name before the begin strategy as the file could potentially be pre moved
353        // and then the file name would be changed
354        String absoluteFileName = file.getAbsoluteFilePath();
355
356        // check if we can begin processing the file
357        Exception beginCause = null;
358        boolean begin = false;
359        try {
360            begin = processStrategy.begin(operations, endpoint, exchange, file);
361        } catch (Exception e) {
362            beginCause = e;
363        }
364
365        if (!begin) {
366            // no something was wrong, so we need to abort and remove the file from the in progress list
367            Exception abortCause = null;
368            log.debug("{} cannot begin processing file: {}", endpoint, file);
369            try {
370                // abort
371                processStrategy.abort(operations, endpoint, exchange, file);
372            } catch (Exception e) {
373                abortCause = e;
374            } finally {
375                // begin returned false, so remove file from the in progress list as its no longer in progress
376                endpoint.getInProgressRepository().remove(absoluteFileName);
377            }
378            if (beginCause != null) {
379                String msg = endpoint + " cannot begin processing file: " + file + " due to: " + beginCause.getMessage();
380                handleException(msg, beginCause);
381            }
382            if (abortCause != null) {
383                String msg2 = endpoint + " cannot abort processing file: " + file + " due to: " + abortCause.getMessage();
384                handleException(msg2, abortCause);
385            }
386            return false;
387        }
388
389        // must use file from exchange as it can be updated due the
390        // preMoveNamePrefix/preMoveNamePostfix options
391        final GenericFile<T> target = getExchangeFileProperty(exchange);
392
393        // we can begin processing the file so update file headers on the Camel message
394        // in case it took some time to acquire read lock, and file size/timestamp has been updated since etc
395        updateFileHeaders(target, exchange.getIn());
396
397        // must use full name when downloading so we have the correct path
398        final String name = target.getAbsoluteFilePath();
399        try {
400            
401            if (isRetrieveFile()) {
402                // retrieve the file using the stream
403                log.trace("Retrieving file: {} from: {}", name, endpoint);
404    
405                // retrieve the file and check it was a success
406                boolean retrieved;
407                Exception cause = null;
408                try {
409                    retrieved = operations.retrieveFile(name, exchange, target.getFileLength());
410                } catch (Exception e) {
411                    retrieved = false;
412                    cause = e;
413                }
414
415                if (!retrieved) {
416                    if (ignoreCannotRetrieveFile(name, exchange, cause)) {
417                        log.trace("Cannot retrieve file {} maybe it does not exists. Ignoring.", name);
418                        // remove file from the in progress list as we could not retrieve it, but should ignore
419                        endpoint.getInProgressRepository().remove(absoluteFileName);
420                        return false;
421                    } else {
422                        // throw exception to handle the problem with retrieving the file
423                        // then if the method return false or throws an exception is handled the same in here
424                        // as in both cases an exception is being thrown
425                        if (cause instanceof GenericFileOperationFailedException) {
426                            throw cause;
427                        } else {
428                            throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint, cause);
429                        }
430                    }
431                }
432    
433                log.trace("Retrieved file: {} from: {}", name, endpoint);                
434            } else {
435                log.trace("Skipped retrieval of file: {} from: {}", name, endpoint);
436                exchange.getIn().setBody(null);
437            }
438
439            // register on completion callback that does the completion strategies
440            // (for instance to move the file after we have processed it)
441            exchange.addOnCompletion(new GenericFileOnCompletion<>(endpoint, operations, processStrategy, target, absoluteFileName));
442
443            log.debug("About to process file: {} using exchange: {}", target, exchange);
444
445            if (endpoint.isSynchronous()) {
446                // process synchronously
447                getProcessor().process(exchange);
448            } else {
449                // process the exchange using the async consumer to support async routing engine
450                // which can be supported by this file consumer as all the done work is
451                // provided in the GenericFileOnCompletion
452                getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
453            }
454
455        } catch (Exception e) {
456            // remove file from the in progress list due to failure
457            // (cannot be in finally block due to GenericFileOnCompletion will remove it
458            // from in progress when it takes over and processes the file, which may happen
459            // by another thread at a later time. So its only safe to remove it if there was an exception)
460            endpoint.getInProgressRepository().remove(absoluteFileName);
461
462            String msg = "Error processing file " + file + " due to " + e.getMessage();
463            handleException(msg, e);
464        }
465
466        return true;
467    }
468
469    /**
470     * Updates the information on {@link Message} after we have acquired read-lock and
471     * can begin process the file.
472     *
473     * @param file    the file
474     * @param message the Camel message to update its headers
475     */
476    protected abstract void updateFileHeaders(GenericFile<T> file, Message message);
477
478    /**
479     * Override if required.  Files are retrieved / returns true by default
480     *
481     * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files.
482     */
483    protected boolean isRetrieveFile() {
484        return true;
485    }
486
487    /**
488     * Processes the exchange using a custom processor.
489     *
490     * @param exchange the exchange
491     * @param processor the custom processor
492     */
493    protected boolean customProcessExchange(final Exchange exchange, final Processor processor) {
494        GenericFile<T> file = getExchangeFileProperty(exchange);
495        log.trace("Custom processing file: {}", file);
496
497        // must extract the absolute name before the begin strategy as the file could potentially be pre moved
498        // and then the file name would be changed
499        String absoluteFileName = file.getAbsoluteFilePath();
500
501        try {
502            // process using the custom processor
503            processor.process(exchange);
504        } catch (Exception e) {
505            if (log.isDebugEnabled()) {
506                log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e);
507            }
508            handleException(e);
509        } finally {
510            // always remove file from the in progress list as its no longer in progress
511            // use the original file name that was used to add it to the repository
512            // as the name can be different when using preMove option
513            endpoint.getInProgressRepository().remove(absoluteFileName);
514        }
515
516        return true;
517    }
518
519    /**
520     * Strategy for validating if the given remote file should be included or not
521     *
522     * @param file        the file
523     * @param isDirectory whether the file is a directory or a file
524     * @param files       files in the directory
525     * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
526     */
527    protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) {
528        String absoluteFilePath = file.getAbsoluteFilePath();
529
530        if (!isMatched(file, isDirectory, files)) {
531            log.trace("File did not match. Will skip this file: {}", file);
532            return false;
533        }
534
535        // directory is always valid
536        if (isDirectory) {
537            return true;
538        }
539
540        // check if file is already in progress
541        if (endpoint.getInProgressRepository().contains(absoluteFilePath)) {
542            if (log.isTraceEnabled()) {
543                log.trace("Skipping as file is already in progress: {}", file.getFileName());
544            }
545            return false;
546        }
547
548        // if its a file then check we have the file in the idempotent registry already
549        if (endpoint.isIdempotent()) {
550            // use absolute file path as default key, but evaluate if an expression key was configured
551            String key = file.getAbsoluteFilePath();
552            if (endpoint.getIdempotentKey() != null) {
553                Exchange dummy = endpoint.createExchange(file);
554                key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
555            }
556            if (key != null && endpoint.getIdempotentRepository().contains(key)) {
557                log.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", key, file);
558                return false;
559            }
560        }
561
562        // okay so final step is to be able to add atomic as in-progress, so we are the
563        // only thread processing this file
564        return endpoint.getInProgressRepository().add(absoluteFilePath);
565    }
566
567    /**
568     * Strategy to perform file matching based on endpoint configuration.
569     * <p/>
570     * Will always return <tt>false</tt> for certain files/folders:
571     * <ul>
572     * <li>Starting with a dot</li>
573     * <li>lock files</li>
574     * </ul>
575     * And then <tt>true</tt> for directories.
576     *
577     * @param file        the file
578     * @param isDirectory whether the file is a directory or a file
579     * @param files       files in the directory
580     * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
581     */
582    protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) {
583        String name = file.getFileNameOnly();
584
585        // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
586        if (name.startsWith(".")) {
587            return false;
588        }
589
590        // lock files should be skipped
591        if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
592            return false;
593        }
594
595        if (endpoint.getFilter() != null) {
596            if (!endpoint.getFilter().accept(file)) {
597                return false;
598            }
599        }
600
601        if (endpoint.getAntFilter() != null) {
602            if (!endpoint.getAntFilter().accept(file)) {
603                return false;
604            }
605        }
606
607        if (isDirectory && endpoint.getFilterDirectory() != null) {
608            // create a dummy exchange as Exchange is needed for expression evaluation
609            Exchange dummy = endpoint.createExchange(file);
610            boolean matches = endpoint.getFilterDirectory().matches(dummy);
611            if (!matches) {
612                return false;
613            }
614        }
615
616        // directories are regarded as matched if filter accepted them
617        if (isDirectory) {
618            return true;
619        }
620
621        // exclude take precedence over include
622        if (excludePattern != null)  {
623            if (excludePattern.matcher(name).matches()) {
624                return false;
625            }
626        }
627        if (includePattern != null)  {
628            if (!includePattern.matcher(name).matches()) {
629                return false;
630            }
631        }
632
633        // use file expression for a simple dynamic file filter
634        if (endpoint.getFileName() != null) {
635            // create a dummy exchange as Exchange is needed for expression evaluation
636            Exchange dummy = endpoint.createExchange(file);
637            String result = evaluateFileExpression(dummy);
638            if (result != null) {
639                if (!name.equals(result)) {
640                    return false;
641                }
642            }
643        }
644
645        if (endpoint.getFilterFile() != null) {
646            // create a dummy exchange as Exchange is needed for expression evaluation
647            Exchange dummy = endpoint.createExchange(file);
648            boolean matches = endpoint.getFilterFile().matches(dummy);
649            if (!matches) {
650                return false;
651            }
652        }
653
654        // if done file name is enabled, then the file is only valid if a done file exists
655        if (endpoint.getDoneFileName() != null) {
656            // done file must be in same path as the file
657            String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath());
658            StringHelper.notEmpty(doneFileName, "doneFileName", endpoint);
659
660            // is it a done file name?
661            if (endpoint.isDoneFile(file.getFileNameOnly())) {
662                log.trace("Skipping done file: {}", file);
663                return false;
664            }
665
666            if (!isMatched(file, doneFileName, files)) {
667                return false;
668            }
669        }
670
671        return true;
672    }
673
674    /**
675     * Strategy to perform file matching based on endpoint configuration in terms of done file name.
676     *
677     * @param file         the file
678     * @param doneFileName the done file name (without any paths)
679     * @param files        files in the directory
680     * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
681     */
682    protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files);
683
684    /**
685     * Is the given file already in progress.
686     *
687     * @param file the file
688     * @return <tt>true</tt> if the file is already in progress
689     * @deprecated no longer in use, use {@link org.apache.camel.component.file.GenericFileEndpoint#getInProgressRepository()} instead.
690     */
691    @Deprecated
692    protected boolean isInProgress(GenericFile<T> file) {
693        String key = file.getAbsoluteFilePath();
694        // must use add, to have operation as atomic
695        return !endpoint.getInProgressRepository().add(key);
696    }
697
698    protected String evaluateFileExpression(Exchange exchange) {
699        String result = endpoint.getFileName().evaluate(exchange, String.class);
700        if (exchange.getException() != null) {
701            throw ObjectHelper.wrapRuntimeCamelException(exchange.getException());
702        }
703        return result;
704    }
705
706    @SuppressWarnings("unchecked")
707    private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
708        return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
709    }
710
711    @Override
712    protected void doStart() throws Exception {
713        // inject CamelContext before starting as it may be needed
714        if (processStrategy instanceof CamelContextAware) {
715            ((CamelContextAware) processStrategy).setCamelContext(getEndpoint().getCamelContext());
716        }
717        ServiceHelper.startService(processStrategy);
718        super.doStart();
719    }
720
721    @Override
722    protected void doStop() throws Exception {
723        prepareOnStartup = false;
724        super.doStop();
725        ServiceHelper.stopService(processStrategy);
726    }
727
728    @Override
729    public void onInit() throws Exception {
730        // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
731    }
732
733    @Override
734    public long beforePoll(long timeout) throws Exception {
735        // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
736        return timeout;
737    }
738
739    @Override
740    public void afterPoll() throws Exception {
741        // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
742    }
743
744}