001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file.strategy; 018 019import java.io.File; 020import java.util.Date; 021 022import org.apache.camel.Exchange; 023import org.apache.camel.LoggingLevel; 024import org.apache.camel.component.file.GenericFile; 025import org.apache.camel.component.file.GenericFileOperations; 026import org.apache.camel.util.CamelLogger; 027import org.apache.camel.util.StopWatch; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * Acquires exclusive read lock to the given file by checking whether the file is being 033 * changed by scanning the file at different intervals (to detect changes). 034 * <p/> 035 * Setting the option {@link #setMarkerFiler(boolean)} to <tt>false</tt> allows to turn off using marker files. 036 */ 037public class FileChangedExclusiveReadLockStrategy extends MarkerFileExclusiveReadLockStrategy { 038 private static final Logger LOG = LoggerFactory.getLogger(FileChangedExclusiveReadLockStrategy.class); 039 private long timeout; 040 private long checkInterval = 1000; 041 private long minLength = 1; 042 private long minAge; 043 private LoggingLevel readLockLoggingLevel = LoggingLevel.DEBUG; 044 045 @Override 046 public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { 047 // must call super 048 if (!super.acquireExclusiveReadLock(operations, file, exchange)) { 049 return false; 050 } 051 052 File target = new File(file.getAbsoluteFilePath()); 053 boolean exclusive = false; 054 055 LOG.trace("Waiting for exclusive read lock to file: {}", file); 056 057 long lastModified = Long.MIN_VALUE; 058 long length = Long.MIN_VALUE; 059 StopWatch watch = new StopWatch(); 060 long startTime = (new Date()).getTime(); 061 062 while (!exclusive) { 063 // timeout check 064 if (timeout > 0) { 065 long delta = watch.taken(); 066 if (delta > timeout) { 067 CamelLogger.log(LOG, readLockLoggingLevel, 068 "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file); 069 // we could not get the lock within the timeout period, so return false 070 return false; 071 } 072 } 073 074 if (!target.exists()) { 075 CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock as file no longer exists. Will skip the file: " + file); 076 return false; 077 } 078 079 long newLastModified = target.lastModified(); 080 long newLength = target.length(); 081 long newOlderThan = startTime + watch.taken() - minAge; 082 083 LOG.trace("Previous last modified: {}, new last modified: {}", lastModified, newLastModified); 084 LOG.trace("Previous length: {}, new length: {}", length, newLength); 085 LOG.trace("New older than threshold: {}", newOlderThan); 086 087 if (newLength >= minLength && ((minAge == 0 && newLastModified == lastModified && newLength == length) || (minAge != 0 && newLastModified < newOlderThan))) { 088 LOG.trace("Read lock acquired."); 089 exclusive = true; 090 } else { 091 // set new base file change information 092 lastModified = newLastModified; 093 length = newLength; 094 095 boolean interrupted = sleep(); 096 if (interrupted) { 097 // we were interrupted while sleeping, we are likely being shutdown so return false 098 return false; 099 } 100 } 101 } 102 103 return exclusive; 104 } 105 106 private boolean sleep() { 107 LOG.trace("Exclusive read lock not granted. Sleeping for {} millis.", checkInterval); 108 try { 109 Thread.sleep(checkInterval); 110 return false; 111 } catch (InterruptedException e) { 112 LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out"); 113 return true; 114 } 115 } 116 117 public long getTimeout() { 118 return timeout; 119 } 120 121 @Override 122 public void setTimeout(long timeout) { 123 this.timeout = timeout; 124 } 125 126 public long getCheckInterval() { 127 return checkInterval; 128 } 129 130 @Override 131 public void setCheckInterval(long checkInterval) { 132 this.checkInterval = checkInterval; 133 } 134 135 @Override 136 public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) { 137 this.readLockLoggingLevel = readLockLoggingLevel; 138 } 139 140 public long getMinLength() { 141 return minLength; 142 } 143 144 public void setMinLength(long minLength) { 145 this.minLength = minLength; 146 } 147 148 public long getMinAge() { 149 return minAge; 150 } 151 152 public void setMinAge(long minAge) { 153 this.minAge = minAge; 154 } 155}