001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor.idempotent;
018
019import java.io.File;
020import java.io.FileOutputStream;
021import java.io.IOException;
022import java.util.Map;
023import java.util.Scanner;
024import java.util.concurrent.atomic.AtomicBoolean;
025
026import org.apache.camel.api.management.ManagedAttribute;
027import org.apache.camel.api.management.ManagedOperation;
028import org.apache.camel.api.management.ManagedResource;
029import org.apache.camel.spi.IdempotentRepository;
030import org.apache.camel.support.ServiceSupport;
031import org.apache.camel.util.FileUtil;
032import org.apache.camel.util.IOHelper;
033import org.apache.camel.util.LRUCache;
034import org.apache.camel.util.ObjectHelper;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
040 * <p/>
041 * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
042 * memory leak.
043 *
044 * @version 
045 */
046@ManagedResource(description = "File based idempotent repository")
047public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
048    private static final Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class);
049    private static final String STORE_DELIMITER = "\n";
050    private Map<String, Object> cache;
051    private File fileStore;
052    private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
053    private AtomicBoolean init = new AtomicBoolean();
054
055    public FileIdempotentRepository() {
056        // default use a 1st level cache 
057        this.cache = new LRUCache<String, Object>(1000);
058    }
059
060    public FileIdempotentRepository(File fileStore, Map<String, Object> set) {
061        this.fileStore = fileStore;
062        this.cache = set;
063    }
064
065    /**
066     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
067     * as 1st level cache with a default of 1000 entries in the cache.
068     *
069     * @param fileStore  the file store
070     */
071    public static IdempotentRepository<String> fileIdempotentRepository(File fileStore) {
072        return fileIdempotentRepository(fileStore, 1000);
073    }
074
075    /**
076     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
077     * as 1st level cache.
078     *
079     * @param fileStore  the file store
080     * @param cacheSize  the cache size
081     */
082    public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize) {
083        return fileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
084    }
085
086    /**
087     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
088     * as 1st level cache.
089     *
090     * @param fileStore  the file store
091     * @param cacheSize  the cache size
092     * @param maxFileStoreSize  the max size in bytes for the filestore file 
093     */
094    public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize, long maxFileStoreSize) {
095        FileIdempotentRepository repository = new FileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
096        repository.setMaxFileStoreSize(maxFileStoreSize);
097        return repository;
098    }
099
100    /**
101     * Creates a new file based repository using the given {@link java.util.Map}
102     * as 1st level cache.
103     * <p/>
104     * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
105     * memory leak.
106     *
107     * @param store  the file store
108     * @param cache  the cache to use as 1st level cache
109     */
110    public static IdempotentRepository<String> fileIdempotentRepository(File store, Map<String, Object> cache) {
111        return new FileIdempotentRepository(store, cache);
112    }
113
114    @ManagedOperation(description = "Adds the key to the store")
115    public boolean add(String key) {
116        synchronized (cache) {
117            if (cache.containsKey(key)) {
118                return false;
119            } else {
120                cache.put(key, key);
121                if (fileStore.length() < maxFileStoreSize) {
122                    // just append to store
123                    appendToStore(key);
124                } else {
125                    // trunk store and flush the cache
126                    trunkStore();
127                }
128
129                return true;
130            }
131        }
132    }
133
134    @ManagedOperation(description = "Does the store contain the given key")
135    public boolean contains(String key) {
136        synchronized (cache) {
137            return cache.containsKey(key);
138        }
139    }
140
141    @ManagedOperation(description = "Remove the key from the store")
142    public boolean remove(String key) {
143        boolean answer;
144        synchronized (cache) {
145            answer = cache.remove(key) != null;
146            // trunk store and flush the cache on remove
147            trunkStore();
148        }
149        return answer;
150    }
151
152    public boolean confirm(String key) {
153        // noop
154        return true;
155    }
156    
157    @ManagedOperation(description = "Clear the store")
158    public void clear() {
159        synchronized (cache) {
160            cache.clear();
161        }        
162    }
163
164    public File getFileStore() {
165        return fileStore;
166    }
167
168    public void setFileStore(File fileStore) {
169        this.fileStore = fileStore;
170    }
171
172    @ManagedAttribute(description = "The file path for the store")
173    public String getFilePath() {
174        return fileStore.getPath();
175    }
176
177    public Map<String, Object> getCache() {
178        return cache;
179    }
180
181    public void setCache(Map<String, Object> cache) {
182        this.cache = cache;
183    }
184
185    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
186    public long getMaxFileStoreSize() {
187        return maxFileStoreSize;
188    }
189
190    /**
191     * Sets the maximum file size for the file store in bytes.
192     * <p/>
193     * The default is 1mb.
194     */
195    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
196    public void setMaxFileStoreSize(long maxFileStoreSize) {
197        this.maxFileStoreSize = maxFileStoreSize;
198    }
199
200    /**
201     * Sets the cache size
202     */
203    public void setCacheSize(int size) {
204        if (cache != null) {
205            cache.clear();
206        }
207        cache = new LRUCache<String, Object>(size);
208    }
209
210    @ManagedAttribute(description = "The current cache size")
211    public int getCacheSize() {
212        if (cache != null) {
213            return cache.size();
214        }
215        return 0;
216    }
217
218    /**
219     * Reset and clears the store to force it to reload from file
220     */
221    @ManagedOperation(description = "Reset and reloads the file store")
222    public synchronized void reset() throws IOException {
223        synchronized (cache) {
224            // trunk and clear, before we reload the store
225            trunkStore();
226            cache.clear();
227            loadStore();
228        }
229    }
230
231    /**
232     * Appends the given message id to the file store
233     *
234     * @param messageId  the message id
235     */
236    protected void appendToStore(final String messageId) {
237        LOG.debug("Appending {} to idempotent filestore: {}", messageId, fileStore);
238        FileOutputStream fos = null;
239        try {
240            // create store parent directory if missing
241            File storeParentDirectory = fileStore.getParentFile();
242            if (storeParentDirectory != null && !storeParentDirectory.exists()) {
243                LOG.info("Parent directory of file store {} doesn't exist. Creating.", fileStore);
244                if (fileStore.getParentFile().mkdirs()) {
245                    LOG.info("Parent directory of file store {} successfully created.", fileStore);
246                } else {
247                    LOG.warn("Parent directory of file store {} cannot be created.", fileStore);
248                }
249            }
250            // create store if missing
251            if (!fileStore.exists()) {
252                FileUtil.createNewFile(fileStore);
253            }
254            // append to store
255            fos = new FileOutputStream(fileStore, true);
256            fos.write(messageId.getBytes());
257            fos.write(STORE_DELIMITER.getBytes());
258        } catch (IOException e) {
259            throw ObjectHelper.wrapRuntimeCamelException(e);
260        } finally {
261            IOHelper.close(fos, "Appending to file idempotent repository", LOG);
262        }
263    }
264
265    /**
266     * Trunks the file store when the max store size is hit by rewriting the 1st level cache
267     * to the file store.
268     */
269    protected void trunkStore() {
270        LOG.info("Trunking idempotent filestore: {}", fileStore);
271        FileOutputStream fos = null;
272        try {
273            fos = new FileOutputStream(fileStore);
274            for (String key : cache.keySet()) {
275                fos.write(key.getBytes());
276                fos.write(STORE_DELIMITER.getBytes());
277            }
278        } catch (IOException e) {
279            throw ObjectHelper.wrapRuntimeCamelException(e);
280        } finally {
281            IOHelper.close(fos, "Trunking file idempotent repository", LOG);
282        }
283    }
284
285    /**
286     * Loads the given file store into the 1st level cache
287     */
288    protected void loadStore() throws IOException {
289        // auto create starting directory if needed
290        if (!fileStore.exists()) {
291            LOG.debug("Creating filestore: {}", fileStore);
292            File parent = fileStore.getParentFile();
293            if (parent != null) {
294                parent.mkdirs();
295            }
296            boolean created = FileUtil.createNewFile(fileStore);
297            if (!created) {
298                throw new IOException("Cannot create filestore: " + fileStore);
299            }
300        }
301
302        LOG.trace("Loading to 1st level cache from idempotent filestore: {}", fileStore);
303
304        cache.clear();
305        Scanner scanner = null;
306        try {
307            scanner = new Scanner(fileStore);
308            scanner.useDelimiter(STORE_DELIMITER);
309            while (scanner.hasNextLine()) {
310                String line = scanner.nextLine();
311                cache.put(line, line);
312            }
313        } catch (IOException e) {
314            throw ObjectHelper.wrapRuntimeCamelException(e);
315        } finally {
316            if (scanner != null) {
317                scanner.close();
318            }
319        }
320
321        LOG.debug("Loaded {} to the 1st level cache from idempotent filestore: {}", cache.size(), fileStore);
322    }
323
324    @Override
325    protected void doStart() throws Exception {
326        ObjectHelper.notNull(fileStore, "fileStore", this);
327
328        // init store if not loaded before
329        if (init.compareAndSet(false, true)) {
330            loadStore();
331        }
332    }
333
334    @Override
335    protected void doStop() throws Exception {
336        // reset will trunk and clear the cache
337        trunkStore();
338        cache.clear();
339        init.set(false);
340    }
341
342}