001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file.strategy;
018
019import java.io.File;
020import java.util.concurrent.ScheduledExecutorService;
021import java.util.concurrent.TimeUnit;
022
023import org.apache.camel.CamelContext;
024import org.apache.camel.CamelContextAware;
025import org.apache.camel.Exchange;
026import org.apache.camel.LoggingLevel;
027import org.apache.camel.component.file.GenericFile;
028import org.apache.camel.component.file.GenericFileEndpoint;
029import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
030import org.apache.camel.component.file.GenericFileOperations;
031import org.apache.camel.spi.IdempotentRepository;
032import org.apache.camel.support.ServiceSupport;
033import org.apache.camel.util.CamelLogger;
034import org.apache.camel.util.ObjectHelper;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A file read lock that uses an {@link org.apache.camel.spi.IdempotentRepository} as the lock strategy. This allows to plugin and use existing
040 * idempotent repositories that for example supports clustering. The other read lock strategies that are using marker files or file locks,
041 * are not guaranteed to work in clustered setup with various platform and file systems.
042 */
043public class FileIdempotentRepositoryReadLockStrategy extends ServiceSupport implements GenericFileExclusiveReadLockStrategy<File>, CamelContextAware {
044
045    private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentRepositoryReadLockStrategy.class);
046
047    private GenericFileEndpoint<File> endpoint;
048    private LoggingLevel readLockLoggingLevel = LoggingLevel.DEBUG;
049    private CamelContext camelContext;
050    private IdempotentRepository<String> idempotentRepository;
051    private boolean removeOnRollback = true;
052    private boolean removeOnCommit;
053    private int readLockIdempotentReleaseDelay;
054    private boolean readLockIdempotentReleaseAsync;
055    private int readLockIdempotentReleaseAsyncPoolSize;
056    private ScheduledExecutorService readLockIdempotentReleaseExecutorService;
057    private boolean shutdownExecutorService;
058
059    @Override
060    public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception {
061        this.endpoint = endpoint;
062        LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository, endpoint);
063    }
064
065    @Override
066    public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
067        // in clustered mode then another node may have processed the file so we must check here again if the file exists
068        File path = file.getFile();
069        if (!path.exists()) {
070            return false;
071        }
072
073        // check if we can begin on this file
074        String key = asKey(file);
075        boolean answer = idempotentRepository.add(key);
076        if (!answer) {
077            // another node is processing the file so skip
078            CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock. Will skip the file: " + file);
079        }
080        return answer;
081    }
082
083    @Override
084    public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
085        // noop
086    }
087
088    @Override
089    public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
090        String key = asKey(file);
091        Runnable r = () -> {
092            if (removeOnRollback) {
093                idempotentRepository.remove(key);
094            } else {
095                // okay we should not remove then confirm it instead
096                idempotentRepository.confirm(key);
097            }
098        };
099
100        if (readLockIdempotentReleaseDelay > 0 && readLockIdempotentReleaseExecutorService != null) {
101            LOG.debug("Scheduling readlock release task to run asynchronous delayed after {} millis", readLockIdempotentReleaseDelay);
102            readLockIdempotentReleaseExecutorService.schedule(r, readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS);
103        } else if (readLockIdempotentReleaseDelay > 0) {
104            LOG.debug("Delaying readlock release task {} millis", readLockIdempotentReleaseDelay);
105            Thread.sleep(readLockIdempotentReleaseDelay);
106            r.run();
107        } else {
108            r.run();
109        }
110    }
111
112    @Override
113    public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
114        String key = asKey(file);
115        Runnable r = () -> {
116            if (removeOnCommit) {
117                idempotentRepository.remove(key);
118            } else {
119                // confirm on commit
120                idempotentRepository.confirm(key);
121            }
122        };
123
124        if (readLockIdempotentReleaseDelay > 0 && readLockIdempotentReleaseExecutorService != null) {
125            LOG.debug("Scheduling readlock release task to run asynchronous delayed after {} millis", readLockIdempotentReleaseDelay);
126            readLockIdempotentReleaseExecutorService.schedule(r, readLockIdempotentReleaseDelay, TimeUnit.MILLISECONDS);
127        } else if (readLockIdempotentReleaseDelay > 0) {
128            LOG.debug("Delaying readlock release task {} millis", readLockIdempotentReleaseDelay);
129            Thread.sleep(readLockIdempotentReleaseDelay);
130            r.run();
131        } else {
132            r.run();
133        }
134    }
135
136    public void setTimeout(long timeout) {
137        // noop
138    }
139
140    public void setCheckInterval(long checkInterval) {
141        // noop
142    }
143
144    public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
145        this.readLockLoggingLevel = readLockLoggingLevel;
146    }
147
148    public void setMarkerFiler(boolean markerFile) {
149        // noop
150    }
151
152    public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
153        // noop
154    }
155
156    public CamelContext getCamelContext() {
157        return camelContext;
158    }
159
160    public void setCamelContext(CamelContext camelContext) {
161        this.camelContext = camelContext;
162    }
163
164    /**
165     * The idempotent repository to use as the store for the read locks.
166     */
167    public IdempotentRepository<String> getIdempotentRepository() {
168        return idempotentRepository;
169    }
170
171    /**
172     * The idempotent repository to use as the store for the read locks.
173     */
174    public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) {
175        this.idempotentRepository = idempotentRepository;
176    }
177
178    /**
179     * Whether to remove the file from the idempotent repository when doing a rollback.
180     * <p/>
181     * By default this is true.
182     */
183    public boolean isRemoveOnRollback() {
184        return removeOnRollback;
185    }
186
187    /**
188     * Whether to remove the file from the idempotent repository when doing a rollback.
189     * <p/>
190     * By default this is true.
191     */
192    public void setRemoveOnRollback(boolean removeOnRollback) {
193        this.removeOnRollback = removeOnRollback;
194    }
195
196    /**
197     * Whether to remove the file from the idempotent repository when doing a commit.
198     * <p/>
199     * By default this is false.
200     */
201    public boolean isRemoveOnCommit() {
202        return removeOnCommit;
203    }
204
205    /**
206     * Whether to remove the file from the idempotent repository when doing a commit.
207     * <p/>
208     * By default this is false.
209     */
210    public void setRemoveOnCommit(boolean removeOnCommit) {
211        this.removeOnCommit = removeOnCommit;
212    }
213
214    public int getReadLockIdempotentReleaseDelay() {
215        return readLockIdempotentReleaseDelay;
216    }
217
218    /**
219     * Whether to delay the release task for a period of millis.
220     */
221    public void setReadLockIdempotentReleaseDelay(int readLockIdempotentReleaseDelay) {
222        this.readLockIdempotentReleaseDelay = readLockIdempotentReleaseDelay;
223    }
224
225    public boolean isReadLockIdempotentReleaseAsync() {
226        return readLockIdempotentReleaseAsync;
227    }
228
229    /**
230     * Whether the delayed release task should be synchronous or asynchronous.
231     */
232    public void setReadLockIdempotentReleaseAsync(boolean readLockIdempotentReleaseAsync) {
233        this.readLockIdempotentReleaseAsync = readLockIdempotentReleaseAsync;
234    }
235
236    public int getReadLockIdempotentReleaseAsyncPoolSize() {
237        return readLockIdempotentReleaseAsyncPoolSize;
238    }
239
240    /**
241     * The number of threads in the scheduled thread pool when using asynchronous release tasks.
242     */
243    public void setReadLockIdempotentReleaseAsyncPoolSize(int readLockIdempotentReleaseAsyncPoolSize) {
244        this.readLockIdempotentReleaseAsyncPoolSize = readLockIdempotentReleaseAsyncPoolSize;
245    }
246
247    public ScheduledExecutorService getReadLockIdempotentReleaseExecutorService() {
248        return readLockIdempotentReleaseExecutorService;
249    }
250
251    /**
252     * To use a custom and shared thread pool for asynchronous release tasks.
253     */
254    public void setReadLockIdempotentReleaseExecutorService(ScheduledExecutorService readLockIdempotentReleaseExecutorService) {
255        this.readLockIdempotentReleaseExecutorService = readLockIdempotentReleaseExecutorService;
256    }
257
258    protected String asKey(GenericFile<File> file) {
259        // use absolute file path as default key, but evaluate if an expression key was configured
260        String key = file.getAbsoluteFilePath();
261        if (endpoint.getIdempotentKey() != null) {
262            Exchange dummy = endpoint.createExchange(file);
263            key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
264        }
265        return key;
266    }
267
268    @Override
269    protected void doStart() throws Exception {
270        ObjectHelper.notNull(camelContext, "camelContext", this);
271        ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
272
273        if (readLockIdempotentReleaseAsync && readLockIdempotentReleaseExecutorService == null) {
274            readLockIdempotentReleaseExecutorService = camelContext.getExecutorServiceManager().newScheduledThreadPool(this, "ReadLockIdempotentReleaseTask", readLockIdempotentReleaseAsyncPoolSize);
275            shutdownExecutorService = true;
276        }
277    }
278
279    @Override
280    protected void doStop() throws Exception {
281        if (shutdownExecutorService && readLockIdempotentReleaseExecutorService != null) {
282            camelContext.getExecutorServiceManager().shutdownGraceful(readLockIdempotentReleaseExecutorService, 30000);
283            readLockIdempotentReleaseExecutorService = null;
284        }
285    }
286
287}