001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file;
018
019import java.util.ArrayList;
020import java.util.Collections;
021import java.util.Deque;
022import java.util.LinkedList;
023import java.util.List;
024import java.util.Queue;
025import java.util.regex.Pattern;
026
027import org.apache.camel.CamelContextAware;
028import org.apache.camel.Exchange;
029import org.apache.camel.Message;
030import org.apache.camel.Processor;
031import org.apache.camel.ShutdownRunningTask;
032import org.apache.camel.impl.ScheduledBatchPollingConsumer;
033import org.apache.camel.support.EmptyAsyncCallback;
034import org.apache.camel.util.CastUtils;
035import org.apache.camel.util.ObjectHelper;
036import org.apache.camel.util.ServiceHelper;
037import org.apache.camel.util.StopWatch;
038import org.apache.camel.util.StringHelper;
039import org.apache.camel.util.TimeUtils;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Base class for file consumers.
045 */
046public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer {
047    protected final Logger log = LoggerFactory.getLogger(getClass());
048    protected GenericFileEndpoint<T> endpoint;
049    protected GenericFileOperations<T> operations;
050    protected GenericFileProcessStrategy<T> processStrategy;
051    protected String fileExpressionResult;
052    protected volatile ShutdownRunningTask shutdownRunningTask;
053    protected volatile int pendingExchanges;
054    protected Processor customProcessor;
055    protected boolean eagerLimitMaxMessagesPerPoll = true;
056    protected volatile boolean prepareOnStartup;
057    private final Pattern includePattern;
058    private final Pattern excludePattern;
059
060    public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations, GenericFileProcessStrategy<T> processStrategy) {
061        super(endpoint, processor);
062        this.endpoint = endpoint;
063        this.operations = operations;
064        this.processStrategy = processStrategy;
065
066        this.includePattern = endpoint.getIncludePattern();
067        this.excludePattern = endpoint.getExcludePattern();
068    }
069
070    public Processor getCustomProcessor() {
071        return customProcessor;
072    }
073
074    /**
075     * Use a custom processor to process the exchange.
076     * <p/>
077     * Only set this if you need to do custom processing, instead of the regular processing.
078     * <p/>
079     * This is for example used to browse file endpoints by leveraging the file consumer to poll
080     * the directory to gather the list of exchanges. But to avoid processing the files regularly
081     * we can use a custom processor.
082     *
083     * @param processor a custom processor
084     */
085    public void setCustomProcessor(Processor processor) {
086        this.customProcessor = processor;
087    }
088
089    public boolean isEagerLimitMaxMessagesPerPoll() {
090        return eagerLimitMaxMessagesPerPoll;
091    }
092
093    public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) {
094        this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll;
095    }
096
097    /**
098     * Poll for files
099     */
100    public int poll() throws Exception {
101        // must prepare on startup the very first time
102        if (!prepareOnStartup) {
103            // prepare on startup
104            processStrategy.prepareOnStartup(operations, endpoint);
105            prepareOnStartup = true;
106        }
107
108        // must reset for each poll
109        fileExpressionResult = null;
110        shutdownRunningTask = null;
111        pendingExchanges = 0;
112
113        // before we poll is there anything we need to check?
114        // such as are we connected to the FTP Server still?
115        if (!prePollCheck()) {
116            log.debug("Skipping poll as pre poll check returned false");
117            return 0;
118        }
119
120        // gather list of files to process
121        List<GenericFile<T>> files = new ArrayList<>();
122        String name = endpoint.getConfiguration().getDirectory();
123
124        // time how long it takes to poll
125        StopWatch stop = new StopWatch();
126        boolean limitHit;
127        try {
128            limitHit = !pollDirectory(name, files, 0);
129        } catch (Exception e) {
130            // during poll directory we add files to the in progress repository, in case of any exception thrown after this work
131            // we must then drain the in progress files before rethrowing the exception
132            log.debug("Error occurred during poll directory: {} due {}. Removing {} files marked as in-progress.", name, e.getMessage(), files.size());
133            removeExcessiveInProgressFiles(files);
134            throw e;
135        }
136
137        long delta = stop.taken();
138        if (log.isDebugEnabled()) {
139            log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name);
140        }
141
142        // log if we hit the limit
143        if (limitHit) {
144            log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", maxMessagesPerPoll);
145        }
146
147        // sort files using file comparator if provided
148        if (endpoint.getSorter() != null) {
149            files.sort(endpoint.getSorter());
150        }
151
152        // sort using build in sorters so we can use expressions
153        // use a linked list so we can dequeue the exchanges
154        LinkedList<Exchange> exchanges = new LinkedList<>();
155        for (GenericFile<T> file : files) {
156            Exchange exchange = endpoint.createExchange(file);
157            endpoint.configureExchange(exchange);
158            endpoint.configureMessage(file, exchange.getIn());
159            exchanges.add(exchange);
160        }
161        // sort files using exchange comparator if provided
162        if (endpoint.getSortBy() != null) {
163            exchanges.sort(endpoint.getSortBy());
164        }
165        if (endpoint.isShuffle()) {
166            Collections.shuffle(exchanges);
167        }
168
169        // use a queue for the exchanges
170        Deque<Exchange> q = exchanges;
171
172        // we are not eager limiting, but we have configured a limit, so cut the list of files
173        if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) {
174            if (files.size() > maxMessagesPerPoll) {
175                log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", maxMessagesPerPoll);
176                // must first remove excessive files from the in progress repository
177                removeExcessiveInProgressFiles(q, maxMessagesPerPoll);
178            }
179        }
180
181        // consume files one by one
182        int total = exchanges.size();
183        if (total > 0) {
184            log.debug("Total {} files to consume", total);
185        }
186
187        int polledMessages = processBatch(CastUtils.cast(q));
188
189        postPollCheck(polledMessages);
190
191        return polledMessages;
192    }
193
194    public int processBatch(Queue<Object> exchanges) {
195        int total = exchanges.size();
196        int answer = total;
197
198        // limit if needed
199        if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) {
200            log.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", maxMessagesPerPoll, total);
201            total = maxMessagesPerPoll;
202        }
203
204        for (int index = 0; index < total && isBatchAllowed(); index++) {
205            // only loop if we are started (allowed to run)
206            // use poll to remove the head so it does not consume memory even after we have processed it
207            Exchange exchange = (Exchange) exchanges.poll();
208            // add current index and total as properties
209            exchange.setProperty(Exchange.BATCH_INDEX, index);
210            exchange.setProperty(Exchange.BATCH_SIZE, total);
211            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
212
213            // update pending number of exchanges
214            pendingExchanges = total - index - 1;
215
216            // process the current exchange
217            boolean started;
218            if (customProcessor != null) {
219                // use a custom processor
220                started = customProcessExchange(exchange, customProcessor);
221            } else {
222                // process the exchange regular
223                started = processExchange(exchange);
224            }
225
226            // if we did not start process the file then decrement the counter
227            if (!started) {
228                answer--;
229            }
230        }
231
232        // drain any in progress files as we are done with this batch
233        removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0);
234
235        return answer;
236    }
237
238    /**
239     * Drain any in progress files as we are done with this batch
240     *
241     * @param exchanges  the exchanges
242     * @param limit      the limit
243     */
244    protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) {
245        // remove the file from the in progress list in case the batch was limited by max messages per poll
246        while (exchanges.size() > limit) {
247            // must remove last
248            Exchange exchange = exchanges.removeLast();
249            GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class);
250            String key = file.getAbsoluteFilePath();
251            endpoint.getInProgressRepository().remove(key);
252        }
253    }
254
255    /**
256     * Drain any in progress files as we are done with the files
257     *
258     * @param files  the files
259     */
260    protected void removeExcessiveInProgressFiles(List<GenericFile<T>> files) {
261        for (GenericFile file : files) {
262            String key = file.getAbsoluteFilePath();
263            endpoint.getInProgressRepository().remove(key);
264        }
265    }
266
267    /**
268     * Whether or not we can continue polling for more files
269     *
270     * @param fileList  the current list of gathered files
271     * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit
272     */
273    public boolean canPollMoreFiles(List<?> fileList) {
274        // at this point we should not limit if we are not eager
275        if (!eagerLimitMaxMessagesPerPoll) {
276            return true;
277        }
278
279        if (maxMessagesPerPoll <= 0) {
280            // no limitation
281            return true;
282        }
283
284        // then only poll if we haven't reached the max limit
285        return fileList.size() < maxMessagesPerPoll;
286    }
287
288    /**
289     * Override if required. Perform some checks (and perhaps actions) before we poll.
290     *
291     * @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll.
292     */
293    protected boolean prePollCheck() throws Exception {
294        return true;
295    }
296
297    /**
298     * Override if required. Perform some checks (and perhaps actions) after we have polled.
299     *
300     * @param polledMessages number of polled messages
301     */
302    protected void postPollCheck(int polledMessages) {
303        // noop
304    }
305
306    /**
307     * Polls the given directory for files to process
308     *
309     * @param fileName current directory or file
310     * @param fileList current list of files gathered
311     * @param depth the current depth of the directory (will start from 0)
312     * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit
313     */
314    protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth);
315
316    /**
317     * Sets the operations to be used.
318     * <p/>
319     * Can be used to set a fresh operations in case of recovery attempts
320     *
321     * @param operations the operations
322     */
323    public void setOperations(GenericFileOperations<T> operations) {
324        this.operations = operations;
325    }
326
327    /**
328     * Whether to ignore if the file cannot be retrieved.
329     * <p/>
330     * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved.
331     * <p/>
332     * This method allows to suppress this and just ignore that.
333     *
334     * @param name        the file name
335     * @param exchange    the exchange
336     * @param cause       optional exception occurred during retrieving file
337     * @return <tt>true</tt> to ignore, <tt>false</tt> is the default.
338     */
339    protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) {
340        return false;
341    }
342
343    /**
344     * Processes the exchange
345     *
346     * @param exchange the exchange
347     * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started
348     * to be processed, for some reason (not found, or aborted etc)
349     */
350    protected boolean processExchange(final Exchange exchange) {
351        GenericFile<T> file = getExchangeFileProperty(exchange);
352        log.trace("Processing file: {}", file);
353
354        // must extract the absolute name before the begin strategy as the file could potentially be pre moved
355        // and then the file name would be changed
356        String absoluteFileName = file.getAbsoluteFilePath();
357
358        // check if we can begin processing the file
359        Exception beginCause = null;
360        boolean begin = false;
361        try {
362            begin = processStrategy.begin(operations, endpoint, exchange, file);
363        } catch (Exception e) {
364            beginCause = e;
365        }
366
367        if (!begin) {
368            // no something was wrong, so we need to abort and remove the file from the in progress list
369            Exception abortCause = null;
370            log.debug("{} cannot begin processing file: {}", endpoint, file);
371            try {
372                // abort
373                processStrategy.abort(operations, endpoint, exchange, file);
374            } catch (Exception e) {
375                abortCause = e;
376            } finally {
377                // begin returned false, so remove file from the in progress list as its no longer in progress
378                endpoint.getInProgressRepository().remove(absoluteFileName);
379            }
380            if (beginCause != null) {
381                String msg = endpoint + " cannot begin processing file: " + file + " due to: " + beginCause.getMessage();
382                handleException(msg, beginCause);
383            }
384            if (abortCause != null) {
385                String msg2 = endpoint + " cannot abort processing file: " + file + " due to: " + abortCause.getMessage();
386                handleException(msg2, abortCause);
387            }
388            return false;
389        }
390
391        // must use file from exchange as it can be updated due the
392        // preMoveNamePrefix/preMoveNamePostfix options
393        final GenericFile<T> target = getExchangeFileProperty(exchange);
394
395        // we can begin processing the file so update file headers on the Camel message
396        // in case it took some time to acquire read lock, and file size/timestamp has been updated since etc
397        updateFileHeaders(target, exchange.getIn());
398
399        // must use full name when downloading so we have the correct path
400        final String name = target.getAbsoluteFilePath();
401        try {
402            
403            if (isRetrieveFile()) {
404                // retrieve the file using the stream
405                log.trace("Retrieving file: {} from: {}", name, endpoint);
406    
407                // retrieve the file and check it was a success
408                boolean retrieved;
409                Exception cause = null;
410                try {
411                    retrieved = operations.retrieveFile(name, exchange, target.getFileLength());
412                } catch (Exception e) {
413                    retrieved = false;
414                    cause = e;
415                }
416
417                if (!retrieved) {
418                    if (ignoreCannotRetrieveFile(name, exchange, cause)) {
419                        log.trace("Cannot retrieve file {} maybe it does not exists. Ignoring.", name);
420                        // remove file from the in progress list as we could not retrieve it, but should ignore
421                        endpoint.getInProgressRepository().remove(absoluteFileName);
422                        return false;
423                    } else {
424                        // throw exception to handle the problem with retrieving the file
425                        // then if the method return false or throws an exception is handled the same in here
426                        // as in both cases an exception is being thrown
427                        if (cause instanceof GenericFileOperationFailedException) {
428                            throw cause;
429                        } else {
430                            throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint, cause);
431                        }
432                    }
433                }
434    
435                log.trace("Retrieved file: {} from: {}", name, endpoint);                
436            } else {
437                log.trace("Skipped retrieval of file: {} from: {}", name, endpoint);
438                exchange.getIn().setBody(null);
439            }
440
441            // register on completion callback that does the completion strategies
442            // (for instance to move the file after we have processed it)
443            exchange.addOnCompletion(new GenericFileOnCompletion<>(endpoint, operations, processStrategy, target, absoluteFileName));
444
445            log.debug("About to process file: {} using exchange: {}", target, exchange);
446
447            if (endpoint.isSynchronous()) {
448                // process synchronously
449                getProcessor().process(exchange);
450            } else {
451                // process the exchange using the async consumer to support async routing engine
452                // which can be supported by this file consumer as all the done work is
453                // provided in the GenericFileOnCompletion
454                getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
455            }
456
457        } catch (Exception e) {
458            // remove file from the in progress list due to failure
459            // (cannot be in finally block due to GenericFileOnCompletion will remove it
460            // from in progress when it takes over and processes the file, which may happen
461            // by another thread at a later time. So its only safe to remove it if there was an exception)
462            endpoint.getInProgressRepository().remove(absoluteFileName);
463
464            String msg = "Error processing file " + file + " due to " + e.getMessage();
465            handleException(msg, e);
466        }
467
468        return true;
469    }
470
471    /**
472     * Updates the information on {@link Message} after we have acquired read-lock and
473     * can begin process the file.
474     *
475     * @param file    the file
476     * @param message the Camel message to update its headers
477     */
478    protected abstract void updateFileHeaders(GenericFile<T> file, Message message);
479
480    /**
481     * Override if required.  Files are retrieved / returns true by default
482     *
483     * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files.
484     */
485    protected boolean isRetrieveFile() {
486        return true;
487    }
488
489    /**
490     * Processes the exchange using a custom processor.
491     *
492     * @param exchange the exchange
493     * @param processor the custom processor
494     */
495    protected boolean customProcessExchange(final Exchange exchange, final Processor processor) {
496        GenericFile<T> file = getExchangeFileProperty(exchange);
497        log.trace("Custom processing file: {}", file);
498
499        // must extract the absolute name before the begin strategy as the file could potentially be pre moved
500        // and then the file name would be changed
501        String absoluteFileName = file.getAbsoluteFilePath();
502
503        try {
504            // process using the custom processor
505            processor.process(exchange);
506        } catch (Exception e) {
507            if (log.isDebugEnabled()) {
508                log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e);
509            }
510            handleException(e);
511        } finally {
512            // always remove file from the in progress list as its no longer in progress
513            // use the original file name that was used to add it to the repository
514            // as the name can be different when using preMove option
515            endpoint.getInProgressRepository().remove(absoluteFileName);
516        }
517
518        return true;
519    }
520
521    /**
522     * Strategy for validating if the given remote file should be included or not
523     *
524     * @param file        the file
525     * @param isDirectory whether the file is a directory or a file
526     * @param files       files in the directory
527     * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it
528     */
529    protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) {
530        String absoluteFilePath = file.getAbsoluteFilePath();
531
532        if (!isMatched(file, isDirectory, files)) {
533            log.trace("File did not match. Will skip this file: {}", file);
534            return false;
535        }
536
537        // directory is always valid
538        if (isDirectory) {
539            return true;
540        }
541
542        // check if file is already in progress
543        if (endpoint.getInProgressRepository().contains(absoluteFilePath)) {
544            if (log.isTraceEnabled()) {
545                log.trace("Skipping as file is already in progress: {}", file.getFileName());
546            }
547            return false;
548        }
549
550        // if its a file then check we have the file in the idempotent registry already
551        if (endpoint.isIdempotent()) {
552            // use absolute file path as default key, but evaluate if an expression key was configured
553            String key = file.getAbsoluteFilePath();
554            if (endpoint.getIdempotentKey() != null) {
555                Exchange dummy = endpoint.createExchange(file);
556                key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
557            }
558            if (key != null && endpoint.getIdempotentRepository().contains(key)) {
559                log.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", key, file);
560                return false;
561            }
562        }
563
564        // okay so final step is to be able to add atomic as in-progress, so we are the
565        // only thread processing this file
566        return endpoint.getInProgressRepository().add(absoluteFilePath);
567    }
568
569    /**
570     * Strategy to perform file matching based on endpoint configuration.
571     * <p/>
572     * Will always return <tt>false</tt> for certain files/folders:
573     * <ul>
574     * <li>Starting with a dot</li>
575     * <li>lock files</li>
576     * </ul>
577     * And then <tt>true</tt> for directories.
578     *
579     * @param file        the file
580     * @param isDirectory whether the file is a directory or a file
581     * @param files       files in the directory
582     * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
583     */
584    protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) {
585        String name = file.getFileNameOnly();
586
587        // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
588        if (name.startsWith(".")) {
589            return false;
590        }
591
592        // lock files should be skipped
593        if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) {
594            return false;
595        }
596
597        if (endpoint.getFilter() != null) {
598            if (!endpoint.getFilter().accept(file)) {
599                return false;
600            }
601        }
602
603        if (endpoint.getAntFilter() != null) {
604            if (!endpoint.getAntFilter().accept(file)) {
605                return false;
606            }
607        }
608
609        if (isDirectory && endpoint.getFilterDirectory() != null) {
610            // create a dummy exchange as Exchange is needed for expression evaluation
611            Exchange dummy = endpoint.createExchange(file);
612            boolean matches = endpoint.getFilterDirectory().matches(dummy);
613            if (!matches) {
614                return false;
615            }
616        }
617
618        // directories are regarded as matched if filter accepted them
619        if (isDirectory) {
620            return true;
621        }
622
623        // exclude take precedence over include
624        if (excludePattern != null)  {
625            if (excludePattern.matcher(name).matches()) {
626                return false;
627            }
628        }
629        if (includePattern != null)  {
630            if (!includePattern.matcher(name).matches()) {
631                return false;
632            }
633        }
634
635        // use file expression for a simple dynamic file filter
636        if (endpoint.getFileName() != null) {
637            fileExpressionResult = evaluateFileExpression();
638            if (fileExpressionResult != null) {
639                if (!name.equals(fileExpressionResult)) {
640                    return false;
641                }
642            }
643        }
644
645        if (endpoint.getFilterFile() != null) {
646            // create a dummy exchange as Exchange is needed for expression evaluation
647            Exchange dummy = endpoint.createExchange(file);
648            boolean matches = endpoint.getFilterFile().matches(dummy);
649            if (!matches) {
650                return false;
651            }
652        }
653
654        // if done file name is enabled, then the file is only valid if a done file exists
655        if (endpoint.getDoneFileName() != null) {
656            // done file must be in same path as the file
657            String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath());
658            StringHelper.notEmpty(doneFileName, "doneFileName", endpoint);
659
660            // is it a done file name?
661            if (endpoint.isDoneFile(file.getFileNameOnly())) {
662                log.trace("Skipping done file: {}", file);
663                return false;
664            }
665
666            if (!isMatched(file, doneFileName, files)) {
667                return false;
668            }
669        }
670
671        return true;
672    }
673
674    /**
675     * Strategy to perform file matching based on endpoint configuration in terms of done file name.
676     *
677     * @param file         the file
678     * @param doneFileName the done file name (without any paths)
679     * @param files        files in the directory
680     * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not
681     */
682    protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files);
683
684    /**
685     * Is the given file already in progress.
686     *
687     * @param file the file
688     * @return <tt>true</tt> if the file is already in progress
689     * @deprecated no longer in use, use {@link org.apache.camel.component.file.GenericFileEndpoint#getInProgressRepository()} instead.
690     */
691    @Deprecated
692    protected boolean isInProgress(GenericFile<T> file) {
693        String key = file.getAbsoluteFilePath();
694        // must use add, to have operation as atomic
695        return !endpoint.getInProgressRepository().add(key);
696    }
697
698    protected String evaluateFileExpression() {
699        if (fileExpressionResult == null && endpoint.getFileName() != null) {
700            // create a dummy exchange as Exchange is needed for expression evaluation
701            Exchange dummy = endpoint.createExchange();
702            fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class);
703            if (dummy.getException() != null) {
704                throw ObjectHelper.wrapRuntimeCamelException(dummy.getException());
705            }
706        }
707        return fileExpressionResult;
708    }
709
710    @SuppressWarnings("unchecked")
711    private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
712        return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE);
713    }
714
715    @Override
716    protected void doStart() throws Exception {
717        // inject CamelContext before starting as it may be needed
718        if (processStrategy instanceof CamelContextAware) {
719            ((CamelContextAware) processStrategy).setCamelContext(getEndpoint().getCamelContext());
720        }
721        ServiceHelper.startService(processStrategy);
722        super.doStart();
723    }
724
725    @Override
726    protected void doStop() throws Exception {
727        prepareOnStartup = false;
728        super.doStop();
729        ServiceHelper.stopService(processStrategy);
730    }
731
732    @Override
733    public void onInit() throws Exception {
734        // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
735    }
736
737    @Override
738    public long beforePoll(long timeout) throws Exception {
739        // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
740        return timeout;
741    }
742
743    @Override
744    public void afterPoll() throws Exception {
745        // noop as we do a manual on-demand poll with GenericFilePolllingConsumer
746    }
747
748}