001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor.idempotent;
018
019import java.io.File;
020import java.io.FileOutputStream;
021import java.io.IOException;
022import java.util.Map;
023import java.util.Scanner;
024import java.util.concurrent.atomic.AtomicBoolean;
025
026import org.apache.camel.api.management.ManagedAttribute;
027import org.apache.camel.api.management.ManagedOperation;
028import org.apache.camel.api.management.ManagedResource;
029import org.apache.camel.spi.IdempotentRepository;
030import org.apache.camel.support.ServiceSupport;
031import org.apache.camel.util.FileUtil;
032import org.apache.camel.util.IOHelper;
033import org.apache.camel.util.LRUCache;
034import org.apache.camel.util.ObjectHelper;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
040 * <p/>
041 * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
042 * memory leak.
043 *
044 * @version 
045 */
046@ManagedResource(description = "File based idempotent repository")
047public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
048    private static final Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class);
049    private static final String STORE_DELIMITER = "\n";
050    private Map<String, Object> cache;
051    private File fileStore;
052    private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
053    private AtomicBoolean init = new AtomicBoolean();
054
055    public FileIdempotentRepository() {
056        // default use a 1st level cache 
057        this.cache = new LRUCache<String, Object>(1000);
058    }
059
060    public FileIdempotentRepository(File fileStore, Map<String, Object> set) {
061        this.fileStore = fileStore;
062        this.cache = set;
063    }
064
065    /**
066     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
067     * as 1st level cache with a default of 1000 entries in the cache.
068     *
069     * @param fileStore  the file store
070     */
071    public static IdempotentRepository<String> fileIdempotentRepository(File fileStore) {
072        return fileIdempotentRepository(fileStore, 1000);
073    }
074
075    /**
076     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
077     * as 1st level cache.
078     *
079     * @param fileStore  the file store
080     * @param cacheSize  the cache size
081     */
082    public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize) {
083        return fileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
084    }
085
086    /**
087     * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache}
088     * as 1st level cache.
089     *
090     * @param fileStore  the file store
091     * @param cacheSize  the cache size
092     * @param maxFileStoreSize  the max size in bytes for the filestore file 
093     */
094    public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize, long maxFileStoreSize) {
095        FileIdempotentRepository repository = new FileIdempotentRepository(fileStore, new LRUCache<String, Object>(cacheSize));
096        repository.setMaxFileStoreSize(maxFileStoreSize);
097        return repository;
098    }
099
100    /**
101     * Creates a new file based repository using the given {@link java.util.Map}
102     * as 1st level cache.
103     * <p/>
104     * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
105     * memory leak.
106     *
107     * @param store  the file store
108     * @param cache  the cache to use as 1st level cache
109     */
110    public static IdempotentRepository<String> fileIdempotentRepository(File store, Map<String, Object> cache) {
111        return new FileIdempotentRepository(store, cache);
112    }
113
114    @ManagedOperation(description = "Adds the key to the store")
115    public boolean add(String key) {
116        synchronized (cache) {
117            if (cache.containsKey(key)) {
118                return false;
119            } else {
120                cache.put(key, key);
121                if (fileStore.length() < maxFileStoreSize) {
122                    // just append to store
123                    appendToStore(key);
124                } else {
125                    // trunk store and flush the cache
126                    trunkStore();
127                }
128
129                return true;
130            }
131        }
132    }
133
134    @ManagedOperation(description = "Does the store contain the given key")
135    public boolean contains(String key) {
136        synchronized (cache) {
137            return cache.containsKey(key);
138        }
139    }
140
141    @ManagedOperation(description = "Remove the key from the store")
142    public boolean remove(String key) {
143        boolean answer;
144        synchronized (cache) {
145            answer = cache.remove(key) != null;
146            // trunk store and flush the cache on remove
147            trunkStore();
148        }
149        return answer;
150    }
151
152    public boolean confirm(String key) {
153        // noop
154        return true;
155    }
156    
157    @ManagedOperation(description = "Clear the store")
158    public void clear() {
159        synchronized (cache) {
160            cache.clear();
161            if (cache instanceof LRUCache) {
162                ((LRUCache) cache).cleanUp();
163            }
164        }
165    }
166
167    public File getFileStore() {
168        return fileStore;
169    }
170
171    public void setFileStore(File fileStore) {
172        this.fileStore = fileStore;
173    }
174
175    @ManagedAttribute(description = "The file path for the store")
176    public String getFilePath() {
177        return fileStore.getPath();
178    }
179
180    public Map<String, Object> getCache() {
181        return cache;
182    }
183
184    public void setCache(Map<String, Object> cache) {
185        this.cache = cache;
186    }
187
188    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
189    public long getMaxFileStoreSize() {
190        return maxFileStoreSize;
191    }
192
193    /**
194     * Sets the maximum file size for the file store in bytes.
195     * <p/>
196     * The default is 1mb.
197     */
198    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
199    public void setMaxFileStoreSize(long maxFileStoreSize) {
200        this.maxFileStoreSize = maxFileStoreSize;
201    }
202
203    /**
204     * Sets the cache size
205     */
206    public void setCacheSize(int size) {
207        if (cache != null) {
208            cache.clear();
209        }
210        cache = new LRUCache<String, Object>(size);
211    }
212
213    @ManagedAttribute(description = "The current cache size")
214    public int getCacheSize() {
215        if (cache != null) {
216            return cache.size();
217        }
218        return 0;
219    }
220
221    /**
222     * Reset and clears the store to force it to reload from file
223     */
224    @ManagedOperation(description = "Reset and reloads the file store")
225    public synchronized void reset() throws IOException {
226        synchronized (cache) {
227            // trunk and clear, before we reload the store
228            trunkStore();
229            cache.clear();
230            if (cache instanceof LRUCache) {
231                ((LRUCache) cache).cleanUp();
232            }
233            loadStore();
234        }
235    }
236
237    /**
238     * Appends the given message id to the file store
239     *
240     * @param messageId  the message id
241     */
242    protected void appendToStore(final String messageId) {
243        LOG.debug("Appending {} to idempotent filestore: {}", messageId, fileStore);
244        FileOutputStream fos = null;
245        try {
246            // create store parent directory if missing
247            File storeParentDirectory = fileStore.getParentFile();
248            if (storeParentDirectory != null && !storeParentDirectory.exists()) {
249                LOG.info("Parent directory of file store {} doesn't exist. Creating.", fileStore);
250                if (fileStore.getParentFile().mkdirs()) {
251                    LOG.info("Parent directory of file store {} successfully created.", fileStore);
252                } else {
253                    LOG.warn("Parent directory of file store {} cannot be created.", fileStore);
254                }
255            }
256            // create store if missing
257            if (!fileStore.exists()) {
258                FileUtil.createNewFile(fileStore);
259            }
260            // append to store
261            fos = new FileOutputStream(fileStore, true);
262            fos.write(messageId.getBytes());
263            fos.write(STORE_DELIMITER.getBytes());
264        } catch (IOException e) {
265            throw ObjectHelper.wrapRuntimeCamelException(e);
266        } finally {
267            IOHelper.close(fos, "Appending to file idempotent repository", LOG);
268        }
269    }
270
271    /**
272     * Trunks the file store when the max store size is hit by rewriting the 1st level cache
273     * to the file store.
274     */
275    protected void trunkStore() {
276        LOG.info("Trunking idempotent filestore: {}", fileStore);
277        FileOutputStream fos = null;
278        try {
279            fos = new FileOutputStream(fileStore);
280            for (String key : cache.keySet()) {
281                fos.write(key.getBytes());
282                fos.write(STORE_DELIMITER.getBytes());
283            }
284        } catch (IOException e) {
285            throw ObjectHelper.wrapRuntimeCamelException(e);
286        } finally {
287            IOHelper.close(fos, "Trunking file idempotent repository", LOG);
288        }
289    }
290
291    /**
292     * Loads the given file store into the 1st level cache
293     */
294    protected void loadStore() throws IOException {
295        // auto create starting directory if needed
296        if (!fileStore.exists()) {
297            LOG.debug("Creating filestore: {}", fileStore);
298            File parent = fileStore.getParentFile();
299            if (parent != null) {
300                parent.mkdirs();
301            }
302            boolean created = FileUtil.createNewFile(fileStore);
303            if (!created) {
304                throw new IOException("Cannot create filestore: " + fileStore);
305            }
306        }
307
308        LOG.trace("Loading to 1st level cache from idempotent filestore: {}", fileStore);
309
310        cache.clear();
311        Scanner scanner = null;
312        try {
313            scanner = new Scanner(fileStore);
314            scanner.useDelimiter(STORE_DELIMITER);
315            while (scanner.hasNextLine()) {
316                String line = scanner.nextLine();
317                cache.put(line, line);
318            }
319        } catch (IOException e) {
320            throw ObjectHelper.wrapRuntimeCamelException(e);
321        } finally {
322            if (scanner != null) {
323                scanner.close();
324            }
325        }
326
327        LOG.debug("Loaded {} to the 1st level cache from idempotent filestore: {}", cache.size(), fileStore);
328    }
329
330    @Override
331    protected void doStart() throws Exception {
332        ObjectHelper.notNull(fileStore, "fileStore", this);
333
334        // init store if not loaded before
335        if (init.compareAndSet(false, true)) {
336            loadStore();
337        }
338    }
339
340    @Override
341    protected void doStop() throws Exception {
342        // reset will trunk and clear the cache
343        trunkStore();
344        cache.clear();
345        if (cache instanceof LRUCache) {
346            ((LRUCache) cache).cleanUp();
347        }
348        init.set(false);
349    }
350
351}