001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName;
020
021import java.io.File;
022import java.io.IOException;
023import java.util.Set;
024import java.util.concurrent.Callable;
025
026import javax.management.ObjectName;
027
028import org.apache.activemq.broker.BrokerService;
029import org.apache.activemq.broker.ConnectionContext;
030import org.apache.activemq.broker.LockableServiceSupport;
031import org.apache.activemq.broker.Locker;
032import org.apache.activemq.broker.jmx.AnnotatedMBean;
033import org.apache.activemq.broker.jmx.PersistenceAdapterView;
034import org.apache.activemq.broker.scheduler.JobSchedulerStore;
035import org.apache.activemq.command.ActiveMQDestination;
036import org.apache.activemq.command.ActiveMQQueue;
037import org.apache.activemq.command.ActiveMQTopic;
038import org.apache.activemq.command.LocalTransactionId;
039import org.apache.activemq.command.ProducerId;
040import org.apache.activemq.command.TransactionId;
041import org.apache.activemq.command.XATransactionId;
042import org.apache.activemq.protobuf.Buffer;
043import org.apache.activemq.store.JournaledStore;
044import org.apache.activemq.store.MessageStore;
045import org.apache.activemq.store.PersistenceAdapter;
046import org.apache.activemq.store.SharedFileLocker;
047import org.apache.activemq.store.TopicMessageStore;
048import org.apache.activemq.store.TransactionIdTransformer;
049import org.apache.activemq.store.TransactionIdTransformerAware;
050import org.apache.activemq.store.TransactionStore;
051import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
052import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
053import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
054import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
055import org.apache.activemq.usage.SystemUsage;
056import org.apache.activemq.util.ServiceStopper;
057
058/**
059 * An implementation of {@link PersistenceAdapter} designed for use with
060 * KahaDB - Embedded Lightweight Non-Relational Database
061 *
062 * @org.apache.xbean.XBean element="kahaDB"
063 *
064 */
065public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore, TransactionIdTransformerAware {
066    private final KahaDBStore letter = new KahaDBStore();
067
068    /**
069     * @param context
070     * @throws IOException
071     * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext)
072     */
073    @Override
074    public void beginTransaction(ConnectionContext context) throws IOException {
075        this.letter.beginTransaction(context);
076    }
077
078    /**
079     * @param cleanup
080     * @throws IOException
081     * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean)
082     */
083    @Override
084    public void checkpoint(boolean cleanup) throws IOException {
085        this.letter.checkpoint(cleanup);
086    }
087
088    /**
089     * @param context
090     * @throws IOException
091     * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext)
092     */
093    @Override
094    public void commitTransaction(ConnectionContext context) throws IOException {
095        this.letter.commitTransaction(context);
096    }
097
098    /**
099     * @param destination
100     * @return MessageStore
101     * @throws IOException
102     * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
103     */
104    @Override
105    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
106        return this.letter.createQueueMessageStore(destination);
107    }
108
109    /**
110     * @param destination
111     * @return TopicMessageStore
112     * @throws IOException
113     * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
114     */
115    @Override
116    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
117        return this.letter.createTopicMessageStore(destination);
118    }
119
120    /**
121     * @return TransactionStore
122     * @throws IOException
123     * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore()
124     */
125    @Override
126    public TransactionStore createTransactionStore() throws IOException {
127        return this.letter.createTransactionStore();
128    }
129
130    /**
131     * @throws IOException
132     * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages()
133     */
134    @Override
135    public void deleteAllMessages() throws IOException {
136        this.letter.deleteAllMessages();
137    }
138
139    /**
140     * @return destinations
141     * @see org.apache.activemq.store.PersistenceAdapter#getDestinations()
142     */
143    @Override
144    public Set<ActiveMQDestination> getDestinations() {
145        return this.letter.getDestinations();
146    }
147
148    /**
149     * @return lastMessageBrokerSequenceId
150     * @throws IOException
151     * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId()
152     */
153    @Override
154    public long getLastMessageBrokerSequenceId() throws IOException {
155        return this.letter.getLastMessageBrokerSequenceId();
156    }
157
158    @Override
159    public long getLastProducerSequenceId(ProducerId id) throws IOException {
160        return this.letter.getLastProducerSequenceId(id);
161    }
162
163    @Override
164    public void allowIOResumption() {
165        this.letter.allowIOResumption();
166    }
167
168    /**
169     * @param destination
170     * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue)
171     */
172    @Override
173    public void removeQueueMessageStore(ActiveMQQueue destination) {
174        this.letter.removeQueueMessageStore(destination);
175    }
176
177    /**
178     * @param destination
179     * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic)
180     */
181    @Override
182    public void removeTopicMessageStore(ActiveMQTopic destination) {
183        this.letter.removeTopicMessageStore(destination);
184    }
185
186    /**
187     * @param context
188     * @throws IOException
189     * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext)
190     */
191    @Override
192    public void rollbackTransaction(ConnectionContext context) throws IOException {
193        this.letter.rollbackTransaction(context);
194    }
195
196    /**
197     * @param brokerName
198     * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String)
199     */
200    @Override
201    public void setBrokerName(String brokerName) {
202        this.letter.setBrokerName(brokerName);
203    }
204
205    /**
206     * @param usageManager
207     * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage)
208     */
209    @Override
210    public void setUsageManager(SystemUsage usageManager) {
211        this.letter.setUsageManager(usageManager);
212    }
213
214    /**
215     * @return the size of the store
216     * @see org.apache.activemq.store.PersistenceAdapter#size()
217     */
218    @Override
219    public long size() {
220        return this.letter.isStarted() ? this.letter.size() : 0l;
221    }
222
223    /**
224     * @throws Exception
225     * @see org.apache.activemq.Service#start()
226     */
227    @Override
228    public void doStart() throws Exception {
229        this.letter.start();
230
231        if (brokerService != null && brokerService.isUseJmx()) {
232            PersistenceAdapterView view = new PersistenceAdapterView(this);
233            view.setInflightTransactionViewCallable(new Callable<String>() {
234                @Override
235                public String call() throws Exception {
236                    return letter.getTransactions();
237                }
238            });
239            view.setDataViewCallable(new Callable<String>() {
240                @Override
241                public String call() throws Exception {
242                    return letter.getJournal().getFileMap().keySet().toString();
243                }
244            });
245            AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view,
246                    createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString()));
247        }
248    }
249
250    /**
251     * @throws Exception
252     * @see org.apache.activemq.Service#stop()
253     */
254    @Override
255    public void doStop(ServiceStopper stopper) throws Exception {
256        this.letter.stop();
257
258        if (brokerService != null && brokerService.isUseJmx()) {
259            ObjectName brokerObjectName = brokerService.getBrokerObjectName();
260            brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString()));
261        }
262    }
263
264    /**
265     * Get the journalMaxFileLength
266     *
267     * @return the journalMaxFileLength
268     */
269    @Override
270    public int getJournalMaxFileLength() {
271        return this.letter.getJournalMaxFileLength();
272    }
273
274    /**
275     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can
276     * be used
277     *
278     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
279     */
280    public void setJournalMaxFileLength(int journalMaxFileLength) {
281        this.letter.setJournalMaxFileLength(journalMaxFileLength);
282    }
283
284    /**
285     * Set the max number of producers (LRU cache) to track for duplicate sends
286     */
287    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
288        this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack);
289    }
290
291    public int getMaxFailoverProducersToTrack() {
292        return this.letter.getMaxFailoverProducersToTrack();
293    }
294
295    /**
296     * set the audit window depth for duplicate suppression (should exceed the max transaction
297     * batch)
298     */
299    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
300        this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth);
301    }
302
303    public int getFailoverProducersAuditDepth() {
304        return this.letter.getFailoverProducersAuditDepth();
305    }
306
307    /**
308     * Get the checkpointInterval
309     *
310     * @return the checkpointInterval
311     */
312    public long getCheckpointInterval() {
313        return this.letter.getCheckpointInterval();
314    }
315
316    /**
317     * Set the checkpointInterval
318     *
319     * @param checkpointInterval
320     *            the checkpointInterval to set
321     */
322    public void setCheckpointInterval(long checkpointInterval) {
323        this.letter.setCheckpointInterval(checkpointInterval);
324    }
325
326    /**
327     * Get the cleanupInterval
328     *
329     * @return the cleanupInterval
330     */
331    public long getCleanupInterval() {
332        return this.letter.getCleanupInterval();
333    }
334
335    /**
336     * Set the cleanupInterval
337     *
338     * @param cleanupInterval
339     *            the cleanupInterval to set
340     */
341    public void setCleanupInterval(long cleanupInterval) {
342        this.letter.setCleanupInterval(cleanupInterval);
343    }
344
345    /**
346     * Get the indexWriteBatchSize
347     *
348     * @return the indexWriteBatchSize
349     */
350    public int getIndexWriteBatchSize() {
351        return this.letter.getIndexWriteBatchSize();
352    }
353
354    /**
355     * Set the indexWriteBatchSize
356     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
357     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
358     * @param indexWriteBatchSize
359     *            the indexWriteBatchSize to set
360     */
361    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
362        this.letter.setIndexWriteBatchSize(indexWriteBatchSize);
363    }
364
365    /**
366     * Get the journalMaxWriteBatchSize
367     *
368     * @return the journalMaxWriteBatchSize
369     */
370    public int getJournalMaxWriteBatchSize() {
371        return this.letter.getJournalMaxWriteBatchSize();
372    }
373
374    /**
375     * Set the journalMaxWriteBatchSize
376     *  * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
377     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
378     * @param journalMaxWriteBatchSize
379     *            the journalMaxWriteBatchSize to set
380     */
381    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
382        this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize);
383    }
384
385    /**
386     * Get the enableIndexWriteAsync
387     *
388     * @return the enableIndexWriteAsync
389     */
390    public boolean isEnableIndexWriteAsync() {
391        return this.letter.isEnableIndexWriteAsync();
392    }
393
394    /**
395     * Set the enableIndexWriteAsync
396     *
397     * @param enableIndexWriteAsync
398     *            the enableIndexWriteAsync to set
399     */
400    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
401        this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync);
402    }
403
404    /**
405     * Get the directory
406     *
407     * @return the directory
408     */
409    @Override
410    public File getDirectory() {
411        return this.letter.getDirectory();
412    }
413
414    /**
415     * @param dir
416     * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File)
417     */
418    @Override
419    public void setDirectory(File dir) {
420        this.letter.setDirectory(dir);
421    }
422
423    /**
424     * @return the currently configured location of the KahaDB index files.
425     */
426    public File getIndexDirectory() {
427        return this.letter.getIndexDirectory();
428    }
429
430    /**
431     * Sets the directory where KahaDB index files should be written.
432     *
433     * @param indexDirectory
434     *        the directory where the KahaDB store index files should be written.
435     */
436    public void setIndexDirectory(File indexDirectory) {
437        this.letter.setIndexDirectory(indexDirectory);
438    }
439
440    /**
441     * Get the enableJournalDiskSyncs
442     * @deprecated use {@link #getJournalDiskSyncStrategy} instead
443     * @return the enableJournalDiskSyncs
444     */
445    public boolean isEnableJournalDiskSyncs() {
446        return this.letter.isEnableJournalDiskSyncs();
447    }
448
449    /**
450     * Set the enableJournalDiskSyncs
451     *
452     * @deprecated use {@link #setJournalDiskSyncStrategy} instead
453     * @param enableJournalDiskSyncs
454     *            the enableJournalDiskSyncs to set
455     */
456    public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) {
457        this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs);
458    }
459
460    /**
461     * @return
462     */
463    public String getJournalDiskSyncStrategy() {
464        return letter.getJournalDiskSyncStrategy();
465    }
466
467    public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() {
468        return letter.getJournalDiskSyncStrategyEnum();
469    }
470
471    /**
472     * @param journalDiskSyncStrategy
473     */
474    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
475        letter.setJournalDiskSyncStrategy(journalDiskSyncStrategy);
476    }
477
478    /**
479     * @return
480     */
481    public long getJournalDiskSyncInterval() {
482        return letter.getJournalDiskSyncInterval();
483    }
484
485    /**
486     * @param journalDiskSyncInterval
487     */
488    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
489        letter.setJournalDiskSyncInterval(journalDiskSyncInterval);
490    }
491
492    /**
493     * Get the indexCacheSize
494     *
495     * @return the indexCacheSize
496     */
497    public int getIndexCacheSize() {
498        return this.letter.getIndexCacheSize();
499    }
500
501    /**
502     * Set the indexCacheSize
503     * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used
504     * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
505     * @param indexCacheSize
506     *            the indexCacheSize to set
507     */
508    public void setIndexCacheSize(int indexCacheSize) {
509        this.letter.setIndexCacheSize(indexCacheSize);
510    }
511
512    /**
513     * Get the ignoreMissingJournalfiles
514     *
515     * @return the ignoreMissingJournalfiles
516     */
517    public boolean isIgnoreMissingJournalfiles() {
518        return this.letter.isIgnoreMissingJournalfiles();
519    }
520
521    /**
522     * Set the ignoreMissingJournalfiles
523     *
524     * @param ignoreMissingJournalfiles
525     *            the ignoreMissingJournalfiles to set
526     */
527    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
528        this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
529    }
530
531    public boolean isChecksumJournalFiles() {
532        return letter.isChecksumJournalFiles();
533    }
534
535    public boolean isCheckForCorruptJournalFiles() {
536        return letter.isCheckForCorruptJournalFiles();
537    }
538
539    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
540        letter.setChecksumJournalFiles(checksumJournalFiles);
541    }
542
543    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
544        letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
545    }
546
547    @Override
548    public void setBrokerService(BrokerService brokerService) {
549        super.setBrokerService(brokerService);
550        letter.setBrokerService(brokerService);
551    }
552
553    public String getPreallocationScope() {
554        return letter.getPreallocationScope();
555    }
556
557    public void setPreallocationScope(String preallocationScope) {
558        this.letter.setPreallocationScope(preallocationScope);
559    }
560
561    public String getPreallocationStrategy() {
562        return letter.getPreallocationStrategy();
563    }
564
565    public void setPreallocationStrategy(String preallocationStrategy) {
566        this.letter.setPreallocationStrategy(preallocationStrategy);
567    }
568
569    public boolean isArchiveDataLogs() {
570        return letter.isArchiveDataLogs();
571    }
572
573    public void setArchiveDataLogs(boolean archiveDataLogs) {
574        letter.setArchiveDataLogs(archiveDataLogs);
575    }
576
577    public File getDirectoryArchive() {
578        return letter.getDirectoryArchive();
579    }
580
581    public void setDirectoryArchive(File directoryArchive) {
582        letter.setDirectoryArchive(directoryArchive);
583    }
584
585    public boolean isConcurrentStoreAndDispatchQueues() {
586        return letter.isConcurrentStoreAndDispatchQueues();
587    }
588
589    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) {
590        letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch);
591    }
592
593    public boolean isConcurrentStoreAndDispatchTopics() {
594        return letter.isConcurrentStoreAndDispatchTopics();
595    }
596
597    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
598        letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch);
599    }
600
601    public int getMaxAsyncJobs() {
602        return letter.getMaxAsyncJobs();
603    }
604    /**
605     * @param maxAsyncJobs
606     *            the maxAsyncJobs to set
607     */
608    public void setMaxAsyncJobs(int maxAsyncJobs) {
609        letter.setMaxAsyncJobs(maxAsyncJobs);
610    }
611
612    /**
613     * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead
614     *
615     * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set
616     */
617    @Deprecated
618    public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException {
619       getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay);
620    }
621
622    public boolean getForceRecoverIndex() {
623        return letter.getForceRecoverIndex();
624    }
625
626    public void setForceRecoverIndex(boolean forceRecoverIndex) {
627        letter.setForceRecoverIndex(forceRecoverIndex);
628    }
629
630    public boolean isArchiveCorruptedIndex() {
631        return letter.isArchiveCorruptedIndex();
632    }
633
634    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
635        letter.setArchiveCorruptedIndex(archiveCorruptedIndex);
636    }
637
638    public float getIndexLFUEvictionFactor() {
639        return letter.getIndexLFUEvictionFactor();
640    }
641
642    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
643        letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor);
644    }
645
646    public boolean isUseIndexLFRUEviction() {
647        return letter.isUseIndexLFRUEviction();
648    }
649
650    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
651        letter.setUseIndexLFRUEviction(useIndexLFRUEviction);
652    }
653
654    public void setEnableIndexDiskSyncs(boolean diskSyncs) {
655        letter.setEnableIndexDiskSyncs(diskSyncs);
656    }
657
658    public boolean isEnableIndexDiskSyncs() {
659        return letter.isEnableIndexDiskSyncs();
660    }
661
662    public void setEnableIndexRecoveryFile(boolean enable) {
663        letter.setEnableIndexRecoveryFile(enable);
664    }
665
666    public boolean  isEnableIndexRecoveryFile() {
667        return letter.isEnableIndexRecoveryFile();
668    }
669
670    public void setEnableIndexPageCaching(boolean enable) {
671        letter.setEnableIndexPageCaching(enable);
672    }
673
674    public boolean isEnableIndexPageCaching() {
675        return letter.isEnableIndexPageCaching();
676    }
677
678    public int getCompactAcksAfterNoGC() {
679        return letter.getCompactAcksAfterNoGC();
680    }
681
682    /**
683     * Sets the number of GC cycles where no journal logs were removed before an attempt to
684     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
685     * <p>
686     * A value of -1 will disable this feature.
687     *
688     * @param compactAcksAfterNoGC
689     *      Number of empty GC cycles before we rewrite old ACKS.
690     */
691    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
692        this.letter.setCompactAcksAfterNoGC(compactAcksAfterNoGC);
693    }
694
695    public boolean isCompactAcksIgnoresStoreGrowth() {
696        return this.letter.isCompactAcksIgnoresStoreGrowth();
697    }
698
699    /**
700     * Configure if Ack compaction will occur regardless of continued growth of the
701     * journal logs meaning that the store has not run out of space yet.  Because the
702     * compaction operation can be costly this value is defaulted to off and the Ack
703     * compaction is only done when it seems that the store cannot grow and larger.
704     *
705     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
706     */
707    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
708        this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth);
709    }
710
711    /**
712     * Returns whether Ack compaction is enabled
713     *
714     * @return enableAckCompaction
715     */
716    public boolean isEnableAckCompaction() {
717        return letter.isEnableAckCompaction();
718    }
719
720    /**
721     * Configure if the Ack compaction task should be enabled to run
722     *
723     * @param enableAckCompaction
724     */
725    public void setEnableAckCompaction(boolean enableAckCompaction) {
726        letter.setEnableAckCompaction(enableAckCompaction);
727    }
728
729    public KahaDBStore getStore() {
730        return letter;
731    }
732
733    public KahaTransactionInfo createTransactionInfo(TransactionId txid) {
734        if (txid == null) {
735            return null;
736        }
737        KahaTransactionInfo rc = new KahaTransactionInfo();
738
739        if (txid.isLocalTransaction()) {
740            LocalTransactionId t = (LocalTransactionId) txid;
741            KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId();
742            kahaTxId.setConnectionId(t.getConnectionId().getValue());
743            kahaTxId.setTransactionId(t.getValue());
744            rc.setLocalTransactionId(kahaTxId);
745        } else {
746            XATransactionId t = (XATransactionId) txid;
747            KahaXATransactionId kahaTxId = new KahaXATransactionId();
748            kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
749            kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
750            kahaTxId.setFormatId(t.getFormatId());
751            rc.setXaTransactionId(kahaTxId);
752        }
753        return rc;
754    }
755
756    @Override
757    public Locker createDefaultLocker() throws IOException {
758        SharedFileLocker locker = new SharedFileLocker();
759        locker.configure(this);
760        return locker;
761    }
762
763    @Override
764    public void init() throws Exception {}
765
766    @Override
767    public String toString() {
768        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
769        return "KahaDBPersistenceAdapter[" + path + (getIndexDirectory() != null ? ",Index:" + getIndexDirectory().getAbsolutePath() : "") +  "]";
770    }
771
772    @Override
773    public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
774        getStore().setTransactionIdTransformer(transactionIdTransformer);
775    }
776
777    @Override
778    public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException {
779        return this.letter.createJobSchedulerStore();
780    }
781
782    /*
783     * When set, ensure that the cleanup/gc operation is executed during the stop procedure
784     */
785    public void setCleanupOnStop(boolean cleanupOnStop) {
786        this.letter.setCleanupOnStop(cleanupOnStop);
787    }
788
789    public boolean getCleanupOnStop() {
790        return this.letter.getCleanupOnStop();
791    }
792}