001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.idempotent;
018    
019    import java.io.File;
020    import java.io.FileOutputStream;
021    import java.io.IOException;
022    import java.util.Map;
023    import java.util.Scanner;
024    import java.util.concurrent.atomic.AtomicBoolean;
025    
026    import org.apache.camel.api.management.ManagedAttribute;
027    import org.apache.camel.api.management.ManagedOperation;
028    import org.apache.camel.api.management.ManagedResource;
029    import org.apache.camel.spi.IdempotentRepository;
030    import org.apache.camel.support.ServiceSupport;
031    import org.apache.camel.util.FileUtil;
032    import org.apache.camel.util.IOHelper;
033    import org.apache.camel.util.LRUCache;
034    import org.apache.camel.util.ObjectHelper;
035    import org.slf4j.Logger;
036    import org.slf4j.LoggerFactory;
037    
038    /**
039     * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
040     * <p/>
041     * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
042     * memory leak.
043     *
044     * @version 
045     */
046    @ManagedResource(description = "File based idempotent repository")
047    public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
048        private static final transient Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class);
049        private static final String STORE_DELIMITER = "\n";
050        private Map<String, Object> cache;
051        private File fileStore;
052        private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
053        private AtomicBoolean init = new AtomicBoolean();
054    
055        public FileIdempotentRepository() {
056            // default use a 1st level cache 
057            this.cache = new LRUCache<String, Object>(1000);
058        }
059    
060        public FileIdempotentRepository(File fileStore, Map<String, Object> set) {
061            this.fileStore = fileStore;
062            this.cache = set;
063        }
064    
065        /**
066         * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
067         * as 1st level cache with a default of 1000 entries in the cache.
068         *
069         * @param fileStore  the file store
070         */
071        public static IdempotentRepository<String> fileIdempotentRepository(File fileStore) {
072            return fileIdempotentRepository(fileStore, 1000);
073        }
074    
075        /**
076         * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
077         * as 1st level cache.
078         *
079         * @param fileStore  the file store
080         * @param cacheSize  the cache size
081         */
082        public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize) {
083            return fileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
084        }
085    
086        /**
087         * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
088         * as 1st level cache.
089         *
090         * @param fileStore  the file store
091         * @param cacheSize  the cache size
092         * @param maxFileStoreSize  the max size in bytes for the filestore file 
093         */
094        public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize, long maxFileStoreSize) {
095            FileIdempotentRepository repository = new FileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
096            repository.setMaxFileStoreSize(maxFileStoreSize);
097            return repository;
098        }
099    
100        /**
101         * Creates a new file based repository using the given {@link java.util.Map}
102         * as 1st level cache.
103         * <p/>
104         * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
105         * memory leak.
106         *
107         * @param store  the file store
108         * @param cache  the cache to use as 1st level cache
109         */
110        public static IdempotentRepository<String> fileIdempotentRepository(File store, Map<String, Object> cache) {
111            return new FileIdempotentRepository(store, cache);
112        }
113    
114        @ManagedOperation(description = "Adds the key to the store")
115        public boolean add(String key) {
116            synchronized (cache) {
117                if (cache.containsKey(key)) {
118                    return false;
119                } else {
120                    cache.put(key, key);
121                    if (fileStore.length() < maxFileStoreSize) {
122                        // just append to store
123                        appendToStore(key);
124                    } else {
125                        // trunk store and flush the cache
126                        trunkStore();
127                    }
128    
129                    return true;
130                }
131            }
132        }
133    
134        @ManagedOperation(description = "Does the store contain the given key")
135        public boolean contains(String key) {
136            synchronized (cache) {
137                return cache.containsKey(key);
138            }
139        }
140    
141        @ManagedOperation(description = "Remove the key from the store")
142        public boolean remove(String key) {
143            boolean answer;
144            synchronized (cache) {
145                answer = cache.remove(key) != null;
146                // trunk store and flush the cache on remove
147                trunkStore();
148            }
149            return answer;
150        }
151    
152        public boolean confirm(String key) {
153            // noop
154            return true;
155        }
156    
157        public File getFileStore() {
158            return fileStore;
159        }
160    
161        public void setFileStore(File fileStore) {
162            this.fileStore = fileStore;
163        }
164    
165        @ManagedAttribute(description = "The file path for the store")
166        public String getFilePath() {
167            return fileStore.getPath();
168        }
169    
170        public Map<String, Object> getCache() {
171            return cache;
172        }
173    
174        public void setCache(Map<String, Object> cache) {
175            this.cache = cache;
176        }
177    
178        @ManagedAttribute(description = "The maximum file size for the file store in bytes")
179        public long getMaxFileStoreSize() {
180            return maxFileStoreSize;
181        }
182    
183        /**
184         * Sets the maximum file size for the file store in bytes.
185         * <p/>
186         * The default is 1mb.
187         */
188        @ManagedAttribute(description = "The maximum file size for the file store in bytes")
189        public void setMaxFileStoreSize(long maxFileStoreSize) {
190            this.maxFileStoreSize = maxFileStoreSize;
191        }
192    
193        /**
194         * Sets the cache size
195         */
196        public void setCacheSize(int size) {
197            if (cache != null) {
198                cache.clear();
199            }
200            cache = new LRUCache<String, Object>(size);
201        }
202    
203        @ManagedAttribute(description = "The current cache size")
204        public int getCacheSize() {
205            if (cache != null) {
206                return cache.size();
207            }
208            return 0;
209        }
210    
211        /**
212         * Reset and clears the store to force it to reload from file
213         */
214        @ManagedOperation(description = "Reset and reloads the file store")
215        public synchronized void reset() {
216            synchronized (cache) {
217                // trunk and clear, before we reload the store
218                trunkStore();
219                cache.clear();
220                loadStore();
221            }
222        }
223    
224        /**
225         * Appends the given message id to the file store
226         *
227         * @param messageId  the message id
228         */
229        protected void appendToStore(final String messageId) {
230            LOG.debug("Appending {} to idempotent filestore: {}", messageId, fileStore);
231            FileOutputStream fos = null;
232            try {
233                // create store if missing
234                if (!fileStore.exists()) {
235                    FileUtil.createNewFile(fileStore);
236                }
237                // append to store
238                fos = new FileOutputStream(fileStore, true);
239                fos.write(messageId.getBytes());
240                fos.write(STORE_DELIMITER.getBytes());
241            } catch (IOException e) {
242                throw ObjectHelper.wrapRuntimeCamelException(e);
243            } finally {
244                IOHelper.close(fos, "Appending to file idempotent repository", LOG);
245            }
246        }
247    
248        /**
249         * Trunks the file store when the max store size is hit by rewriting the 1st level cache
250         * to the file store.
251         */
252        protected void trunkStore() {
253            LOG.info("Trunking idempotent filestore: {}", fileStore);
254            FileOutputStream fos = null;
255            try {
256                fos = new FileOutputStream(fileStore);
257                for (String key : cache.keySet()) {
258                    fos.write(key.getBytes());
259                    fos.write(STORE_DELIMITER.getBytes());
260                }
261            } catch (IOException e) {
262                throw ObjectHelper.wrapRuntimeCamelException(e);
263            } finally {
264                IOHelper.close(fos, "Trunking file idempotent repository", LOG);
265            }
266        }
267    
268        /**
269         * Loads the given file store into the 1st level cache
270         */
271        protected void loadStore() {
272            LOG.trace("Loading to 1st level cache from idempotent filestore: {}", fileStore);
273    
274            if (!fileStore.exists()) {
275                return;
276            }
277    
278            cache.clear();
279            Scanner scanner = null;
280            try {
281                scanner = new Scanner(fileStore);
282                scanner.useDelimiter(STORE_DELIMITER);
283                while (scanner.hasNextLine()) {
284                    String line = scanner.nextLine();
285                    cache.put(line, line);
286                }
287            } catch (IOException e) {
288                throw ObjectHelper.wrapRuntimeCamelException(e);
289            } finally {
290                if (scanner != null) {
291                    scanner.close();
292                }
293            }
294    
295            LOG.debug("Loaded {} to the 1st level cache from idempotent filestore: {}", cache.size(), fileStore);
296        }
297    
298        @Override
299        protected void doStart() throws Exception {
300            // init store if not loaded before
301            if (init.compareAndSet(false, true)) {
302                loadStore();
303            }
304        }
305    
306        @Override
307        protected void doStop() throws Exception {
308            // reset will trunk and clear the cache
309            trunkStore();
310            cache.clear();
311            init.set(false);
312        }
313    
314    }