001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.camel.processor.idempotent; 018 019import java.io.File; 020import java.io.FileOutputStream; 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Map; 025import java.util.concurrent.atomic.AtomicBoolean; 026 027import org.apache.camel.api.management.ManagedAttribute; 028import org.apache.camel.api.management.ManagedOperation; 029import org.apache.camel.api.management.ManagedResource; 030import org.apache.camel.spi.IdempotentRepository; 031import org.apache.camel.support.ServiceSupport; 032import org.apache.camel.util.FileUtil; 033import org.apache.camel.util.IOHelper; 034import org.apache.camel.util.LRUCache; 035import org.apache.camel.util.LRUCacheFactory; 036import org.apache.camel.util.ObjectHelper; 037import org.apache.camel.util.Scanner; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * A file based implementation of {@link org.apache.camel.spi.IdempotentRepository}. 043 * <p/> 044 * This implementation provides a 1st-level in-memory {@link LRUCache} for fast check of the most 045 * frequently used keys. When {@link #add(String)} or {@link #contains(String)} methods are being used 046 * then in case of 1st-level cache miss, the underlying file is scanned which may cost additional performance. 047 * So try to find the right balance of the size of the 1st-level cache, the default size is 1000. 048 * The file store has a maximum capacity of 32mb by default (you can turn this off and have unlimited size). 049 * If the file store grows bigger than the maximum capacity, then the {@link #getDropOldestFileStore()} (is default 1000) 050 * number of entries from the file store is dropped to reduce the file store and make room for newer entries. 051 * 052 * @version 053 */ 054@ManagedResource(description = "File based idempotent repository") 055public class FileIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> { 056 private static final Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class); 057 private static final String STORE_DELIMITER = "\n"; 058 059 private final AtomicBoolean init = new AtomicBoolean(); 060 061 private Map<String, Object> cache; 062 private File fileStore; 063 private long maxFileStoreSize = 32 * 1024 * 1000L; // 32mb store file 064 private long dropOldestFileStore = 1000; 065 066 public FileIdempotentRepository() { 067 } 068 069 public FileIdempotentRepository(File fileStore, Map<String, Object> set) { 070 this.fileStore = fileStore; 071 this.cache = set; 072 } 073 074 /** 075 * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache} 076 * as 1st level cache with a default of 1000 entries in the cache. 077 * 078 * @param fileStore the file store 079 */ 080 public static IdempotentRepository<String> fileIdempotentRepository(File fileStore) { 081 return fileIdempotentRepository(fileStore, 1000); 082 } 083 084 /** 085 * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache} 086 * as 1st level cache. 087 * 088 * @param fileStore the file store 089 * @param cacheSize the cache size 090 */ 091 @SuppressWarnings("unchecked") 092 public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize) { 093 return fileIdempotentRepository(fileStore, LRUCacheFactory.newLRUCache(cacheSize)); 094 } 095 096 /** 097 * Creates a new file based repository using a {@link org.apache.camel.util.LRUCache} 098 * as 1st level cache. 099 * 100 * @param fileStore the file store 101 * @param cacheSize the cache size 102 * @param maxFileStoreSize the max size in bytes for the filestore file 103 */ 104 @SuppressWarnings("unchecked") 105 public static IdempotentRepository<String> fileIdempotentRepository(File fileStore, int cacheSize, long maxFileStoreSize) { 106 FileIdempotentRepository repository = new FileIdempotentRepository(fileStore, LRUCacheFactory.newLRUCache(cacheSize)); 107 repository.setMaxFileStoreSize(maxFileStoreSize); 108 return repository; 109 } 110 111 /** 112 * Creates a new file based repository using the given {@link java.util.Map} 113 * as 1st level cache. 114 * <p/> 115 * Care should be taken to use a suitable underlying {@link java.util.Map} to avoid this class being a 116 * memory leak. 117 * 118 * @param store the file store 119 * @param cache the cache to use as 1st level cache 120 */ 121 public static IdempotentRepository<String> fileIdempotentRepository(File store, Map<String, Object> cache) { 122 return new FileIdempotentRepository(store, cache); 123 } 124 125 @ManagedOperation(description = "Adds the key to the store") 126 public boolean add(String key) { 127 synchronized (cache) { 128 if (cache.containsKey(key)) { 129 return false; 130 } else { 131 // always register the most used keys in the LRUCache 132 cache.put(key, key); 133 134 // now check the file store 135 boolean containsInFile = containsStore(key); 136 if (containsInFile) { 137 return false; 138 } 139 140 // its a new key so append to file store 141 appendToStore(key); 142 143 // check if we hit maximum capacity (if enabled) and report a warning about this 144 if (maxFileStoreSize > 0 && fileStore.length() > maxFileStoreSize) { 145 LOG.warn("Maximum capacity of file store: {} hit at {} bytes. Dropping {} oldest entries from the file store", fileStore, maxFileStoreSize, dropOldestFileStore); 146 trunkStore(); 147 } 148 149 return true; 150 } 151 } 152 } 153 154 @ManagedOperation(description = "Does the store contain the given key") 155 public boolean contains(String key) { 156 synchronized (cache) { 157 // check 1st-level first and then fallback to check the actual file 158 return cache.containsKey(key) || containsStore(key); 159 } 160 } 161 162 @ManagedOperation(description = "Remove the key from the store") 163 public boolean remove(String key) { 164 boolean answer; 165 synchronized (cache) { 166 answer = cache.remove(key) != null; 167 // remove from file cache also 168 removeFromStore(key); 169 } 170 return answer; 171 } 172 173 public boolean confirm(String key) { 174 // noop 175 return true; 176 } 177 178 @ManagedOperation(description = "Clear the store (danger this removes all entries)") 179 public void clear() { 180 synchronized (cache) { 181 cache.clear(); 182 if (cache instanceof LRUCache) { 183 ((LRUCache) cache).cleanUp(); 184 } 185 // clear file store 186 clearStore(); 187 } 188 } 189 190 public File getFileStore() { 191 return fileStore; 192 } 193 194 public void setFileStore(File fileStore) { 195 this.fileStore = fileStore; 196 } 197 198 @ManagedAttribute(description = "The file path for the store") 199 public String getFilePath() { 200 return fileStore.getPath(); 201 } 202 203 public Map<String, Object> getCache() { 204 return cache; 205 } 206 207 public void setCache(Map<String, Object> cache) { 208 this.cache = cache; 209 } 210 211 @ManagedAttribute(description = "The maximum file size for the file store in bytes") 212 public long getMaxFileStoreSize() { 213 return maxFileStoreSize; 214 } 215 216 /** 217 * Sets the maximum file size for the file store in bytes. 218 * You can set the value to 0 or negative to turn this off, and have unlimited file store size. 219 * <p/> 220 * The default is 32mb. 221 */ 222 @ManagedAttribute(description = "The maximum file size for the file store in bytes") 223 public void setMaxFileStoreSize(long maxFileStoreSize) { 224 this.maxFileStoreSize = maxFileStoreSize; 225 } 226 227 public long getDropOldestFileStore() { 228 return dropOldestFileStore; 229 } 230 231 /** 232 * Sets the number of oldest entries to drop from the file store when the maximum capacity is hit to reduce 233 * disk space to allow room for new entries. 234 * <p/> 235 * The default is 1000. 236 */ 237 @ManagedAttribute(description = "Number of oldest elements to drop from file store if maximum file size reached") 238 public void setDropOldestFileStore(long dropOldestFileStore) { 239 this.dropOldestFileStore = dropOldestFileStore; 240 } 241 242 /** 243 * Sets the 1st-level cache size. 244 * 245 * Setting cache size is only possible when using the default {@link LRUCache} cache implementation. 246 */ 247 @SuppressWarnings("unchecked") 248 public void setCacheSize(int size) { 249 if (cache != null && !(cache instanceof LRUCache)) { 250 throw new IllegalArgumentException("Setting cache size is only possible when using the default LRUCache cache implementation"); 251 } 252 if (cache != null) { 253 cache.clear(); 254 } 255 cache = LRUCacheFactory.newLRUCache(size); 256 } 257 258 @ManagedAttribute(description = "The current 1st-level cache size") 259 public int getCacheSize() { 260 if (cache != null) { 261 return cache.size(); 262 } 263 return 0; 264 } 265 266 /** 267 * Reset and clears the 1st-level cache to force it to reload from file 268 */ 269 @ManagedOperation(description = "Reset and reloads the file store") 270 public synchronized void reset() throws IOException { 271 synchronized (cache) { 272 // run the cleanup task first 273 if (cache instanceof LRUCache) { 274 ((LRUCache) cache).cleanUp(); 275 } 276 cache.clear(); 277 loadStore(); 278 } 279 } 280 281 /** 282 * Checks the file store if the key exists 283 * 284 * @param key the key 285 * @return <tt>true</tt> if exists in the file, <tt>false</tt> otherwise 286 */ 287 protected boolean containsStore(final String key) { 288 if (fileStore == null || !fileStore.exists()) { 289 return false; 290 } 291 292 try (Scanner scanner = new Scanner(fileStore, null, STORE_DELIMITER)) { 293 while (scanner.hasNext()) { 294 String line = scanner.next(); 295 if (line.equals(key)) { 296 return true; 297 } 298 } 299 } catch (IOException e) { 300 throw ObjectHelper.wrapRuntimeCamelException(e); 301 } 302 return false; 303 } 304 305 /** 306 * Appends the given key to the file store 307 * 308 * @param key the key 309 */ 310 protected void appendToStore(final String key) { 311 LOG.debug("Appending: {} to idempotent filestore: {}", key, fileStore); 312 FileOutputStream fos = null; 313 try { 314 // create store parent directory if missing 315 File storeParentDirectory = fileStore.getParentFile(); 316 if (storeParentDirectory != null && !storeParentDirectory.exists()) { 317 LOG.info("Parent directory of file store {} doesn't exist. Creating.", fileStore); 318 if (fileStore.getParentFile().mkdirs()) { 319 LOG.info("Parent directory of filestore: {} successfully created.", fileStore); 320 } else { 321 LOG.warn("Parent directory of filestore: {} cannot be created.", fileStore); 322 } 323 } 324 // create store if missing 325 if (!fileStore.exists()) { 326 FileUtil.createNewFile(fileStore); 327 } 328 // append to store 329 fos = new FileOutputStream(fileStore, true); 330 fos.write(key.getBytes()); 331 fos.write(STORE_DELIMITER.getBytes()); 332 } catch (IOException e) { 333 throw ObjectHelper.wrapRuntimeCamelException(e); 334 } finally { 335 IOHelper.close(fos, "Appending to file idempotent repository", LOG); 336 } 337 } 338 339 protected synchronized void removeFromStore(String key) { 340 LOG.debug("Removing: {} from idempotent filestore: {}", key, fileStore); 341 342 // we need to re-load the entire file and remove the key and then re-write the file 343 List<String> lines = new ArrayList<>(); 344 345 boolean found = false; 346 Scanner scanner = null; 347 try { 348 scanner = new Scanner(fileStore, null, STORE_DELIMITER); 349 while (scanner.hasNext()) { 350 String line = scanner.next(); 351 if (key.equals(line)) { 352 found = true; 353 } else { 354 lines.add(line); 355 } 356 } 357 } catch (IOException e) { 358 throw ObjectHelper.wrapRuntimeCamelException(e); 359 } finally { 360 if (scanner != null) { 361 scanner.close(); 362 } 363 } 364 365 if (found) { 366 // rewrite file 367 LOG.debug("Rewriting idempotent filestore: {} due to key: {} removed", fileStore, key); 368 FileOutputStream fos = null; 369 try { 370 fos = new FileOutputStream(fileStore); 371 for (String line : lines) { 372 fos.write(line.getBytes()); 373 fos.write(STORE_DELIMITER.getBytes()); 374 } 375 } catch (IOException e) { 376 throw ObjectHelper.wrapRuntimeCamelException(e); 377 } finally { 378 IOHelper.close(fos, "Rewriting file idempotent repository", LOG); 379 } 380 } 381 } 382 383 /** 384 * Clears the file-store (danger this deletes all entries) 385 */ 386 protected void clearStore() { 387 try { 388 FileUtil.deleteFile(fileStore); 389 FileUtil.createNewFile(fileStore); 390 } catch (IOException e) { 391 throw ObjectHelper.wrapRuntimeCamelException(e); 392 } 393 } 394 395 /** 396 * Trunks the file store when the max store size is hit by dropping the most oldest entries. 397 */ 398 protected synchronized void trunkStore() { 399 if (fileStore == null || !fileStore.exists()) { 400 return; 401 } 402 403 LOG.debug("Trunking: {} oldest entries from idempotent filestore: {}", dropOldestFileStore, fileStore); 404 405 // we need to re-load the entire file and remove the key and then re-write the file 406 List<String> lines = new ArrayList<>(); 407 408 Scanner scanner = null; 409 int count = 0; 410 try { 411 scanner = new Scanner(fileStore, null, STORE_DELIMITER); 412 while (scanner.hasNext()) { 413 String line = scanner.next(); 414 count++; 415 if (count > dropOldestFileStore) { 416 lines.add(line); 417 } 418 } 419 } catch (IOException e) { 420 throw ObjectHelper.wrapRuntimeCamelException(e); 421 } finally { 422 if (scanner != null) { 423 scanner.close(); 424 } 425 } 426 427 if (!lines.isEmpty()) { 428 // rewrite file 429 LOG.debug("Rewriting idempotent filestore: {} with {} entries:", fileStore, lines.size()); 430 FileOutputStream fos = null; 431 try { 432 fos = new FileOutputStream(fileStore); 433 for (String line : lines) { 434 fos.write(line.getBytes()); 435 fos.write(STORE_DELIMITER.getBytes()); 436 } 437 } catch (IOException e) { 438 throw ObjectHelper.wrapRuntimeCamelException(e); 439 } finally { 440 IOHelper.close(fos, "Rewriting file idempotent repository", LOG); 441 } 442 } else { 443 // its a small file so recreate the file 444 LOG.debug("Clearing idempotent filestore: {}", fileStore); 445 clearStore(); 446 } 447 } 448 449 /** 450 * Cleanup the 1st-level cache. 451 */ 452 protected void cleanup() { 453 // run the cleanup task first 454 if (cache instanceof LRUCache) { 455 ((LRUCache) cache).cleanUp(); 456 } 457 } 458 459 /** 460 * Loads the given file store into the 1st level cache 461 */ 462 protected void loadStore() throws IOException { 463 // auto create starting directory if needed 464 if (!fileStore.exists()) { 465 LOG.debug("Creating filestore: {}", fileStore); 466 File parent = fileStore.getParentFile(); 467 if (parent != null) { 468 parent.mkdirs(); 469 } 470 boolean created = FileUtil.createNewFile(fileStore); 471 if (!created) { 472 throw new IOException("Cannot create filestore: " + fileStore); 473 } 474 } 475 476 LOG.trace("Loading to 1st level cache from idempotent filestore: {}", fileStore); 477 478 cache.clear(); 479 try (Scanner scanner = new Scanner(fileStore, null, STORE_DELIMITER)) { 480 while (scanner.hasNext()) { 481 String line = scanner.next(); 482 cache.put(line, line); 483 } 484 } catch (IOException e) { 485 throw ObjectHelper.wrapRuntimeCamelException(e); 486 } 487 488 LOG.debug("Loaded {} to the 1st level cache from idempotent filestore: {}", cache.size(), fileStore); 489 } 490 491 @Override 492 @SuppressWarnings("unchecked") 493 protected void doStart() throws Exception { 494 ObjectHelper.notNull(fileStore, "fileStore", this); 495 496 if (this.cache == null) { 497 // default use a 1st level cache 498 this.cache = LRUCacheFactory.newLRUCache(1000); 499 } 500 501 // init store if not loaded before 502 if (init.compareAndSet(false, true)) { 503 loadStore(); 504 } 505 } 506 507 @Override 508 protected void doStop() throws Exception { 509 // run the cleanup task first 510 if (cache instanceof LRUCache) { 511 ((LRUCache) cache).cleanUp(); 512 } 513 514 cache.clear(); 515 init.set(false); 516 } 517 518}