001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.processor.idempotent;
018
019import java.util.Map;
020
021import org.apache.camel.api.management.ManagedAttribute;
022import org.apache.camel.api.management.ManagedOperation;
023import org.apache.camel.api.management.ManagedResource;
024import org.apache.camel.spi.IdempotentRepository;
025import org.apache.camel.support.ServiceSupport;
026import org.apache.camel.util.LRUCache;
027import org.apache.camel.util.LRUCacheFactory;
028
029/**
030 * A memory based implementation of {@link org.apache.camel.spi.IdempotentRepository}. 
031 * <p/>
032 * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a
033 * memory leak.
034 *
035 * @version 
036 */
037@ManagedResource(description = "Memory based idempotent repository")
038public class MemoryIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
039    private Map<String, Object> cache;
040    private int cacheSize;
041
042    @SuppressWarnings("unchecked")
043    public MemoryIdempotentRepository() {
044        this.cache = LRUCacheFactory.newLRUCache(1000);
045    }
046
047    public MemoryIdempotentRepository(Map<String, Object> set) {
048        this.cache = set;
049    }
050
051    /**
052     * Creates a new memory based repository using a {@link LRUCache}
053     * with a default of 1000 entries in the cache.
054     */
055    public static IdempotentRepository<String> memoryIdempotentRepository() {
056        return new MemoryIdempotentRepository();
057    }
058
059    /**
060     * Creates a new memory based repository using a {@link LRUCache}.
061     *
062     * @param cacheSize  the cache size
063     */
064    @SuppressWarnings("unchecked")
065    public static IdempotentRepository<String> memoryIdempotentRepository(int cacheSize) {
066        return memoryIdempotentRepository(LRUCacheFactory.newLRUCache(cacheSize));
067    }
068
069    /**
070     * Creates a new memory based repository using the given {@link Map} to
071     * use to store the processed message ids.
072     * <p/>
073     * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a
074     * memory leak.
075     *
076     * @param cache  the cache
077     */
078    public static IdempotentRepository<String> memoryIdempotentRepository(Map<String, Object> cache) {
079        return new MemoryIdempotentRepository(cache);
080    }
081
082    @ManagedOperation(description = "Adds the key to the store")
083    public boolean add(String key) {
084        synchronized (cache) {
085            if (cache.containsKey(key)) {
086                return false;
087            } else {
088                cache.put(key, key);
089                return true;
090            }
091        }
092    }
093
094    @ManagedOperation(description = "Does the store contain the given key")
095    public boolean contains(String key) {
096        synchronized (cache) {
097            return cache.containsKey(key);
098        }
099    }
100
101    @ManagedOperation(description = "Remove the key from the store")
102    public boolean remove(String key) {
103        synchronized (cache) {
104            return cache.remove(key) != null;
105        }
106    }
107
108    public boolean confirm(String key) {
109        // noop
110        return true;
111    }
112    
113    @ManagedOperation(description = "Clear the store")
114    public void clear() {
115        synchronized (cache) {
116            cache.clear();
117        }
118    }
119
120    public Map<String, Object> getCache() {
121        return cache;
122    }
123
124    @ManagedAttribute(description = "The current cache size")
125    public int getCacheSize() {
126        return cache.size();
127    }
128
129    public void setCacheSize(int cacheSize) {
130        this.cacheSize = cacheSize;
131    }
132
133    @Override
134    @SuppressWarnings("unchecked")
135    protected void doStart() throws Exception {
136        if (cacheSize > 0) {
137            cache = LRUCacheFactory.newLRUCache(cacheSize);
138        }
139    }
140
141    @Override
142    protected void doStop() throws Exception {
143        cache.clear();
144    }
145}