001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor.idempotent;
018
019import java.io.File;
020import java.io.FileOutputStream;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.Scanner;
026import java.util.concurrent.atomic.AtomicBoolean;
027
028import org.apache.camel.api.management.ManagedAttribute;
029import org.apache.camel.api.management.ManagedOperation;
030import org.apache.camel.api.management.ManagedResource;
031import org.apache.camel.spi.IdempotentRepository;
032import org.apache.camel.support.ServiceSupport;
033import org.apache.camel.util.FileUtil;
034import org.apache.camel.util.IOHelper;
035import org.apache.camel.util.LRUCache;
036import org.apache.camel.util.LRUCacheFactory;
037import org.apache.camel.util.ObjectHelper;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
043 * <p/>
044 * This implementation provides a 1st-level in-memory {@link LRUCache} for fast check of the most
045 * frequently used keys. When {@link #add(String)} or {@link #contains(String)} methods are being used
046 * then in case of 1st-level cache miss, the underlying file is scanned which may cost additional performance.
047 * So try to find the right balance of the size of the 1st-level cache, the default size is 1000.
048 * The file store has a maximum capacity of 32mb by default (you can turn this off and have unlimited size).
049 * If the file store grows bigger than the maximum capacity, then the {@link #getDropOldestFileStore()} (is default 1000)
050 * number of entries from the file store is dropped to reduce the file store and make room for newer entries.
051 *
052 * @version 
053 */
054@ManagedResource(description = "File based idempotent repository")
055public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
056    private static final Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class);
057    private static final String STORE_DELIMITER = "\n";
058
059    private final AtomicBoolean init = new AtomicBoolean();
060
061    private Map<String, Object> cache;
062    private File fileStore;
063    private long maxFileStoreSize = 32 * 1024 * 1000L; // 32mb store file
064    private long dropOldestFileStore = 1000;
065
066    public FileIdempotentRepository() {
067    }
068
069    public FileIdempotentRepository(File fileStore, Map<String, Object> set) {
070        this.fileStore = fileStore;
071        this.cache = set;
072    }
073
074    /**
075     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
076     * as 1st level cache with a default of 1000 entries in the cache.
077     *
078     * @param fileStore  the file store
079     */
080    public static IdempotentRepository<String> fileIdempotentRepository(File fileStore) {
081        return fileIdempotentRepository(fileStore, 1000);
082    }
083
084    /**
085     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
086     * as 1st level cache.
087     *
088     * @param fileStore  the file store
089     * @param cacheSize  the cache size
090     */
091    @SuppressWarnings("unchecked")
092    public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize) {
093        return fileIdempotentRepository(fileStore, LRUCacheFactory.newLRUCache(cacheSize));
094    }
095
096    /**
097     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
098     * as 1st level cache.
099     *
100     * @param fileStore  the file store
101     * @param cacheSize  the cache size
102     * @param maxFileStoreSize  the max size in bytes for the filestore file 
103     */
104    @SuppressWarnings("unchecked")
105    public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize, long maxFileStoreSize) {
106        FileIdempotentRepository repository = new FileIdempotentRepository(fileStore, LRUCacheFactory.newLRUCache(cacheSize));
107        repository.setMaxFileStoreSize(maxFileStoreSize);
108        return repository;
109    }
110
111    /**
112     * Creates a new file based repository using the given {@link java.util.Map}
113     * as 1st level cache.
114     * <p/>
115     * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
116     * memory leak.
117     *
118     * @param store  the file store
119     * @param cache  the cache to use as 1st level cache
120     */
121    public static IdempotentRepository<String> fileIdempotentRepository(File store, Map<String, Object> cache) {
122        return new FileIdempotentRepository(store, cache);
123    }
124
125    @ManagedOperation(description = "Adds the key to the store")
126    public boolean add(String key) {
127        synchronized (cache) {
128            if (cache.containsKey(key)) {
129                return false;
130            } else {
131                // always register the most used keys in the LRUCache
132                cache.put(key, key);
133
134                // now check the file store
135                boolean containsInFile = containsStore(key);
136                if (containsInFile) {
137                    return false;
138                }
139
140                // its a new key so append to file store
141                appendToStore(key);
142
143                // check if we hit maximum capacity (if enabled) and report a warning about this
144                if (maxFileStoreSize > 0 && fileStore.length() > maxFileStoreSize) {
145                    LOG.warn("Maximum capacity of file store: {} hit at {} bytes. Dropping {} oldest entries from the file store", fileStore, maxFileStoreSize, dropOldestFileStore);
146                    trunkStore();
147                }
148
149                return true;
150            }
151        }
152    }
153
154    @ManagedOperation(description = "Does the store contain the given key")
155    public boolean contains(String key) {
156        synchronized (cache) {
157            // check 1st-level first and then fallback to check the actual file
158            return cache.containsKey(key) || containsStore(key);
159        }
160    }
161
162    @ManagedOperation(description = "Remove the key from the store")
163    public boolean remove(String key) {
164        boolean answer;
165        synchronized (cache) {
166            answer = cache.remove(key) != null;
167            // remove from file cache also
168            removeFromStore(key);
169        }
170        return answer;
171    }
172
173    public boolean confirm(String key) {
174        // noop
175        return true;
176    }
177    
178    @ManagedOperation(description = "Clear the store (danger this removes all entries)")
179    public void clear() {
180        synchronized (cache) {
181            cache.clear();
182            if (cache instanceof LRUCache) {
183                ((LRUCache) cache).cleanUp();
184            }
185            // clear file store
186            clearStore();
187        }
188    }
189
190    public File getFileStore() {
191        return fileStore;
192    }
193
194    public void setFileStore(File fileStore) {
195        this.fileStore = fileStore;
196    }
197
198    @ManagedAttribute(description = "The file path for the store")
199    public String getFilePath() {
200        return fileStore.getPath();
201    }
202
203    public Map<String, Object> getCache() {
204        return cache;
205    }
206
207    public void setCache(Map<String, Object> cache) {
208        this.cache = cache;
209    }
210
211    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
212    public long getMaxFileStoreSize() {
213        return maxFileStoreSize;
214    }
215
216    /**
217     * Sets the maximum file size for the file store in bytes.
218     * You can set the value to 0 or negative to turn this off, and have unlimited file store size.
219     * <p/>
220     * The default is 32mb.
221     */
222    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
223    public void setMaxFileStoreSize(long maxFileStoreSize) {
224        this.maxFileStoreSize = maxFileStoreSize;
225    }
226
227    public long getDropOldestFileStore() {
228        return dropOldestFileStore;
229    }
230
231    /**
232     * Sets the number of oldest entries to drop from the file store when the maximum capacity is hit to reduce
233     * disk space to allow room for new entries.
234     * <p/>
235     * The default is 1000.
236     */
237    @ManagedAttribute(description = "Number of oldest elements to drop from file store if maximum file size reached")
238    public void setDropOldestFileStore(long dropOldestFileStore) {
239        this.dropOldestFileStore = dropOldestFileStore;
240    }
241
242    /**
243     * Sets the 1st-level cache size.
244     *
245     * Setting cache size is only possible when using the default {@link LRUCache} cache implementation.
246     */
247    @SuppressWarnings("unchecked")
248    public void setCacheSize(int size) {
249        if (cache != null && !(cache instanceof LRUCache)) {
250            throw new IllegalArgumentException("Setting cache size is only possible when using the default LRUCache cache implementation");
251        }
252        if (cache != null) {
253            cache.clear();
254        }
255        cache = LRUCacheFactory.newLRUCache(size);
256    }
257
258    @ManagedAttribute(description = "The current 1st-level cache size")
259    public int getCacheSize() {
260        if (cache != null) {
261            return cache.size();
262        }
263        return 0;
264    }
265
266    /**
267     * Reset and clears the 1st-level cache to force it to reload from file
268     */
269    @ManagedOperation(description = "Reset and reloads the file store")
270    public synchronized void reset() throws IOException {
271        synchronized (cache) {
272            // run the cleanup task first
273            if (cache instanceof LRUCache) {
274                ((LRUCache) cache).cleanUp();
275            }
276            cache.clear();
277            loadStore();
278        }
279    }
280
281    /**
282     * Checks the file store if the key exists
283     *
284     * @param key  the key
285     * @return <tt>true</tt> if exists in the file, <tt>false</tt> otherwise
286     */
287    protected boolean containsStore(final String key) {
288        if (fileStore == null || !fileStore.exists()) {
289            return false;
290        }
291
292        Scanner scanner = null;
293        try {
294            scanner = new Scanner(fileStore);
295            scanner.useDelimiter(STORE_DELIMITER);
296            while (scanner.hasNextLine()) {
297                String line = scanner.nextLine();
298                if (line.equals(key)) {
299                    return true;
300                }
301            }
302        } catch (IOException e) {
303            throw ObjectHelper.wrapRuntimeCamelException(e);
304        } finally {
305            if (scanner != null) {
306                scanner.close();
307            }
308        }
309        return false;
310    }
311
312    /**
313     * Appends the given key to the file store
314     *
315     * @param key  the key
316     */
317    protected void appendToStore(final String key) {
318        LOG.debug("Appending: {} to idempotent filestore: {}", key, fileStore);
319        FileOutputStream fos = null;
320        try {
321            // create store parent directory if missing
322            File storeParentDirectory = fileStore.getParentFile();
323            if (storeParentDirectory != null && !storeParentDirectory.exists()) {
324                LOG.info("Parent directory of file store {} doesn't exist. Creating.", fileStore);
325                if (fileStore.getParentFile().mkdirs()) {
326                    LOG.info("Parent directory of filestore: {} successfully created.", fileStore);
327                } else {
328                    LOG.warn("Parent directory of filestore: {} cannot be created.", fileStore);
329                }
330            }
331            // create store if missing
332            if (!fileStore.exists()) {
333                FileUtil.createNewFile(fileStore);
334            }
335            // append to store
336            fos = new FileOutputStream(fileStore, true);
337            fos.write(key.getBytes());
338            fos.write(STORE_DELIMITER.getBytes());
339        } catch (IOException e) {
340            throw ObjectHelper.wrapRuntimeCamelException(e);
341        } finally {
342            IOHelper.close(fos, "Appending to file idempotent repository", LOG);
343        }
344    }
345
346    protected synchronized void removeFromStore(String key) {
347        LOG.debug("Removing: {} from idempotent filestore: {}", key, fileStore);
348
349        // we need to re-load the entire file and remove the key and then re-write the file
350        List<String> lines = new ArrayList<>();
351
352        boolean found = false;
353        Scanner scanner = null;
354        try {
355            scanner = new Scanner(fileStore);
356            scanner.useDelimiter(STORE_DELIMITER);
357            while (scanner.hasNextLine()) {
358                String line = scanner.nextLine();
359                if (key.equals(line)) {
360                    found = true;
361                } else {
362                    lines.add(line);
363                }
364            }
365        } catch (IOException e) {
366            throw ObjectHelper.wrapRuntimeCamelException(e);
367        } finally {
368            if (scanner != null) {
369                scanner.close();
370            }
371        }
372
373        if (found) {
374            // rewrite file
375            LOG.debug("Rewriting idempotent filestore: {} due to key: {} removed", fileStore, key);
376            FileOutputStream fos = null;
377            try {
378                fos = new FileOutputStream(fileStore);
379                for (String line : lines) {
380                    fos.write(line.getBytes());
381                    fos.write(STORE_DELIMITER.getBytes());
382                }
383            } catch (IOException e) {
384                throw ObjectHelper.wrapRuntimeCamelException(e);
385            } finally {
386                IOHelper.close(fos, "Rewriting file idempotent repository", LOG);
387            }
388        }
389    }
390
391    /**
392     * Clears the file-store (danger this deletes all entries)
393     */
394    protected void clearStore() {
395        try {
396            FileUtil.deleteFile(fileStore);
397            FileUtil.createNewFile(fileStore);
398        } catch (IOException e) {
399            throw ObjectHelper.wrapRuntimeCamelException(e);
400        }
401    }
402
403    /**
404     * Trunks the file store when the max store size is hit by dropping the most oldest entries.
405     */
406    protected synchronized void trunkStore() {
407        if (fileStore == null || !fileStore.exists()) {
408            return;
409        }
410
411        LOG.debug("Trunking: {} oldest entries from idempotent filestore: {}", dropOldestFileStore, fileStore);
412
413        // we need to re-load the entire file and remove the key and then re-write the file
414        List<String> lines = new ArrayList<>();
415
416        Scanner scanner = null;
417        int count = 0;
418        try {
419            scanner = new Scanner(fileStore);
420            scanner.useDelimiter(STORE_DELIMITER);
421            while (scanner.hasNextLine()) {
422                String line = scanner.nextLine();
423                count++;
424                if (count > dropOldestFileStore) {
425                    lines.add(line);
426                }
427            }
428        } catch (IOException e) {
429            throw ObjectHelper.wrapRuntimeCamelException(e);
430        } finally {
431            if (scanner != null) {
432                scanner.close();
433            }
434        }
435
436        if (!lines.isEmpty()) {
437            // rewrite file
438            LOG.debug("Rewriting idempotent filestore: {} with {} entries:", fileStore, lines.size());
439            FileOutputStream fos = null;
440            try {
441                fos = new FileOutputStream(fileStore);
442                for (String line : lines) {
443                    fos.write(line.getBytes());
444                    fos.write(STORE_DELIMITER.getBytes());
445                }
446            } catch (IOException e) {
447                throw ObjectHelper.wrapRuntimeCamelException(e);
448            } finally {
449                IOHelper.close(fos, "Rewriting file idempotent repository", LOG);
450            }
451        } else {
452            // its a small file so recreate the file
453            LOG.debug("Clearing idempotent filestore: {}", fileStore);
454            clearStore();
455        }
456    }
457
458    /**
459     * Cleanup the 1st-level cache.
460     */
461    protected void cleanup() {
462        // run the cleanup task first
463        if (cache instanceof LRUCache) {
464            ((LRUCache) cache).cleanUp();
465        }
466    }
467
468    /**
469     * Loads the given file store into the 1st level cache
470     */
471    protected void loadStore() throws IOException {
472        // auto create starting directory if needed
473        if (!fileStore.exists()) {
474            LOG.debug("Creating filestore: {}", fileStore);
475            File parent = fileStore.getParentFile();
476            if (parent != null) {
477                parent.mkdirs();
478            }
479            boolean created = FileUtil.createNewFile(fileStore);
480            if (!created) {
481                throw new IOException("Cannot create filestore: " + fileStore);
482            }
483        }
484
485        LOG.trace("Loading to 1st level cache from idempotent filestore: {}", fileStore);
486
487        cache.clear();
488        Scanner scanner = null;
489        try {
490            scanner = new Scanner(fileStore);
491            scanner.useDelimiter(STORE_DELIMITER);
492            while (scanner.hasNextLine()) {
493                String line = scanner.nextLine();
494                cache.put(line, line);
495            }
496        } catch (IOException e) {
497            throw ObjectHelper.wrapRuntimeCamelException(e);
498        } finally {
499            if (scanner != null) {
500                scanner.close();
501            }
502        }
503
504        LOG.debug("Loaded {} to the 1st level cache from idempotent filestore: {}", cache.size(), fileStore);
505    }
506
507    @Override
508    @SuppressWarnings("unchecked")
509    protected void doStart() throws Exception {
510        ObjectHelper.notNull(fileStore, "fileStore", this);
511
512        if (this.cache == null) {
513            // default use a 1st level cache
514            this.cache = LRUCacheFactory.newLRUCache(1000);
515        }
516
517        // init store if not loaded before
518        if (init.compareAndSet(false, true)) {
519            loadStore();
520        }
521    }
522
523    @Override
524    protected void doStop() throws Exception {
525        // run the cleanup task first
526        if (cache instanceof LRUCache) {
527            ((LRUCache) cache).cleanUp();
528        }
529
530        cache.clear();
531        init.set(false);
532    }
533
534}