001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.Deque; 022import java.util.LinkedList; 023import java.util.List; 024import java.util.Queue; 025import java.util.regex.Pattern; 026 027import org.apache.camel.AsyncCallback; 028import org.apache.camel.Exchange; 029import org.apache.camel.Message; 030import org.apache.camel.Processor; 031import org.apache.camel.ShutdownRunningTask; 032import org.apache.camel.impl.ScheduledBatchPollingConsumer; 033import org.apache.camel.util.CastUtils; 034import org.apache.camel.util.ObjectHelper; 035import org.apache.camel.util.StopWatch; 036import org.apache.camel.util.TimeUtils; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * Base class for file consumers. 042 */ 043public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer { 044 protected final Logger log = LoggerFactory.getLogger(getClass()); 045 protected GenericFileEndpoint<T> endpoint; 046 protected GenericFileOperations<T> operations; 047 protected volatile boolean loggedIn; 048 protected String fileExpressionResult; 049 protected volatile ShutdownRunningTask shutdownRunningTask; 050 protected volatile int pendingExchanges; 051 protected Processor customProcessor; 052 protected boolean eagerLimitMaxMessagesPerPoll = true; 053 protected volatile boolean prepareOnStartup; 054 private final Pattern includePattern; 055 private final Pattern excludePattern; 056 057 public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) { 058 super(endpoint, processor); 059 this.endpoint = endpoint; 060 this.operations = operations; 061 062 if (endpoint.getInclude() != null) { 063 this.includePattern = Pattern.compile(endpoint.getInclude(), Pattern.CASE_INSENSITIVE); 064 } else { 065 this.includePattern = null; 066 } 067 if (endpoint.getExclude() != null) { 068 this.excludePattern = Pattern.compile(endpoint.getExclude(), Pattern.CASE_INSENSITIVE); 069 } else { 070 this.excludePattern = null; 071 } 072 } 073 074 public Processor getCustomProcessor() { 075 return customProcessor; 076 } 077 078 /** 079 * Use a custom processor to process the exchange. 080 * <p/> 081 * Only set this if you need to do custom processing, instead of the regular processing. 082 * <p/> 083 * This is for example used to browse file endpoints by leveraging the file consumer to poll 084 * the directory to gather the list of exchanges. But to avoid processing the files regularly 085 * we can use a custom processor. 086 * 087 * @param processor a custom processor 088 */ 089 public void setCustomProcessor(Processor processor) { 090 this.customProcessor = processor; 091 } 092 093 public boolean isEagerLimitMaxMessagesPerPoll() { 094 return eagerLimitMaxMessagesPerPoll; 095 } 096 097 public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) { 098 this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll; 099 } 100 101 /** 102 * Poll for files 103 */ 104 protected int poll() throws Exception { 105 // must prepare on startup the very first time 106 if (!prepareOnStartup) { 107 // prepare on startup 108 endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint); 109 prepareOnStartup = true; 110 } 111 112 // must reset for each poll 113 fileExpressionResult = null; 114 shutdownRunningTask = null; 115 pendingExchanges = 0; 116 117 // before we poll is there anything we need to check? 118 // such as are we connected to the FTP Server still? 119 if (!prePollCheck()) { 120 log.debug("Skipping poll as pre poll check returned false"); 121 return 0; 122 } 123 124 // gather list of files to process 125 List<GenericFile<T>> files = new ArrayList<GenericFile<T>>(); 126 String name = endpoint.getConfiguration().getDirectory(); 127 128 // time how long it takes to poll 129 StopWatch stop = new StopWatch(); 130 boolean limitHit; 131 try { 132 limitHit = !pollDirectory(name, files, 0); 133 } catch (Exception e) { 134 // during poll directory we add files to the in progress repository, in case of any exception thrown after this work 135 // we must then drain the in progress files before rethrowing the exception 136 log.debug("Error occurred during poll directory: " + name + " due " + e.getMessage() + ". Removing " + files.size() + " files marked as in-progress."); 137 removeExcessiveInProgressFiles(files); 138 throw e; 139 } 140 141 long delta = stop.stop(); 142 if (log.isDebugEnabled()) { 143 log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name); 144 } 145 146 // log if we hit the limit 147 if (limitHit) { 148 log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", maxMessagesPerPoll); 149 } 150 151 // sort files using file comparator if provided 152 if (endpoint.getSorter() != null) { 153 Collections.sort(files, endpoint.getSorter()); 154 } 155 156 // sort using build in sorters so we can use expressions 157 // use a linked list so we can dequeue the exchanges 158 LinkedList<Exchange> exchanges = new LinkedList<Exchange>(); 159 for (GenericFile<T> file : files) { 160 Exchange exchange = endpoint.createExchange(file); 161 endpoint.configureExchange(exchange); 162 endpoint.configureMessage(file, exchange.getIn()); 163 exchanges.add(exchange); 164 } 165 // sort files using exchange comparator if provided 166 if (endpoint.getSortBy() != null) { 167 Collections.sort(exchanges, endpoint.getSortBy()); 168 } 169 if (endpoint.isShuffle()) { 170 Collections.shuffle(exchanges); 171 } 172 173 // use a queue for the exchanges 174 Deque<Exchange> q = exchanges; 175 176 // we are not eager limiting, but we have configured a limit, so cut the list of files 177 if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) { 178 if (files.size() > maxMessagesPerPoll) { 179 log.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", maxMessagesPerPoll); 180 // must first remove excessive files from the in progress repository 181 removeExcessiveInProgressFiles(q, maxMessagesPerPoll); 182 } 183 } 184 185 // consume files one by one 186 int total = exchanges.size(); 187 if (total > 0) { 188 log.debug("Total {} files to consume", total); 189 } 190 191 int polledMessages = processBatch(CastUtils.cast(q)); 192 193 postPollCheck(polledMessages); 194 195 return polledMessages; 196 } 197 198 public int processBatch(Queue<Object> exchanges) { 199 int total = exchanges.size(); 200 int answer = total; 201 202 // limit if needed 203 if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) { 204 log.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", maxMessagesPerPoll, total); 205 total = maxMessagesPerPoll; 206 } 207 208 for (int index = 0; index < total && isBatchAllowed(); index++) { 209 // only loop if we are started (allowed to run) 210 // use poll to remove the head so it does not consume memory even after we have processed it 211 Exchange exchange = (Exchange) exchanges.poll(); 212 // add current index and total as properties 213 exchange.setProperty(Exchange.BATCH_INDEX, index); 214 exchange.setProperty(Exchange.BATCH_SIZE, total); 215 exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1); 216 217 // update pending number of exchanges 218 pendingExchanges = total - index - 1; 219 220 // process the current exchange 221 boolean started; 222 if (customProcessor != null) { 223 // use a custom processor 224 started = customProcessExchange(exchange, customProcessor); 225 } else { 226 // process the exchange regular 227 started = processExchange(exchange); 228 } 229 230 // if we did not start process the file then decrement the counter 231 if (!started) { 232 answer--; 233 } 234 } 235 236 // drain any in progress files as we are done with this batch 237 removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0); 238 239 return answer; 240 } 241 242 /** 243 * Drain any in progress files as we are done with this batch 244 * 245 * @param exchanges the exchanges 246 * @param limit the limit 247 */ 248 protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) { 249 // remove the file from the in progress list in case the batch was limited by max messages per poll 250 while (exchanges.size() > limit) { 251 // must remove last 252 Exchange exchange = exchanges.removeLast(); 253 GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class); 254 String key = file.getAbsoluteFilePath(); 255 endpoint.getInProgressRepository().remove(key); 256 } 257 } 258 259 /** 260 * Drain any in progress files as we are done with the files 261 * 262 * @param files the files 263 */ 264 protected void removeExcessiveInProgressFiles(List<GenericFile<T>> files) { 265 for (GenericFile file : files) { 266 String key = file.getAbsoluteFilePath(); 267 endpoint.getInProgressRepository().remove(key); 268 } 269 } 270 271 /** 272 * Whether or not we can continue polling for more files 273 * 274 * @param fileList the current list of gathered files 275 * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit 276 */ 277 public boolean canPollMoreFiles(List<?> fileList) { 278 // at this point we should not limit if we are not eager 279 if (!eagerLimitMaxMessagesPerPoll) { 280 return true; 281 } 282 283 if (maxMessagesPerPoll <= 0) { 284 // no limitation 285 return true; 286 } 287 288 // then only poll if we haven't reached the max limit 289 return fileList.size() < maxMessagesPerPoll; 290 } 291 292 /** 293 * Override if required. Perform some checks (and perhaps actions) before we poll. 294 * 295 * @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll. 296 */ 297 protected boolean prePollCheck() throws Exception { 298 return true; 299 } 300 301 /** 302 * Override if required. Perform some checks (and perhaps actions) after we have polled. 303 * 304 * @param polledMessages number of polled messages 305 */ 306 protected void postPollCheck(int polledMessages) { 307 // noop 308 } 309 310 /** 311 * Polls the given directory for files to process 312 * 313 * @param fileName current directory or file 314 * @param fileList current list of files gathered 315 * @param depth the current depth of the directory (will start from 0) 316 * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit 317 */ 318 protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth); 319 320 /** 321 * Sets the operations to be used. 322 * <p/> 323 * Can be used to set a fresh operations in case of recovery attempts 324 * 325 * @param operations the operations 326 */ 327 public void setOperations(GenericFileOperations<T> operations) { 328 this.operations = operations; 329 } 330 331 /** 332 * Whether to ignore if the file cannot be retrieved. 333 * <p/> 334 * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved. 335 * <p/> 336 * This method allows to suppress this and just ignore that. 337 * 338 * @param name the file name 339 * @param exchange the exchange 340 * @param cause optional exception occurred during retrieving file 341 * @return <tt>true</tt> to ignore, <tt>false</tt> is the default. 342 */ 343 protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) { 344 return false; 345 } 346 347 /** 348 * Processes the exchange 349 * 350 * @param exchange the exchange 351 * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started 352 * to be processed, for some reason (not found, or aborted etc) 353 */ 354 protected boolean processExchange(final Exchange exchange) { 355 GenericFile<T> file = getExchangeFileProperty(exchange); 356 log.trace("Processing file: {}", file); 357 358 // must extract the absolute name before the begin strategy as the file could potentially be pre moved 359 // and then the file name would be changed 360 String absoluteFileName = file.getAbsoluteFilePath(); 361 362 // check if we can begin processing the file 363 final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy(); 364 365 Exception beginCause = null; 366 boolean begin = false; 367 try { 368 begin = processStrategy.begin(operations, endpoint, exchange, file); 369 } catch (Exception e) { 370 beginCause = e; 371 } 372 373 if (!begin) { 374 // no something was wrong, so we need to abort and remove the file from the in progress list 375 Exception abortCause = null; 376 log.debug("{} cannot begin processing file: {}", endpoint, file); 377 try { 378 // abort 379 processStrategy.abort(operations, endpoint, exchange, file); 380 } catch (Exception e) { 381 abortCause = e; 382 } finally { 383 // begin returned false, so remove file from the in progress list as its no longer in progress 384 endpoint.getInProgressRepository().remove(absoluteFileName); 385 } 386 if (beginCause != null) { 387 String msg = endpoint + " cannot begin processing file: " + file + " due to: " + beginCause.getMessage(); 388 handleException(msg, beginCause); 389 } 390 if (abortCause != null) { 391 String msg2 = endpoint + " cannot abort processing file: " + file + " due to: " + abortCause.getMessage(); 392 handleException(msg2, abortCause); 393 } 394 return false; 395 } 396 397 // must use file from exchange as it can be updated due the 398 // preMoveNamePrefix/preMoveNamePostfix options 399 final GenericFile<T> target = getExchangeFileProperty(exchange); 400 401 // we can begin processing the file so update file headers on the Camel message 402 // in case it took some time to acquire read lock, and file size/timestamp has been updated since etc 403 updateFileHeaders(target, exchange.getIn()); 404 405 // must use full name when downloading so we have the correct path 406 final String name = target.getAbsoluteFilePath(); 407 try { 408 409 if (isRetrieveFile()) { 410 // retrieve the file using the stream 411 log.trace("Retrieving file: {} from: {}", name, endpoint); 412 413 // retrieve the file and check it was a success 414 boolean retrieved; 415 Exception cause = null; 416 try { 417 retrieved = operations.retrieveFile(name, exchange); 418 } catch (Exception e) { 419 retrieved = false; 420 cause = e; 421 } 422 423 if (!retrieved) { 424 if (ignoreCannotRetrieveFile(name, exchange, cause)) { 425 log.trace("Cannot retrieve file {} maybe it does not exists. Ignoring.", name); 426 // remove file from the in progress list as we could not retrieve it, but should ignore 427 endpoint.getInProgressRepository().remove(absoluteFileName); 428 return false; 429 } else { 430 // throw exception to handle the problem with retrieving the file 431 // then if the method return false or throws an exception is handled the same in here 432 // as in both cases an exception is being thrown 433 if (cause != null && cause instanceof GenericFileOperationFailedException) { 434 throw cause; 435 } else { 436 throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint, cause); 437 } 438 } 439 } 440 441 log.trace("Retrieved file: {} from: {}", name, endpoint); 442 } else { 443 log.trace("Skipped retrieval of file: {} from: {}", name, endpoint); 444 exchange.getIn().setBody(null); 445 } 446 447 // register on completion callback that does the completion strategies 448 // (for instance to move the file after we have processed it) 449 exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName)); 450 451 log.debug("About to process file: {} using exchange: {}", target, exchange); 452 453 if (endpoint.isSynchronous()) { 454 // process synchronously 455 getProcessor().process(exchange); 456 } else { 457 // process the exchange using the async consumer to support async routing engine 458 // which can be supported by this file consumer as all the done work is 459 // provided in the GenericFileOnCompletion 460 getAsyncProcessor().process(exchange, new AsyncCallback() { 461 public void done(boolean doneSync) { 462 // noop 463 if (log.isTraceEnabled()) { 464 log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously"); 465 } 466 } 467 }); 468 } 469 470 } catch (Exception e) { 471 // remove file from the in progress list due to failure 472 // (cannot be in finally block due to GenericFileOnCompletion will remove it 473 // from in progress when it takes over and processes the file, which may happen 474 // by another thread at a later time. So its only safe to remove it if there was an exception) 475 endpoint.getInProgressRepository().remove(absoluteFileName); 476 477 String msg = "Error processing file " + file + " due to " + e.getMessage(); 478 handleException(msg, e); 479 } 480 481 return true; 482 } 483 484 /** 485 * Updates the information on {@link Message} after we have acquired read-lock and 486 * can begin process the file. 487 * 488 * @param file the file 489 * @param message the Camel message to update its headers 490 */ 491 protected abstract void updateFileHeaders(GenericFile<T> file, Message message); 492 493 /** 494 * Override if required. Files are retrieved / returns true by default 495 * 496 * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files. 497 */ 498 protected boolean isRetrieveFile() { 499 return true; 500 } 501 502 /** 503 * Processes the exchange using a custom processor. 504 * 505 * @param exchange the exchange 506 * @param processor the custom processor 507 */ 508 protected boolean customProcessExchange(final Exchange exchange, final Processor processor) { 509 GenericFile<T> file = getExchangeFileProperty(exchange); 510 log.trace("Custom processing file: {}", file); 511 512 // must extract the absolute name before the begin strategy as the file could potentially be pre moved 513 // and then the file name would be changed 514 String absoluteFileName = file.getAbsoluteFilePath(); 515 516 try { 517 // process using the custom processor 518 processor.process(exchange); 519 } catch (Exception e) { 520 if (log.isDebugEnabled()) { 521 log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e); 522 } 523 handleException(e); 524 } finally { 525 // always remove file from the in progress list as its no longer in progress 526 // use the original file name that was used to add it to the repository 527 // as the name can be different when using preMove option 528 endpoint.getInProgressRepository().remove(absoluteFileName); 529 } 530 531 return true; 532 } 533 534 /** 535 * Strategy for validating if the given remote file should be included or not 536 * 537 * @param file the file 538 * @param isDirectory whether the file is a directory or a file 539 * @param files files in the directory 540 * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it 541 */ 542 protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) { 543 String absoluteFilePath = file.getAbsoluteFilePath(); 544 545 if (!isMatched(file, isDirectory, files)) { 546 log.trace("File did not match. Will skip this file: {}", file); 547 return false; 548 } 549 550 // directory is always valid 551 if (isDirectory) { 552 return true; 553 } 554 555 // check if file is already in progress 556 if (endpoint.getInProgressRepository().contains(absoluteFilePath)) { 557 if (log.isTraceEnabled()) { 558 log.trace("Skipping as file is already in progress: {}", file.getFileName()); 559 } 560 return false; 561 } 562 563 // if its a file then check we have the file in the idempotent registry already 564 if (endpoint.isIdempotent()) { 565 // use absolute file path as default key, but evaluate if an expression key was configured 566 String key = file.getAbsoluteFilePath(); 567 if (endpoint.getIdempotentKey() != null) { 568 Exchange dummy = endpoint.createExchange(file); 569 key = endpoint.getIdempotentKey().evaluate(dummy, String.class); 570 } 571 if (key != null && endpoint.getIdempotentRepository().contains(key)) { 572 log.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", key, file); 573 return false; 574 } 575 } 576 577 // okay so final step is to be able to add atomic as in-progress, so we are the 578 // only thread processing this file 579 return endpoint.getInProgressRepository().add(absoluteFilePath); 580 } 581 582 /** 583 * Strategy to perform file matching based on endpoint configuration. 584 * <p/> 585 * Will always return <tt>false</tt> for certain files/folders: 586 * <ul> 587 * <li>Starting with a dot</li> 588 * <li>lock files</li> 589 * </ul> 590 * And then <tt>true</tt> for directories. 591 * 592 * @param file the file 593 * @param isDirectory whether the file is a directory or a file 594 * @param files files in the directory 595 * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not 596 */ 597 protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) { 598 String name = file.getFileNameOnly(); 599 600 // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock") 601 if (name.startsWith(".")) { 602 return false; 603 } 604 605 // lock files should be skipped 606 if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) { 607 return false; 608 } 609 610 if (endpoint.getFilter() != null) { 611 if (!endpoint.getFilter().accept(file)) { 612 return false; 613 } 614 } 615 616 if (endpoint.getAntFilter() != null) { 617 if (!endpoint.getAntFilter().accept(file)) { 618 return false; 619 } 620 } 621 622 if (isDirectory && endpoint.getFilterDirectory() != null) { 623 // create a dummy exchange as Exchange is needed for expression evaluation 624 Exchange dummy = endpoint.createExchange(file); 625 boolean matches = endpoint.getFilterDirectory().matches(dummy); 626 if (!matches) { 627 return false; 628 } 629 } 630 631 // directories are regarded as matched if filter accepted them 632 if (isDirectory) { 633 return true; 634 } 635 636 // exclude take precedence over include 637 if (excludePattern != null) { 638 if (excludePattern.matcher(name).matches()) { 639 return false; 640 } 641 } 642 if (includePattern != null) { 643 if (!includePattern.matcher(name).matches()) { 644 return false; 645 } 646 } 647 648 // use file expression for a simple dynamic file filter 649 if (endpoint.getFileName() != null) { 650 fileExpressionResult = evaluateFileExpression(); 651 if (fileExpressionResult != null) { 652 if (!name.equals(fileExpressionResult)) { 653 return false; 654 } 655 } 656 } 657 658 if (endpoint.getFilterFile() != null) { 659 // create a dummy exchange as Exchange is needed for expression evaluation 660 Exchange dummy = endpoint.createExchange(file); 661 boolean matches = endpoint.getFilterFile().matches(dummy); 662 if (!matches) { 663 return false; 664 } 665 } 666 667 // if done file name is enabled, then the file is only valid if a done file exists 668 if (endpoint.getDoneFileName() != null) { 669 // done file must be in same path as the file 670 String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath()); 671 ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint); 672 673 // is it a done file name? 674 if (endpoint.isDoneFile(file.getFileNameOnly())) { 675 log.trace("Skipping done file: {}", file); 676 return false; 677 } 678 679 if (!isMatched(file, doneFileName, files)) { 680 return false; 681 } 682 } 683 684 return true; 685 } 686 687 /** 688 * Strategy to perform file matching based on endpoint configuration in terms of done file name. 689 * 690 * @param file the file 691 * @param doneFileName the done file name (without any paths) 692 * @param files files in the directory 693 * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not 694 */ 695 protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files); 696 697 /** 698 * Is the given file already in progress. 699 * 700 * @param file the file 701 * @return <tt>true</tt> if the file is already in progress 702 * @deprecated no longer in use, use {@link org.apache.camel.component.file.GenericFileEndpoint#getInProgressRepository()} instead. 703 */ 704 @Deprecated 705 protected boolean isInProgress(GenericFile<T> file) { 706 String key = file.getAbsoluteFilePath(); 707 // must use add, to have operation as atomic 708 return !endpoint.getInProgressRepository().add(key); 709 } 710 711 protected String evaluateFileExpression() { 712 if (fileExpressionResult == null && endpoint.getFileName() != null) { 713 // create a dummy exchange as Exchange is needed for expression evaluation 714 Exchange dummy = endpoint.createExchange(); 715 fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class); 716 } 717 return fileExpressionResult; 718 } 719 720 @SuppressWarnings("unchecked") 721 private GenericFile<T> getExchangeFileProperty(Exchange exchange) { 722 return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); 723 } 724 725 @Override 726 protected void doStart() throws Exception { 727 super.doStart(); 728 } 729 730 @Override 731 protected void doStop() throws Exception { 732 prepareOnStartup = false; 733 super.doStop(); 734 } 735}