001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor.idempotent;
018
019import java.io.File;
020import java.io.FileOutputStream;
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.List;
024import java.util.Map;
025import java.util.concurrent.atomic.AtomicBoolean;
026
027import org.apache.camel.api.management.ManagedAttribute;
028import org.apache.camel.api.management.ManagedOperation;
029import org.apache.camel.api.management.ManagedResource;
030import org.apache.camel.spi.IdempotentRepository;
031import org.apache.camel.support.ServiceSupport;
032import org.apache.camel.util.FileUtil;
033import org.apache.camel.util.IOHelper;
034import org.apache.camel.util.LRUCache;
035import org.apache.camel.util.LRUCacheFactory;
036import org.apache.camel.util.ObjectHelper;
037import org.apache.camel.util.Scanner;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041/**
042 * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
043 * <p/>
044 * This implementation provides a 1st-level in-memory {@link LRUCache} for fast check of the most
045 * frequently used keys. When {@link #add(String)} or {@link #contains(String)} methods are being used
046 * then in case of 1st-level cache miss, the underlying file is scanned which may cost additional performance.
047 * So try to find the right balance of the size of the 1st-level cache, the default size is 1000.
048 * The file store has a maximum capacity of 32mb by default (you can turn this off and have unlimited size).
049 * If the file store grows bigger than the maximum capacity, then the {@link #getDropOldestFileStore()} (is default 1000)
050 * number of entries from the file store is dropped to reduce the file store and make room for newer entries.
051 *
052 * @version 
053 */
054@ManagedResource(description = "File based idempotent repository")
055public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
056    private static final Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class);
057    private static final String STORE_DELIMITER = "\n";
058
059    private final AtomicBoolean init = new AtomicBoolean();
060
061    private Map<String, Object> cache;
062    private File fileStore;
063    private long maxFileStoreSize = 32 * 1024 * 1000L; // 32mb store file
064    private long dropOldestFileStore = 1000;
065
066    public FileIdempotentRepository() {
067    }
068
069    public FileIdempotentRepository(File fileStore, Map<String, Object> set) {
070        this.fileStore = fileStore;
071        this.cache = set;
072    }
073
074    /**
075     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
076     * as 1st level cache with a default of 1000 entries in the cache.
077     *
078     * @param fileStore  the file store
079     */
080    public static IdempotentRepository<String> fileIdempotentRepository(File fileStore) {
081        return fileIdempotentRepository(fileStore, 1000);
082    }
083
084    /**
085     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
086     * as 1st level cache.
087     *
088     * @param fileStore  the file store
089     * @param cacheSize  the cache size
090     */
091    @SuppressWarnings("unchecked")
092    public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize) {
093        return fileIdempotentRepository(fileStore, LRUCacheFactory.newLRUCache(cacheSize));
094    }
095
096    /**
097     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
098     * as 1st level cache.
099     *
100     * @param fileStore  the file store
101     * @param cacheSize  the cache size
102     * @param maxFileStoreSize  the max size in bytes for the filestore file 
103     */
104    @SuppressWarnings("unchecked")
105    public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize, long maxFileStoreSize) {
106        FileIdempotentRepository repository = new FileIdempotentRepository(fileStore, LRUCacheFactory.newLRUCache(cacheSize));
107        repository.setMaxFileStoreSize(maxFileStoreSize);
108        return repository;
109    }
110
111    /**
112     * Creates a new file based repository using the given {@link java.util.Map}
113     * as 1st level cache.
114     * <p/>
115     * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
116     * memory leak.
117     *
118     * @param store  the file store
119     * @param cache  the cache to use as 1st level cache
120     */
121    public static IdempotentRepository<String> fileIdempotentRepository(File store, Map<String, Object> cache) {
122        return new FileIdempotentRepository(store, cache);
123    }
124
125    @ManagedOperation(description = "Adds the key to the store")
126    public boolean add(String key) {
127        synchronized (cache) {
128            if (cache.containsKey(key)) {
129                return false;
130            } else {
131                // always register the most used keys in the LRUCache
132                cache.put(key, key);
133
134                // now check the file store
135                boolean containsInFile = containsStore(key);
136                if (containsInFile) {
137                    return false;
138                }
139
140                // its a new key so append to file store
141                appendToStore(key);
142
143                // check if we hit maximum capacity (if enabled) and report a warning about this
144                if (maxFileStoreSize > 0 && fileStore.length() > maxFileStoreSize) {
145                    LOG.warn("Maximum capacity of file store: {} hit at {} bytes. Dropping {} oldest entries from the file store", fileStore, maxFileStoreSize, dropOldestFileStore);
146                    trunkStore();
147                }
148
149                return true;
150            }
151        }
152    }
153
154    @ManagedOperation(description = "Does the store contain the given key")
155    public boolean contains(String key) {
156        synchronized (cache) {
157            // check 1st-level first and then fallback to check the actual file
158            return cache.containsKey(key) || containsStore(key);
159        }
160    }
161
162    @ManagedOperation(description = "Remove the key from the store")
163    public boolean remove(String key) {
164        boolean answer;
165        synchronized (cache) {
166            answer = cache.remove(key) != null;
167            // remove from file cache also
168            removeFromStore(key);
169        }
170        return answer;
171    }
172
173    public boolean confirm(String key) {
174        // noop
175        return true;
176    }
177    
178    @ManagedOperation(description = "Clear the store (danger this removes all entries)")
179    public void clear() {
180        synchronized (cache) {
181            cache.clear();
182            if (cache instanceof LRUCache) {
183                ((LRUCache) cache).cleanUp();
184            }
185            // clear file store
186            clearStore();
187        }
188    }
189
190    public File getFileStore() {
191        return fileStore;
192    }
193
194    public void setFileStore(File fileStore) {
195        this.fileStore = fileStore;
196    }
197
198    @ManagedAttribute(description = "The file path for the store")
199    public String getFilePath() {
200        return fileStore.getPath();
201    }
202
203    public Map<String, Object> getCache() {
204        return cache;
205    }
206
207    public void setCache(Map<String, Object> cache) {
208        this.cache = cache;
209    }
210
211    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
212    public long getMaxFileStoreSize() {
213        return maxFileStoreSize;
214    }
215
216    /**
217     * Sets the maximum file size for the file store in bytes.
218     * You can set the value to 0 or negative to turn this off, and have unlimited file store size.
219     * <p/>
220     * The default is 32mb.
221     */
222    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
223    public void setMaxFileStoreSize(long maxFileStoreSize) {
224        this.maxFileStoreSize = maxFileStoreSize;
225    }
226
227    public long getDropOldestFileStore() {
228        return dropOldestFileStore;
229    }
230
231    /**
232     * Sets the number of oldest entries to drop from the file store when the maximum capacity is hit to reduce
233     * disk space to allow room for new entries.
234     * <p/>
235     * The default is 1000.
236     */
237    @ManagedAttribute(description = "Number of oldest elements to drop from file store if maximum file size reached")
238    public void setDropOldestFileStore(long dropOldestFileStore) {
239        this.dropOldestFileStore = dropOldestFileStore;
240    }
241
242    /**
243     * Sets the 1st-level cache size.
244     *
245     * Setting cache size is only possible when using the default {@link LRUCache} cache implementation.
246     */
247    @SuppressWarnings("unchecked")
248    public void setCacheSize(int size) {
249        if (cache != null && !(cache instanceof LRUCache)) {
250            throw new IllegalArgumentException("Setting cache size is only possible when using the default LRUCache cache implementation");
251        }
252        if (cache != null) {
253            cache.clear();
254        }
255        cache = LRUCacheFactory.newLRUCache(size);
256    }
257
258    @ManagedAttribute(description = "The current 1st-level cache size")
259    public int getCacheSize() {
260        if (cache != null) {
261            return cache.size();
262        }
263        return 0;
264    }
265
266    /**
267     * Reset and clears the 1st-level cache to force it to reload from file
268     */
269    @ManagedOperation(description = "Reset and reloads the file store")
270    public synchronized void reset() throws IOException {
271        synchronized (cache) {
272            // run the cleanup task first
273            if (cache instanceof LRUCache) {
274                ((LRUCache) cache).cleanUp();
275            }
276            cache.clear();
277            loadStore();
278        }
279    }
280
281    /**
282     * Checks the file store if the key exists
283     *
284     * @param key  the key
285     * @return <tt>true</tt> if exists in the file, <tt>false</tt> otherwise
286     */
287    protected boolean containsStore(final String key) {
288        if (fileStore == null || !fileStore.exists()) {
289            return false;
290        }
291
292        try (Scanner scanner = new Scanner(fileStore, null, STORE_DELIMITER)) {
293            while (scanner.hasNext()) {
294                String line = scanner.next();
295                if (line.equals(key)) {
296                    return true;
297                }
298            }
299        } catch (IOException e) {
300            throw ObjectHelper.wrapRuntimeCamelException(e);
301        }
302        return false;
303    }
304
305    /**
306     * Appends the given key to the file store
307     *
308     * @param key  the key
309     */
310    protected void appendToStore(final String key) {
311        LOG.debug("Appending: {} to idempotent filestore: {}", key, fileStore);
312        FileOutputStream fos = null;
313        try {
314            // create store parent directory if missing
315            File storeParentDirectory = fileStore.getParentFile();
316            if (storeParentDirectory != null && !storeParentDirectory.exists()) {
317                LOG.info("Parent directory of file store {} doesn't exist. Creating.", fileStore);
318                if (fileStore.getParentFile().mkdirs()) {
319                    LOG.info("Parent directory of filestore: {} successfully created.", fileStore);
320                } else {
321                    LOG.warn("Parent directory of filestore: {} cannot be created.", fileStore);
322                }
323            }
324            // create store if missing
325            if (!fileStore.exists()) {
326                FileUtil.createNewFile(fileStore);
327            }
328            // append to store
329            fos = new FileOutputStream(fileStore, true);
330            fos.write(key.getBytes());
331            fos.write(STORE_DELIMITER.getBytes());
332        } catch (IOException e) {
333            throw ObjectHelper.wrapRuntimeCamelException(e);
334        } finally {
335            IOHelper.close(fos, "Appending to file idempotent repository", LOG);
336        }
337    }
338
339    protected synchronized void removeFromStore(String key) {
340        LOG.debug("Removing: {} from idempotent filestore: {}", key, fileStore);
341
342        // we need to re-load the entire file and remove the key and then re-write the file
343        List<String> lines = new ArrayList<>();
344
345        boolean found = false;
346        Scanner scanner = null;
347        try {
348            scanner = new Scanner(fileStore, null, STORE_DELIMITER);
349            while (scanner.hasNext()) {
350                String line = scanner.next();
351                if (key.equals(line)) {
352                    found = true;
353                } else {
354                    lines.add(line);
355                }
356            }
357        } catch (IOException e) {
358            throw ObjectHelper.wrapRuntimeCamelException(e);
359        } finally {
360            if (scanner != null) {
361                scanner.close();
362            }
363        }
364
365        if (found) {
366            // rewrite file
367            LOG.debug("Rewriting idempotent filestore: {} due to key: {} removed", fileStore, key);
368            FileOutputStream fos = null;
369            try {
370                fos = new FileOutputStream(fileStore);
371                for (String line : lines) {
372                    fos.write(line.getBytes());
373                    fos.write(STORE_DELIMITER.getBytes());
374                }
375            } catch (IOException e) {
376                throw ObjectHelper.wrapRuntimeCamelException(e);
377            } finally {
378                IOHelper.close(fos, "Rewriting file idempotent repository", LOG);
379            }
380        }
381    }
382
383    /**
384     * Clears the file-store (danger this deletes all entries)
385     */
386    protected void clearStore() {
387        try {
388            FileUtil.deleteFile(fileStore);
389            FileUtil.createNewFile(fileStore);
390        } catch (IOException e) {
391            throw ObjectHelper.wrapRuntimeCamelException(e);
392        }
393    }
394
395    /**
396     * Trunks the file store when the max store size is hit by dropping the most oldest entries.
397     */
398    protected synchronized void trunkStore() {
399        if (fileStore == null || !fileStore.exists()) {
400            return;
401        }
402
403        LOG.debug("Trunking: {} oldest entries from idempotent filestore: {}", dropOldestFileStore, fileStore);
404
405        // we need to re-load the entire file and remove the key and then re-write the file
406        List<String> lines = new ArrayList<>();
407
408        Scanner scanner = null;
409        int count = 0;
410        try {
411            scanner = new Scanner(fileStore, null, STORE_DELIMITER);
412            while (scanner.hasNext()) {
413                String line = scanner.next();
414                count++;
415                if (count > dropOldestFileStore) {
416                    lines.add(line);
417                }
418            }
419        } catch (IOException e) {
420            throw ObjectHelper.wrapRuntimeCamelException(e);
421        } finally {
422            if (scanner != null) {
423                scanner.close();
424            }
425        }
426
427        if (!lines.isEmpty()) {
428            // rewrite file
429            LOG.debug("Rewriting idempotent filestore: {} with {} entries:", fileStore, lines.size());
430            FileOutputStream fos = null;
431            try {
432                fos = new FileOutputStream(fileStore);
433                for (String line : lines) {
434                    fos.write(line.getBytes());
435                    fos.write(STORE_DELIMITER.getBytes());
436                }
437            } catch (IOException e) {
438                throw ObjectHelper.wrapRuntimeCamelException(e);
439            } finally {
440                IOHelper.close(fos, "Rewriting file idempotent repository", LOG);
441            }
442        } else {
443            // its a small file so recreate the file
444            LOG.debug("Clearing idempotent filestore: {}", fileStore);
445            clearStore();
446        }
447    }
448
449    /**
450     * Cleanup the 1st-level cache.
451     */
452    protected void cleanup() {
453        // run the cleanup task first
454        if (cache instanceof LRUCache) {
455            ((LRUCache) cache).cleanUp();
456        }
457    }
458
459    /**
460     * Loads the given file store into the 1st level cache
461     */
462    protected void loadStore() throws IOException {
463        // auto create starting directory if needed
464        if (!fileStore.exists()) {
465            LOG.debug("Creating filestore: {}", fileStore);
466            File parent = fileStore.getParentFile();
467            if (parent != null) {
468                parent.mkdirs();
469            }
470            boolean created = FileUtil.createNewFile(fileStore);
471            if (!created) {
472                throw new IOException("Cannot create filestore: " + fileStore);
473            }
474        }
475
476        LOG.trace("Loading to 1st level cache from idempotent filestore: {}", fileStore);
477
478        cache.clear();
479        try (Scanner scanner = new Scanner(fileStore, null, STORE_DELIMITER)) {
480            while (scanner.hasNext()) {
481                String line = scanner.next();
482                cache.put(line, line);
483            }
484        } catch (IOException e) {
485            throw ObjectHelper.wrapRuntimeCamelException(e);
486        }
487
488        LOG.debug("Loaded {} to the 1st level cache from idempotent filestore: {}", cache.size(), fileStore);
489    }
490
491    @Override
492    @SuppressWarnings("unchecked")
493    protected void doStart() throws Exception {
494        ObjectHelper.notNull(fileStore, "fileStore", this);
495
496        if (this.cache == null) {
497            // default use a 1st level cache
498            this.cache = LRUCacheFactory.newLRUCache(1000);
499        }
500
501        // init store if not loaded before
502        if (init.compareAndSet(false, true)) {
503            loadStore();
504        }
505    }
506
507    @Override
508    protected void doStop() throws Exception {
509        // run the cleanup task first
510        if (cache instanceof LRUCache) {
511            ((LRUCache) cache).cleanUp();
512        }
513
514        cache.clear();
515        init.set(false);
516    }
517
518}