001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.component.file.strategy; 018 019import java.io.File; 020import java.util.concurrent.ScheduledExecutorService; 021import java.util.concurrent.TimeUnit; 022 023import org.apache.camel.CamelContext; 024import org.apache.camel.CamelContextAware; 025import org.apache.camel.Exchange; 026import org.apache.camel.LoggingLevel; 027import org.apache.camel.component.file.GenericFile; 028import org.apache.camel.component.file.GenericFileEndpoint; 029import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy; 030import org.apache.camel.component.file.GenericFileOperations; 031import org.apache.camel.spi.IdempotentRepository; 032import org.apache.camel.support.ServiceSupport; 033import org.apache.camel.util.CamelLogger; 034import org.apache.camel.util.ObjectHelper; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A file read lock that uses an {@link org.apache.camel.spi.IdempotentRepository} as the lock strategy. This allows to plugin and use existing 040 * idempotent repositories that for example supports clustering. The other read lock strategies that are using marker files or file locks, 041 * are not guaranteed to work in clustered setup with various platform and file systems. 042 */ 043public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport implements GenericFileExclusiveReadLockStrategy<File>, CamelContextAware { 044 045 private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentRepositoryReadLockStrategy.class); 046 047 private GenericFileEndpoint<File> endpoint; 048 private LoggingLevel readLockLoggingLevel = LoggingLevel.DEBUG; 049 private CamelContext camelContext; 050 private IdempotentRepository<String> idempotentRepository; 051 private boolean removeOnRollback = true; 052 private boolean removeOnCommit; 053 private int readLockIdempotentReleaseDelay; 054 private boolean readLockIdempotentReleaseAsync; 055 private int readLockIdempotentReleaseAsyncPoolSize; 056 private ScheduledExecutorService readLockIdempotentReleaseExecutorService; 057 private boolean shutdownExecutorService; 058 059 @Override 060 public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception { 061 this.endpoint = endpoint; 062 LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository, endpoint); 063 } 064 065 @Override 066 public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { 067 // in clustered mode then another node may have processed the file so we must check here again if the file exists 068 File path = file.getFile(); 069 if (!path.exists()) { 070 return false; 071 } 072 073 // check if we can begin on this file 074 String key = asKey(file); 075 boolean answer = idempotentRepository.add(key); 076 if (!answer) { 077 // another node is processing the file so skip 078 CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock. Will skip the file: " + file); 079 } 080 return answer; 081 } 082 083 @Override 084 public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { 085 // noop 086 } 087 088 @Override 089 public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { 090 String key = asKey(file); 091 Runnable r = () -> { 092 if (removeOnRollback) { 093 idempotentRepository.remove(key); 094 } else { 095 // okay we should not remove then confirm it instead 096 idempotentRepository.confirm(key); 097 } 098 }; 099 100 if (readLockIdempotentReleaseDelay > 0 && readLockIdempotentReleaseExecutorService != null) { 101 LOG.debug("Scheduling readlock release task to run asynchronous delayed after {} millis", readLockIdempotentReleaseDelay); 102 readLockIdempotentReleaseExecutorService.schedule(r, readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS); 103 } else if (readLockIdempotentReleaseDelay > 0) { 104 LOG.debug("Delaying readlock release task {} millis", readLockIdempotentReleaseDelay); 105 Thread.sleep(readLockIdempotentReleaseDelay); 106 r.run(); 107 } else { 108 r.run(); 109 } 110 } 111 112 @Override 113 public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception { 114 String key = asKey(file); 115 Runnable r = () -> { 116 if (removeOnCommit) { 117 idempotentRepository.remove(key); 118 } else { 119 // confirm on commit 120 idempotentRepository.confirm(key); 121 } 122 }; 123 124 if (readLockIdempotentReleaseDelay > 0 && readLockIdempotentReleaseExecutorService != null) { 125 LOG.debug("Scheduling readlock release task to run asynchronous delayed after {} millis", readLockIdempotentReleaseDelay); 126 readLockIdempotentReleaseExecutorService.schedule(r, readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS); 127 } else if (readLockIdempotentReleaseDelay > 0) { 128 LOG.debug("Delaying readlock release task {} millis", readLockIdempotentReleaseDelay); 129 Thread.sleep(readLockIdempotentReleaseDelay); 130 r.run(); 131 } else { 132 r.run(); 133 } 134 } 135 136 public void setTimeout(long timeout) { 137 // noop 138 } 139 140 public void setCheckInterval(long checkInterval) { 141 // noop 142 } 143 144 public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) { 145 this.readLockLoggingLevel = readLockLoggingLevel; 146 } 147 148 public void setMarkerFiler(boolean markerFile) { 149 // noop 150 } 151 152 public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) { 153 // noop 154 } 155 156 public CamelContext getCamelContext() { 157 return camelContext; 158 } 159 160 public void setCamelContext(CamelContext camelContext) { 161 this.camelContext = camelContext; 162 } 163 164 /** 165 * The idempotent repository to use as the store for the read locks. 166 */ 167 public IdempotentRepository<String> getIdempotentRepository() { 168 return idempotentRepository; 169 } 170 171 /** 172 * The idempotent repository to use as the store for the read locks. 173 */ 174 public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) { 175 this.idempotentRepository = idempotentRepository; 176 } 177 178 /** 179 * Whether to remove the file from the idempotent repository when doing a rollback. 180 * <p/> 181 * By default this is true. 182 */ 183 public boolean isRemoveOnRollback() { 184 return removeOnRollback; 185 } 186 187 /** 188 * Whether to remove the file from the idempotent repository when doing a rollback. 189 * <p/> 190 * By default this is true. 191 */ 192 public void setRemoveOnRollback(boolean removeOnRollback) { 193 this.removeOnRollback = removeOnRollback; 194 } 195 196 /** 197 * Whether to remove the file from the idempotent repository when doing a commit. 198 * <p/> 199 * By default this is false. 200 */ 201 public boolean isRemoveOnCommit() { 202 return removeOnCommit; 203 } 204 205 /** 206 * Whether to remove the file from the idempotent repository when doing a commit. 207 * <p/> 208 * By default this is false. 209 */ 210 public void setRemoveOnCommit(boolean removeOnCommit) { 211 this.removeOnCommit = removeOnCommit; 212 } 213 214 public int getReadLockIdempotentReleaseDelay() { 215 return readLockIdempotentReleaseDelay; 216 } 217 218 /** 219 * Whether to delay the release task for a period of millis. 220 */ 221 public void setReadLockIdempotentReleaseDelay(int readLockIdempotentReleaseDelay) { 222 this.readLockIdempotentReleaseDelay = readLockIdempotentReleaseDelay; 223 } 224 225 public boolean isReadLockIdempotentReleaseAsync() { 226 return readLockIdempotentReleaseAsync; 227 } 228 229 /** 230 * Whether the delayed release task should be synchronous or asynchronous. 231 */ 232 public void setReadLockIdempotentReleaseAsync(boolean readLockIdempotentReleaseAsync) { 233 this.readLockIdempotentReleaseAsync = readLockIdempotentReleaseAsync; 234 } 235 236 public int getReadLockIdempotentReleaseAsyncPoolSize() { 237 return readLockIdempotentReleaseAsyncPoolSize; 238 } 239 240 /** 241 * The number of threads in the scheduled thread pool when using asynchronous release tasks. 242 */ 243 public void setReadLockIdempotentReleaseAsyncPoolSize(int readLockIdempotentReleaseAsyncPoolSize) { 244 this.readLockIdempotentReleaseAsyncPoolSize = readLockIdempotentReleaseAsyncPoolSize; 245 } 246 247 public ScheduledExecutorService getReadLockIdempotentReleaseExecutorService() { 248 return readLockIdempotentReleaseExecutorService; 249 } 250 251 /** 252 * To use a custom and shared thread pool for asynchronous release tasks. 253 */ 254 public void setReadLockIdempotentReleaseExecutorService(ScheduledExecutorService readLockIdempotentReleaseExecutorService) { 255 this.readLockIdempotentReleaseExecutorService = readLockIdempotentReleaseExecutorService; 256 } 257 258 protected String asKey(GenericFile<File> file) { 259 // use absolute file path as default key, but evaluate if an expression key was configured 260 String key = file.getAbsoluteFilePath(); 261 if (endpoint.getIdempotentKey() != null) { 262 Exchange dummy = endpoint.createExchange(file); 263 key = endpoint.getIdempotentKey().evaluate(dummy, String.class); 264 } 265 return key; 266 } 267 268 @Override 269 protected void doStart() throws Exception { 270 ObjectHelper.notNull(camelContext, "camelContext", this); 271 ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this); 272 273 if (readLockIdempotentReleaseAsync && readLockIdempotentReleaseExecutorService == null) { 274 readLockIdempotentReleaseExecutorService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "ReadLockIdempotentReleaseTask", readLockIdempotentReleaseAsyncPoolSize); 275 shutdownExecutorService = true; 276 } 277 } 278 279 @Override 280 protected void doStop() throws Exception { 281 if (shutdownExecutorService && readLockIdempotentReleaseExecutorService != null) { 282 camelContext.getExecutorServiceManager().shutdownGraceful(readLockIdempotentReleaseExecutorService, 30000); 283 readLockIdempotentReleaseExecutorService = null; 284 } 285 } 286 287}