001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.store.kahadb;
019
020import java.io.File;
021import java.io.IOException;
022import java.util.Date;
023import java.util.concurrent.atomic.AtomicBoolean;
024import java.util.concurrent.atomic.AtomicLong;
025import java.util.concurrent.locks.ReentrantReadWriteLock;
026
027import org.apache.activemq.broker.LockableServiceSupport;
028import org.apache.activemq.broker.Locker;
029import org.apache.activemq.store.SharedFileLocker;
030import org.apache.activemq.store.kahadb.data.KahaEntryType;
031import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
032import org.apache.activemq.store.kahadb.disk.journal.Journal;
033import org.apache.activemq.store.kahadb.disk.journal.Location;
034import org.apache.activemq.store.kahadb.disk.page.PageFile;
035import org.apache.activemq.store.kahadb.disk.page.Transaction;
036import org.apache.activemq.util.ByteSequence;
037import org.apache.activemq.util.DataByteArrayInputStream;
038import org.apache.activemq.util.DataByteArrayOutputStream;
039import org.apache.activemq.util.IOHelper;
040import org.apache.activemq.util.ServiceStopper;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044public abstract class AbstractKahaDBStore extends LockableServiceSupport {
045
046    static final Logger LOG = LoggerFactory.getLogger(AbstractKahaDBStore.class);
047
048    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
049    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
050
051    protected File directory;
052    protected PageFile pageFile;
053    protected Journal journal;
054    protected AtomicLong journalSize = new AtomicLong(0);
055    protected boolean failIfDatabaseIsLocked;
056    protected long checkpointInterval = 5*1000;
057    protected long cleanupInterval = 30*1000;
058    private boolean cleanupOnStop = true;
059    protected boolean checkForCorruptJournalFiles = false;
060    protected boolean checksumJournalFiles = true;
061    protected boolean forceRecoverIndex = false;
062    protected int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
063    protected int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
064    protected boolean archiveCorruptedIndex = false;
065    protected boolean enableIndexWriteAsync = false;
066    protected boolean enableJournalDiskSyncs = false;
067    protected boolean deleteAllJobs = false;
068    protected int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
069    protected boolean useIndexLFRUEviction = false;
070    protected float indexLFUEvictionFactor = 0.2f;
071    protected boolean ignoreMissingJournalfiles = false;
072    protected int indexCacheSize = 1000;
073    protected boolean enableIndexDiskSyncs = true;
074    protected boolean enableIndexRecoveryFile = true;
075    protected boolean enableIndexPageCaching = true;
076    protected boolean archiveDataLogs;
077    protected boolean purgeStoreOnStartup;
078    protected File directoryArchive;
079
080    protected AtomicBoolean opened = new AtomicBoolean();
081    protected Thread checkpointThread;
082    protected final Object checkpointThreadLock = new Object();
083    protected ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
084    protected ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
085
086    /**
087     * @return the name to give this store's PageFile instance.
088     */
089    protected abstract String getPageFileName();
090
091    /**
092     * @return the location of the data directory if no set by configuration.
093     */
094    protected abstract File getDefaultDataDirectory();
095
096    /**
097     * Loads the store from disk.
098     *
099     * Based on configuration this method can either load an existing store or it can purge
100     * an existing store and start in a clean state.
101     *
102     * @throws IOException if an error occurs during the load.
103     */
104    public abstract void load() throws IOException;
105
106    /**
107     * Unload the state of the Store to disk and shuts down all resources assigned to this
108     * KahaDB store implementation.
109     *
110     * @throws IOException if an error occurs during the store unload.
111     */
112    public abstract void unload() throws IOException;
113
114    @Override
115    protected void doStart() throws Exception {
116        this.indexLock.writeLock().lock();
117        if (getDirectory() == null) {
118            setDirectory(getDefaultDataDirectory());
119        }
120        IOHelper.mkdirs(getDirectory());
121        try {
122            if (isPurgeStoreOnStartup()) {
123                getJournal().start();
124                getJournal().delete();
125                getJournal().close();
126                journal = null;
127                getPageFile().delete();
128                LOG.info("{} Persistence store purged.", this);
129                setPurgeStoreOnStartup(false);
130            }
131
132            load();
133            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
134        } finally {
135            this.indexLock.writeLock().unlock();
136        }
137    }
138
139    @Override
140    protected void doStop(ServiceStopper stopper) throws Exception {
141        unload();
142    }
143
144    public PageFile getPageFile() {
145        if (pageFile == null) {
146            pageFile = createPageFile();
147        }
148        return pageFile;
149    }
150
151    public Journal getJournal() throws IOException {
152        if (journal == null) {
153            journal = createJournal();
154        }
155        return journal;
156    }
157
158    public File getDirectory() {
159        return directory;
160    }
161
162    public void setDirectory(File directory) {
163        this.directory = directory;
164    }
165
166    public boolean isArchiveCorruptedIndex() {
167        return archiveCorruptedIndex;
168    }
169
170    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
171        this.archiveCorruptedIndex = archiveCorruptedIndex;
172    }
173
174    public boolean isFailIfDatabaseIsLocked() {
175        return failIfDatabaseIsLocked;
176    }
177
178    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
179        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
180    }
181
182    public boolean isCheckForCorruptJournalFiles() {
183        return checkForCorruptJournalFiles;
184    }
185
186    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
187        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
188    }
189
190    public long getCheckpointInterval() {
191        return checkpointInterval;
192    }
193
194    public void setCheckpointInterval(long checkpointInterval) {
195        this.checkpointInterval = checkpointInterval;
196    }
197
198    public long getCleanupInterval() {
199        return cleanupInterval;
200    }
201
202    public void setCleanupInterval(long cleanupInterval) {
203        this.cleanupInterval = cleanupInterval;
204    }
205
206    public void setCleanupOnStop(boolean cleanupOnStop) {
207        this.cleanupOnStop = cleanupOnStop;
208    }
209
210    public boolean getCleanupOnStop() {
211        return this.cleanupOnStop;
212    }
213
214    public boolean isChecksumJournalFiles() {
215        return checksumJournalFiles;
216    }
217
218    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
219        this.checksumJournalFiles = checksumJournalFiles;
220    }
221
222    public boolean isForceRecoverIndex() {
223        return forceRecoverIndex;
224    }
225
226    public void setForceRecoverIndex(boolean forceRecoverIndex) {
227        this.forceRecoverIndex = forceRecoverIndex;
228    }
229
230    public int getJournalMaxFileLength() {
231        return journalMaxFileLength;
232    }
233
234    public void setJournalMaxFileLength(int journalMaxFileLength) {
235        this.journalMaxFileLength = journalMaxFileLength;
236    }
237
238    public int getJournalMaxWriteBatchSize() {
239        return journalMaxWriteBatchSize;
240    }
241
242    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
243        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
244    }
245
246    public boolean isEnableIndexWriteAsync() {
247        return enableIndexWriteAsync;
248    }
249
250    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
251        this.enableIndexWriteAsync = enableIndexWriteAsync;
252    }
253
254    public boolean isEnableJournalDiskSyncs() {
255        return enableJournalDiskSyncs;
256    }
257
258    public void setEnableJournalDiskSyncs(boolean syncWrites) {
259        this.enableJournalDiskSyncs = syncWrites;
260    }
261
262    public boolean isDeleteAllJobs() {
263        return deleteAllJobs;
264    }
265
266    public void setDeleteAllJobs(boolean deleteAllJobs) {
267        this.deleteAllJobs = deleteAllJobs;
268    }
269
270    /**
271     * @return the archiveDataLogs
272     */
273    public boolean isArchiveDataLogs() {
274        return this.archiveDataLogs;
275    }
276
277    /**
278     * @param archiveDataLogs the archiveDataLogs to set
279     */
280    public void setArchiveDataLogs(boolean archiveDataLogs) {
281        this.archiveDataLogs = archiveDataLogs;
282    }
283
284    /**
285     * @return the directoryArchive
286     */
287    public File getDirectoryArchive() {
288        return this.directoryArchive;
289    }
290
291    /**
292     * @param directoryArchive the directoryArchive to set
293     */
294    public void setDirectoryArchive(File directoryArchive) {
295        this.directoryArchive = directoryArchive;
296    }
297
298    public int getIndexCacheSize() {
299        return indexCacheSize;
300    }
301
302    public void setIndexCacheSize(int indexCacheSize) {
303        this.indexCacheSize = indexCacheSize;
304    }
305
306    public int getIndexWriteBatchSize() {
307        return indexWriteBatchSize;
308    }
309
310    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
311        this.indexWriteBatchSize = indexWriteBatchSize;
312    }
313
314    public boolean isUseIndexLFRUEviction() {
315        return useIndexLFRUEviction;
316    }
317
318    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
319        this.useIndexLFRUEviction = useIndexLFRUEviction;
320    }
321
322    public float getIndexLFUEvictionFactor() {
323        return indexLFUEvictionFactor;
324    }
325
326    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
327        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
328    }
329
330    public boolean isEnableIndexDiskSyncs() {
331        return enableIndexDiskSyncs;
332    }
333
334    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
335        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
336    }
337
338    public boolean isEnableIndexRecoveryFile() {
339        return enableIndexRecoveryFile;
340    }
341
342    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
343        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
344    }
345
346    public boolean isEnableIndexPageCaching() {
347        return enableIndexPageCaching;
348    }
349
350    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
351        this.enableIndexPageCaching = enableIndexPageCaching;
352    }
353
354    public boolean isPurgeStoreOnStartup() {
355        return this.purgeStoreOnStartup;
356    }
357
358    public void setPurgeStoreOnStartup(boolean purge) {
359        this.purgeStoreOnStartup = purge;
360    }
361
362    public boolean isIgnoreMissingJournalfiles() {
363        return ignoreMissingJournalfiles;
364    }
365
366    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
367        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
368    }
369
370    public long size() {
371        if (!isStarted()) {
372            return 0;
373        }
374        try {
375            return journalSize.get() + pageFile.getDiskSize();
376        } catch (IOException e) {
377            throw new RuntimeException(e);
378        }
379    }
380
381    @Override
382    public Locker createDefaultLocker() throws IOException {
383        SharedFileLocker locker = new SharedFileLocker();
384        locker.setDirectory(this.getDirectory());
385        return locker;
386    }
387
388    @Override
389    public void init() throws Exception {
390    }
391
392    /**
393     * Store a command in the Journal and process to update the Store index.
394     *
395     * @param command
396     *      The specific JournalCommand to store and process.
397     *
398     * @returns the Location where the data was written in the Journal.
399     *
400     * @throws IOException if an error occurs storing or processing the command.
401     */
402    public Location store(JournalCommand<?> command) throws IOException {
403        return store(command, isEnableIndexDiskSyncs(), null, null, null);
404    }
405
406    /**
407     * Store a command in the Journal and process to update the Store index.
408     *
409     * @param command
410     *      The specific JournalCommand to store and process.
411     * @param sync
412     *      Should the store operation be done synchronously. (ignored if completion passed).
413     *
414     * @returns the Location where the data was written in the Journal.
415     *
416     * @throws IOException if an error occurs storing or processing the command.
417     */
418    public Location store(JournalCommand<?> command, boolean sync) throws IOException {
419        return store(command, sync, null, null, null);
420    }
421
422    /**
423     * Store a command in the Journal and process to update the Store index.
424     *
425     * @param command
426     *      The specific JournalCommand to store and process.
427     * @param onJournalStoreComplete
428     *      The Runnable to call when the Journal write operation completes.
429     *
430     * @returns the Location where the data was written in the Journal.
431     *
432     * @throws IOException if an error occurs storing or processing the command.
433     */
434    public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException {
435        return store(command, isEnableIndexDiskSyncs(), null, null, onJournalStoreComplete);
436    }
437
438    /**
439     * Store a command in the Journal and process to update the Store index.
440     *
441     * @param command
442     *      The specific JournalCommand to store and process.
443     * @param sync
444     *      Should the store operation be done synchronously. (ignored if completion passed).
445     * @param before
446     *      The Runnable instance to execute before performing the store and process operation.
447     * @param after
448     *      The Runnable instance to execute after performing the store and process operation.
449     *
450     * @returns the Location where the data was written in the Journal.
451     *
452     * @throws IOException if an error occurs storing or processing the command.
453     */
454    public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException {
455        return store(command, sync, before, after, null);
456    }
457
458    /**
459     * All updated are are funneled through this method. The updates are converted to a
460     * JournalMessage which is logged to the journal and then the data from the JournalMessage
461     * is used to update the index just like it would be done during a recovery process.
462     *
463     * @param command
464     *      The specific JournalCommand to store and process.
465     * @param sync
466     *      Should the store operation be done synchronously. (ignored if completion passed).
467     * @param before
468     *      The Runnable instance to execute before performing the store and process operation.
469     * @param after
470     *      The Runnable instance to execute after performing the store and process operation.
471     * @param onJournalStoreComplete
472     *      Callback to be run when the journal write operation is complete.
473     *
474     * @returns the Location where the data was written in the Journal.
475     *
476     * @throws IOException if an error occurs storing or processing the command.
477     */
478    public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
479        try {
480
481            if (before != null) {
482                before.run();
483            }
484
485            ByteSequence sequence = toByteSequence(command);
486            Location location;
487            checkpointLock.readLock().lock();
488            try {
489
490                long start = System.currentTimeMillis();
491                location = onJournalStoreComplete == null ? journal.write(sequence, sync) :
492                                                            journal.write(sequence, onJournalStoreComplete);
493                long start2 = System.currentTimeMillis();
494
495                process(command, location);
496
497                long end = System.currentTimeMillis();
498                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
499                    LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms",
500                             (start2-start), (end-start2));
501                }
502            } finally {
503                checkpointLock.readLock().unlock();
504            }
505
506            if (after != null) {
507                after.run();
508            }
509
510            if (checkpointThread != null && !checkpointThread.isAlive()) {
511                startCheckpoint();
512            }
513            return location;
514        } catch (IOException ioe) {
515            LOG.error("KahaDB failed to store to Journal", ioe);
516            if (brokerService != null) {
517                brokerService.handleIOException(ioe);
518            }
519            throw ioe;
520        }
521    }
522
523    /**
524     * Loads a previously stored JournalMessage
525     *
526     * @param location
527     *      The location of the journal command to read.
528     *
529     * @return a new un-marshaled JournalCommand instance.
530     *
531     * @throws IOException if an error occurs reading the stored command.
532     */
533    protected JournalCommand<?> load(Location location) throws IOException {
534        ByteSequence data = journal.read(location);
535        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
536        byte readByte = is.readByte();
537        KahaEntryType type = KahaEntryType.valueOf(readByte);
538        if (type == null) {
539            try {
540                is.close();
541            } catch (IOException e) {
542            }
543            throw new IOException("Could not load journal record. Invalid location: " + location);
544        }
545        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
546        message.mergeFramed(is);
547        return message;
548    }
549
550    /**
551     * Process a stored or recovered JournalCommand instance and update the DB Index with the
552     * state changes that this command produces.  This can be called either as a new DB operation
553     * or as a replay during recovery operations.
554     *
555     * @param command
556     *      The JournalCommand to process.
557     * @param location
558     *      The location in the Journal where the command was written or read from.
559     */
560    protected abstract void process(JournalCommand<?> command, Location location) throws IOException;
561
562    /**
563     * Perform a checkpoint operation with optional cleanup.
564     *
565     * Called by the checkpoint background thread periodically to initiate a checkpoint operation
566     * and if the cleanup flag is set a cleanup sweep should be done to allow for release of no
567     * longer needed journal log files etc.
568     *
569     * @param cleanup
570     *      Should the method do a simple checkpoint or also perform a journal cleanup.
571     *
572     * @throws IOException if an error occurs during the checkpoint operation.
573     */
574    protected void checkpointUpdate(final boolean cleanup) throws IOException {
575        checkpointLock.writeLock().lock();
576        try {
577            this.indexLock.writeLock().lock();
578            try {
579                pageFile.tx().execute(new Transaction.Closure<IOException>() {
580                    @Override
581                    public void execute(Transaction tx) throws IOException {
582                        checkpointUpdate(tx, cleanup);
583                    }
584                });
585            } finally {
586                this.indexLock.writeLock().unlock();
587            }
588
589        } finally {
590            checkpointLock.writeLock().unlock();
591        }
592    }
593
594    /**
595     * Perform the checkpoint update operation.  If the cleanup flag is true then the
596     * operation should also purge any unused Journal log files.
597     *
598     * This method must always be called with the checkpoint and index write locks held.
599     *
600     * @param tx
601     *      The TX under which to perform the checkpoint update.
602     * @param cleanup
603     *      Should the checkpoint also do unused Journal file cleanup.
604     *
605     * @throws IOException if an error occurs while performing the checkpoint.
606     */
607    protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException;
608
609    /**
610     * Creates a new ByteSequence that represents the marshaled form of the given Journal Command.
611     *
612     * @param command
613     *      The Journal Command that should be marshaled to bytes for writing.
614     *
615     * @return the byte representation of the given journal command.
616     *
617     * @throws IOException if an error occurs while serializing the command.
618     */
619    protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
620        int size = data.serializedSizeFramed();
621        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
622        os.writeByte(data.type().getNumber());
623        data.writeFramed(os);
624        return os.toByteSequence();
625    }
626
627    /**
628     * Create the PageFile instance and configure it using the configuration options
629     * currently set.
630     *
631     * @return the newly created and configured PageFile instance.
632     */
633    protected PageFile createPageFile() {
634        PageFile index = new PageFile(getDirectory(), getPageFileName());
635        index.setEnableWriteThread(isEnableIndexWriteAsync());
636        index.setWriteBatchSize(getIndexWriteBatchSize());
637        index.setPageCacheSize(getIndexCacheSize());
638        index.setUseLFRUEviction(isUseIndexLFRUEviction());
639        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
640        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
641        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
642        index.setEnablePageCaching(isEnableIndexPageCaching());
643        return index;
644    }
645
646    /**
647     * Create a new Journal instance and configure it using the currently set configuration
648     * options.  If an archive directory is configured than this method will attempt to create
649     * that directory if it does not already exist.
650     *
651     * @return the newly created an configured Journal instance.
652     *
653     * @throws IOException if an error occurs while creating the Journal object.
654     */
655    protected Journal createJournal() throws IOException {
656        Journal manager = new Journal();
657        manager.setDirectory(getDirectory());
658        manager.setMaxFileLength(getJournalMaxFileLength());
659        manager.setCheckForCorruptionOnStartup(isCheckForCorruptJournalFiles());
660        manager.setChecksum(isChecksumJournalFiles() || isCheckForCorruptJournalFiles());
661        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
662        manager.setArchiveDataLogs(isArchiveDataLogs());
663        manager.setSizeAccumulator(journalSize);
664        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
665        if (getDirectoryArchive() != null) {
666            IOHelper.mkdirs(getDirectoryArchive());
667            manager.setDirectoryArchive(getDirectoryArchive());
668        }
669        return manager;
670    }
671
672    /**
673     * Starts the checkpoint Thread instance if not already running and not disabled
674     * by configuration.
675     */
676    protected void startCheckpoint() {
677        if (checkpointInterval == 0 && cleanupInterval == 0) {
678            LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart");
679            return;
680        }
681        synchronized (checkpointThreadLock) {
682            boolean start = false;
683            if (checkpointThread == null) {
684                start = true;
685            } else if (!checkpointThread.isAlive()) {
686                start = true;
687                LOG.info("KahaDB: Recovering checkpoint thread after death");
688            }
689            if (start) {
690                checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
691                    @Override
692                    public void run() {
693                        try {
694                            long lastCleanup = System.currentTimeMillis();
695                            long lastCheckpoint = System.currentTimeMillis();
696                            // Sleep for a short time so we can periodically check
697                            // to see if we need to exit this thread.
698                            long sleepTime = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
699                            while (opened.get()) {
700                                Thread.sleep(sleepTime);
701                                long now = System.currentTimeMillis();
702                                if( cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval) ) {
703                                    checkpointCleanup(true);
704                                    lastCleanup = now;
705                                    lastCheckpoint = now;
706                                } else if( checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval )) {
707                                    checkpointCleanup(false);
708                                    lastCheckpoint = now;
709                                }
710                            }
711                        } catch (InterruptedException e) {
712                            // Looks like someone really wants us to exit this thread...
713                        } catch (IOException ioe) {
714                            LOG.error("Checkpoint failed", ioe);
715                            brokerService.handleIOException(ioe);
716                        }
717                    }
718                };
719
720                checkpointThread.setDaemon(true);
721                checkpointThread.start();
722            }
723        }
724    }
725
726    /**
727     * Called from the worker thread to start a checkpoint.
728     *
729     * This method ensure that the store is in an opened state and optionaly logs information
730     * related to slow store access times.
731     *
732     * @param cleanup
733     *      Should a cleanup of the journal occur during the checkpoint operation.
734     *
735     * @throws IOException if an error occurs during the checkpoint operation.
736     */
737    protected void checkpointCleanup(final boolean cleanup) throws IOException {
738        long start;
739        this.indexLock.writeLock().lock();
740        try {
741            start = System.currentTimeMillis();
742            if (!opened.get()) {
743                return;
744            }
745        } finally {
746            this.indexLock.writeLock().unlock();
747        }
748        checkpointUpdate(cleanup);
749        long end = System.currentTimeMillis();
750        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
751            LOG.info("Slow KahaDB access: cleanup took {}", (end - start));
752        }
753    }
754}