public abstract class AbstractKahaDBStore extends LockableServiceSupport
brokerService, clockDaemon
Constructor and Description |
---|
AbstractKahaDBStore() |
Modifier and Type | Method and Description |
---|---|
protected void |
checkpointCleanup(boolean cleanup)
Called from the worker thread to start a checkpoint.
|
protected void |
checkpointUpdate(boolean cleanup)
Perform a checkpoint operation with optional cleanup.
|
protected abstract void |
checkpointUpdate(Transaction tx,
boolean cleanup)
Perform the checkpoint update operation.
|
Locker |
createDefaultLocker() |
protected Journal |
createJournal()
Create a new Journal instance and configure it using the currently set configuration
options.
|
protected PageFile |
createPageFile()
Create the PageFile instance and configure it using the configuration options
currently set.
|
protected void |
doStart() |
protected void |
doStop(ServiceStopper stopper) |
long |
getCheckpointInterval() |
long |
getCleanupInterval() |
boolean |
getCleanupOnStop() |
protected abstract File |
getDefaultDataDirectory() |
File |
getDirectory() |
File |
getDirectoryArchive() |
int |
getIndexCacheSize() |
float |
getIndexLFUEvictionFactor() |
int |
getIndexWriteBatchSize() |
Journal |
getJournal() |
int |
getJournalMaxFileLength() |
int |
getJournalMaxWriteBatchSize() |
PageFile |
getPageFile() |
protected abstract String |
getPageFileName() |
void |
init() |
boolean |
isArchiveCorruptedIndex() |
boolean |
isArchiveDataLogs() |
boolean |
isCheckForCorruptJournalFiles() |
boolean |
isChecksumJournalFiles() |
boolean |
isDeleteAllJobs() |
boolean |
isEnableIndexDiskSyncs() |
boolean |
isEnableIndexPageCaching() |
boolean |
isEnableIndexRecoveryFile() |
boolean |
isEnableIndexWriteAsync() |
boolean |
isEnableJournalDiskSyncs() |
boolean |
isFailIfDatabaseIsLocked() |
boolean |
isForceRecoverIndex() |
boolean |
isIgnoreMissingJournalfiles() |
boolean |
isPurgeStoreOnStartup() |
boolean |
isUseIndexLFRUEviction() |
abstract void |
load()
Loads the store from disk.
|
protected JournalCommand<?> |
load(Location location)
Loads a previously stored JournalMessage
|
protected abstract void |
process(JournalCommand<?> command,
Location location)
Process a stored or recovered JournalCommand instance and update the DB Index with the
state changes that this command produces.
|
void |
setArchiveCorruptedIndex(boolean archiveCorruptedIndex) |
void |
setArchiveDataLogs(boolean archiveDataLogs) |
void |
setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) |
void |
setCheckpointInterval(long checkpointInterval) |
void |
setChecksumJournalFiles(boolean checksumJournalFiles) |
void |
setCleanupInterval(long cleanupInterval) |
void |
setCleanupOnStop(boolean cleanupOnStop) |
void |
setDeleteAllJobs(boolean deleteAllJobs) |
void |
setDirectory(File directory) |
void |
setDirectoryArchive(File directoryArchive) |
void |
setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) |
void |
setEnableIndexPageCaching(boolean enableIndexPageCaching) |
void |
setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) |
void |
setEnableIndexWriteAsync(boolean enableIndexWriteAsync) |
void |
setEnableJournalDiskSyncs(boolean syncWrites) |
void |
setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) |
void |
setForceRecoverIndex(boolean forceRecoverIndex) |
void |
setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) |
void |
setIndexCacheSize(int indexCacheSize) |
void |
setIndexLFUEvictionFactor(float indexLFUEvictionFactor) |
void |
setIndexWriteBatchSize(int indexWriteBatchSize) |
void |
setJournalMaxFileLength(int journalMaxFileLength) |
void |
setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) |
void |
setPurgeStoreOnStartup(boolean purge) |
void |
setUseIndexLFRUEviction(boolean useIndexLFRUEviction) |
long |
size() |
protected void |
startCheckpoint()
Starts the checkpoint Thread instance if not already running and not disabled
by configuration.
|
Location |
store(JournalCommand<?> command)
Store a command in the Journal and process to update the Store index.
|
Location |
store(JournalCommand<?> command,
boolean sync)
Store a command in the Journal and process to update the Store index.
|
Location |
store(JournalCommand<?> command,
boolean sync,
Runnable before,
Runnable after)
Store a command in the Journal and process to update the Store index.
|
Location |
store(JournalCommand<?> command,
boolean sync,
Runnable before,
Runnable after,
Runnable onJournalStoreComplete)
All updated are are funneled through this method.
|
Location |
store(JournalCommand<?> command,
Runnable onJournalStoreComplete)
Store a command in the Journal and process to update the Store index.
|
protected ByteSequence |
toByteSequence(JournalCommand<?> data)
Creates a new ByteSequence that represents the marshaled form of the given Journal Command.
|
abstract void |
unload()
Unload the state of the Store to disk and shuts down all resources assigned to this
KahaDB store implementation.
|
getBrokerService, getLocker, getLockKeepAlivePeriod, getScheduledThreadPoolExecutor, isUseLock, keepLockAlive, postStop, preStart, setBrokerService, setLocker, setLockKeepAlivePeriod, setScheduledThreadPoolExecutor, setUseLock, stopBroker
addServiceListener, dispose, isStarted, isStopped, isStopping, removeServiceListener, start, stop
public static final String PROPERTY_LOG_SLOW_ACCESS_TIME
public static final int LOG_SLOW_ACCESS_TIME
protected AtomicLong journalSize
protected boolean failIfDatabaseIsLocked
protected long checkpointInterval
protected long cleanupInterval
protected boolean checkForCorruptJournalFiles
protected boolean checksumJournalFiles
protected boolean forceRecoverIndex
protected int journalMaxFileLength
protected int journalMaxWriteBatchSize
protected boolean archiveCorruptedIndex
protected boolean enableIndexWriteAsync
protected boolean enableJournalDiskSyncs
protected boolean deleteAllJobs
protected int indexWriteBatchSize
protected boolean useIndexLFRUEviction
protected float indexLFUEvictionFactor
protected boolean ignoreMissingJournalfiles
protected int indexCacheSize
protected boolean enableIndexDiskSyncs
protected boolean enableIndexRecoveryFile
protected boolean enableIndexPageCaching
protected boolean archiveDataLogs
protected boolean purgeStoreOnStartup
protected File directoryArchive
protected AtomicBoolean opened
protected Thread checkpointThread
protected final Object checkpointThreadLock
protected ReentrantReadWriteLock checkpointLock
protected ReentrantReadWriteLock indexLock
public AbstractKahaDBStore()
protected abstract String getPageFileName()
protected abstract File getDefaultDataDirectory()
public abstract void load() throws IOException
IOException
- if an error occurs during the load.public abstract void unload() throws IOException
IOException
- if an error occurs during the store unload.protected void doStart() throws Exception
doStart
in class ServiceSupport
Exception
protected void doStop(ServiceStopper stopper) throws Exception
doStop
in class ServiceSupport
Exception
public PageFile getPageFile()
public Journal getJournal() throws IOException
IOException
public File getDirectory()
public void setDirectory(File directory)
public boolean isArchiveCorruptedIndex()
public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex)
public boolean isFailIfDatabaseIsLocked()
public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked)
public boolean isCheckForCorruptJournalFiles()
public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles)
public long getCheckpointInterval()
public void setCheckpointInterval(long checkpointInterval)
public long getCleanupInterval()
public void setCleanupInterval(long cleanupInterval)
public void setCleanupOnStop(boolean cleanupOnStop)
public boolean getCleanupOnStop()
public boolean isChecksumJournalFiles()
public void setChecksumJournalFiles(boolean checksumJournalFiles)
public boolean isForceRecoverIndex()
public void setForceRecoverIndex(boolean forceRecoverIndex)
public int getJournalMaxFileLength()
public void setJournalMaxFileLength(int journalMaxFileLength)
public int getJournalMaxWriteBatchSize()
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize)
public boolean isEnableIndexWriteAsync()
public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync)
public boolean isEnableJournalDiskSyncs()
public void setEnableJournalDiskSyncs(boolean syncWrites)
public boolean isDeleteAllJobs()
public void setDeleteAllJobs(boolean deleteAllJobs)
public boolean isArchiveDataLogs()
public void setArchiveDataLogs(boolean archiveDataLogs)
archiveDataLogs
- the archiveDataLogs to setpublic File getDirectoryArchive()
public void setDirectoryArchive(File directoryArchive)
directoryArchive
- the directoryArchive to setpublic int getIndexCacheSize()
public void setIndexCacheSize(int indexCacheSize)
public int getIndexWriteBatchSize()
public void setIndexWriteBatchSize(int indexWriteBatchSize)
public boolean isUseIndexLFRUEviction()
public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction)
public float getIndexLFUEvictionFactor()
public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor)
public boolean isEnableIndexDiskSyncs()
public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs)
public boolean isEnableIndexRecoveryFile()
public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile)
public boolean isEnableIndexPageCaching()
public void setEnableIndexPageCaching(boolean enableIndexPageCaching)
public boolean isPurgeStoreOnStartup()
public void setPurgeStoreOnStartup(boolean purge)
public boolean isIgnoreMissingJournalfiles()
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles)
public long size()
public Locker createDefaultLocker() throws IOException
IOException
public void init() throws Exception
init
in class LockableServiceSupport
Exception
public Location store(JournalCommand<?> command) throws IOException
command
- The specific JournalCommand to store and process.IOException
- if an error occurs storing or processing the command.public Location store(JournalCommand<?> command, boolean sync) throws IOException
command
- The specific JournalCommand to store and process.sync
- Should the store operation be done synchronously. (ignored if completion passed).IOException
- if an error occurs storing or processing the command.public Location store(JournalCommand<?> command, Runnable onJournalStoreComplete) throws IOException
command
- The specific JournalCommand to store and process.onJournalStoreComplete
- The Runnable to call when the Journal write operation completes.IOException
- if an error occurs storing or processing the command.public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after) throws IOException
command
- The specific JournalCommand to store and process.sync
- Should the store operation be done synchronously. (ignored if completion passed).before
- The Runnable instance to execute before performing the store and process operation.after
- The Runnable instance to execute after performing the store and process operation.IOException
- if an error occurs storing or processing the command.public Location store(JournalCommand<?> command, boolean sync, Runnable before, Runnable after, Runnable onJournalStoreComplete) throws IOException
command
- The specific JournalCommand to store and process.sync
- Should the store operation be done synchronously. (ignored if completion passed).before
- The Runnable instance to execute before performing the store and process operation.after
- The Runnable instance to execute after performing the store and process operation.onJournalStoreComplete
- Callback to be run when the journal write operation is complete.IOException
- if an error occurs storing or processing the command.protected JournalCommand<?> load(Location location) throws IOException
location
- The location of the journal command to read.IOException
- if an error occurs reading the stored command.protected abstract void process(JournalCommand<?> command, Location location) throws IOException
command
- The JournalCommand to process.location
- The location in the Journal where the command was written or read from.IOException
protected void checkpointUpdate(boolean cleanup) throws IOException
cleanup
- Should the method do a simple checkpoint or also perform a journal cleanup.IOException
- if an error occurs during the checkpoint operation.protected abstract void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException
tx
- The TX under which to perform the checkpoint update.cleanup
- Should the checkpoint also do unused Journal file cleanup.IOException
- if an error occurs while performing the checkpoint.protected ByteSequence toByteSequence(JournalCommand<?> data) throws IOException
command
- The Journal Command that should be marshaled to bytes for writing.IOException
- if an error occurs while serializing the command.protected PageFile createPageFile()
protected Journal createJournal() throws IOException
IOException
- if an error occurs while creating the Journal object.protected void startCheckpoint()
protected void checkpointCleanup(boolean cleanup) throws IOException
cleanup
- Should a cleanup of the journal occur during the checkpoint operation.IOException
- if an error occurs during the checkpoint operation.Copyright © 2005–2020 FuseSource, Corp.. All rights reserved.