001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file; 018 019 import java.util.ArrayList; 020 import java.util.Collections; 021 import java.util.Deque; 022 import java.util.LinkedList; 023 import java.util.List; 024 import java.util.Queue; 025 026 import org.apache.camel.AsyncCallback; 027 import org.apache.camel.Exchange; 028 import org.apache.camel.Processor; 029 import org.apache.camel.ShutdownRunningTask; 030 import org.apache.camel.impl.ScheduledBatchPollingConsumer; 031 import org.apache.camel.util.CastUtils; 032 import org.apache.camel.util.ObjectHelper; 033 import org.apache.camel.util.StopWatch; 034 import org.apache.camel.util.TimeUtils; 035 import org.slf4j.Logger; 036 import org.slf4j.LoggerFactory; 037 038 /** 039 * Base class for file consumers. 040 */ 041 public abstract class GenericFileConsumer<T> extends ScheduledBatchPollingConsumer { 042 protected final transient Logger log = LoggerFactory.getLogger(getClass()); 043 protected GenericFileEndpoint<T> endpoint; 044 protected GenericFileOperations<T> operations; 045 protected boolean loggedIn; 046 protected String fileExpressionResult; 047 protected volatile ShutdownRunningTask shutdownRunningTask; 048 protected volatile int pendingExchanges; 049 protected Processor customProcessor; 050 protected boolean eagerLimitMaxMessagesPerPoll = true; 051 052 public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations) { 053 super(endpoint, processor); 054 this.endpoint = endpoint; 055 this.operations = operations; 056 } 057 058 public Processor getCustomProcessor() { 059 return customProcessor; 060 } 061 062 /** 063 * Use a custom processor to process the exchange. 064 * <p/> 065 * Only set this if you need to do custom processing, instead of the regular processing. 066 * <p/> 067 * This is for example used to browse file endpoints by leveraging the file consumer to poll 068 * the directory to gather the list of exchanges. But to avoid processing the files regularly 069 * we can use a custom processor. 070 * 071 * @param processor a custom processor 072 */ 073 public void setCustomProcessor(Processor processor) { 074 this.customProcessor = processor; 075 } 076 077 public boolean isEagerLimitMaxMessagesPerPoll() { 078 return eagerLimitMaxMessagesPerPoll; 079 } 080 081 public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) { 082 this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll; 083 } 084 085 /** 086 * Poll for files 087 */ 088 protected int poll() throws Exception { 089 // must reset for each poll 090 fileExpressionResult = null; 091 shutdownRunningTask = null; 092 pendingExchanges = 0; 093 094 // before we poll is there anything we need to check? 095 // such as are we connected to the FTP Server still? 096 if (!prePollCheck()) { 097 log.debug("Skipping poll as pre poll check returned false"); 098 return 0; 099 } 100 101 // gather list of files to process 102 List<GenericFile<T>> files = new ArrayList<GenericFile<T>>(); 103 String name = endpoint.getConfiguration().getDirectory(); 104 105 // time how long time it takes to poll 106 StopWatch stop = new StopWatch(); 107 boolean limitHit = !pollDirectory(name, files, 0); 108 long delta = stop.stop(); 109 if (log.isDebugEnabled()) { 110 log.debug("Took {} to poll: {}", TimeUtils.printDuration(delta), name); 111 } 112 113 // log if we hit the limit 114 if (limitHit) { 115 log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll); 116 } 117 118 // sort files using file comparator if provided 119 if (endpoint.getSorter() != null) { 120 Collections.sort(files, endpoint.getSorter()); 121 } 122 123 // sort using build in sorters so we can use expressions 124 // use a linked list so we can deque the exchanges 125 LinkedList<Exchange> exchanges = new LinkedList<Exchange>(); 126 for (GenericFile<T> file : files) { 127 Exchange exchange = endpoint.createExchange(file); 128 endpoint.configureExchange(exchange); 129 endpoint.configureMessage(file, exchange.getIn()); 130 exchanges.add(exchange); 131 } 132 // sort files using exchange comparator if provided 133 if (endpoint.getSortBy() != null) { 134 Collections.sort(exchanges, endpoint.getSortBy()); 135 } 136 137 // use a queue for the exchanges 138 Deque<Exchange> q = exchanges; 139 140 // we are not eager limiting, but we have configured a limit, so cut the list of files 141 if (!eagerLimitMaxMessagesPerPoll && maxMessagesPerPoll > 0) { 142 if (files.size() > maxMessagesPerPoll) { 143 log.debug("Limiting maximum messages to poll at {} files as there was more messages in this poll.", maxMessagesPerPoll); 144 // must first remove excessive files from the in progress repository 145 removeExcessiveInProgressFiles(q, maxMessagesPerPoll); 146 } 147 } 148 149 // consume files one by one 150 int total = exchanges.size(); 151 if (total > 0) { 152 log.debug("Total {} files to consume", total); 153 } 154 155 int polledMessages = processBatch(CastUtils.cast(q)); 156 157 postPollCheck(); 158 159 return polledMessages; 160 } 161 162 public int processBatch(Queue<Object> exchanges) { 163 int total = exchanges.size(); 164 165 // limit if needed 166 if (maxMessagesPerPoll > 0 && total > maxMessagesPerPoll) { 167 log.debug("Limiting to maximum messages to poll {} as there was {} messages in this poll.", maxMessagesPerPoll, total); 168 total = maxMessagesPerPoll; 169 } 170 171 for (int index = 0; index < total && isBatchAllowed(); index++) { 172 // only loop if we are started (allowed to run) 173 // use poll to remove the head so it does not consume memory even after we have processed it 174 Exchange exchange = (Exchange) exchanges.poll(); 175 // add current index and total as properties 176 exchange.setProperty(Exchange.BATCH_INDEX, index); 177 exchange.setProperty(Exchange.BATCH_SIZE, total); 178 exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1); 179 180 // update pending number of exchanges 181 pendingExchanges = total - index - 1; 182 183 // process the current exchange 184 if (customProcessor != null) { 185 // use a custom processor 186 customProcessExchange(exchange, customProcessor); 187 } else { 188 // process the exchange regular 189 processExchange(exchange); 190 } 191 } 192 193 // drain any in progress files as we are done with this batch 194 removeExcessiveInProgressFiles(CastUtils.cast((Deque<?>) exchanges, Exchange.class), 0); 195 196 return total; 197 } 198 199 protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) { 200 // remove the file from the in progress list in case the batch was limited by max messages per poll 201 while (exchanges.size() > limit) { 202 // must remove last 203 Exchange exchange = exchanges.removeLast(); 204 GenericFile<?> file = exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class); 205 String key = file.getAbsoluteFilePath(); 206 endpoint.getInProgressRepository().remove(key); 207 } 208 } 209 210 211 /** 212 * Whether or not we can continue polling for more files 213 * 214 * @param fileList the current list of gathered files 215 * @return <tt>true</tt> to continue, <tt>false</tt> to stop due hitting maxMessagesPerPoll limit 216 */ 217 public boolean canPollMoreFiles(List<?> fileList) { 218 // at this point we should not limit if we are not eager 219 if (!eagerLimitMaxMessagesPerPoll) { 220 return true; 221 } 222 223 if (maxMessagesPerPoll <= 0) { 224 // no limitation 225 return true; 226 } 227 228 // then only poll if we haven't reached the max limit 229 return fileList.size() < maxMessagesPerPoll; 230 } 231 232 /** 233 * Override if required. Perform some checks (and perhaps actions) before we poll. 234 * 235 * @return <tt>true</tt> to poll, <tt>false</tt> to skip this poll. 236 */ 237 protected boolean prePollCheck() throws Exception { 238 return true; 239 } 240 241 /** 242 * Override if required. Perform some checks (and perhaps actions) after we have polled. 243 */ 244 protected void postPollCheck() { 245 // noop 246 } 247 248 /** 249 * Polls the given directory for files to process 250 * 251 * @param fileName current directory or file 252 * @param fileList current list of files gathered 253 * @param depth the current depth of the directory (will start from 0) 254 * @return whether or not to continue polling, <tt>false</tt> means the maxMessagesPerPoll limit has been hit 255 */ 256 protected abstract boolean pollDirectory(String fileName, List<GenericFile<T>> fileList, int depth); 257 258 /** 259 * Sets the operations to be used. 260 * <p/> 261 * Can be used to set a fresh operations in case of recovery attempts 262 * 263 * @param operations the operations 264 */ 265 public void setOperations(GenericFileOperations<T> operations) { 266 this.operations = operations; 267 } 268 269 /** 270 * Processes the exchange 271 * 272 * @param exchange the exchange 273 */ 274 protected void processExchange(final Exchange exchange) { 275 GenericFile<T> file = getExchangeFileProperty(exchange); 276 log.trace("Processing file: {}", file); 277 278 // must extract the absolute name before the begin strategy as the file could potentially be pre moved 279 // and then the file name would be changed 280 String absoluteFileName = file.getAbsoluteFilePath(); 281 282 // check if we can begin processing the file 283 try { 284 final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy(); 285 286 boolean begin = processStrategy.begin(operations, endpoint, exchange, file); 287 if (!begin) { 288 log.debug("{} cannot begin processing file: {}", endpoint, file); 289 try { 290 // abort 291 processStrategy.abort(operations, endpoint, exchange, file); 292 } finally { 293 // begin returned false, so remove file from the in progress list as its no longer in progress 294 endpoint.getInProgressRepository().remove(absoluteFileName); 295 } 296 return; 297 } 298 } catch (Exception e) { 299 // remove file from the in progress list due to failure 300 endpoint.getInProgressRepository().remove(absoluteFileName); 301 302 String msg = endpoint + " cannot begin processing file: " + file + " due to: " + e.getMessage(); 303 handleException(msg, e); 304 return; 305 } 306 307 // must use file from exchange as it can be updated due the 308 // preMoveNamePrefix/preMoveNamePostfix options 309 final GenericFile<T> target = getExchangeFileProperty(exchange); 310 // must use full name when downloading so we have the correct path 311 final String name = target.getAbsoluteFilePath(); 312 try { 313 // retrieve the file using the stream 314 log.trace("Retrieving file: {} from: {}", name, endpoint); 315 316 // retrieve the file and check it was a success 317 boolean retrieved = operations.retrieveFile(name, exchange); 318 if (!retrieved) { 319 // throw exception to handle the problem with retrieving the file 320 // then if the method return false or throws an exception is handled the same in here 321 // as in both cases an exception is being thrown 322 throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + endpoint); 323 } 324 325 log.trace("Retrieved file: {} from: {}", name, endpoint); 326 327 // register on completion callback that does the completion strategies 328 // (for instance to move the file after we have processed it) 329 exchange.addOnCompletion(new GenericFileOnCompletion<T>(endpoint, operations, target, absoluteFileName)); 330 331 log.debug("About to process file: {} using exchange: {}", target, exchange); 332 333 // process the exchange using the async consumer to support async routing engine 334 // which can be supported by this file consumer as all the done work is 335 // provided in the GenericFileOnCompletion 336 getAsyncProcessor().process(exchange, new AsyncCallback() { 337 public void done(boolean doneSync) { 338 // noop 339 if (log.isTraceEnabled()) { 340 log.trace("Done processing file: {} {}", target, doneSync ? "synchronously" : "asynchronously"); 341 } 342 } 343 }); 344 345 } catch (Exception e) { 346 // remove file from the in progress list due to failure 347 // (cannot be in finally block due to GenericFileOnCompletion will remove it 348 // from in progress when it takes over and processes the file, which may happen 349 // by another thread at a later time. So its only safe to remove it if there was an exception) 350 endpoint.getInProgressRepository().remove(absoluteFileName); 351 352 String msg = "Error processing file " + file + " due to " + e.getMessage(); 353 handleException(msg, e); 354 } 355 } 356 357 /** 358 * Processes the exchange using a custom processor. 359 * 360 * @param exchange the exchange 361 * @param processor the custom processor 362 */ 363 protected void customProcessExchange(final Exchange exchange, final Processor processor) { 364 GenericFile<T> file = getExchangeFileProperty(exchange); 365 log.trace("Custom processing file: {}", file); 366 367 // must extract the absolute name before the begin strategy as the file could potentially be pre moved 368 // and then the file name would be changed 369 String absoluteFileName = file.getAbsoluteFilePath(); 370 371 try { 372 // process using the custom processor 373 processor.process(exchange); 374 } catch (Exception e) { 375 if (log.isDebugEnabled()) { 376 log.debug(endpoint + " error custom processing: " + file + " due to: " + e.getMessage() + ". This exception will be ignored.", e); 377 } 378 handleException(e); 379 } finally { 380 // always remove file from the in progress list as its no longer in progress 381 // use the original file name that was used to add it to the repository 382 // as the name can be different when using preMove option 383 endpoint.getInProgressRepository().remove(absoluteFileName); 384 } 385 } 386 387 /** 388 * Strategy for validating if the given remote file should be included or not 389 * 390 * @param file the file 391 * @param isDirectory whether the file is a directory or a file 392 * @param files files in the directory 393 * @return <tt>true</tt> to include the file, <tt>false</tt> to skip it 394 */ 395 protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) { 396 if (!isMatched(file, isDirectory, files)) { 397 log.trace("File did not match. Will skip this file: {}", file); 398 return false; 399 } else if (endpoint.isIdempotent() && endpoint.getIdempotentRepository().contains(file.getAbsoluteFilePath())) { 400 log.trace("This consumer is idempotent and the file has been consumed before. Will skip this file: {}", file); 401 return false; 402 } 403 404 // file matched 405 return true; 406 } 407 408 /** 409 * Strategy to perform file matching based on endpoint configuration. 410 * <p/> 411 * Will always return <tt>false</tt> for certain files/folders: 412 * <ul> 413 * <li>Starting with a dot</li> 414 * <li>lock files</li> 415 * </ul> 416 * And then <tt>true</tt> for directories. 417 * 418 * @param file the file 419 * @param isDirectory whether the file is a directory or a file 420 * @param files files in the directory 421 * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not 422 */ 423 protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) { 424 String name = file.getFileNameOnly(); 425 426 // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock") 427 if (name.startsWith(".")) { 428 return false; 429 } 430 431 // lock files should be skipped 432 if (name.endsWith(FileComponent.DEFAULT_LOCK_FILE_POSTFIX)) { 433 return false; 434 } 435 436 if (endpoint.getFilter() != null) { 437 if (!endpoint.getFilter().accept(file)) { 438 return false; 439 } 440 } 441 442 if (endpoint.getAntFilter() != null) { 443 if (!endpoint.getAntFilter().accept(file)) { 444 return false; 445 } 446 } 447 448 // directories are regarded as matched if filter accepted them 449 if (isDirectory) { 450 return true; 451 } 452 453 if (ObjectHelper.isNotEmpty(endpoint.getExclude())) { 454 if (name.matches(endpoint.getExclude())) { 455 return false; 456 } 457 } 458 459 if (ObjectHelper.isNotEmpty(endpoint.getInclude())) { 460 if (!name.matches(endpoint.getInclude())) { 461 return false; 462 } 463 } 464 465 // use file expression for a simple dynamic file filter 466 if (endpoint.getFileName() != null) { 467 evaluateFileExpression(); 468 if (fileExpressionResult != null) { 469 if (!name.equals(fileExpressionResult)) { 470 return false; 471 } 472 } 473 } 474 475 // if done file name is enabled, then the file is only valid if a done file exists 476 if (endpoint.getDoneFileName() != null) { 477 // done file must be in same path as the file 478 String doneFileName = endpoint.createDoneFileName(file.getAbsoluteFilePath()); 479 ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint); 480 481 // is it a done file name? 482 if (endpoint.isDoneFile(file.getFileNameOnly())) { 483 log.trace("Skipping done file: {}", file); 484 return false; 485 } 486 487 if (!isMatched(file, doneFileName, files)) { 488 return false; 489 } 490 } 491 492 return true; 493 } 494 495 /** 496 * Strategy to perform file matching based on endpoint configuration in terms of done file name. 497 * 498 * @param file the file 499 * @param doneFileName the done file name (without any paths) 500 * @param files files in the directory 501 * @return <tt>true</tt> if the file is matched, <tt>false</tt> if not 502 */ 503 protected abstract boolean isMatched(GenericFile<T> file, String doneFileName, List<T> files); 504 505 /** 506 * Is the given file already in progress. 507 * 508 * @param file the file 509 * @return <tt>true</tt> if the file is already in progress 510 */ 511 protected boolean isInProgress(GenericFile<T> file) { 512 String key = file.getAbsoluteFilePath(); 513 return !endpoint.getInProgressRepository().add(key); 514 } 515 516 private void evaluateFileExpression() { 517 if (fileExpressionResult == null) { 518 // create a dummy exchange as Exchange is needed for expression evaluation 519 Exchange dummy = endpoint.createExchange(); 520 fileExpressionResult = endpoint.getFileName().evaluate(dummy, String.class); 521 } 522 } 523 524 @SuppressWarnings("unchecked") 525 private GenericFile<T> getExchangeFileProperty(Exchange exchange) { 526 return (GenericFile<T>) exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE); 527 } 528 529 @Override 530 protected void doStart() throws Exception { 531 super.doStart(); 532 533 // prepare on startup 534 endpoint.getGenericFileProcessStrategy().prepareOnStartup(operations, endpoint); 535 } 536 }