001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.Deque; 022import java.util.LinkedList; 023import java.util.List; 024import java.util.Queue; 025import java.util.regex.Pattern; 026 027import org.apache.camel.CamelContextAware; 028import org.apache.camel.Exchange; 029import org.apache.camel.Message; 030import org.apache.camel.Processor; 031import org.apache.camel.ShutdownRunningTask; 032import org.apache.camel.impl.ScheduledBatchPollingConsumer; 033import org.apache.camel.support.EmptyAsyncCallback; 034import org.apache.camel.util.CastUtils; 035import org.apache.camel.util.ObjectHelper; 036import org.apache.camel.util.ServiceHelper; 037import org.apache.camel.util.StopWatch; 038import org.apache.camel.util.StringHelper; 039import org.apache.camel.util.TimeUtils; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Base class for file consumers. 045 */ 046public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer { 047 protected final Logger log = LoggerFactory.getLogger(getClass()); 048 protected GenericFileEndpoint<T> endpoint; 049 protected GenericFileOperations<T> operations; 050 protected GenericFileProcessStrategy<T> processStrategy; 051 protected String fileExpressionResult; 052 protected volatile ShutdownRunningTask shutdownRunningTask; 053 protected volatile int pendingExchanges; 054 protected Processor customProcessor; 055 protected boolean eagerLimitMaxMessagesPerPoll = true; 056 protected volatile boolean prepareOnStartup; 057 private final Pattern includePattern; 058 private final Pattern excludePattern; 059 060 public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations, GenericFileProcessStrategy<T> processStrategy) { 061 super(endpoint, processor); 062 this.endpoint = endpoint; 063 this.operations = operations; 064 this.processStrategy = processStrategy; 065 066 this.includePattern = endpoint.getIncludePattern(); 067 this.excludePattern = endpoint.getExcludePattern(); 068 } 069 070 public Processor getCustomProcessor() { 071 return customProcessor; 072 } 073 074 /** 075 * Use a custom processor to process the exchange. 076 * <p/> 077 * Only set this if you need to do custom processing, instead of the regular processing. 078 * <p/> 079 * This is for example used to browse file endpoints by leveraging the file consumer to poll 080 * the directory to gather the list of exchanges. But to avoid processing the files regularly 081 * we can use a custom processor. 082 * 083 * @param processor a custom processor 084 */ 085 public void setCustomProcessor(Processor processor) { 086 this.customProcessor = processor; 087 } 088 089 public boolean isEagerLimitMaxMessagesPerPoll() { 090 return eagerLimitMaxMessagesPerPoll; 091 } 092 093 public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) { 094 this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll; 095 } 096 097 /** 098 * Poll for files 099 */ 100 public int poll() throws Exception { 101 // must prepare on startup the very first time 102 if (!prepareOnStartup) { 103 // prepare on startup 104 processStrategy.prepareOnStartup(operations, endpoint); 105 prepareOnStartup = true; 106 } 107 108 // must reset for each poll 109 fileExpressionResult = null; 110 shutdownRunningTask = null; 111 pendingExchanges = 0; 112 113 // before we poll is there anything we need to check? 114 // such as are we connected to the FTP Server still? 115 if (!prePollCheck()) { 116 log.debug("Skipping poll as pre poll check returned false"); 117 return 0; 118 } 119 120 // gather list of files to process 121 List<GenericFile<T>> files = new ArrayList<>(); 122 String name = endpoint.getConfiguration().getDirectory(); 123 124 // time how long it takes to poll 125 StopWatch stop = new StopWatch(); 126 boolean limitHit; 127 try { 128 limitHit = !pollDirectory(name, files, 0); 129 } catch (Exception e) { 130 // during poll directory we add files to the in progress repository, in case of any exception thrown after this work 131 // we must then drain the in progress files before rethrowing the exception 132 log.debug("Error occurred during poll directory: {} due {}. Removing {} files marked as in-progress.", name, e.getMessage(), files.size()); 133 removeExcessiveInProgressFiles(files); 134 throw e; 135 } 136 137 long delta = stop.taken(); 138 if (log.isDebugEnabled()) { 139 log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name); 140 } 141 142 // log if we hit the limit 143 if (limitHit) { 144 log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", maxMessagesPerPoll); 145 } 146 147 // sort files using file comparator if provided 148 if (endpoint.getSorter() != null) { 149 files.sort(endpoint.getSorter()); 150 } 151 152 // sort using build in sorters so we can use expressions 153 // use a linked list so we can dequeue the exchanges 154 LinkedList<Exchange> exchanges = new LinkedList<>(); 155 for (GenericFile<T> file : files) { 156 Exchange exchange = endpoint.createExchange(file); 157 endpoint.configureExchange(exchange); 158 endpoint.configureMessage(file, exchange.getIn()); 159 exchanges.add(exchange); 160 } 161 // sort files using exchange comparator if provided 162 if (endpoint.getSortBy() != null) { 163 exchanges.sort(endpoint.getSortBy()); 164 } 165 if (endpoint.isShuffle()) { 166 Collections.shuffle(exchanges); 167 } 168 169 // use a queue for the exchanges 170 Deque<Exchange> q = exchanges; 171 172 // we are not eager limiting, but we have configured a limit, so cut the list of files 173 if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) { 174 if (files.size() > maxMessagesPerPoll) { 175 log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", maxMessagesPerPoll); 176 // must first remove excessive files from the in progress repository 177 removeExcessiveInProgressFiles(q, maxMessagesPerPoll); 178 } 179 } 180 181 // consume files one by one 182 int total = exchanges.size(); 183 if (total > 0) { 184 log.debug("Total {} files to consume", total); 185 } 186 187 int polledMessages = processBatch(CastUtils.cast(q)); 188 189 postPollCheck(polledMessages); 190 191 return polledMessages; 192 } 193 194 public int processBatch(Queue<Object> exchanges) { 195 int total = exchanges.size(); 196 int answer = total; 197 198 // limit if needed 199 if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) { 200 log.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", maxMessagesPerPoll, total); 201 total = maxMessagesPerPoll; 202 } 203 204 for (int index = 0; index < total && isBatchAllowed(); index++) { 205 // only loop if we are started (allowed to run) 206 // use poll to remove the head so it does not consume memory even after we have processed it 207 Exchange exchange = (Exchange) exchanges.poll(); 208 // add current index and total as properties 209 exchange.setProperty(Exchange.BATCH_INDEX, index); 210 exchange.setProperty(Exchange.BATCH_SIZE, total); 211 exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1); 212 213 // update pending number of exchanges 214 pendingExchanges = total - index - 1; 215 216 // process the current exchange 217 boolean started; 218 if (customProcessor != null) { 219 // use a custom processor 220 started = customProcessExchange(exchange, customProcessor); 221 } else { 222 // process the exchange regular 223 started = processExchange(exchange); 224 } 225 226 // if we did not start process the file then decrement the counter 227 if (!started) { 228 answer--; 229 } 230 } 231 232 // drain any in progress files as we are done with this batch 233 removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0); 234 235 return answer; 236 } 237 238 /** 239 * Drain any in progress files as we are done with this batch 240 * 241 * @param exchanges the exchanges 242 * @param limit the limit 243 */ 244 protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) { 245 // remove the file from the in progress list in case the batch was limited by max messages per poll 246 while (exchanges.size() > limit) { 247 // must remove last 248 Exchange exchange = exchanges.removeLast(); 249 GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class); 250 String key = file.getAbsoluteFilePath(); 251 endpoint.getInProgressRepository().remove(key); 252 } 253 } 254 255 /** 256 * Drain any in progress files as we are done with the files 257 * 258 * @param files the files 259 */ 260 protected void removeExcessiveInProgressFiles(List<GenericFile<T>> files) { 261 for (GenericFile file : files) { 262 String key = file.getAbsoluteFilePath(); 263 endpoint.getInProgressRepository().remove(key); 264 } 265 } 266 267 /** 268 * Whether or not we can continue polling for more files 269 * 270 * @param fileList the current list of gathered files 271 * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit 272 */ 273 public boolean canPollMoreFiles(List<?> fileList) { 274 // at this point we should not limit if we are not eager 275 if (!eagerLimitMaxMessagesPerPoll) { 276 return true; 277 } 278 279 if (maxMessagesPerPoll <= 0) { 280 // no limitation 281 return true; 282 } 283 284 // then only poll if we haven't reached the max limit 285 return fileList.size() < maxMessagesPerPoll; 286 } 287 288 /** 289 * Override if required. Perform some checks (and perhaps actions) before we poll. 290 * 291 * @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll. 292 */ 293 protected boolean prePollCheck() throws Exception { 294 return true; 295 } 296 297 /** 298 * Override if required. Perform some checks (and perhaps actions) after we have polled. 299 * 300 * @param polledMessages number of polled messages 301 */ 302 protected void postPollCheck(int polledMessages) { 303 // noop 304 } 305 306 /** 307 * Polls the given directory for files to process 308 * 309 * @param fileName current directory or file 310 * @param fileList current list of files gathered 311 * @param depth the current depth of the directory (will start from 0) 312 * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit 313 */ 314 protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth); 315 316 /** 317 * Sets the operations to be used. 318 * <p/> 319 * Can be used to set a fresh operations in case of recovery attempts 320 * 321 * @param operations the operations 322 */ 323 public void setOperations(GenericFileOperations<T> operations) { 324 this.operations = operations; 325 } 326 327 /** 328 * Whether to ignore if the file cannot be retrieved. 329 * <p/> 330 * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved. 331 * <p/> 332 * This method allows to suppress this and just ignore that. 333 * 334 * @param name the file name 335 * @param exchange the exchange 336 * @param cause optional exception occurred during retrieving file 337 * @return <tt>true</tt> to ignore, <tt>false</tt> is the default. 338 */ 339 protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) { 340 return false; 341 } 342 343 /** 344 * Processes the exchange 345 * 346 * @param exchange the exchange 347 * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started 348 * to be processed, for some reason (not found, or aborted etc) 349 */ 350 protected boolean processExchange(final Exchange exchange) { 351 GenericFile<T> file = getExchangeFileProperty(exchange); 352 log.trace("Processing file: {}", file); 353 354 // must extract the absolute name before the begin strategy as the file could potentially be pre moved 355 // and then the file name would be changed 356 String absoluteFileName = file.getAbsoluteFilePath(); 357 358 // check if we can begin processing the file 359 Exception beginCause = null; 360 boolean begin = false; 361 try { 362 begin = processStrategy.begin(operations, endpoint, exchange, file); 363 } catch (Exception e) { 364 beginCause = e; 365 } 366 367 if (!begin) { 368 // no something was wrong, so we need to abort and remove the file from the in progress list 369 Exception abortCause = null; 370 log.debug("{} cannot begin processing file: {}", endpoint, file); 371 try { 372 // abort 373 processStrategy.abort(operations, endpoint, exchange, file); 374 } catch (Exception e) { 375 abortCause = e; 376 } finally { 377 // begin returned false, so remove file from the in progress list as its no longer in progress 378 endpoint.getInProgressRepository().remove(absoluteFileName); 379 } 380 if (beginCause != null) { 381 String msg = endpoint + " cannot begin processing file: " + file + " due to: " + beginCause.getMessage(); 382 handleException(msg, beginCause); 383 } 384 if (abortCause != null) { 385 String msg2 = endpoint + " cannot abort processing file: " + file + " due to: " + abortCause.getMessage(); 386 handleException(msg2, abortCause); 387 } 388 return false; 389 } 390 391 // must use file from exchange as it can be updated due the 392 // preMoveNamePrefix/preMoveNamePostfix options 393 final GenericFile<T> target = getExchangeFileProperty(exchange); 394 395 // we can begin processing the file so update file headers on the Camel message 396 // in case it took some time to acquire read lock, and file size/timestamp has been updated since etc 397 updateFileHeaders(target, exchange.getIn()); 398 399 // must use full name when downloading so we have the correct path 400 final String name = target.getAbsoluteFilePath(); 401 try { 402 403 if (isRetrieveFile()) { 404 // retrieve the file using the stream 405 log.trace("Retrieving file: {} from: {}", name, endpoint); 406 407 // retrieve the file and check it was a success 408 boolean retrieved; 409 Exception cause = null; 410 try { 411 retrieved = operations.retrieveFile(name, exchange, target.getFileLength()); 412 } catch (Exception e) { 413 retrieved = false; 414 cause = e; 415 } 416 417 if (!retrieved) { 418 if (ignoreCannotRetrieveFile(name, exchange, cause)) { 419 log.trace("Cannot retrieve file {} maybe it does not exists. Ignoring.", name); 420 // remove file from the in progress list as we could not retrieve it, but should ignore 421 endpoint.getInProgressRepository().remove(absoluteFileName); 422 return false; 423 } else { 424 // throw exception to handle the problem with retrieving the file 425 // then if the method return false or throws an exception is handled the same in here 426 // as in both cases an exception is being thrown 427 if (cause instanceof GenericFileOperationFailedException) { 428 throw cause; 429 } else { 430 throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint, cause); 431 } 432 } 433 } 434 435 log.trace("Retrieved file: {} from: {}", name, endpoint); 436 } else { 437 log.trace("Skipped retrieval of file: {} from: {}", name, endpoint); 438 exchange.getIn().setBody(null); 439 } 440 441 // register on completion callback that does the completion strategies 442 // (for instance to move the file after we have processed it) 443 exchange.addOnCompletion(new GenericFileOnCompletion<>(endpoint, operations, processStrategy, target, absoluteFileName)); 444 445 log.debug("About to process file: {} using exchange: {}", target, exchange); 446 447 if (endpoint.isSynchronous()) { 448 // process synchronously 449 getProcessor().process(exchange); 450 } else { 451 // process the exchange using the async consumer to support async routing engine 452 // which can be supported by this file consumer as all the done work is 453 // provided in the GenericFileOnCompletion 454 getAsyncProcessor().process(exchange, EmptyAsyncCallback.get()); 455 } 456 457 } catch (Exception e) { 458 // remove file from the in progress list due to failure 459 // (cannot be in finally block due to GenericFileOnCompletion will remove it 460 // from in progress when it takes over and processes the file, which may happen 461 // by another thread at a later time. So its only safe to remove it if there was an exception) 462 endpoint.getInProgressRepository().remove(absoluteFileName); 463 464 String msg = "Error processing file " + file + " due to " + e.getMessage(); 465 handleException(msg, e); 466 } 467 468 return true; 469 } 470 471 /** 472 * Updates the information on {@link Message} after we have acquired read-lock and 473 * can begin process the file. 474 * 475 * @param file the file 476 * @param message the Camel message to update its headers 477 */ 478 protected abstract void updateFileHeaders(GenericFile<T> file, Message message); 479 480 /** 481 * Override if required. Files are retrieved / returns true by default 482 * 483 * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files. 484 */ 485 protected boolean isRetrieveFile() { 486 return true; 487 } 488 489 /** 490 * Processes the exchange using a custom processor. 491 * 492 * @param exchange the exchange 493 * @param processor the custom processor 494 */ 495 protected boolean customProcessExchange(final Exchange exchange, final Processor processor) { 496 GenericFile<T> file = getExchangeFileProperty(exchange); 497 log.trace("Custom processing file: {}", file); 498 499 // must extract the absolute name before the begin strategy as the file could potentially be pre moved 500 // and then the file name would be changed 501 String absoluteFileName = file.getAbsoluteFilePath(); 502 503 try { 504 // process using the custom processor 505 processor.process(exchange); 506 } catch (Exception e) { 507 if (log.isDebugEnabled()) { 508 log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e); 509 } 510 handleException(e); 511 } finally { 512 // always remove file from the in progress list as its no longer in progress 513 // use the original file name that was used to add it to the repository 514 // as the name can be different when using preMove option 515 endpoint.getInProgressRepository().remove(absoluteFileName); 516 } 517 518 return true; 519 } 520 521 /** 522 * Strategy for validating if the given remote file should be included or not 523 * 524 * @param file the file 525 * @param isDirectory whether the file is a directory or a file 526 * @param files files in the directory 527 * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it 528 */ 529 protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) { 530 String absoluteFilePath = file.getAbsoluteFilePath(); 531 532 if (!isMatched(file, isDirectory, files)) { 533 log.trace("File did not match. Will skip this file: {}", file); 534 return false; 535 } 536 537 // directory is always valid 538 if (isDirectory) { 539 return true; 540 } 541 542 // check if file is already in progress 543 if (endpoint.getInProgressRepository().contains(absoluteFilePath)) { 544 if (log.isTraceEnabled()) { 545 log.trace("Skipping as file is already in progress: {}", file.getFileName()); 546 } 547 return false; 548 } 549 550 // if its a file then check we have the file in the idempotent registry already 551 if (endpoint.isIdempotent()) { 552 // use absolute file path as default key, but evaluate if an expression key was configured 553 String key = file.getAbsoluteFilePath(); 554 if (endpoint.getIdempotentKey() != null) { 555 Exchange dummy = endpoint.createExchange(file); 556 key = endpoint.getIdempotentKey().evaluate(dummy, String.class); 557 } 558 if (key != null && endpoint.getIdempotentRepository().contains(key)) { 559 log.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", key, file); 560 return false; 561 } 562 } 563 564 // okay so final step is to be able to add atomic as in-progress, so we are the 565 // only thread processing this file 566 return endpoint.getInProgressRepository().add(absoluteFilePath); 567 } 568 569 /** 570 * Strategy to perform file matching based on endpoint configuration. 571 * <p/> 572 * Will always return <tt>false</tt> for certain files/folders: 573 * <ul> 574 * <li>Starting with a dot</li> 575 * <li>lock files</li> 576 * </ul> 577 * And then <tt>true</tt> for directories. 578 * 579 * @param file the file 580 * @param isDirectory whether the file is a directory or a file 581 * @param files files in the directory 582 * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not 583 */ 584 protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) { 585 String name = file.getFileNameOnly(); 586 587 // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock") 588 if (name.startsWith(".")) { 589 return false; 590 } 591 592 // lock files should be skipped 593 if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) { 594 return false; 595 } 596 597 if (endpoint.getFilter() != null) { 598 if (!endpoint.getFilter().accept(file)) { 599 return false; 600 } 601 } 602 603 if (endpoint.getAntFilter() != null) { 604 if (!endpoint.getAntFilter().accept(file)) { 605 return false; 606 } 607 } 608 609 if (isDirectory && endpoint.getFilterDirectory() != null) { 610 // create a dummy exchange as Exchange is needed for expression evaluation 611 Exchange dummy = endpoint.createExchange(file); 612 boolean matches = endpoint.getFilterDirectory().matches(dummy); 613 if (!matches) { 614 return false; 615 } 616 } 617 618 // directories are regarded as matched if filter accepted them 619 if (isDirectory) { 620 return true; 621 } 622 623 // exclude take precedence over include 624 if (excludePattern != null) { 625 if (excludePattern.matcher(name).matches()) { 626 return false; 627 } 628 } 629 if (includePattern != null) { 630 if (!includePattern.matcher(name).matches()) { 631 return false; 632 } 633 } 634 635 // use file expression for a simple dynamic file filter 636 if (endpoint.getFileName() != null) { 637 fileExpressionResult = evaluateFileExpression(); 638 if (fileExpressionResult != null) { 639 if (!name.equals(fileExpressionResult)) { 640 return false; 641 } 642 } 643 } 644 645 if (endpoint.getFilterFile() != null) { 646 // create a dummy exchange as Exchange is needed for expression evaluation 647 Exchange dummy = endpoint.createExchange(file); 648 boolean matches = endpoint.getFilterFile().matches(dummy); 649 if (!matches) { 650 return false; 651 } 652 } 653 654 // if done file name is enabled, then the file is only valid if a done file exists 655 if (endpoint.getDoneFileName() != null) { 656 // done file must be in same path as the file 657 String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath()); 658 StringHelper.notEmpty(doneFileName, "doneFileName", endpoint); 659 660 // is it a done file name? 661 if (endpoint.isDoneFile(file.getFileNameOnly())) { 662 log.trace("Skipping done file: {}", file); 663 return false; 664 } 665 666 if (!isMatched(file, doneFileName, files)) { 667 return false; 668 } 669 } 670 671 return true; 672 } 673 674 /** 675 * Strategy to perform file matching based on endpoint configuration in terms of done file name. 676 * 677 * @param file the file 678 * @param doneFileName the done file name (without any paths) 679 * @param files files in the directory 680 * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not 681 */ 682 protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files); 683 684 /** 685 * Is the given file already in progress. 686 * 687 * @param file the file 688 * @return <tt>true</tt> if the file is already in progress 689 * @deprecated no longer in use, use {@link org.apache.camel.component.file.GenericFileEndpoint#getInProgressRepository()} instead. 690 */ 691 @Deprecated 692 protected boolean isInProgress(GenericFile<T> file) { 693 String key = file.getAbsoluteFilePath(); 694 // must use add, to have operation as atomic 695 return !endpoint.getInProgressRepository().add(key); 696 } 697 698 protected String evaluateFileExpression() { 699 if (fileExpressionResult == null && endpoint.getFileName() != null) { 700 // create a dummy exchange as Exchange is needed for expression evaluation 701 Exchange dummy = endpoint.createExchange(); 702 fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class); 703 if (dummy.getException() != null) { 704 throw ObjectHelper.wrapRuntimeCamelException(dummy.getException()); 705 } 706 } 707 return fileExpressionResult; 708 } 709 710 @SuppressWarnings("unchecked") 711 private GenericFile<T> getExchangeFileProperty(Exchange exchange) { 712 return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); 713 } 714 715 @Override 716 protected void doStart() throws Exception { 717 // inject CamelContext before starting as it may be needed 718 if (processStrategy instanceof CamelContextAware) { 719 ((CamelContextAware) processStrategy).setCamelContext(getEndpoint().getCamelContext()); 720 } 721 ServiceHelper.startService(processStrategy); 722 super.doStart(); 723 } 724 725 @Override 726 protected void doStop() throws Exception { 727 prepareOnStartup = false; 728 super.doStop(); 729 ServiceHelper.stopService(processStrategy); 730 } 731 732 @Override 733 public void onInit() throws Exception { 734 // noop as we do a manual on-demand poll with GenericFilePolllingConsumer 735 } 736 737 @Override 738 public long beforePoll(long timeout) throws Exception { 739 // noop as we do a manual on-demand poll with GenericFilePolllingConsumer 740 return timeout; 741 } 742 743 @Override 744 public void afterPoll() throws Exception { 745 // noop as we do a manual on-demand poll with GenericFilePolllingConsumer 746 } 747 748}