001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.component.file.strategy; 018 019 import java.io.File; 020 import java.io.IOException; 021 import java.io.RandomAccessFile; 022 import java.nio.channels.Channel; 023 import java.nio.channels.FileChannel; 024 import java.nio.channels.FileLock; 025 026 import org.apache.camel.Exchange; 027 import org.apache.camel.component.file.GenericFile; 028 import org.apache.camel.component.file.GenericFileEndpoint; 029 import org.apache.camel.component.file.GenericFileOperations; 030 import org.apache.camel.util.IOHelper; 031 import org.apache.camel.util.StopWatch; 032 import org.slf4j.Logger; 033 import org.slf4j.LoggerFactory; 034 035 /** 036 * Acquires exclusive read lock to the given file. Will wait until the lock is granted. 037 * After granting the read lock it is released, we just want to make sure that when we start 038 * consuming the file its not currently in progress of being written by third party. 039 */ 040 public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLockStrategy { 041 private static final transient Logger LOG = LoggerFactory.getLogger(FileLockExclusiveReadLockStrategy.class); 042 private long timeout; 043 private long checkInterval = 1000; 044 private FileLock lock; 045 private String lockFileName; 046 047 public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) { 048 // noop 049 } 050 051 public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { 052 // must call super 053 if (!super.acquireExclusiveReadLock(operations, file, exchange)) { 054 return false; 055 } 056 057 File target = new File(file.getAbsoluteFilePath()); 058 059 LOG.trace("Waiting for exclusive read lock to file: {}", target); 060 061 try { 062 // try to acquire rw lock on the file before we can consume it 063 FileChannel channel = new RandomAccessFile(target, "rw").getChannel(); 064 065 boolean exclusive = false; 066 StopWatch watch = new StopWatch(); 067 068 while (!exclusive) { 069 // timeout check 070 if (timeout > 0) { 071 long delta = watch.taken(); 072 if (delta > timeout) { 073 LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target); 074 // we could not get the lock within the timeout period, so return false 075 return false; 076 } 077 } 078 079 // get the lock using either try lock or not depending on if we are using timeout or not 080 try { 081 lock = timeout > 0 ? channel.tryLock() : channel.lock(); 082 } catch (IllegalStateException ex) { 083 // Also catch the OverlappingFileLockException here. Do nothing here 084 } 085 if (lock != null) { 086 LOG.trace("Acquired exclusive read lock: {} to file: {}", lock, target); 087 lockFileName = target.getName(); 088 exclusive = true; 089 } else { 090 boolean interrupted = sleep(); 091 if (interrupted) { 092 // we were interrupted while sleeping, we are likely being shutdown so return false 093 return false; 094 } 095 } 096 } 097 } catch (IOException e) { 098 // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file 099 // such as AntiVirus or MS Office that has special locks for it's supported files 100 if (timeout == 0) { 101 // if not using timeout, then we cant retry, so rethrow 102 throw e; 103 } 104 LOG.debug("Cannot acquire read lock. Will try again.", e); 105 boolean interrupted = sleep(); 106 if (interrupted) { 107 // we were interrupted while sleeping, we are likely being shutdown so return false 108 return false; 109 } 110 } 111 112 return true; 113 } 114 115 public void releaseExclusiveReadLock(GenericFileOperations<File> operations, 116 GenericFile<File> file, Exchange exchange) throws Exception { 117 118 // must call super 119 super.releaseExclusiveReadLock(operations, file, exchange); 120 121 if (lock != null) { 122 Channel channel = lock.channel(); 123 try { 124 lock.release(); 125 } finally { 126 // must close channel first 127 IOHelper.close(channel, "while acquiring exclusive read lock for file: " + lockFileName, LOG); 128 } 129 } 130 } 131 132 private boolean sleep() { 133 LOG.trace("Exclusive read lock not granted. Sleeping for {} millis.", checkInterval); 134 try { 135 Thread.sleep(checkInterval); 136 return false; 137 } catch (InterruptedException e) { 138 LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out"); 139 return true; 140 } 141 } 142 143 public long getTimeout() { 144 return timeout; 145 } 146 147 public void setTimeout(long timeout) { 148 this.timeout = timeout; 149 } 150 151 public void setCheckInterval(long checkInterval) { 152 this.checkInterval = checkInterval; 153 } 154 155 }