001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.component.file.strategy;
018
019import java.io.File;
020
021import org.apache.camel.CamelContext;
022import org.apache.camel.CamelContextAware;
023import org.apache.camel.Exchange;
024import org.apache.camel.LoggingLevel;
025import org.apache.camel.component.file.GenericFile;
026import org.apache.camel.component.file.GenericFileEndpoint;
027import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
028import org.apache.camel.component.file.GenericFileOperations;
029import org.apache.camel.spi.IdempotentRepository;
030import org.apache.camel.support.ServiceSupport;
031import org.apache.camel.util.CamelLogger;
032import org.apache.camel.util.ObjectHelper;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * A file read lock that uses an {@link IdempotentRepository} and {@link FileChangedExclusiveReadLockStrategy changed} as the lock strategy.
038 * This allows to plugin and use existing idempotent repositories that for example supports clustering.
039 * The other read lock strategies that are using marker files or file locks, are not guaranteed to work in clustered setup with various platform and file systems.
040 */
041public class FileIdempotentChangedRepositoryReadLockStrategy extends ServiceSupport implements GenericFileExclusiveReadLockStrategy<File>, CamelContextAware {
042
043    private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentChangedRepositoryReadLockStrategy.class);
044
045    private final FileChangedExclusiveReadLockStrategy changed;
046    private GenericFileEndpoint<File> endpoint;
047    private LoggingLevel readLockLoggingLevel = LoggingLevel.DEBUG;
048    private CamelContext camelContext;
049    private IdempotentRepository<String> idempotentRepository;
050    private boolean removeOnRollback = true;
051    private boolean removeOnCommit;
052
053    public FileIdempotentChangedRepositoryReadLockStrategy() {
054        this.changed = new FileChangedExclusiveReadLockStrategy();
055        // no need to use marker file as idempotent ensures exclusive read-lock
056        this.changed.setMarkerFiler(false);
057        this.changed.setDeleteOrphanLockFiles(false);
058    }
059
060    @Override
061    public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) throws Exception {
062        this.endpoint = endpoint;
063        LOG.info("Using FileIdempotentRepositoryReadLockStrategy: {} on endpoint: {}", idempotentRepository, endpoint);
064
065        changed.prepareOnStartup(operations, endpoint);
066    }
067
068    @Override
069    public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
070        // in clustered mode then another node may have processed the file so we must check here again if the file exists
071        File path = file.getFile();
072        if (!path.exists()) {
073            return false;
074        }
075
076        // check if we can begin on this file
077        String key = asKey(file);
078        boolean answer = idempotentRepository.add(key);
079        if (!answer) {
080            // another node is processing the file so skip
081            CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock. Will skip the file: " + file);
082        }
083
084        if (answer) {
085            // if we acquired during idempotent then check changed also
086            answer = changed.acquireExclusiveReadLock(operations, file, exchange);
087            if (!answer) {
088                // remove from idempontent as we did not acquire it from changed
089                idempotentRepository.remove(key);
090            }
091        }
092        return answer;
093    }
094
095    @Override
096    public void releaseExclusiveReadLockOnAbort(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
097        changed.releaseExclusiveReadLockOnAbort(operations, file, exchange);
098    }
099
100    @Override
101    public void releaseExclusiveReadLockOnRollback(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
102        String key = asKey(file);
103        if (removeOnRollback) {
104            idempotentRepository.remove(key);
105        } else {
106            // okay we should not remove then confirm it instead
107            idempotentRepository.confirm(key);
108        }
109
110        changed.releaseExclusiveReadLockOnRollback(operations, file, exchange);
111    }
112
113    @Override
114    public void releaseExclusiveReadLockOnCommit(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
115        String key = asKey(file);
116        if (removeOnCommit) {
117            idempotentRepository.remove(key);
118        } else {
119            // confirm on commit
120            idempotentRepository.confirm(key);
121        }
122
123        changed.releaseExclusiveReadLockOnCommit(operations, file, exchange);
124    }
125
126    public void setTimeout(long timeout) {
127        changed.setTimeout(timeout);
128    }
129
130    public void setCheckInterval(long checkInterval) {
131        changed.setCheckInterval(checkInterval);
132    }
133
134    public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
135        this.readLockLoggingLevel = readLockLoggingLevel;
136        changed.setReadLockLoggingLevel(readLockLoggingLevel);
137    }
138
139    public void setMarkerFiler(boolean markerFile) {
140        // we do not use marker files
141    }
142
143    public void setDeleteOrphanLockFiles(boolean deleteOrphanLockFiles) {
144        // we do not use marker files
145    }
146
147    public void setMinLength(long minLength) {
148        changed.setMinLength(minLength);
149    }
150
151    public void setMinAge(long minAge) {
152        changed.setMinAge(minAge);
153    }
154
155    public CamelContext getCamelContext() {
156        return camelContext;
157    }
158
159    public void setCamelContext(CamelContext camelContext) {
160        this.camelContext = camelContext;
161    }
162
163    /**
164     * The idempotent repository to use as the store for the read locks.
165     */
166    public IdempotentRepository<String> getIdempotentRepository() {
167        return idempotentRepository;
168    }
169
170    /**
171     * The idempotent repository to use as the store for the read locks.
172     */
173    public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) {
174        this.idempotentRepository = idempotentRepository;
175    }
176
177    /**
178     * Whether to remove the file from the idempotent repository when doing a rollback.
179     * <p/>
180     * By default this is true.
181     */
182    public boolean isRemoveOnRollback() {
183        return removeOnRollback;
184    }
185
186    /**
187     * Whether to remove the file from the idempotent repository when doing a rollback.
188     * <p/>
189     * By default this is true.
190     */
191    public void setRemoveOnRollback(boolean removeOnRollback) {
192        this.removeOnRollback = removeOnRollback;
193    }
194
195    /**
196     * Whether to remove the file from the idempotent repository when doing a commit.
197     * <p/>
198     * By default this is false.
199     */
200    public boolean isRemoveOnCommit() {
201        return removeOnCommit;
202    }
203
204    /**
205     * Whether to remove the file from the idempotent repository when doing a commit.
206     * <p/>
207     * By default this is false.
208     */
209    public void setRemoveOnCommit(boolean removeOnCommit) {
210        this.removeOnCommit = removeOnCommit;
211    }
212
213    protected String asKey(GenericFile<File> file) {
214        // use absolute file path as default key, but evaluate if an expression key was configured
215        String key = file.getAbsoluteFilePath();
216        if (endpoint.getIdempotentKey() != null) {
217            Exchange dummy = endpoint.createExchange(file);
218            key = endpoint.getIdempotentKey().evaluate(dummy, String.class);
219        }
220        return key;
221    }
222
223    @Override
224    protected void doStart() throws Exception {
225        ObjectHelper.notNull(camelContext, "camelContext", this);
226        ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this);
227
228        // ensure the idempotent repository is added as a service so CamelContext will stop the repo when it shutdown itself
229        camelContext.addService(idempotentRepository, true);
230    }
231
232    @Override
233    protected void doStop() throws Exception {
234        // noop
235    }
236
237}