001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file.strategy;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.RandomAccessFile;
022    import java.nio.channels.Channel;
023    import java.nio.channels.FileChannel;
024    import java.nio.channels.FileLock;
025    
026    import org.apache.camel.Exchange;
027    import org.apache.camel.component.file.GenericFile;
028    import org.apache.camel.component.file.GenericFileEndpoint;
029    import org.apache.camel.component.file.GenericFileOperations;
030    import org.apache.camel.util.IOHelper;
031    import org.apache.camel.util.StopWatch;
032    import org.slf4j.Logger;
033    import org.slf4j.LoggerFactory;
034    
035    /**
036     * Acquires exclusive read lock to the given file. Will wait until the lock is granted.
037     * After granting the read lock it is released, we just want to make sure that when we start
038     * consuming the file its not currently in progress of being written by third party.
039     */
040    public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLockStrategy {
041        private static final transient Logger LOG = LoggerFactory.getLogger(FileLockExclusiveReadLockStrategy.class);
042        private long timeout;
043        private long checkInterval = 1000;
044        private FileLock lock;
045        private String lockFileName;
046    
047        public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) {
048            // noop
049        }
050    
051        public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
052            // must call super
053            if (!super.acquireExclusiveReadLock(operations, file, exchange)) {
054                return false;
055            }
056    
057            File target = new File(file.getAbsoluteFilePath());
058    
059            LOG.trace("Waiting for exclusive read lock to file: {}", target);
060    
061            try {
062                // try to acquire rw lock on the file before we can consume it
063                FileChannel channel = new RandomAccessFile(target, "rw").getChannel();
064    
065                boolean exclusive = false;
066                StopWatch watch = new StopWatch();
067    
068                while (!exclusive) {
069                    // timeout check
070                    if (timeout > 0) {
071                        long delta = watch.taken();
072                        if (delta > timeout) {
073                            LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target);
074                            // we could not get the lock within the timeout period, so return false
075                            return false;
076                        }
077                    }
078    
079                    // get the lock using either try lock or not depending on if we are using timeout or not
080                    try {
081                        lock = timeout > 0 ? channel.tryLock() : channel.lock();
082                    } catch (IllegalStateException ex) {
083                        // Also catch the OverlappingFileLockException here. Do nothing here                    
084                    }
085                    if (lock != null) {
086                        LOG.trace("Acquired exclusive read lock: {} to file: {}", lock, target);
087                        lockFileName = target.getName();
088                        exclusive = true;
089                    } else {
090                        boolean interrupted = sleep();
091                        if (interrupted) {
092                            // we were interrupted while sleeping, we are likely being shutdown so return false
093                            return false;
094                        }
095                    }
096                }
097            } catch (IOException e) {
098                // must handle IOException as some apps on Windows etc. will still somehow hold a lock to a file
099                // such as AntiVirus or MS Office that has special locks for it's supported files
100                if (timeout == 0) {
101                    // if not using timeout, then we cant retry, so rethrow
102                    throw e;
103                }
104                LOG.debug("Cannot acquire read lock. Will try again.", e);
105                boolean interrupted = sleep();
106                if (interrupted) {
107                    // we were interrupted while sleeping, we are likely being shutdown so return false
108                    return false;
109                }
110            }
111    
112            return true;
113        }
114    
115        public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
116                                             GenericFile<File> file, Exchange exchange) throws Exception {
117    
118            // must call super
119            super.releaseExclusiveReadLock(operations, file, exchange);
120    
121            if (lock != null) {
122                Channel channel = lock.channel();
123                try {
124                    lock.release();
125                } finally {
126                    // must close channel first
127                    IOHelper.close(channel, "while acquiring exclusive read lock for file: " + lockFileName, LOG);
128                }
129            }
130        }
131    
132        private boolean sleep() {
133            LOG.trace("Exclusive read lock not granted. Sleeping for {} millis.", checkInterval);
134            try {
135                Thread.sleep(checkInterval);
136                return false;
137            } catch (InterruptedException e) {
138                LOG.debug("Sleep interrupted while waiting for exclusive read lock, so breaking out");
139                return true;
140            }
141        }
142    
143        public long getTimeout() {
144            return timeout;
145        }
146    
147        public void setTimeout(long timeout) {
148            this.timeout = timeout;
149        }
150    
151        public void setCheckInterval(long checkInterval) {
152            this.checkInterval = checkInterval;
153        }
154    
155    }