001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor.idempotent; 018 019import java.util.Map; 020 021import org.apache.camel.api.management.ManagedAttribute; 022import org.apache.camel.api.management.ManagedOperation; 023import org.apache.camel.api.management.ManagedResource; 024import org.apache.camel.spi.IdempotentRepository; 025import org.apache.camel.support.ServiceSupport; 026import org.apache.camel.util.LRUCache; 027import org.apache.camel.util.LRUCacheFactory; 028 029/** 030 * A memory based implementation of {@link org.apache.camel.spi.IdempotentRepository}. 031 * <p/> 032 * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a 033 * memory leak. 034 * 035 * @version 036 */ 037@ManagedResource(description = "Memory based idempotent repository") 038public class MemoryIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> { 039 private Map<String, Object> cache; 040 private int cacheSize; 041 042 @SuppressWarnings("unchecked") 043 public MemoryIdempotentRepository() { 044 this.cache = LRUCacheFactory.newLRUCache(1000); 045 } 046 047 public MemoryIdempotentRepository(Map<String, Object> set) { 048 this.cache = set; 049 } 050 051 /** 052 * Creates a new memory based repository using a {@link LRUCache} 053 * with a default of 1000 entries in the cache. 054 */ 055 public static IdempotentRepository<String> memoryIdempotentRepository() { 056 return new MemoryIdempotentRepository(); 057 } 058 059 /** 060 * Creates a new memory based repository using a {@link LRUCache}. 061 * 062 * @param cacheSize the cache size 063 */ 064 @SuppressWarnings("unchecked") 065 public static IdempotentRepository<String> memoryIdempotentRepository(int cacheSize) { 066 return memoryIdempotentRepository(LRUCacheFactory.newLRUCache(cacheSize)); 067 } 068 069 /** 070 * Creates a new memory based repository using the given {@link Map} to 071 * use to store the processed message ids. 072 * <p/> 073 * Care should be taken to use a suitable underlying {@link Map} to avoid this class being a 074 * memory leak. 075 * 076 * @param cache the cache 077 */ 078 public static IdempotentRepository<String> memoryIdempotentRepository(Map<String, Object> cache) { 079 return new MemoryIdempotentRepository(cache); 080 } 081 082 @ManagedOperation(description = "Adds the key to the store") 083 public boolean add(String key) { 084 synchronized (cache) { 085 if (cache.containsKey(key)) { 086 return false; 087 } else { 088 cache.put(key, key); 089 return true; 090 } 091 } 092 } 093 094 @ManagedOperation(description = "Does the store contain the given key") 095 public boolean contains(String key) { 096 synchronized (cache) { 097 return cache.containsKey(key); 098 } 099 } 100 101 @ManagedOperation(description = "Remove the key from the store") 102 public boolean remove(String key) { 103 synchronized (cache) { 104 return cache.remove(key) != null; 105 } 106 } 107 108 public boolean confirm(String key) { 109 // noop 110 return true; 111 } 112 113 @ManagedOperation(description = "Clear the store") 114 public void clear() { 115 synchronized (cache) { 116 cache.clear(); 117 } 118 } 119 120 public Map<String, Object> getCache() { 121 return cache; 122 } 123 124 @ManagedAttribute(description = "The current cache size") 125 public int getCacheSize() { 126 return cache.size(); 127 } 128 129 public void setCacheSize(int cacheSize) { 130 this.cacheSize = cacheSize; 131 } 132 133 @Override 134 @SuppressWarnings("unchecked") 135 protected void doStart() throws Exception { 136 if (cacheSize > 0) { 137 cache = LRUCacheFactory.newLRUCache(cacheSize); 138 } 139 } 140 141 @Override 142 protected void doStop() throws Exception { 143 cache.clear(); 144 } 145}