001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file.cluster; 018 019import java.io.RandomAccessFile; 020import java.nio.channels.FileChannel; 021import java.nio.channels.FileLock; 022import java.nio.channels.OverlappingFileLockException; 023import java.nio.file.Files; 024import java.nio.file.Path; 025import java.nio.file.Paths; 026import java.util.Collections; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.ScheduledExecutorService; 030import java.util.concurrent.ScheduledFuture; 031import java.util.concurrent.TimeUnit; 032 033import org.apache.camel.cluster.CamelClusterMember; 034import org.apache.camel.impl.cluster.AbstractCamelClusterView; 035import org.apache.camel.util.IOHelper; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039public class FileLockClusterView extends AbstractCamelClusterView { 040 private static final Logger LOGGER = LoggerFactory.getLogger(FileLockClusterView.class); 041 042 private final ClusterMember localMember; 043 private final Path path; 044 private RandomAccessFile file; 045 private FileChannel channel; 046 private FileLock lock; 047 private ScheduledFuture<?> task; 048 049 FileLockClusterView(FileLockClusterService cluster, String namespace) { 050 super(cluster, namespace); 051 052 this.localMember = new ClusterMember(); 053 this.path = Paths.get(cluster.getRoot(), namespace); 054 055 } 056 057 @Override 058 public Optional<CamelClusterMember> getLeader() { 059 return this.localMember.isLeader() 060 ? Optional.of(this.localMember) 061 : Optional.empty(); 062 } 063 064 @Override 065 public CamelClusterMember getLocalMember() { 066 return this.localMember; 067 } 068 069 @Override 070 public List<CamelClusterMember> getMembers() { 071 // It may be useful to lock only a region of the file an then have views 072 // appending their id to the file on different regions so we can 073 // have a list of members. Root/Header region that is used for locking 074 // purpose may also contains the lock holder. 075 return Collections.emptyList(); 076 } 077 078 @Override 079 protected void doStart() throws Exception { 080 if (file != null) { 081 close(); 082 083 fireLeadershipChangedEvent(Optional.empty()); 084 } 085 086 if (!Files.exists(path.getParent())) { 087 Files.createDirectories(path.getParent()); 088 } 089 090 file = new RandomAccessFile(path.toFile(), "rw"); 091 channel = file.getChannel(); 092 093 FileLockClusterService service = getClusterService().unwrap(FileLockClusterService.class); 094 ScheduledExecutorService executor = service.getExecutor(); 095 096 task = executor.scheduleAtFixedRate( 097 this::tryLock, 098 TimeUnit.MILLISECONDS.convert(service.getAcquireLockDelay(), service.getAcquireLockDelayUnit()), 099 TimeUnit.MILLISECONDS.convert(service.getAcquireLockInterval(), service.getAcquireLockIntervalUnit()), 100 TimeUnit.MILLISECONDS 101 ); 102 } 103 104 @Override 105 protected void doStop() throws Exception { 106 close(); 107 } 108 109 // ********************************* 110 // 111 // ********************************* 112 113 private void close() throws Exception { 114 if (task != null) { 115 task.cancel(true); 116 } 117 118 if (lock != null) { 119 lock.release(); 120 } 121 122 if (file != null) { 123 IOHelper.close(channel); 124 IOHelper.close(file); 125 126 channel = null; 127 file = null; 128 } 129 } 130 131 private void tryLock() { 132 if (isStarting() || isStarted()) { 133 try { 134 if (localMember.isLeader()) { 135 LOGGER.trace("Holding the lock on file {} (lock={})", path, lock); 136 return; 137 } 138 139 synchronized (FileLockClusterView.this) { 140 if (lock != null) { 141 LOGGER.info("Lock on file {} lost (lock={})", path, lock); 142 fireLeadershipChangedEvent(Optional.empty()); 143 } 144 145 LOGGER.debug("Try to acquire a lock on {}", path); 146 147 lock = null; 148 lock = channel.tryLock(); 149 150 if (lock != null) { 151 LOGGER.info("Lock on file {} acquired (lock={})", path, lock); 152 fireLeadershipChangedEvent(Optional.of(localMember)); 153 } else { 154 LOGGER.debug("Lock on file {} not acquired ", path); 155 } 156 } 157 } catch (OverlappingFileLockException e) { 158 LOGGER.debug("Lock on file {} not acquired ", path); 159 } catch (Exception e) { 160 throw new RuntimeException(e); 161 } 162 } 163 } 164 165 private final class ClusterMember implements CamelClusterMember { 166 @Override 167 public boolean isLeader() { 168 synchronized (FileLockClusterView.this) { 169 return lock != null && lock.isValid(); 170 } 171 } 172 173 @Override 174 public boolean isLocal() { 175 return true; 176 } 177 178 @Override 179 public String getId() { 180 return getClusterService().getId(); 181 } 182 } 183}