001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.Deque; 022import java.util.LinkedList; 023import java.util.List; 024import java.util.Queue; 025import java.util.regex.Pattern; 026 027import org.apache.camel.CamelContextAware; 028import org.apache.camel.Exchange; 029import org.apache.camel.Message; 030import org.apache.camel.Processor; 031import org.apache.camel.ShutdownRunningTask; 032import org.apache.camel.impl.ScheduledBatchPollingConsumer; 033import org.apache.camel.support.EmptyAsyncCallback; 034import org.apache.camel.util.CastUtils; 035import org.apache.camel.util.ObjectHelper; 036import org.apache.camel.util.ServiceHelper; 037import org.apache.camel.util.StopWatch; 038import org.apache.camel.util.StringHelper; 039import org.apache.camel.util.TimeUtils; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Base class for file consumers. 045 */ 046public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer { 047 protected final Logger log = LoggerFactory.getLogger(getClass()); 048 protected GenericFileEndpoint<T> endpoint; 049 protected GenericFileOperations<T> operations; 050 protected GenericFileProcessStrategy<T> processStrategy; 051 protected volatile ShutdownRunningTask shutdownRunningTask; 052 protected volatile int pendingExchanges; 053 protected Processor customProcessor; 054 protected boolean eagerLimitMaxMessagesPerPoll = true; 055 protected volatile boolean prepareOnStartup; 056 private final Pattern includePattern; 057 private final Pattern excludePattern; 058 059 public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations, GenericFileProcessStrategy<T> processStrategy) { 060 super(endpoint, processor); 061 this.endpoint = endpoint; 062 this.operations = operations; 063 this.processStrategy = processStrategy; 064 065 this.includePattern = endpoint.getIncludePattern(); 066 this.excludePattern = endpoint.getExcludePattern(); 067 } 068 069 public Processor getCustomProcessor() { 070 return customProcessor; 071 } 072 073 /** 074 * Use a custom processor to process the exchange. 075 * <p/> 076 * Only set this if you need to do custom processing, instead of the regular processing. 077 * <p/> 078 * This is for example used to browse file endpoints by leveraging the file consumer to poll 079 * the directory to gather the list of exchanges. But to avoid processing the files regularly 080 * we can use a custom processor. 081 * 082 * @param processor a custom processor 083 */ 084 public void setCustomProcessor(Processor processor) { 085 this.customProcessor = processor; 086 } 087 088 public boolean isEagerLimitMaxMessagesPerPoll() { 089 return eagerLimitMaxMessagesPerPoll; 090 } 091 092 public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) { 093 this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll; 094 } 095 096 /** 097 * Poll for files 098 */ 099 public int poll() throws Exception { 100 // must prepare on startup the very first time 101 if (!prepareOnStartup) { 102 // prepare on startup 103 processStrategy.prepareOnStartup(operations, endpoint); 104 prepareOnStartup = true; 105 } 106 107 // must reset for each poll 108 shutdownRunningTask = null; 109 pendingExchanges = 0; 110 111 // before we poll is there anything we need to check? 112 // such as are we connected to the FTP Server still? 113 if (!prePollCheck()) { 114 log.debug("Skipping poll as pre poll check returned false"); 115 return 0; 116 } 117 118 // gather list of files to process 119 List<GenericFile<T>> files = new ArrayList<>(); 120 String name = endpoint.getConfiguration().getDirectory(); 121 122 // time how long it takes to poll 123 StopWatch stop = new StopWatch(); 124 boolean limitHit; 125 try { 126 limitHit = !pollDirectory(name, files, 0); 127 } catch (Exception e) { 128 // during poll directory we add files to the in progress repository, in case of any exception thrown after this work 129 // we must then drain the in progress files before rethrowing the exception 130 log.debug("Error occurred during poll directory: {} due {}. Removing {} files marked as in-progress.", name, e.getMessage(), files.size()); 131 removeExcessiveInProgressFiles(files); 132 throw e; 133 } 134 135 long delta = stop.taken(); 136 if (log.isDebugEnabled()) { 137 log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name); 138 } 139 140 // log if we hit the limit 141 if (limitHit) { 142 log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", maxMessagesPerPoll); 143 } 144 145 // sort files using file comparator if provided 146 if (endpoint.getSorter() != null) { 147 files.sort(endpoint.getSorter()); 148 } 149 150 // sort using build in sorters so we can use expressions 151 // use a linked list so we can dequeue the exchanges 152 LinkedList<Exchange> exchanges = new LinkedList<>(); 153 for (GenericFile<T> file : files) { 154 Exchange exchange = endpoint.createExchange(file); 155 endpoint.configureExchange(exchange); 156 endpoint.configureMessage(file, exchange.getIn()); 157 exchanges.add(exchange); 158 } 159 // sort files using exchange comparator if provided 160 if (endpoint.getSortBy() != null) { 161 exchanges.sort(endpoint.getSortBy()); 162 } 163 if (endpoint.isShuffle()) { 164 Collections.shuffle(exchanges); 165 } 166 167 // use a queue for the exchanges 168 Deque<Exchange> q = exchanges; 169 170 // we are not eager limiting, but we have configured a limit, so cut the list of files 171 if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) { 172 if (files.size() > maxMessagesPerPoll) { 173 log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", maxMessagesPerPoll); 174 // must first remove excessive files from the in progress repository 175 removeExcessiveInProgressFiles(q, maxMessagesPerPoll); 176 } 177 } 178 179 // consume files one by one 180 int total = exchanges.size(); 181 if (total > 0) { 182 log.debug("Total {} files to consume", total); 183 } 184 185 int polledMessages = processBatch(CastUtils.cast(q)); 186 187 postPollCheck(polledMessages); 188 189 return polledMessages; 190 } 191 192 public int processBatch(Queue<Object> exchanges) { 193 int total = exchanges.size(); 194 int answer = total; 195 196 // limit if needed 197 if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) { 198 log.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", maxMessagesPerPoll, total); 199 total = maxMessagesPerPoll; 200 } 201 202 for (int index = 0; index < total && isBatchAllowed(); index++) { 203 // only loop if we are started (allowed to run) 204 // use poll to remove the head so it does not consume memory even after we have processed it 205 Exchange exchange = (Exchange) exchanges.poll(); 206 // add current index and total as properties 207 exchange.setProperty(Exchange.BATCH_INDEX, index); 208 exchange.setProperty(Exchange.BATCH_SIZE, total); 209 exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1); 210 211 // update pending number of exchanges 212 pendingExchanges = total - index - 1; 213 214 // process the current exchange 215 boolean started; 216 if (customProcessor != null) { 217 // use a custom processor 218 started = customProcessExchange(exchange, customProcessor); 219 } else { 220 // process the exchange regular 221 started = processExchange(exchange); 222 } 223 224 // if we did not start process the file then decrement the counter 225 if (!started) { 226 answer--; 227 } 228 } 229 230 // drain any in progress files as we are done with this batch 231 removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0); 232 233 return answer; 234 } 235 236 /** 237 * Drain any in progress files as we are done with this batch 238 * 239 * @param exchanges the exchanges 240 * @param limit the limit 241 */ 242 protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) { 243 // remove the file from the in progress list in case the batch was limited by max messages per poll 244 while (exchanges.size() > limit) { 245 // must remove last 246 Exchange exchange = exchanges.removeLast(); 247 GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class); 248 String key = file.getAbsoluteFilePath(); 249 endpoint.getInProgressRepository().remove(key); 250 } 251 } 252 253 /** 254 * Drain any in progress files as we are done with the files 255 * 256 * @param files the files 257 */ 258 protected void removeExcessiveInProgressFiles(List<GenericFile<T>> files) { 259 for (GenericFile file : files) { 260 String key = file.getAbsoluteFilePath(); 261 endpoint.getInProgressRepository().remove(key); 262 } 263 } 264 265 /** 266 * Whether or not we can continue polling for more files 267 * 268 * @param fileList the current list of gathered files 269 * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit 270 */ 271 public boolean canPollMoreFiles(List<?> fileList) { 272 // at this point we should not limit if we are not eager 273 if (!eagerLimitMaxMessagesPerPoll) { 274 return true; 275 } 276 277 if (maxMessagesPerPoll <= 0) { 278 // no limitation 279 return true; 280 } 281 282 // then only poll if we haven't reached the max limit 283 return fileList.size() < maxMessagesPerPoll; 284 } 285 286 /** 287 * Override if required. Perform some checks (and perhaps actions) before we poll. 288 * 289 * @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll. 290 */ 291 protected boolean prePollCheck() throws Exception { 292 return true; 293 } 294 295 /** 296 * Override if required. Perform some checks (and perhaps actions) after we have polled. 297 * 298 * @param polledMessages number of polled messages 299 */ 300 protected void postPollCheck(int polledMessages) { 301 // noop 302 } 303 304 /** 305 * Polls the given directory for files to process 306 * 307 * @param fileName current directory or file 308 * @param fileList current list of files gathered 309 * @param depth the current depth of the directory (will start from 0) 310 * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit 311 */ 312 protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth); 313 314 /** 315 * Sets the operations to be used. 316 * <p/> 317 * Can be used to set a fresh operations in case of recovery attempts 318 * 319 * @param operations the operations 320 */ 321 public void setOperations(GenericFileOperations<T> operations) { 322 this.operations = operations; 323 } 324 325 /** 326 * Whether to ignore if the file cannot be retrieved. 327 * <p/> 328 * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved. 329 * <p/> 330 * This method allows to suppress this and just ignore that. 331 * 332 * @param name the file name 333 * @param exchange the exchange 334 * @param cause optional exception occurred during retrieving file 335 * @return <tt>true</tt> to ignore, <tt>false</tt> is the default. 336 */ 337 protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) { 338 return false; 339 } 340 341 /** 342 * Processes the exchange 343 * 344 * @param exchange the exchange 345 * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started 346 * to be processed, for some reason (not found, or aborted etc) 347 */ 348 protected boolean processExchange(final Exchange exchange) { 349 GenericFile<T> file = getExchangeFileProperty(exchange); 350 log.trace("Processing file: {}", file); 351 352 // must extract the absolute name before the begin strategy as the file could potentially be pre moved 353 // and then the file name would be changed 354 String absoluteFileName = file.getAbsoluteFilePath(); 355 356 // check if we can begin processing the file 357 Exception beginCause = null; 358 boolean begin = false; 359 try { 360 begin = processStrategy.begin(operations, endpoint, exchange, file); 361 } catch (Exception e) { 362 beginCause = e; 363 } 364 365 if (!begin) { 366 // no something was wrong, so we need to abort and remove the file from the in progress list 367 Exception abortCause = null; 368 log.debug("{} cannot begin processing file: {}", endpoint, file); 369 try { 370 // abort 371 processStrategy.abort(operations, endpoint, exchange, file); 372 } catch (Exception e) { 373 abortCause = e; 374 } finally { 375 // begin returned false, so remove file from the in progress list as its no longer in progress 376 endpoint.getInProgressRepository().remove(absoluteFileName); 377 } 378 if (beginCause != null) { 379 String msg = endpoint + " cannot begin processing file: " + file + " due to: " + beginCause.getMessage(); 380 handleException(msg, beginCause); 381 } 382 if (abortCause != null) { 383 String msg2 = endpoint + " cannot abort processing file: " + file + " due to: " + abortCause.getMessage(); 384 handleException(msg2, abortCause); 385 } 386 return false; 387 } 388 389 // must use file from exchange as it can be updated due the 390 // preMoveNamePrefix/preMoveNamePostfix options 391 final GenericFile<T> target = getExchangeFileProperty(exchange); 392 393 // we can begin processing the file so update file headers on the Camel message 394 // in case it took some time to acquire read lock, and file size/timestamp has been updated since etc 395 updateFileHeaders(target, exchange.getIn()); 396 397 // must use full name when downloading so we have the correct path 398 final String name = target.getAbsoluteFilePath(); 399 try { 400 401 if (isRetrieveFile()) { 402 // retrieve the file using the stream 403 log.trace("Retrieving file: {} from: {}", name, endpoint); 404 405 // retrieve the file and check it was a success 406 boolean retrieved; 407 Exception cause = null; 408 try { 409 retrieved = operations.retrieveFile(name, exchange, target.getFileLength()); 410 } catch (Exception e) { 411 retrieved = false; 412 cause = e; 413 } 414 415 if (!retrieved) { 416 if (ignoreCannotRetrieveFile(name, exchange, cause)) { 417 log.trace("Cannot retrieve file {} maybe it does not exists. Ignoring.", name); 418 // remove file from the in progress list as we could not retrieve it, but should ignore 419 endpoint.getInProgressRepository().remove(absoluteFileName); 420 return false; 421 } else { 422 // throw exception to handle the problem with retrieving the file 423 // then if the method return false or throws an exception is handled the same in here 424 // as in both cases an exception is being thrown 425 if (cause instanceof GenericFileOperationFailedException) { 426 throw cause; 427 } else { 428 throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint, cause); 429 } 430 } 431 } 432 433 log.trace("Retrieved file: {} from: {}", name, endpoint); 434 } else { 435 log.trace("Skipped retrieval of file: {} from: {}", name, endpoint); 436 exchange.getIn().setBody(null); 437 } 438 439 // register on completion callback that does the completion strategies 440 // (for instance to move the file after we have processed it) 441 exchange.addOnCompletion(new GenericFileOnCompletion<>(endpoint, operations, processStrategy, target, absoluteFileName)); 442 443 log.debug("About to process file: {} using exchange: {}", target, exchange); 444 445 if (endpoint.isSynchronous()) { 446 // process synchronously 447 getProcessor().process(exchange); 448 } else { 449 // process the exchange using the async consumer to support async routing engine 450 // which can be supported by this file consumer as all the done work is 451 // provided in the GenericFileOnCompletion 452 getAsyncProcessor().process(exchange, EmptyAsyncCallback.get()); 453 } 454 455 } catch (Exception e) { 456 // remove file from the in progress list due to failure 457 // (cannot be in finally block due to GenericFileOnCompletion will remove it 458 // from in progress when it takes over and processes the file, which may happen 459 // by another thread at a later time. So its only safe to remove it if there was an exception) 460 endpoint.getInProgressRepository().remove(absoluteFileName); 461 462 String msg = "Error processing file " + file + " due to " + e.getMessage(); 463 handleException(msg, e); 464 } 465 466 return true; 467 } 468 469 /** 470 * Updates the information on {@link Message} after we have acquired read-lock and 471 * can begin process the file. 472 * 473 * @param file the file 474 * @param message the Camel message to update its headers 475 */ 476 protected abstract void updateFileHeaders(GenericFile<T> file, Message message); 477 478 /** 479 * Override if required. Files are retrieved / returns true by default 480 * 481 * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files. 482 */ 483 protected boolean isRetrieveFile() { 484 return true; 485 } 486 487 /** 488 * Processes the exchange using a custom processor. 489 * 490 * @param exchange the exchange 491 * @param processor the custom processor 492 */ 493 protected boolean customProcessExchange(final Exchange exchange, final Processor processor) { 494 GenericFile<T> file = getExchangeFileProperty(exchange); 495 log.trace("Custom processing file: {}", file); 496 497 // must extract the absolute name before the begin strategy as the file could potentially be pre moved 498 // and then the file name would be changed 499 String absoluteFileName = file.getAbsoluteFilePath(); 500 501 try { 502 // process using the custom processor 503 processor.process(exchange); 504 } catch (Exception e) { 505 if (log.isDebugEnabled()) { 506 log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e); 507 } 508 handleException(e); 509 } finally { 510 // always remove file from the in progress list as its no longer in progress 511 // use the original file name that was used to add it to the repository 512 // as the name can be different when using preMove option 513 endpoint.getInProgressRepository().remove(absoluteFileName); 514 } 515 516 return true; 517 } 518 519 /** 520 * Strategy for validating if the given remote file should be included or not 521 * 522 * @param file the file 523 * @param isDirectory whether the file is a directory or a file 524 * @param files files in the directory 525 * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it 526 */ 527 protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) { 528 String absoluteFilePath = file.getAbsoluteFilePath(); 529 530 if (!isMatched(file, isDirectory, files)) { 531 log.trace("File did not match. Will skip this file: {}", file); 532 return false; 533 } 534 535 // directory is always valid 536 if (isDirectory) { 537 return true; 538 } 539 540 // check if file is already in progress 541 if (endpoint.getInProgressRepository().contains(absoluteFilePath)) { 542 if (log.isTraceEnabled()) { 543 log.trace("Skipping as file is already in progress: {}", file.getFileName()); 544 } 545 return false; 546 } 547 548 // if its a file then check we have the file in the idempotent registry already 549 if (endpoint.isIdempotent()) { 550 // use absolute file path as default key, but evaluate if an expression key was configured 551 String key = file.getAbsoluteFilePath(); 552 if (endpoint.getIdempotentKey() != null) { 553 Exchange dummy = endpoint.createExchange(file); 554 key = endpoint.getIdempotentKey().evaluate(dummy, String.class); 555 } 556 if (key != null && endpoint.getIdempotentRepository().contains(key)) { 557 log.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", key, file); 558 return false; 559 } 560 } 561 562 // okay so final step is to be able to add atomic as in-progress, so we are the 563 // only thread processing this file 564 return endpoint.getInProgressRepository().add(absoluteFilePath); 565 } 566 567 /** 568 * Strategy to perform file matching based on endpoint configuration. 569 * <p/> 570 * Will always return <tt>false</tt> for certain files/folders: 571 * <ul> 572 * <li>Starting with a dot</li> 573 * <li>lock files</li> 574 * </ul> 575 * And then <tt>true</tt> for directories. 576 * 577 * @param file the file 578 * @param isDirectory whether the file is a directory or a file 579 * @param files files in the directory 580 * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not 581 */ 582 protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) { 583 String name = file.getFileNameOnly(); 584 585 // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock") 586 if (name.startsWith(".")) { 587 return false; 588 } 589 590 // lock files should be skipped 591 if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) { 592 return false; 593 } 594 595 if (endpoint.getFilter() != null) { 596 if (!endpoint.getFilter().accept(file)) { 597 return false; 598 } 599 } 600 601 if (endpoint.getAntFilter() != null) { 602 if (!endpoint.getAntFilter().accept(file)) { 603 return false; 604 } 605 } 606 607 if (isDirectory && endpoint.getFilterDirectory() != null) { 608 // create a dummy exchange as Exchange is needed for expression evaluation 609 Exchange dummy = endpoint.createExchange(file); 610 boolean matches = endpoint.getFilterDirectory().matches(dummy); 611 if (!matches) { 612 return false; 613 } 614 } 615 616 // directories are regarded as matched if filter accepted them 617 if (isDirectory) { 618 return true; 619 } 620 621 // exclude take precedence over include 622 if (excludePattern != null) { 623 if (excludePattern.matcher(name).matches()) { 624 return false; 625 } 626 } 627 if (includePattern != null) { 628 if (!includePattern.matcher(name).matches()) { 629 return false; 630 } 631 } 632 633 // use file expression for a simple dynamic file filter 634 if (endpoint.getFileName() != null) { 635 // create a dummy exchange as Exchange is needed for expression evaluation 636 Exchange dummy = endpoint.createExchange(file); 637 String result = evaluateFileExpression(dummy); 638 if (result != null) { 639 if (!name.equals(result)) { 640 return false; 641 } 642 } 643 } 644 645 if (endpoint.getFilterFile() != null) { 646 // create a dummy exchange as Exchange is needed for expression evaluation 647 Exchange dummy = endpoint.createExchange(file); 648 boolean matches = endpoint.getFilterFile().matches(dummy); 649 if (!matches) { 650 return false; 651 } 652 } 653 654 // if done file name is enabled, then the file is only valid if a done file exists 655 if (endpoint.getDoneFileName() != null) { 656 // done file must be in same path as the file 657 String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath()); 658 StringHelper.notEmpty(doneFileName, "doneFileName", endpoint); 659 660 // is it a done file name? 661 if (endpoint.isDoneFile(file.getFileNameOnly())) { 662 log.trace("Skipping done file: {}", file); 663 return false; 664 } 665 666 if (!isMatched(file, doneFileName, files)) { 667 return false; 668 } 669 } 670 671 return true; 672 } 673 674 /** 675 * Strategy to perform file matching based on endpoint configuration in terms of done file name. 676 * 677 * @param file the file 678 * @param doneFileName the done file name (without any paths) 679 * @param files files in the directory 680 * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not 681 */ 682 protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files); 683 684 /** 685 * Is the given file already in progress. 686 * 687 * @param file the file 688 * @return <tt>true</tt> if the file is already in progress 689 * @deprecated no longer in use, use {@link org.apache.camel.component.file.GenericFileEndpoint#getInProgressRepository()} instead. 690 */ 691 @Deprecated 692 protected boolean isInProgress(GenericFile<T> file) { 693 String key = file.getAbsoluteFilePath(); 694 // must use add, to have operation as atomic 695 return !endpoint.getInProgressRepository().add(key); 696 } 697 698 protected String evaluateFileExpression(Exchange exchange) { 699 String result = endpoint.getFileName().evaluate(exchange, String.class); 700 if (exchange.getException() != null) { 701 throw ObjectHelper.wrapRuntimeCamelException(exchange.getException()); 702 } 703 return result; 704 } 705 706 @SuppressWarnings("unchecked") 707 private GenericFile<T> getExchangeFileProperty(Exchange exchange) { 708 return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); 709 } 710 711 @Override 712 protected void doStart() throws Exception { 713 // inject CamelContext before starting as it may be needed 714 if (processStrategy instanceof CamelContextAware) { 715 ((CamelContextAware) processStrategy).setCamelContext(getEndpoint().getCamelContext()); 716 } 717 ServiceHelper.startService(processStrategy); 718 super.doStart(); 719 } 720 721 @Override 722 protected void doStop() throws Exception { 723 prepareOnStartup = false; 724 super.doStop(); 725 ServiceHelper.stopService(processStrategy); 726 } 727 728 @Override 729 public void onInit() throws Exception { 730 // noop as we do a manual on-demand poll with GenericFilePolllingConsumer 731 } 732 733 @Override 734 public long beforePoll(long timeout) throws Exception { 735 // noop as we do a manual on-demand poll with GenericFilePolllingConsumer 736 return timeout; 737 } 738 739 @Override 740 public void afterPoll() throws Exception { 741 // noop as we do a manual on-demand poll with GenericFilePolllingConsumer 742 } 743 744}