001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file; 018 019import java.util.ArrayList; 020import java.util.Collections; 021import java.util.Deque; 022import java.util.LinkedList; 023import java.util.List; 024import java.util.Queue; 025import java.util.regex.Pattern; 026 027import org.apache.camel.AsyncCallback; 028import org.apache.camel.Exchange; 029import org.apache.camel.Processor; 030import org.apache.camel.ShutdownRunningTask; 031import org.apache.camel.impl.ScheduledBatchPollingConsumer; 032import org.apache.camel.util.CastUtils; 033import org.apache.camel.util.ObjectHelper; 034import org.apache.camel.util.StopWatch; 035import org.apache.camel.util.TimeUtils; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039/** 040 * Base class for file consumers. 041 */ 042public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer { 043 protected final Logger log = LoggerFactory.getLogger(getClass()); 044 protected GenericFileEndpoint<T> endpoint; 045 protected GenericFileOperations<T> operations; 046 protected volatile boolean loggedIn; 047 protected String fileExpressionResult; 048 protected volatile ShutdownRunningTask shutdownRunningTask; 049 protected volatile int pendingExchanges; 050 protected Processor customProcessor; 051 protected boolean eagerLimitMaxMessagesPerPoll = true; 052 protected volatile boolean prepareOnStartup; 053 private final Pattern includePattern; 054 private final Pattern excludePattern; 055 056 public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) { 057 super(endpoint, processor); 058 this.endpoint = endpoint; 059 this.operations = operations; 060 061 if (endpoint.getInclude() != null) { 062 this.includePattern = Pattern.compile(endpoint.getInclude(), Pattern.CASE_INSENSITIVE); 063 } else { 064 this.includePattern = null; 065 } 066 if (endpoint.getExclude() != null) { 067 this.excludePattern = Pattern.compile(endpoint.getExclude(), Pattern.CASE_INSENSITIVE); 068 } else { 069 this.excludePattern = null; 070 } 071 } 072 073 public Processor getCustomProcessor() { 074 return customProcessor; 075 } 076 077 /** 078 * Use a custom processor to process the exchange. 079 * <p/> 080 * Only set this if you need to do custom processing, instead of the regular processing. 081 * <p/> 082 * This is for example used to browse file endpoints by leveraging the file consumer to poll 083 * the directory to gather the list of exchanges. But to avoid processing the files regularly 084 * we can use a custom processor. 085 * 086 * @param processor a custom processor 087 */ 088 public void setCustomProcessor(Processor processor) { 089 this.customProcessor = processor; 090 } 091 092 public boolean isEagerLimitMaxMessagesPerPoll() { 093 return eagerLimitMaxMessagesPerPoll; 094 } 095 096 public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) { 097 this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll; 098 } 099 100 /** 101 * Poll for files 102 */ 103 protected int poll() throws Exception { 104 // must prepare on startup the very first time 105 if (!prepareOnStartup) { 106 // prepare on startup 107 endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint); 108 prepareOnStartup = true; 109 } 110 111 // must reset for each poll 112 fileExpressionResult = null; 113 shutdownRunningTask = null; 114 pendingExchanges = 0; 115 116 // before we poll is there anything we need to check? 117 // such as are we connected to the FTP Server still? 118 if (!prePollCheck()) { 119 log.debug("Skipping poll as pre poll check returned false"); 120 return 0; 121 } 122 123 // gather list of files to process 124 List<GenericFile<T>> files = new ArrayList<GenericFile<T>>(); 125 String name = endpoint.getConfiguration().getDirectory(); 126 127 // time how long time it takes to poll 128 StopWatch stop = new StopWatch(); 129 boolean limitHit; 130 try { 131 limitHit = !pollDirectory(name, files, 0); 132 } catch (Exception e) { 133 // during poll directory we add files to the in progress repository, in case of any exception thrown after this work 134 // we must then drain the in progress files before rethrowing the exception 135 log.debug("Error occurred during poll directory: " + name + " due " + e.getMessage() + ". Removing " + files.size() + " files marked as in-progress."); 136 removeExcessiveInProgressFiles(files); 137 throw e; 138 } 139 140 long delta = stop.stop(); 141 if (log.isDebugEnabled()) { 142 log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name); 143 } 144 145 // log if we hit the limit 146 if (limitHit) { 147 log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll); 148 } 149 150 // sort files using file comparator if provided 151 if (endpoint.getSorter() != null) { 152 Collections.sort(files, endpoint.getSorter()); 153 } 154 155 // sort using build in sorters so we can use expressions 156 // use a linked list so we can dequeue the exchanges 157 LinkedList<Exchange> exchanges = new LinkedList<Exchange>(); 158 for (GenericFile<T> file : files) { 159 Exchange exchange = endpoint.createExchange(file); 160 endpoint.configureExchange(exchange); 161 endpoint.configureMessage(file, exchange.getIn()); 162 exchanges.add(exchange); 163 } 164 // sort files using exchange comparator if provided 165 if (endpoint.getSortBy() != null) { 166 Collections.sort(exchanges, endpoint.getSortBy()); 167 } 168 if (endpoint.isShuffle()) { 169 Collections.shuffle(exchanges); 170 } 171 172 // use a queue for the exchanges 173 Deque<Exchange> q = exchanges; 174 175 // we are not eager limiting, but we have configured a limit, so cut the list of files 176 if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) { 177 if (files.size() > maxMessagesPerPoll) { 178 log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll); 179 // must first remove excessive files from the in progress repository 180 removeExcessiveInProgressFiles(q, maxMessagesPerPoll); 181 } 182 } 183 184 // consume files one by one 185 int total = exchanges.size(); 186 if (total > 0) { 187 log.debug("Total {} files to consume", total); 188 } 189 190 int polledMessages = processBatch(CastUtils.cast(q)); 191 192 postPollCheck(polledMessages); 193 194 return polledMessages; 195 } 196 197 public int processBatch(Queue<Object> exchanges) { 198 int total = exchanges.size(); 199 int answer = total; 200 201 // limit if needed 202 if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) { 203 log.debug("Limiting to maximum messages to poll {} as there was {} messages in this poll.", maxMessagesPerPoll, total); 204 total = maxMessagesPerPoll; 205 } 206 207 for (int index = 0; index < total && isBatchAllowed(); index++) { 208 // only loop if we are started (allowed to run) 209 // use poll to remove the head so it does not consume memory even after we have processed it 210 Exchange exchange = (Exchange) exchanges.poll(); 211 // add current index and total as properties 212 exchange.setProperty(Exchange.BATCH_INDEX, index); 213 exchange.setProperty(Exchange.BATCH_SIZE, total); 214 exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1); 215 216 // update pending number of exchanges 217 pendingExchanges = total - index - 1; 218 219 // process the current exchange 220 boolean started; 221 if (customProcessor != null) { 222 // use a custom processor 223 started = customProcessExchange(exchange, customProcessor); 224 } else { 225 // process the exchange regular 226 started = processExchange(exchange); 227 } 228 229 // if we did not start process the file then decrement the counter 230 if (!started) { 231 answer--; 232 } 233 } 234 235 // drain any in progress files as we are done with this batch 236 removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0); 237 238 return answer; 239 } 240 241 /** 242 * Drain any in progress files as we are done with this batch 243 * 244 * @param exchanges the exchanges 245 * @param limit the limit 246 */ 247 protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) { 248 // remove the file from the in progress list in case the batch was limited by max messages per poll 249 while (exchanges.size() > limit) { 250 // must remove last 251 Exchange exchange = exchanges.removeLast(); 252 GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class); 253 String key = file.getAbsoluteFilePath(); 254 endpoint.getInProgressRepository().remove(key); 255 } 256 } 257 258 /** 259 * Drain any in progress files as we are done with the files 260 * 261 * @param files the files 262 */ 263 protected void removeExcessiveInProgressFiles(List<GenericFile<T>> files) { 264 for (GenericFile file : files) { 265 String key = file.getAbsoluteFilePath(); 266 endpoint.getInProgressRepository().remove(key); 267 } 268 } 269 270 /** 271 * Whether or not we can continue polling for more files 272 * 273 * @param fileList the current list of gathered files 274 * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit 275 */ 276 public boolean canPollMoreFiles(List<?> fileList) { 277 // at this point we should not limit if we are not eager 278 if (!eagerLimitMaxMessagesPerPoll) { 279 return true; 280 } 281 282 if (maxMessagesPerPoll <= 0) { 283 // no limitation 284 return true; 285 } 286 287 // then only poll if we haven't reached the max limit 288 return fileList.size() < maxMessagesPerPoll; 289 } 290 291 /** 292 * Override if required. Perform some checks (and perhaps actions) before we poll. 293 * 294 * @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll. 295 */ 296 protected boolean prePollCheck() throws Exception { 297 return true; 298 } 299 300 /** 301 * Override if required. Perform some checks (and perhaps actions) after we have polled. 302 * 303 * @param polledMessages number of polled messages 304 */ 305 protected void postPollCheck(int polledMessages) { 306 // noop 307 } 308 309 /** 310 * Polls the given directory for files to process 311 * 312 * @param fileName current directory or file 313 * @param fileList current list of files gathered 314 * @param depth the current depth of the directory (will start from 0) 315 * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit 316 */ 317 protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth); 318 319 /** 320 * Sets the operations to be used. 321 * <p/> 322 * Can be used to set a fresh operations in case of recovery attempts 323 * 324 * @param operations the operations 325 */ 326 public void setOperations(GenericFileOperations<T> operations) { 327 this.operations = operations; 328 } 329 330 /** 331 * Whether to ignore if the file cannot be retrieved. 332 * <p/> 333 * By default an {@link GenericFileOperationFailedException} is thrown if the file cannot be retrieved. 334 * <p/> 335 * This method allows to suppress this and just ignore that. 336 * 337 * @param name the file name 338 * @param exchange the exchange 339 * @param cause optional exception occurred during retrieving file 340 * @return <tt>true</tt> to ignore, <tt>false</tt> is the default. 341 */ 342 protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) { 343 return false; 344 } 345 346 /** 347 * Processes the exchange 348 * 349 * @param exchange the exchange 350 * @return <tt>true</tt> if the file was started to be processed, <tt>false</tt> if the file was not started 351 * to be processed, for some reason (not found, or aborted etc) 352 */ 353 protected boolean processExchange(final Exchange exchange) { 354 GenericFile<T> file = getExchangeFileProperty(exchange); 355 log.trace("Processing file: {}", file); 356 357 // must extract the absolute name before the begin strategy as the file could potentially be pre moved 358 // and then the file name would be changed 359 String absoluteFileName = file.getAbsoluteFilePath(); 360 361 // check if we can begin processing the file 362 final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy(); 363 364 Exception beginCause = null; 365 boolean begin = false; 366 try { 367 begin = processStrategy.begin(operations, endpoint, exchange, file); 368 } catch (Exception e) { 369 beginCause = e; 370 } 371 372 if (!begin) { 373 // no something was wrong, so we need to abort and remove the file from the in progress list 374 Exception abortCause = null; 375 log.debug("{} cannot begin processing file: {}", endpoint, file); 376 try { 377 // abort 378 processStrategy.abort(operations, endpoint, exchange, file); 379 } catch (Exception e) { 380 abortCause = e; 381 } finally { 382 // begin returned false, so remove file from the in progress list as its no longer in progress 383 endpoint.getInProgressRepository().remove(absoluteFileName); 384 } 385 if (beginCause != null) { 386 String msg = endpoint + " cannot begin processing file: " + file + " due to: " + beginCause.getMessage(); 387 handleException(msg, beginCause); 388 } 389 if (abortCause != null) { 390 String msg2 = endpoint + " cannot abort processing file: " + file + " due to: " + abortCause.getMessage(); 391 handleException(msg2, abortCause); 392 } 393 return false; 394 } 395 396 // must use file from exchange as it can be updated due the 397 // preMoveNamePrefix/preMoveNamePostfix options 398 final GenericFile<T> target = getExchangeFileProperty(exchange); 399 // must use full name when downloading so we have the correct path 400 final String name = target.getAbsoluteFilePath(); 401 try { 402 403 if (isRetrieveFile()) { 404 // retrieve the file using the stream 405 log.trace("Retrieving file: {} from: {}", name, endpoint); 406 407 // retrieve the file and check it was a success 408 boolean retrieved; 409 Exception cause = null; 410 try { 411 retrieved = operations.retrieveFile(name, exchange); 412 } catch (Exception e) { 413 retrieved = false; 414 cause = e; 415 } 416 417 if (!retrieved) { 418 if (ignoreCannotRetrieveFile(name, exchange, cause)) { 419 log.trace("Cannot retrieve file {} maybe it does not exists. Ignoring.", name); 420 // remove file from the in progress list as we could not retrieve it, but should ignore 421 endpoint.getInProgressRepository().remove(absoluteFileName); 422 return false; 423 } else { 424 // throw exception to handle the problem with retrieving the file 425 // then if the method return false or throws an exception is handled the same in here 426 // as in both cases an exception is being thrown 427 if (cause != null && cause instanceof GenericFileOperationFailedException) { 428 throw cause; 429 } else { 430 throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint, cause); 431 } 432 } 433 } 434 435 log.trace("Retrieved file: {} from: {}", name, endpoint); 436 } else { 437 log.trace("Skipped retrieval of file: {} from: {}", name, endpoint); 438 exchange.getIn().setBody(null); 439 } 440 441 // register on completion callback that does the completion strategies 442 // (for instance to move the file after we have processed it) 443 exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName)); 444 445 log.debug("About to process file: {} using exchange: {}", target, exchange); 446 447 if (endpoint.isSynchronous()) { 448 // process synchronously 449 getProcessor().process(exchange); 450 } else { 451 // process the exchange using the async consumer to support async routing engine 452 // which can be supported by this file consumer as all the done work is 453 // provided in the GenericFileOnCompletion 454 getAsyncProcessor().process(exchange, new AsyncCallback() { 455 public void done(boolean doneSync) { 456 // noop 457 if (log.isTraceEnabled()) { 458 log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously"); 459 } 460 } 461 }); 462 } 463 464 } catch (Exception e) { 465 // remove file from the in progress list due to failure 466 // (cannot be in finally block due to GenericFileOnCompletion will remove it 467 // from in progress when it takes over and processes the file, which may happen 468 // by another thread at a later time. So its only safe to remove it if there was an exception) 469 endpoint.getInProgressRepository().remove(absoluteFileName); 470 471 String msg = "Error processing file " + file + " due to " + e.getMessage(); 472 handleException(msg, e); 473 } 474 475 return true; 476 } 477 478 /** 479 * Override if required. Files are retrieved / returns true by default 480 * 481 * @return <tt>true</tt> to retrieve files, <tt>false</tt> to skip retrieval of files. 482 */ 483 protected boolean isRetrieveFile() { 484 return true; 485 } 486 487 /** 488 * Processes the exchange using a custom processor. 489 * 490 * @param exchange the exchange 491 * @param processor the custom processor 492 */ 493 protected boolean customProcessExchange(final Exchange exchange, final Processor processor) { 494 GenericFile<T> file = getExchangeFileProperty(exchange); 495 log.trace("Custom processing file: {}", file); 496 497 // must extract the absolute name before the begin strategy as the file could potentially be pre moved 498 // and then the file name would be changed 499 String absoluteFileName = file.getAbsoluteFilePath(); 500 501 try { 502 // process using the custom processor 503 processor.process(exchange); 504 } catch (Exception e) { 505 if (log.isDebugEnabled()) { 506 log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e); 507 } 508 handleException(e); 509 } finally { 510 // always remove file from the in progress list as its no longer in progress 511 // use the original file name that was used to add it to the repository 512 // as the name can be different when using preMove option 513 endpoint.getInProgressRepository().remove(absoluteFileName); 514 } 515 516 return true; 517 } 518 519 /** 520 * Strategy for validating if the given remote file should be included or not 521 * 522 * @param file the file 523 * @param isDirectory whether the file is a directory or a file 524 * @param files files in the directory 525 * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it 526 */ 527 protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) { 528 String absoluteFilePath = file.getAbsoluteFilePath(); 529 530 if (!isMatched(file, isDirectory, files)) { 531 log.trace("File did not match. Will skip this file: {}", file); 532 return false; 533 } 534 535 // directory is always valid 536 if (isDirectory) { 537 return true; 538 } 539 540 // check if file is already in progress 541 if (endpoint.getInProgressRepository().contains(absoluteFilePath)) { 542 if (log.isTraceEnabled()) { 543 log.trace("Skipping as file is already in progress: {}", file.getFileName()); 544 } 545 return false; 546 } 547 548 // if its a file then check we have the file in the idempotent registry already 549 if (endpoint.isIdempotent()) { 550 // use absolute file path as default key, but evaluate if an expression key was configured 551 String key = file.getAbsoluteFilePath(); 552 if (endpoint.getIdempotentKey() != null) { 553 Exchange dummy = endpoint.createExchange(file); 554 key = endpoint.getIdempotentKey().evaluate(dummy, String.class); 555 } 556 if (key != null && endpoint.getIdempotentRepository().contains(key)) { 557 log.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", key, file); 558 return false; 559 } 560 } 561 562 // okay so final step is to be able to add atomic as in-progress, so we are the 563 // only thread processing this file 564 return endpoint.getInProgressRepository().add(absoluteFilePath); 565 } 566 567 /** 568 * Strategy to perform file matching based on endpoint configuration. 569 * <p/> 570 * Will always return <tt>false</tt> for certain files/folders: 571 * <ul> 572 * <li>Starting with a dot</li> 573 * <li>lock files</li> 574 * </ul> 575 * And then <tt>true</tt> for directories. 576 * 577 * @param file the file 578 * @param isDirectory whether the file is a directory or a file 579 * @param files files in the directory 580 * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not 581 */ 582 protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) { 583 String name = file.getFileNameOnly(); 584 585 // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock") 586 if (name.startsWith(".")) { 587 return false; 588 } 589 590 // lock files should be skipped 591 if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) { 592 return false; 593 } 594 595 if (endpoint.getFilter() != null) { 596 if (!endpoint.getFilter().accept(file)) { 597 return false; 598 } 599 } 600 601 if (endpoint.getAntFilter() != null) { 602 if (!endpoint.getAntFilter().accept(file)) { 603 return false; 604 } 605 } 606 607 // directories are regarded as matched if filter accepted them 608 if (isDirectory) { 609 return true; 610 } 611 612 // exclude take precedence over include 613 if (excludePattern != null) { 614 if (excludePattern.matcher(name).matches()) { 615 return false; 616 } 617 } 618 if (includePattern != null) { 619 if (!includePattern.matcher(name).matches()) { 620 return false; 621 } 622 } 623 624 // use file expression for a simple dynamic file filter 625 if (endpoint.getFileName() != null) { 626 fileExpressionResult = evaluateFileExpression(); 627 if (fileExpressionResult != null) { 628 if (!name.equals(fileExpressionResult)) { 629 return false; 630 } 631 } 632 } 633 634 // if done file name is enabled, then the file is only valid if a done file exists 635 if (endpoint.getDoneFileName() != null) { 636 // done file must be in same path as the file 637 String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath()); 638 ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint); 639 640 // is it a done file name? 641 if (endpoint.isDoneFile(file.getFileNameOnly())) { 642 log.trace("Skipping done file: {}", file); 643 return false; 644 } 645 646 if (!isMatched(file, doneFileName, files)) { 647 return false; 648 } 649 } 650 651 return true; 652 } 653 654 /** 655 * Strategy to perform file matching based on endpoint configuration in terms of done file name. 656 * 657 * @param file the file 658 * @param doneFileName the done file name (without any paths) 659 * @param files files in the directory 660 * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not 661 */ 662 protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files); 663 664 /** 665 * Is the given file already in progress. 666 * 667 * @param file the file 668 * @return <tt>true</tt> if the file is already in progress 669 * @deprecated no longer in use, use {@link org.apache.camel.component.file.GenericFileEndpoint#getInProgressRepository()} instead. 670 */ 671 @Deprecated 672 protected boolean isInProgress(GenericFile<T> file) { 673 String key = file.getAbsoluteFilePath(); 674 // must use add, to have operation as atomic 675 return !endpoint.getInProgressRepository().add(key); 676 } 677 678 protected String evaluateFileExpression() { 679 if (fileExpressionResult == null && endpoint.getFileName() != null) { 680 // create a dummy exchange as Exchange is needed for expression evaluation 681 Exchange dummy = endpoint.createExchange(); 682 fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class); 683 } 684 return fileExpressionResult; 685 } 686 687 @SuppressWarnings("unchecked") 688 private GenericFile<T> getExchangeFileProperty(Exchange exchange) { 689 return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); 690 } 691 692 @Override 693 protected void doStart() throws Exception { 694 super.doStart(); 695 } 696 697 @Override 698 protected void doStop() throws Exception { 699 prepareOnStartup = false; 700 super.doStop(); 701 } 702}