001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file.strategy;
018
019import java.io.File;
020import java.io.IOException;
021import java.io.RandomAccessFile;
022import java.nio.channels.Channel;
023import java.nio.channels.FileChannel;
024import java.nio.channels.FileLock;
025
026import org.apache.camel.Exchange;
027import org.apache.camel.LoggingLevel;
028import org.apache.camel.component.file.GenericFile;
029import org.apache.camel.component.file.GenericFileEndpoint;
030import org.apache.camel.component.file.GenericFileOperations;
031import org.apache.camel.util.CamelLogger;
032import org.apache.camel.util.IOHelper;
033import org.apache.camel.util.StopWatch;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Acquires exclusive read lock to the given file. Will wait until the lock is granted.
039 * After granting the read lock it is released, we just want to make sure that when we start
040 * consuming the file its not currently in progress of being written by third party.
041 */
042public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLockStrategy {
043    private static final Logger LOG = LoggerFactory.getLogger(FileLockExclusiveReadLockStrategy.class);
044    private long timeout;
045    private long checkInterval = 1000;
046    private LoggingLevel readLockLoggingLevel = LoggingLevel.DEBUG;
047
048    @Override
049    public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) {
050        // noop
051    }
052
053    @Override
054    public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
055        // must call super
056        if (!super.acquireExclusiveReadLock(operations, file, exchange)) {
057            return false;
058        }
059
060        File target = new File(file.getAbsoluteFilePath());
061
062        LOG.trace("Waiting for exclusive read lock to file: {}", target);
063
064        FileChannel channel = null;
065        RandomAccessFile randomAccessFile = null;
066
067        boolean exclusive = false;
068        FileLock lock = null;
069
070        try {
071            randomAccessFile = new RandomAccessFile(target, "rw");
072            // try to acquire rw lock on the file before we can consume it
073            channel = randomAccessFile.getChannel();
074
075            StopWatch watch = new StopWatch();
076
077            while (!exclusive) {
078                // timeout check
079                if (timeout > 0) {
080                    long delta = watch.taken();
081                    if (delta > timeout) {
082                        CamelLogger.log(LOG, readLockLoggingLevel,
083                                "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target);
084                        // we could not get the lock within the timeout period, so return false
085                        return false;
086                    }
087                }
088
089                if (!target.exists()) {
090                    CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock as file no longer exists. Will skip the file: " + file);
091                    return false;
092                }
093
094                // get the lock using either try lock or not depending on if we are using timeout or not
095                try {
096                    lock = timeout > 0 ? channel.tryLock() : channel.lock();
097                } catch (IllegalStateException ex) {
098                    // Also catch the OverlappingFileLockException here. Do nothing here
099                }
100                if (lock != null) {
101                    LOG.trace("Acquired exclusive read lock: {} to file: {}", lock, target);
102                    exclusive = true;
103                } else {
104                    boolean interrupted = sleep();
105                    if (interrupted) {
106                        // we were interrupted while sleeping, we are likely being shutdown so return false
107                        return false;
108                    }
109                }
110            }
111        } catch (IOException e) {
112            // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file
113            // such as AntiVirus or MS Office that has special locks for it's supported files
114            if (timeout == 0) {
115                // if not using timeout, then we cant retry, so return false
116                return false;
117            }
118            LOG.debug("Cannot acquire read lock. Will try again.", e);
119            boolean interrupted = sleep();
120            if (interrupted) {
121                // we were interrupted while sleeping, we are likely being shutdown so return false
122                return false;
123            }
124        } finally {
125            // close channels if we did not grab the lock
126            if (!exclusive) {
127                IOHelper.close(channel, "while acquiring exclusive read lock for file: " + target, LOG);
128                IOHelper.close(randomAccessFile, "while acquiring exclusive read lock for file: " + target, LOG);
129
130                // and also must release super lock
131                super.releaseExclusiveReadLockOnAbort(operations, file, exchange);
132            }
133        }
134
135        // store read-lock state
136        exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), lock);
137        exchange.setProperty(asReadLockKey(file, Exchange.FILE_LOCK_RANDOM_ACCESS_FILE), randomAccessFile);
138
139        // we grabbed the lock
140        return true;
141    }
142
143    @Override
144    protected void doReleaseExclusiveReadLock(GenericFileOperations<File> operations,
145                                              GenericFile<File> file, Exchange exchange) throws Exception {
146        // must call super
147        super.doReleaseExclusiveReadLock(operations, file, exchange);
148
149        FileLock lock = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), FileLock.class);
150        RandomAccessFile rac = exchange.getProperty(asReadLockKey(file, Exchange.FILE_LOCK_EXCLUSIVE_LOCK), RandomAccessFile.class);
151
152        String target = file.getFileName();
153        if (lock != null) {
154            Channel channel = lock.acquiredBy();
155            try {
156                lock.release();
157            } finally {
158                // close channel as well
159                IOHelper.close(channel, "while releasing exclusive read lock for file: " + target, LOG);
160                IOHelper.close(rac, "while releasing exclusive read lock for file: " + target, LOG);
161            }
162        }
163    }
164
165    private boolean sleep() {
166        LOG.trace("Exclusive read lock not granted. Sleeping for {} millis.", checkInterval);
167        try {
168            Thread.sleep(checkInterval);
169            return false;
170        } catch (InterruptedException e) {
171            LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out");
172            return true;
173        }
174    }
175
176    public long getTimeout() {
177        return timeout;
178    }
179
180    @Override
181    public void setTimeout(long timeout) {
182        this.timeout = timeout;
183    }
184
185    @Override
186    public void setCheckInterval(long checkInterval) {
187        this.checkInterval = checkInterval;
188    }
189
190    @Override
191    public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
192        this.readLockLoggingLevel = readLockLoggingLevel;
193    }
194
195    private static String asReadLockKey(GenericFile file, String key) {
196        // use the copy from absolute path as that was the original path of the file when the lock was acquired
197        // for example if the file consumer uses preMove then the file is moved and therefore has another name
198        // that would no longer match
199        String path = file.getCopyFromAbsoluteFilePath() != null ? file.getCopyFromAbsoluteFilePath() : file.getAbsoluteFilePath();
200        return path + "-" + key;
201    }
202
203}