001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file.strategy; 018 019import java.io.File; 020import java.io.IOException; 021import java.io.RandomAccessFile; 022import java.nio.channels.Channel; 023import java.nio.channels.FileChannel; 024import java.nio.channels.FileLock; 025 026import org.apache.camel.Exchange; 027import org.apache.camel.LoggingLevel; 028import org.apache.camel.component.file.GenericFile; 029import org.apache.camel.component.file.GenericFileEndpoint; 030import org.apache.camel.component.file.GenericFileOperations; 031import org.apache.camel.util.CamelLogger; 032import org.apache.camel.util.IOHelper; 033import org.apache.camel.util.StopWatch; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Acquires exclusive read lock to the given file. Will wait until the lock is granted. 039 * After granting the read lock it is released, we just want to make sure that when we start 040 * consuming the file its not currently in progress of being written by third party. 041 */ 042public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLockStrategy { 043 private static final Logger LOG = LoggerFactory.getLogger(FileLockExclusiveReadLockStrategy.class); 044 private long timeout; 045 private long checkInterval = 1000; 046 private LoggingLevel readLockLoggingLevel = LoggingLevel.DEBUG; 047 048 @Override 049 public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) { 050 // noop 051 } 052 053 @Override 054 public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { 055 // must call super 056 if (!super.acquireExclusiveReadLock(operations, file, exchange)) { 057 return false; 058 } 059 060 File target = new File(file.getAbsoluteFilePath()); 061 062 LOG.trace("Waiting for exclusive read lock to file: {}", target); 063 064 FileChannel channel = null; 065 RandomAccessFile randomAccessFile = null; 066 067 boolean exclusive = false; 068 FileLock lock = null; 069 070 try { 071 randomAccessFile = new RandomAccessFile(target, "rw"); 072 // try to acquire rw lock on the file before we can consume it 073 channel = randomAccessFile.getChannel(); 074 075 StopWatch watch = new StopWatch(); 076 077 while (!exclusive) { 078 // timeout check 079 if (timeout > 0) { 080 long delta = watch.taken(); 081 if (delta > timeout) { 082 CamelLogger.log(LOG, readLockLoggingLevel, 083 "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target); 084 // we could not get the lock within the timeout period, so return false 085 return false; 086 } 087 } 088 089 if (!target.exists()) { 090 CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock as file no longer exists. Will skip the file: " + file); 091 return false; 092 } 093 094 // get the lock using either try lock or not depending on if we are using timeout or not 095 try { 096 lock = timeout > 0 ? channel.tryLock() : channel.lock(); 097 } catch (IllegalStateException ex) { 098 // Also catch the OverlappingFileLockException here. Do nothing here 099 } 100 if (lock != null) { 101 LOG.trace("Acquired exclusive read lock: {} to file: {}", lock, target); 102 exclusive = true; 103 } else { 104 boolean interrupted = sleep(); 105 if (interrupted) { 106 // we were interrupted while sleeping, we are likely being shutdown so return false 107 return false; 108 } 109 } 110 } 111 } catch (IOException e) { 112 // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file 113 // such as AntiVirus or MS Office that has special locks for it's supported files 114 if (timeout == 0) { 115 // if not using timeout, then we cant retry, so return false 116 return false; 117 } 118 LOG.debug("Cannot acquire read lock. Will try again.", e); 119 boolean interrupted = sleep(); 120 if (interrupted) { 121 // we were interrupted while sleeping, we are likely being shutdown so return false 122 return false; 123 } 124 } finally { 125 // close channels if we did not grab the lock 126 if (!exclusive) { 127 IOHelper.close(channel, "while acquiring exclusive read lock for file: " + target, LOG); 128 IOHelper.close(randomAccessFile, "while acquiring exclusive read lock for file: " + target, LOG); 129 130 // and also must release super lock 131 super.releaseExclusiveReadLockOnAbort(operations, file, exchange); 132 } 133 } 134 135 // store read-lock state 136 exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), lock); 137 exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_RANDOM_ACCESS_FILE), randomAccessFile); 138 139 // we grabbed the lock 140 return true; 141 } 142 143 @Override 144 protected void doReleaseExclusiveReadLock(GenericFileOperations<File> operations, 145 GenericFile<File> file, Exchange exchange) throws Exception { 146 // must call super 147 super.doReleaseExclusiveReadLock(operations, file, exchange); 148 149 FileLock lock = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), FileLock.class); 150 RandomAccessFile rac = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), RandomAccessFile.class); 151 152 String target = file.getFileName(); 153 if (lock != null) { 154 Channel channel = lock.acquiredBy(); 155 try { 156 lock.release(); 157 } finally { 158 // close channel as well 159 IOHelper.close(channel, "while releasing exclusive read lock for file: " + target, LOG); 160 IOHelper.close(rac, "while releasing exclusive read lock for file: " + target, LOG); 161 } 162 } 163 } 164 165 private boolean sleep() { 166 LOG.trace("Exclusive read lock not granted. Sleeping for {} millis.", checkInterval); 167 try { 168 Thread.sleep(checkInterval); 169 return false; 170 } catch (InterruptedException e) { 171 LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out"); 172 return true; 173 } 174 } 175 176 public long getTimeout() { 177 return timeout; 178 } 179 180 @Override 181 public void setTimeout(long timeout) { 182 this.timeout = timeout; 183 } 184 185 @Override 186 public void setCheckInterval(long checkInterval) { 187 this.checkInterval = checkInterval; 188 } 189 190 @Override 191 public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) { 192 this.readLockLoggingLevel = readLockLoggingLevel; 193 } 194 195 private static String asReadLockKey(GenericFile file, String key) { 196 // use the copy from absolute path as that was the original path of the file when the lock was acquired 197 // for example if the file consumer uses preMove then the file is moved and therefore has another name 198 // that would no longer match 199 String path = file.getCopyFromAbsoluteFilePath() != null ? file.getCopyFromAbsoluteFilePath() : file.getAbsoluteFilePath(); 200 return path + "-" + key; 201 } 202 203}