001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file.cluster;
018
019import java.io.RandomAccessFile;
020import java.nio.channels.FileChannel;
021import java.nio.channels.FileLock;
022import java.nio.channels.OverlappingFileLockException;
023import java.nio.file.Files;
024import java.nio.file.Path;
025import java.nio.file.Paths;
026import java.util.Collections;
027import java.util.List;
028import java.util.Optional;
029import java.util.concurrent.ScheduledExecutorService;
030import java.util.concurrent.ScheduledFuture;
031import java.util.concurrent.TimeUnit;
032
033import org.apache.camel.cluster.CamelClusterMember;
034import org.apache.camel.impl.cluster.AbstractCamelClusterView;
035import org.apache.camel.util.IOHelper;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039public class FileLockClusterView extends AbstractCamelClusterView {
040    private static final Logger LOGGER = LoggerFactory.getLogger(FileLockClusterView.class);
041
042    private final ClusterMember localMember;
043    private final Path path;
044    private RandomAccessFile file;
045    private FileChannel channel;
046    private FileLock lock;
047    private ScheduledFuture<?> task;
048
049    FileLockClusterView(FileLockClusterService cluster, String namespace) {
050        super(cluster, namespace);
051
052        this.localMember = new ClusterMember();
053        this.path = Paths.get(cluster.getRoot(), namespace);
054
055    }
056
057    @Override
058    public Optional<CamelClusterMember> getLeader() {
059        return this.localMember.isLeader()
060            ? Optional.of(this.localMember)
061            : Optional.empty();
062    }
063
064    @Override
065    public CamelClusterMember getLocalMember() {
066        return this.localMember;
067    }
068
069    @Override
070    public List<CamelClusterMember> getMembers() {
071        // It may be useful to lock only a region of the file an then have views
072        // appending their id to the file on different regions so we can
073        // have a list of members. Root/Header region that is used for locking
074        // purpose may also contains the lock holder.
075        return Collections.emptyList();
076    }
077
078    @Override
079    protected void doStart() throws Exception {
080        if (file != null) {
081            close();
082
083            fireLeadershipChangedEvent(Optional.empty());
084        }
085
086        if (!Files.exists(path.getParent())) {
087            Files.createDirectories(path.getParent());
088        }
089
090        file = new RandomAccessFile(path.toFile(), "rw");
091        channel = file.getChannel();
092
093        FileLockClusterService service = getClusterService().unwrap(FileLockClusterService.class);
094        ScheduledExecutorService executor = service.getExecutor();
095
096        task = executor.scheduleAtFixedRate(
097            this::tryLock,
098            TimeUnit.MILLISECONDS.convert(service.getAcquireLockDelay(), service.getAcquireLockDelayUnit()),
099            TimeUnit.MILLISECONDS.convert(service.getAcquireLockInterval(), service.getAcquireLockIntervalUnit()),
100            TimeUnit.MILLISECONDS
101        );
102    }
103
104    @Override
105    protected void doStop() throws Exception {
106        close();
107    }
108
109    // *********************************
110    //
111    // *********************************
112
113    private void close() throws Exception {
114        if (task != null) {
115            task.cancel(true);
116        }
117
118        if (lock != null) {
119            lock.release();
120        }
121
122        if (file != null) {
123            IOHelper.close(channel);
124            IOHelper.close(file);
125
126            channel = null;
127            file = null;
128        }
129    }
130
131    private void tryLock() {
132        if (isStarting() || isStarted()) {
133            try {
134                if (localMember.isLeader()) {
135                    LOGGER.trace("Holding the lock on file {} (lock={})", path, lock);
136                    return;
137                }
138
139                synchronized (FileLockClusterView.this) {
140                    if (lock != null) {
141                        LOGGER.info("Lock on file {} lost (lock={})", path, lock);
142                        fireLeadershipChangedEvent(Optional.empty());
143                    }
144
145                    LOGGER.debug("Try to acquire a lock on {}", path);
146
147                    lock = null;
148                    lock = channel.tryLock();
149
150                    if (lock != null) {
151                        LOGGER.info("Lock on file {} acquired (lock={})", path, lock);
152                        fireLeadershipChangedEvent(Optional.of(localMember));
153                    } else {
154                        LOGGER.debug("Lock on file {} not acquired ", path);
155                    }
156                }
157            } catch (OverlappingFileLockException e) {
158                LOGGER.debug("Lock on file {} not acquired ", path);
159            } catch (Exception e) {
160                throw new RuntimeException(e);
161            }
162        }
163    }
164
165    private final class ClusterMember implements CamelClusterMember {
166        @Override
167        public boolean isLeader() {
168            synchronized (FileLockClusterView.this) {
169                return lock != null && lock.isValid();
170            }
171        }
172
173        @Override
174        public boolean isLocal() {
175            return true;
176        }
177
178        @Override
179        public String getId() {
180            return getClusterService().getId();
181        }
182    }
183}