001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.impl;
018
019import java.io.File;
020import java.io.FileOutputStream;
021import java.io.IOException;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.concurrent.atomic.AtomicBoolean;
025
026import org.apache.camel.api.management.ManagedAttribute;
027import org.apache.camel.api.management.ManagedOperation;
028import org.apache.camel.api.management.ManagedResource;
029import org.apache.camel.spi.StateRepository;
030import org.apache.camel.support.ServiceSupport;
031import org.apache.camel.util.FileUtil;
032import org.apache.camel.util.IOHelper;
033import org.apache.camel.util.ObjectHelper;
034import org.apache.camel.util.Scanner;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * This {@link FileStateRepository} class is a file-based implementation of a {@link StateRepository}.
040 */
041@ManagedResource(description = "File based state repository")
042public class FileStateRepository extends ServiceSupport implements StateRepository<String, String> {
043    private static final Logger LOG = LoggerFactory.getLogger(FileStateRepository.class);
044    private static final String STORE_DELIMITER = "\n";
045    private static final String KEY_VALUE_DELIMITER = "=";
046    private final AtomicBoolean init = new AtomicBoolean();
047    private Map<String, String> cache;
048    private File fileStore;
049    private long maxFileStoreSize = 1024 * 1000L; // 1mb store file
050
051    public FileStateRepository() {
052        // default use a 1st level cache
053        this.cache = new HashMap<>();
054    }
055
056    public FileStateRepository(File fileStore, Map<String, String> cache) {
057        this.fileStore = fileStore;
058        this.cache = cache;
059    }
060
061    /**
062     * Creates a new file based repository using as 1st level cache
063     *
064     * @param fileStore the file store
065     */
066    public static FileStateRepository fileStateRepository(File fileStore) {
067        return fileStateRepository(fileStore, new HashMap<>());
068    }
069
070    /**
071     * Creates a new file based repository using a {@link HashMap} as 1st level cache.
072     *
073     * @param fileStore the file store
074     * @param maxFileStoreSize the max size in bytes for the fileStore file
075     */
076    public static FileStateRepository fileStateRepository(File fileStore, long maxFileStoreSize) {
077        FileStateRepository repository = new FileStateRepository(fileStore, new HashMap<>());
078        repository.setMaxFileStoreSize(maxFileStoreSize);
079        return repository;
080    }
081
082    /**
083     * Creates a new file based repository using the given {@link java.util.Map} as 1st level cache.
084     * <p/>
085     * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a
086     * memory leak.
087     *
088     * @param store the file store
089     * @param cache the cache to use as 1st level cache
090     */
091    public static FileStateRepository fileStateRepository(File store, Map<String, String> cache) {
092        return new FileStateRepository(store, cache);
093    }
094
095    @Override
096    @ManagedOperation(description = "Adds the value of the given key to the store")
097    public void setState(String key, String value) {
098        if (key.contains(KEY_VALUE_DELIMITER)) {
099            throw new IllegalArgumentException("Key " + key + " contains illegal character: " + KEY_VALUE_DELIMITER);
100        }
101        if (key.contains(STORE_DELIMITER)) {
102            throw new IllegalArgumentException("Key " + key + " contains illegal character: <newline>");
103        }
104        if (value.contains(STORE_DELIMITER)) {
105            throw new IllegalArgumentException("Value " + value + " contains illegal character: <newline>");
106        }
107        synchronized (cache) {
108            cache.put(key, value);
109            if (fileStore.length() < maxFileStoreSize) {
110                // just append to store
111                appendToStore(key, value);
112            } else {
113                // trunk store and flush the cache
114                trunkStore();
115            }
116        }
117    }
118
119    @Override
120    @ManagedOperation(description = "Gets the value of the given key from store")
121    public String getState(String key) {
122        synchronized (cache) {
123            return cache.get(key);
124        }
125    }
126
127    /**
128     * Resets and clears the store to force it to reload from file
129     */
130    @ManagedOperation(description = "Reset and reloads the file store")
131    public synchronized void reset() throws IOException {
132        synchronized (cache) {
133            // trunk and clear, before we reload the store
134            trunkStore();
135            cache.clear();
136            loadStore();
137        }
138    }
139
140    /**
141     * Appends the {@code <key,value>} pair to the file store
142     *
143     * @param key the state key
144     */
145    private void appendToStore(String key, String value) {
146        if (LOG.isDebugEnabled()) {
147            LOG.debug("Appending {}={} to state filestore: {}", key, value, fileStore);
148        }
149        FileOutputStream fos = null;
150        try {
151            // create store parent directory if missing
152            File storeParentDirectory = fileStore.getParentFile();
153            if (storeParentDirectory != null && !storeParentDirectory.exists()) {
154                LOG.info("Parent directory of file store {} doesn't exist. Creating.", fileStore);
155                if (fileStore.getParentFile().mkdirs()) {
156                    LOG.info("Parent directory of file store {} successfully created.", fileStore);
157                } else {
158                    LOG.warn("Parent directory of file store {} cannot be created.", fileStore);
159                }
160            }
161            // create store if missing
162            if (!fileStore.exists()) {
163                FileUtil.createNewFile(fileStore);
164            }
165            // append to store
166            fos = new FileOutputStream(fileStore, true);
167            fos.write(key.getBytes());
168            fos.write(KEY_VALUE_DELIMITER.getBytes());
169            fos.write(value.getBytes());
170            fos.write(STORE_DELIMITER.getBytes());
171        } catch (IOException e) {
172            throw ObjectHelper.wrapRuntimeCamelException(e);
173        } finally {
174            IOHelper.close(fos, "Appending to file state repository", LOG);
175        }
176    }
177
178    /**
179     * Trunks the file store when the max store size is hit by rewriting the 1st level cache
180     * to the file store.
181     */
182    protected void trunkStore() {
183        LOG.info("Trunking state filestore: {}", fileStore);
184        FileOutputStream fos = null;
185        try {
186            fos = new FileOutputStream(fileStore);
187            for (Map.Entry<String, String> entry : cache.entrySet()) {
188                fos.write(entry.getKey().getBytes());
189                fos.write(KEY_VALUE_DELIMITER.getBytes());
190                fos.write(entry.getValue().getBytes());
191                fos.write(STORE_DELIMITER.getBytes());
192            }
193        } catch (IOException e) {
194            throw ObjectHelper.wrapRuntimeCamelException(e);
195        } finally {
196            IOHelper.close(fos, "Trunking file state repository", LOG);
197        }
198    }
199
200    /**
201     * Loads the given file store into the 1st level cache
202     */
203    protected void loadStore() throws IOException {
204        // auto create starting directory if needed
205        if (!fileStore.exists()) {
206            LOG.debug("Creating filestore: {}", fileStore);
207            File parent = fileStore.getParentFile();
208            if (parent != null) {
209                parent.mkdirs();
210            }
211            boolean created = FileUtil.createNewFile(fileStore);
212            if (!created) {
213                throw new IOException("Cannot create filestore: " + fileStore);
214            }
215        }
216
217        LOG.trace("Loading to 1st level cache from state filestore: {}", fileStore);
218
219        cache.clear();
220        try (Scanner scanner = new Scanner(fileStore, null, STORE_DELIMITER)) {
221            while (scanner.hasNext()) {
222                String line = scanner.next();
223                int separatorIndex = line.indexOf(KEY_VALUE_DELIMITER);
224                String key = line.substring(0, separatorIndex);
225                String value = line.substring(separatorIndex + KEY_VALUE_DELIMITER.length());
226                cache.put(key, value);
227            }
228        } catch (IOException e) {
229            throw ObjectHelper.wrapRuntimeCamelException(e);
230        }
231
232        LOG.debug("Loaded {} to the 1st level cache from state filestore: {}", cache.size(), fileStore);
233    }
234
235    @Override
236    protected void doStart() throws Exception {
237        ObjectHelper.notNull(fileStore, "fileStore", this);
238
239        // init store if not loaded before
240        if (init.compareAndSet(false, true)) {
241            loadStore();
242        }
243    }
244
245    @Override
246    protected void doStop() throws Exception {
247        // reset will trunk and clear the cache
248        trunkStore();
249        cache.clear();
250        init.set(false);
251    }
252
253    public File getFileStore() {
254        return fileStore;
255    }
256
257    public void setFileStore(File fileStore) {
258        this.fileStore = fileStore;
259    }
260
261    @ManagedAttribute(description = "The file path for the store")
262    public String getFilePath() {
263        return fileStore.getPath();
264    }
265
266    public Map<String, String> getCache() {
267        return cache;
268    }
269
270    public void setCache(Map<String, String> cache) {
271        this.cache = cache;
272    }
273
274    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
275    public long getMaxFileStoreSize() {
276        return maxFileStoreSize;
277    }
278
279    /**
280     * Sets the maximum file size for the file store in bytes.
281     * <p/>
282     * The default is 1mb.
283     */
284    @ManagedAttribute(description = "The maximum file size for the file store in bytes")
285    public void setMaxFileStoreSize(long maxFileStoreSize) {
286        this.maxFileStoreSize = maxFileStoreSize;
287    }
288}