001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.store.kahadb; 019 020import java.io.File; 021import java.io.IOException; 022import java.util.Date; 023import java.util.concurrent.atomic.AtomicBoolean; 024import java.util.concurrent.atomic.AtomicLong; 025import java.util.concurrent.locks.ReentrantReadWriteLock; 026 027import org.apache.activemq.broker.LockableServiceSupport; 028import org.apache.activemq.broker.Locker; 029import org.apache.activemq.store.SharedFileLocker; 030import org.apache.activemq.store.kahadb.data.KahaEntryType; 031import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 032import org.apache.activemq.store.kahadb.disk.journal.Journal; 033import org.apache.activemq.store.kahadb.disk.journal.Location; 034import org.apache.activemq.store.kahadb.disk.page.PageFile; 035import org.apache.activemq.store.kahadb.disk.page.Transaction; 036import org.apache.activemq.util.ByteSequence; 037import org.apache.activemq.util.DataByteArrayInputStream; 038import org.apache.activemq.util.DataByteArrayOutputStream; 039import org.apache.activemq.util.IOHelper; 040import org.apache.activemq.util.ServiceStopper; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044public abstract class AbstractKahaDBStore extends LockableServiceSupport { 045 046 static final Logger LOG = LoggerFactory.getLogger(AbstractKahaDBStore.class); 047 048 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 049 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); 050 051 protected File directory; 052 protected PageFile pageFile; 053 protected Journal journal; 054 protected AtomicLong journalSize = new AtomicLong(0); 055 protected boolean failIfDatabaseIsLocked; 056 protected long checkpointInterval = 5*1000; 057 protected long cleanupInterval = 30*1000; 058 private boolean cleanupOnStop = true; 059 protected boolean checkForCorruptJournalFiles = false; 060 protected boolean checksumJournalFiles = true; 061 protected boolean forceRecoverIndex = false; 062 protected int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 063 protected int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 064 protected boolean archiveCorruptedIndex = false; 065 protected boolean enableIndexWriteAsync = false; 066 protected boolean enableJournalDiskSyncs = false; 067 protected boolean deleteAllJobs = false; 068 protected int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 069 protected boolean useIndexLFRUEviction = false; 070 protected float indexLFUEvictionFactor = 0.2f; 071 protected boolean ignoreMissingJournalfiles = false; 072 protected int indexCacheSize = 1000; 073 protected boolean enableIndexDiskSyncs = true; 074 protected boolean enableIndexRecoveryFile = true; 075 protected boolean enableIndexPageCaching = true; 076 protected boolean archiveDataLogs; 077 protected boolean purgeStoreOnStartup; 078 protected File directoryArchive; 079 080 protected AtomicBoolean opened = new AtomicBoolean(); 081 protected Thread checkpointThread; 082 protected final Object checkpointThreadLock = new Object(); 083 protected ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); 084 protected ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 085 086 /** 087 * @return the name to give this store's PageFile instance. 088 */ 089 protected abstract String getPageFileName(); 090 091 /** 092 * @return the location of the data directory if no set by configuration. 093 */ 094 protected abstract File getDefaultDataDirectory(); 095 096 /** 097 * Loads the store from disk. 098 * 099 * Based on configuration this method can either load an existing store or it can purge 100 * an existing store and start in a clean state. 101 * 102 * @throws IOException if an error occurs during the load. 103 */ 104 public abstract void load() throws IOException; 105 106 /** 107 * Unload the state of the Store to disk and shuts down all resources assigned to this 108 * KahaDB store implementation. 109 * 110 * @throws IOException if an error occurs during the store unload. 111 */ 112 public abstract void unload() throws IOException; 113 114 @Override 115 protected void doStart() throws Exception { 116 this.indexLock.writeLock().lock(); 117 if (getDirectory() == null) { 118 setDirectory(getDefaultDataDirectory()); 119 } 120 IOHelper.mkdirs(getDirectory()); 121 try { 122 if (isPurgeStoreOnStartup()) { 123 getJournal().start(); 124 getJournal().delete(); 125 getJournal().close(); 126 journal = null; 127 getPageFile().delete(); 128 LOG.info("{} Persistence store purged.", this); 129 setPurgeStoreOnStartup(false); 130 } 131 132 load(); 133 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 134 } finally { 135 this.indexLock.writeLock().unlock(); 136 } 137 } 138 139 @Override 140 protected void doStop(ServiceStopper stopper) throws Exception { 141 unload(); 142 } 143 144 public PageFile getPageFile() { 145 if (pageFile == null) { 146 pageFile = createPageFile(); 147 } 148 return pageFile; 149 } 150 151 public Journal getJournal() throws IOException { 152 if (journal == null) { 153 journal = createJournal(); 154 } 155 return journal; 156 } 157 158 public File getDirectory() { 159 return directory; 160 } 161 162 public void setDirectory(File directory) { 163 this.directory = directory; 164 } 165 166 public boolean isArchiveCorruptedIndex() { 167 return archiveCorruptedIndex; 168 } 169 170 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 171 this.archiveCorruptedIndex = archiveCorruptedIndex; 172 } 173 174 public boolean isFailIfDatabaseIsLocked() { 175 return failIfDatabaseIsLocked; 176 } 177 178 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 179 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 180 } 181 182 public boolean isCheckForCorruptJournalFiles() { 183 return checkForCorruptJournalFiles; 184 } 185 186 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 187 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 188 } 189 190 public long getCheckpointInterval() { 191 return checkpointInterval; 192 } 193 194 public void setCheckpointInterval(long checkpointInterval) { 195 this.checkpointInterval = checkpointInterval; 196 } 197 198 public long getCleanupInterval() { 199 return cleanupInterval; 200 } 201 202 public void setCleanupInterval(long cleanupInterval) { 203 this.cleanupInterval = cleanupInterval; 204 } 205 206 public void setCleanupOnStop(boolean cleanupOnStop) { 207 this.cleanupOnStop = cleanupOnStop; 208 } 209 210 public boolean getCleanupOnStop() { 211 return this.cleanupOnStop; 212 } 213 214 public boolean isChecksumJournalFiles() { 215 return checksumJournalFiles; 216 } 217 218 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 219 this.checksumJournalFiles = checksumJournalFiles; 220 } 221 222 public boolean isForceRecoverIndex() { 223 return forceRecoverIndex; 224 } 225 226 public void setForceRecoverIndex(boolean forceRecoverIndex) { 227 this.forceRecoverIndex = forceRecoverIndex; 228 } 229 230 public int getJournalMaxFileLength() { 231 return journalMaxFileLength; 232 } 233 234 public void setJournalMaxFileLength(int journalMaxFileLength) { 235 this.journalMaxFileLength = journalMaxFileLength; 236 } 237 238 public int getJournalMaxWriteBatchSize() { 239 return journalMaxWriteBatchSize; 240 } 241 242 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 243 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 244 } 245 246 public boolean isEnableIndexWriteAsync() { 247 return enableIndexWriteAsync; 248 } 249 250 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 251 this.enableIndexWriteAsync = enableIndexWriteAsync; 252 } 253 254 public boolean isEnableJournalDiskSyncs() { 255 return enableJournalDiskSyncs; 256 } 257 258 public void setEnableJournalDiskSyncs(boolean syncWrites) { 259 this.enableJournalDiskSyncs = syncWrites; 260 } 261 262 public boolean isDeleteAllJobs() { 263 return deleteAllJobs; 264 } 265 266 public void setDeleteAllJobs(boolean deleteAllJobs) { 267 this.deleteAllJobs = deleteAllJobs; 268 } 269 270 /** 271 * @return the archiveDataLogs 272 */ 273 public boolean isArchiveDataLogs() { 274 return this.archiveDataLogs; 275 } 276 277 /** 278 * @param archiveDataLogs the archiveDataLogs to set 279 */ 280 public void setArchiveDataLogs(boolean archiveDataLogs) { 281 this.archiveDataLogs = archiveDataLogs; 282 } 283 284 /** 285 * @return the directoryArchive 286 */ 287 public File getDirectoryArchive() { 288 return this.directoryArchive; 289 } 290 291 /** 292 * @param directoryArchive the directoryArchive to set 293 */ 294 public void setDirectoryArchive(File directoryArchive) { 295 this.directoryArchive = directoryArchive; 296 } 297 298 public int getIndexCacheSize() { 299 return indexCacheSize; 300 } 301 302 public void setIndexCacheSize(int indexCacheSize) { 303 this.indexCacheSize = indexCacheSize; 304 } 305 306 public int getIndexWriteBatchSize() { 307 return indexWriteBatchSize; 308 } 309 310 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 311 this.indexWriteBatchSize = indexWriteBatchSize; 312 } 313 314 public boolean isUseIndexLFRUEviction() { 315 return useIndexLFRUEviction; 316 } 317 318 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 319 this.useIndexLFRUEviction = useIndexLFRUEviction; 320 } 321 322 public float getIndexLFUEvictionFactor() { 323 return indexLFUEvictionFactor; 324 } 325 326 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 327 this.indexLFUEvictionFactor = indexLFUEvictionFactor; 328 } 329 330 public boolean isEnableIndexDiskSyncs() { 331 return enableIndexDiskSyncs; 332 } 333 334 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { 335 this.enableIndexDiskSyncs = enableIndexDiskSyncs; 336 } 337 338 public boolean isEnableIndexRecoveryFile() { 339 return enableIndexRecoveryFile; 340 } 341 342 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { 343 this.enableIndexRecoveryFile = enableIndexRecoveryFile; 344 } 345 346 public boolean isEnableIndexPageCaching() { 347 return enableIndexPageCaching; 348 } 349 350 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { 351 this.enableIndexPageCaching = enableIndexPageCaching; 352 } 353 354 public boolean isPurgeStoreOnStartup() { 355 return this.purgeStoreOnStartup; 356 } 357 358 public void setPurgeStoreOnStartup(boolean purge) { 359 this.purgeStoreOnStartup = purge; 360 } 361 362 public boolean isIgnoreMissingJournalfiles() { 363 return ignoreMissingJournalfiles; 364 } 365 366 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 367 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 368 } 369 370 public long size() { 371 if (!isStarted()) { 372 return 0; 373 } 374 try { 375 return journalSize.get() + pageFile.getDiskSize(); 376 } catch (IOException e) { 377 throw new RuntimeException(e); 378 } 379 } 380 381 @Override 382 public Locker createDefaultLocker() throws IOException { 383 SharedFileLocker locker = new SharedFileLocker(); 384 locker.setDirectory(this.getDirectory()); 385 return locker; 386 } 387 388 @Override 389 public void init() throws Exception { 390 } 391 392 /** 393 * Store a command in the Journal and process to update the Store index. 394 * 395 * @param command 396 * The specific JournalCommand to store and process. 397 * 398 * @returns the Location where the data was written in the Journal. 399 * 400 * @throws IOException if an error occurs storing or processing the command. 401 */ 402 public Location store(JournalCommand<?> command) throws IOException { 403 return store(command, isEnableIndexDiskSyncs(), null, null, null); 404 } 405 406 /** 407 * Store a command in the Journal and process to update the Store index. 408 * 409 * @param command 410 * The specific JournalCommand to store and process. 411 * @param sync 412 * Should the store operation be done synchronously. (ignored if completion passed). 413 * 414 * @returns the Location where the data was written in the Journal. 415 * 416 * @throws IOException if an error occurs storing or processing the command. 417 */ 418 public Location store(JournalCommand<?> command, boolean sync) throws IOException { 419 return store(command, sync, null, null, null); 420 } 421 422 /** 423 * Store a command in the Journal and process to update the Store index. 424 * 425 * @param command 426 * The specific JournalCommand to store and process. 427 * @param onJournalStoreComplete 428 * The Runnable to call when the Journal write operation completes. 429 * 430 * @returns the Location where the data was written in the Journal. 431 * 432 * @throws IOException if an error occurs storing or processing the command. 433 */ 434 public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException { 435 return store(command, isEnableIndexDiskSyncs(), null, null, onJournalStoreComplete); 436 } 437 438 /** 439 * Store a command in the Journal and process to update the Store index. 440 * 441 * @param command 442 * The specific JournalCommand to store and process. 443 * @param sync 444 * Should the store operation be done synchronously. (ignored if completion passed). 445 * @param before 446 * The Runnable instance to execute before performing the store and process operation. 447 * @param after 448 * The Runnable instance to execute after performing the store and process operation. 449 * 450 * @returns the Location where the data was written in the Journal. 451 * 452 * @throws IOException if an error occurs storing or processing the command. 453 */ 454 public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException { 455 return store(command, sync, before, after, null); 456 } 457 458 /** 459 * All updated are are funneled through this method. The updates are converted to a 460 * JournalMessage which is logged to the journal and then the data from the JournalMessage 461 * is used to update the index just like it would be done during a recovery process. 462 * 463 * @param command 464 * The specific JournalCommand to store and process. 465 * @param sync 466 * Should the store operation be done synchronously. (ignored if completion passed). 467 * @param before 468 * The Runnable instance to execute before performing the store and process operation. 469 * @param after 470 * The Runnable instance to execute after performing the store and process operation. 471 * @param onJournalStoreComplete 472 * Callback to be run when the journal write operation is complete. 473 * 474 * @returns the Location where the data was written in the Journal. 475 * 476 * @throws IOException if an error occurs storing or processing the command. 477 */ 478 public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException { 479 try { 480 481 if (before != null) { 482 before.run(); 483 } 484 485 ByteSequence sequence = toByteSequence(command); 486 Location location; 487 checkpointLock.readLock().lock(); 488 try { 489 490 long start = System.currentTimeMillis(); 491 location = onJournalStoreComplete == null ? journal.write(sequence, sync) : 492 journal.write(sequence, onJournalStoreComplete); 493 long start2 = System.currentTimeMillis(); 494 495 process(command, location); 496 497 long end = System.currentTimeMillis(); 498 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 499 LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms", 500 (start2-start), (end-start2)); 501 } 502 } finally { 503 checkpointLock.readLock().unlock(); 504 } 505 506 if (after != null) { 507 after.run(); 508 } 509 510 if (checkpointThread != null && !checkpointThread.isAlive()) { 511 startCheckpoint(); 512 } 513 return location; 514 } catch (IOException ioe) { 515 LOG.error("KahaDB failed to store to Journal", ioe); 516 if (brokerService != null) { 517 brokerService.handleIOException(ioe); 518 } 519 throw ioe; 520 } 521 } 522 523 /** 524 * Loads a previously stored JournalMessage 525 * 526 * @param location 527 * The location of the journal command to read. 528 * 529 * @return a new un-marshaled JournalCommand instance. 530 * 531 * @throws IOException if an error occurs reading the stored command. 532 */ 533 protected JournalCommand<?> load(Location location) throws IOException { 534 ByteSequence data = journal.read(location); 535 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 536 byte readByte = is.readByte(); 537 KahaEntryType type = KahaEntryType.valueOf(readByte); 538 if (type == null) { 539 try { 540 is.close(); 541 } catch (IOException e) { 542 } 543 throw new IOException("Could not load journal record. Invalid location: " + location); 544 } 545 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 546 message.mergeFramed(is); 547 return message; 548 } 549 550 /** 551 * Process a stored or recovered JournalCommand instance and update the DB Index with the 552 * state changes that this command produces. This can be called either as a new DB operation 553 * or as a replay during recovery operations. 554 * 555 * @param command 556 * The JournalCommand to process. 557 * @param location 558 * The location in the Journal where the command was written or read from. 559 */ 560 protected abstract void process(JournalCommand<?> command, Location location) throws IOException; 561 562 /** 563 * Perform a checkpoint operation with optional cleanup. 564 * 565 * Called by the checkpoint background thread periodically to initiate a checkpoint operation 566 * and if the cleanup flag is set a cleanup sweep should be done to allow for release of no 567 * longer needed journal log files etc. 568 * 569 * @param cleanup 570 * Should the method do a simple checkpoint or also perform a journal cleanup. 571 * 572 * @throws IOException if an error occurs during the checkpoint operation. 573 */ 574 protected void checkpointUpdate(final boolean cleanup) throws IOException { 575 checkpointLock.writeLock().lock(); 576 try { 577 this.indexLock.writeLock().lock(); 578 try { 579 pageFile.tx().execute(new Transaction.Closure<IOException>() { 580 @Override 581 public void execute(Transaction tx) throws IOException { 582 checkpointUpdate(tx, cleanup); 583 } 584 }); 585 } finally { 586 this.indexLock.writeLock().unlock(); 587 } 588 589 } finally { 590 checkpointLock.writeLock().unlock(); 591 } 592 } 593 594 /** 595 * Perform the checkpoint update operation. If the cleanup flag is true then the 596 * operation should also purge any unused Journal log files. 597 * 598 * This method must always be called with the checkpoint and index write locks held. 599 * 600 * @param tx 601 * The TX under which to perform the checkpoint update. 602 * @param cleanup 603 * Should the checkpoint also do unused Journal file cleanup. 604 * 605 * @throws IOException if an error occurs while performing the checkpoint. 606 */ 607 protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException; 608 609 /** 610 * Creates a new ByteSequence that represents the marshaled form of the given Journal Command. 611 * 612 * @param command 613 * The Journal Command that should be marshaled to bytes for writing. 614 * 615 * @return the byte representation of the given journal command. 616 * 617 * @throws IOException if an error occurs while serializing the command. 618 */ 619 protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { 620 int size = data.serializedSizeFramed(); 621 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 622 os.writeByte(data.type().getNumber()); 623 data.writeFramed(os); 624 return os.toByteSequence(); 625 } 626 627 /** 628 * Create the PageFile instance and configure it using the configuration options 629 * currently set. 630 * 631 * @return the newly created and configured PageFile instance. 632 */ 633 protected PageFile createPageFile() { 634 PageFile index = new PageFile(getDirectory(), getPageFileName()); 635 index.setEnableWriteThread(isEnableIndexWriteAsync()); 636 index.setWriteBatchSize(getIndexWriteBatchSize()); 637 index.setPageCacheSize(getIndexCacheSize()); 638 index.setUseLFRUEviction(isUseIndexLFRUEviction()); 639 index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); 640 index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); 641 index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); 642 index.setEnablePageCaching(isEnableIndexPageCaching()); 643 return index; 644 } 645 646 /** 647 * Create a new Journal instance and configure it using the currently set configuration 648 * options. If an archive directory is configured than this method will attempt to create 649 * that directory if it does not already exist. 650 * 651 * @return the newly created an configured Journal instance. 652 * 653 * @throws IOException if an error occurs while creating the Journal object. 654 */ 655 protected Journal createJournal() throws IOException { 656 Journal manager = new Journal(); 657 manager.setDirectory(getDirectory()); 658 manager.setMaxFileLength(getJournalMaxFileLength()); 659 manager.setCheckForCorruptionOnStartup(isCheckForCorruptJournalFiles()); 660 manager.setChecksum(isChecksumJournalFiles() || isCheckForCorruptJournalFiles()); 661 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 662 manager.setArchiveDataLogs(isArchiveDataLogs()); 663 manager.setSizeAccumulator(journalSize); 664 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); 665 if (getDirectoryArchive() != null) { 666 IOHelper.mkdirs(getDirectoryArchive()); 667 manager.setDirectoryArchive(getDirectoryArchive()); 668 } 669 return manager; 670 } 671 672 /** 673 * Starts the checkpoint Thread instance if not already running and not disabled 674 * by configuration. 675 */ 676 protected void startCheckpoint() { 677 if (checkpointInterval == 0 && cleanupInterval == 0) { 678 LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart"); 679 return; 680 } 681 synchronized (checkpointThreadLock) { 682 boolean start = false; 683 if (checkpointThread == null) { 684 start = true; 685 } else if (!checkpointThread.isAlive()) { 686 start = true; 687 LOG.info("KahaDB: Recovering checkpoint thread after death"); 688 } 689 if (start) { 690 checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") { 691 @Override 692 public void run() { 693 try { 694 long lastCleanup = System.currentTimeMillis(); 695 long lastCheckpoint = System.currentTimeMillis(); 696 // Sleep for a short time so we can periodically check 697 // to see if we need to exit this thread. 698 long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); 699 while (opened.get()) { 700 Thread.sleep(sleepTime); 701 long now = System.currentTimeMillis(); 702 if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) { 703 checkpointCleanup(true); 704 lastCleanup = now; 705 lastCheckpoint = now; 706 } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) { 707 checkpointCleanup(false); 708 lastCheckpoint = now; 709 } 710 } 711 } catch (InterruptedException e) { 712 // Looks like someone really wants us to exit this thread... 713 } catch (IOException ioe) { 714 LOG.error("Checkpoint failed", ioe); 715 brokerService.handleIOException(ioe); 716 } 717 } 718 }; 719 720 checkpointThread.setDaemon(true); 721 checkpointThread.start(); 722 } 723 } 724 } 725 726 /** 727 * Called from the worker thread to start a checkpoint. 728 * 729 * This method ensure that the store is in an opened state and optionaly logs information 730 * related to slow store access times. 731 * 732 * @param cleanup 733 * Should a cleanup of the journal occur during the checkpoint operation. 734 * 735 * @throws IOException if an error occurs during the checkpoint operation. 736 */ 737 protected void checkpointCleanup(final boolean cleanup) throws IOException { 738 long start; 739 this.indexLock.writeLock().lock(); 740 try { 741 start = System.currentTimeMillis(); 742 if (!opened.get()) { 743 return; 744 } 745 } finally { 746 this.indexLock.writeLock().unlock(); 747 } 748 checkpointUpdate(cleanup); 749 long end = System.currentTimeMillis(); 750 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 751 LOG.info("Slow KahaDB access: cleanup took {}", (end - start)); 752 } 753 } 754}