001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName; 020 021import java.io.File; 022import java.io.IOException; 023import java.util.Set; 024import java.util.concurrent.Callable; 025 026import javax.management.ObjectName; 027 028import org.apache.activemq.broker.BrokerService; 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.broker.LockableServiceSupport; 031import org.apache.activemq.broker.Locker; 032import org.apache.activemq.broker.jmx.AnnotatedMBean; 033import org.apache.activemq.broker.jmx.PersistenceAdapterView; 034import org.apache.activemq.broker.scheduler.JobSchedulerStore; 035import org.apache.activemq.command.ActiveMQDestination; 036import org.apache.activemq.command.ActiveMQQueue; 037import org.apache.activemq.command.ActiveMQTopic; 038import org.apache.activemq.command.LocalTransactionId; 039import org.apache.activemq.command.ProducerId; 040import org.apache.activemq.command.TransactionId; 041import org.apache.activemq.command.XATransactionId; 042import org.apache.activemq.protobuf.Buffer; 043import org.apache.activemq.store.JournaledStore; 044import org.apache.activemq.store.MessageStore; 045import org.apache.activemq.store.PersistenceAdapter; 046import org.apache.activemq.store.SharedFileLocker; 047import org.apache.activemq.store.TopicMessageStore; 048import org.apache.activemq.store.TransactionIdTransformer; 049import org.apache.activemq.store.TransactionIdTransformerAware; 050import org.apache.activemq.store.TransactionStore; 051import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; 052import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 053import org.apache.activemq.store.kahadb.data.KahaXATransactionId; 054import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; 055import org.apache.activemq.usage.SystemUsage; 056import org.apache.activemq.util.ServiceStopper; 057 058/** 059 * An implementation of {@link PersistenceAdapter} designed for use with 060 * KahaDB - Embedded Lightweight Non-Relational Database 061 * 062 * @org.apache.xbean.XBean element="kahaDB" 063 * 064 */ 065public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore, TransactionIdTransformerAware { 066 private final KahaDBStore letter = new KahaDBStore(); 067 068 /** 069 * @param context 070 * @throws IOException 071 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext) 072 */ 073 @Override 074 public void beginTransaction(ConnectionContext context) throws IOException { 075 this.letter.beginTransaction(context); 076 } 077 078 /** 079 * @param cleanup 080 * @throws IOException 081 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) 082 */ 083 @Override 084 public void checkpoint(boolean cleanup) throws IOException { 085 this.letter.checkpoint(cleanup); 086 } 087 088 /** 089 * @param context 090 * @throws IOException 091 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext) 092 */ 093 @Override 094 public void commitTransaction(ConnectionContext context) throws IOException { 095 this.letter.commitTransaction(context); 096 } 097 098 /** 099 * @param destination 100 * @return MessageStore 101 * @throws IOException 102 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 103 */ 104 @Override 105 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 106 return this.letter.createQueueMessageStore(destination); 107 } 108 109 /** 110 * @param destination 111 * @return TopicMessageStore 112 * @throws IOException 113 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 114 */ 115 @Override 116 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 117 return this.letter.createTopicMessageStore(destination); 118 } 119 120 /** 121 * @return TransactionStore 122 * @throws IOException 123 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore() 124 */ 125 @Override 126 public TransactionStore createTransactionStore() throws IOException { 127 return this.letter.createTransactionStore(); 128 } 129 130 /** 131 * @throws IOException 132 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages() 133 */ 134 @Override 135 public void deleteAllMessages() throws IOException { 136 this.letter.deleteAllMessages(); 137 } 138 139 /** 140 * @return destinations 141 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations() 142 */ 143 @Override 144 public Set<ActiveMQDestination> getDestinations() { 145 return this.letter.getDestinations(); 146 } 147 148 /** 149 * @return lastMessageBrokerSequenceId 150 * @throws IOException 151 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId() 152 */ 153 @Override 154 public long getLastMessageBrokerSequenceId() throws IOException { 155 return this.letter.getLastMessageBrokerSequenceId(); 156 } 157 158 @Override 159 public long getLastProducerSequenceId(ProducerId id) throws IOException { 160 return this.letter.getLastProducerSequenceId(id); 161 } 162 163 @Override 164 public void allowIOResumption() { 165 this.letter.allowIOResumption(); 166 } 167 168 /** 169 * @param destination 170 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 171 */ 172 @Override 173 public void removeQueueMessageStore(ActiveMQQueue destination) { 174 this.letter.removeQueueMessageStore(destination); 175 } 176 177 /** 178 * @param destination 179 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 180 */ 181 @Override 182 public void removeTopicMessageStore(ActiveMQTopic destination) { 183 this.letter.removeTopicMessageStore(destination); 184 } 185 186 /** 187 * @param context 188 * @throws IOException 189 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext) 190 */ 191 @Override 192 public void rollbackTransaction(ConnectionContext context) throws IOException { 193 this.letter.rollbackTransaction(context); 194 } 195 196 /** 197 * @param brokerName 198 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String) 199 */ 200 @Override 201 public void setBrokerName(String brokerName) { 202 this.letter.setBrokerName(brokerName); 203 } 204 205 /** 206 * @param usageManager 207 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) 208 */ 209 @Override 210 public void setUsageManager(SystemUsage usageManager) { 211 this.letter.setUsageManager(usageManager); 212 } 213 214 /** 215 * @return the size of the store 216 * @see org.apache.activemq.store.PersistenceAdapter#size() 217 */ 218 @Override 219 public long size() { 220 return this.letter.isStarted() ? this.letter.size() : 0l; 221 } 222 223 /** 224 * @throws Exception 225 * @see org.apache.activemq.Service#start() 226 */ 227 @Override 228 public void doStart() throws Exception { 229 this.letter.start(); 230 231 if (brokerService != null && brokerService.isUseJmx()) { 232 PersistenceAdapterView view = new PersistenceAdapterView(this); 233 view.setInflightTransactionViewCallable(new Callable<String>() { 234 @Override 235 public String call() throws Exception { 236 return letter.getTransactions(); 237 } 238 }); 239 view.setDataViewCallable(new Callable<String>() { 240 @Override 241 public String call() throws Exception { 242 return letter.getJournal().getFileMap().keySet().toString(); 243 } 244 }); 245 AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, 246 createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString())); 247 } 248 } 249 250 /** 251 * @throws Exception 252 * @see org.apache.activemq.Service#stop() 253 */ 254 @Override 255 public void doStop(ServiceStopper stopper) throws Exception { 256 this.letter.stop(); 257 258 if (brokerService != null && brokerService.isUseJmx()) { 259 ObjectName brokerObjectName = brokerService.getBrokerObjectName(); 260 brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString())); 261 } 262 } 263 264 /** 265 * Get the journalMaxFileLength 266 * 267 * @return the journalMaxFileLength 268 */ 269 @Override 270 public int getJournalMaxFileLength() { 271 return this.letter.getJournalMaxFileLength(); 272 } 273 274 /** 275 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can 276 * be used 277 * 278 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 279 */ 280 public void setJournalMaxFileLength(int journalMaxFileLength) { 281 this.letter.setJournalMaxFileLength(journalMaxFileLength); 282 } 283 284 /** 285 * Set the max number of producers (LRU cache) to track for duplicate sends 286 */ 287 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 288 this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack); 289 } 290 291 public int getMaxFailoverProducersToTrack() { 292 return this.letter.getMaxFailoverProducersToTrack(); 293 } 294 295 /** 296 * set the audit window depth for duplicate suppression (should exceed the max transaction 297 * batch) 298 */ 299 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 300 this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth); 301 } 302 303 public int getFailoverProducersAuditDepth() { 304 return this.letter.getFailoverProducersAuditDepth(); 305 } 306 307 /** 308 * Get the checkpointInterval 309 * 310 * @return the checkpointInterval 311 */ 312 public long getCheckpointInterval() { 313 return this.letter.getCheckpointInterval(); 314 } 315 316 /** 317 * Set the checkpointInterval 318 * 319 * @param checkpointInterval 320 * the checkpointInterval to set 321 */ 322 public void setCheckpointInterval(long checkpointInterval) { 323 this.letter.setCheckpointInterval(checkpointInterval); 324 } 325 326 /** 327 * Get the cleanupInterval 328 * 329 * @return the cleanupInterval 330 */ 331 public long getCleanupInterval() { 332 return this.letter.getCleanupInterval(); 333 } 334 335 /** 336 * Set the cleanupInterval 337 * 338 * @param cleanupInterval 339 * the cleanupInterval to set 340 */ 341 public void setCleanupInterval(long cleanupInterval) { 342 this.letter.setCleanupInterval(cleanupInterval); 343 } 344 345 /** 346 * Get the indexWriteBatchSize 347 * 348 * @return the indexWriteBatchSize 349 */ 350 public int getIndexWriteBatchSize() { 351 return this.letter.getIndexWriteBatchSize(); 352 } 353 354 /** 355 * Set the indexWriteBatchSize 356 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 357 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 358 * @param indexWriteBatchSize 359 * the indexWriteBatchSize to set 360 */ 361 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 362 this.letter.setIndexWriteBatchSize(indexWriteBatchSize); 363 } 364 365 /** 366 * Get the journalMaxWriteBatchSize 367 * 368 * @return the journalMaxWriteBatchSize 369 */ 370 public int getJournalMaxWriteBatchSize() { 371 return this.letter.getJournalMaxWriteBatchSize(); 372 } 373 374 /** 375 * Set the journalMaxWriteBatchSize 376 * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 377 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 378 * @param journalMaxWriteBatchSize 379 * the journalMaxWriteBatchSize to set 380 */ 381 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 382 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize); 383 } 384 385 /** 386 * Get the enableIndexWriteAsync 387 * 388 * @return the enableIndexWriteAsync 389 */ 390 public boolean isEnableIndexWriteAsync() { 391 return this.letter.isEnableIndexWriteAsync(); 392 } 393 394 /** 395 * Set the enableIndexWriteAsync 396 * 397 * @param enableIndexWriteAsync 398 * the enableIndexWriteAsync to set 399 */ 400 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 401 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); 402 } 403 404 /** 405 * Get the directory 406 * 407 * @return the directory 408 */ 409 @Override 410 public File getDirectory() { 411 return this.letter.getDirectory(); 412 } 413 414 /** 415 * @param dir 416 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) 417 */ 418 @Override 419 public void setDirectory(File dir) { 420 this.letter.setDirectory(dir); 421 } 422 423 /** 424 * @return the currently configured location of the KahaDB index files. 425 */ 426 public File getIndexDirectory() { 427 return this.letter.getIndexDirectory(); 428 } 429 430 /** 431 * Sets the directory where KahaDB index files should be written. 432 * 433 * @param indexDirectory 434 * the directory where the KahaDB store index files should be written. 435 */ 436 public void setIndexDirectory(File indexDirectory) { 437 this.letter.setIndexDirectory(indexDirectory); 438 } 439 440 /** 441 * Get the enableJournalDiskSyncs 442 * @deprecated use {@link #getJournalDiskSyncStrategy} instead 443 * @return the enableJournalDiskSyncs 444 */ 445 public boolean isEnableJournalDiskSyncs() { 446 return this.letter.isEnableJournalDiskSyncs(); 447 } 448 449 /** 450 * Set the enableJournalDiskSyncs 451 * 452 * @deprecated use {@link #setJournalDiskSyncStrategy} instead 453 * @param enableJournalDiskSyncs 454 * the enableJournalDiskSyncs to set 455 */ 456 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) { 457 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs); 458 } 459 460 /** 461 * @return 462 */ 463 public String getJournalDiskSyncStrategy() { 464 return letter.getJournalDiskSyncStrategy(); 465 } 466 467 public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() { 468 return letter.getJournalDiskSyncStrategyEnum(); 469 } 470 471 /** 472 * @param journalDiskSyncStrategy 473 */ 474 public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) { 475 letter.setJournalDiskSyncStrategy(journalDiskSyncStrategy); 476 } 477 478 /** 479 * @return 480 */ 481 public long getJournalDiskSyncInterval() { 482 return letter.getJournalDiskSyncInterval(); 483 } 484 485 /** 486 * @param journalDiskSyncInterval 487 */ 488 public void setJournalDiskSyncInterval(long journalDiskSyncInterval) { 489 letter.setJournalDiskSyncInterval(journalDiskSyncInterval); 490 } 491 492 /** 493 * Get the indexCacheSize 494 * 495 * @return the indexCacheSize 496 */ 497 public int getIndexCacheSize() { 498 return this.letter.getIndexCacheSize(); 499 } 500 501 /** 502 * Set the indexCacheSize 503 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 504 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 505 * @param indexCacheSize 506 * the indexCacheSize to set 507 */ 508 public void setIndexCacheSize(int indexCacheSize) { 509 this.letter.setIndexCacheSize(indexCacheSize); 510 } 511 512 /** 513 * Get the ignoreMissingJournalfiles 514 * 515 * @return the ignoreMissingJournalfiles 516 */ 517 public boolean isIgnoreMissingJournalfiles() { 518 return this.letter.isIgnoreMissingJournalfiles(); 519 } 520 521 /** 522 * Set the ignoreMissingJournalfiles 523 * 524 * @param ignoreMissingJournalfiles 525 * the ignoreMissingJournalfiles to set 526 */ 527 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 528 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); 529 } 530 531 public boolean isChecksumJournalFiles() { 532 return letter.isChecksumJournalFiles(); 533 } 534 535 public boolean isCheckForCorruptJournalFiles() { 536 return letter.isCheckForCorruptJournalFiles(); 537 } 538 539 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 540 letter.setChecksumJournalFiles(checksumJournalFiles); 541 } 542 543 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 544 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); 545 } 546 547 @Override 548 public void setBrokerService(BrokerService brokerService) { 549 super.setBrokerService(brokerService); 550 letter.setBrokerService(brokerService); 551 } 552 553 public String getPreallocationScope() { 554 return letter.getPreallocationScope(); 555 } 556 557 public void setPreallocationScope(String preallocationScope) { 558 this.letter.setPreallocationScope(preallocationScope); 559 } 560 561 public String getPreallocationStrategy() { 562 return letter.getPreallocationStrategy(); 563 } 564 565 public void setPreallocationStrategy(String preallocationStrategy) { 566 this.letter.setPreallocationStrategy(preallocationStrategy); 567 } 568 569 public boolean isArchiveDataLogs() { 570 return letter.isArchiveDataLogs(); 571 } 572 573 public void setArchiveDataLogs(boolean archiveDataLogs) { 574 letter.setArchiveDataLogs(archiveDataLogs); 575 } 576 577 public File getDirectoryArchive() { 578 return letter.getDirectoryArchive(); 579 } 580 581 public void setDirectoryArchive(File directoryArchive) { 582 letter.setDirectoryArchive(directoryArchive); 583 } 584 585 public boolean isConcurrentStoreAndDispatchQueues() { 586 return letter.isConcurrentStoreAndDispatchQueues(); 587 } 588 589 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 590 letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch); 591 } 592 593 public boolean isConcurrentStoreAndDispatchTopics() { 594 return letter.isConcurrentStoreAndDispatchTopics(); 595 } 596 597 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 598 letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch); 599 } 600 601 public int getMaxAsyncJobs() { 602 return letter.getMaxAsyncJobs(); 603 } 604 /** 605 * @param maxAsyncJobs 606 * the maxAsyncJobs to set 607 */ 608 public void setMaxAsyncJobs(int maxAsyncJobs) { 609 letter.setMaxAsyncJobs(maxAsyncJobs); 610 } 611 612 /** 613 * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead 614 * 615 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set 616 */ 617 @Deprecated 618 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException { 619 getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay); 620 } 621 622 public boolean getForceRecoverIndex() { 623 return letter.getForceRecoverIndex(); 624 } 625 626 public void setForceRecoverIndex(boolean forceRecoverIndex) { 627 letter.setForceRecoverIndex(forceRecoverIndex); 628 } 629 630 public boolean isArchiveCorruptedIndex() { 631 return letter.isArchiveCorruptedIndex(); 632 } 633 634 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 635 letter.setArchiveCorruptedIndex(archiveCorruptedIndex); 636 } 637 638 public float getIndexLFUEvictionFactor() { 639 return letter.getIndexLFUEvictionFactor(); 640 } 641 642 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 643 letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor); 644 } 645 646 public boolean isUseIndexLFRUEviction() { 647 return letter.isUseIndexLFRUEviction(); 648 } 649 650 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 651 letter.setUseIndexLFRUEviction(useIndexLFRUEviction); 652 } 653 654 public void setEnableIndexDiskSyncs(boolean diskSyncs) { 655 letter.setEnableIndexDiskSyncs(diskSyncs); 656 } 657 658 public boolean isEnableIndexDiskSyncs() { 659 return letter.isEnableIndexDiskSyncs(); 660 } 661 662 public void setEnableIndexRecoveryFile(boolean enable) { 663 letter.setEnableIndexRecoveryFile(enable); 664 } 665 666 public boolean isEnableIndexRecoveryFile() { 667 return letter.isEnableIndexRecoveryFile(); 668 } 669 670 public void setEnableIndexPageCaching(boolean enable) { 671 letter.setEnableIndexPageCaching(enable); 672 } 673 674 public boolean isEnableIndexPageCaching() { 675 return letter.isEnableIndexPageCaching(); 676 } 677 678 public int getCompactAcksAfterNoGC() { 679 return letter.getCompactAcksAfterNoGC(); 680 } 681 682 /** 683 * Sets the number of GC cycles where no journal logs were removed before an attempt to 684 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 685 * <p> 686 * A value of -1 will disable this feature. 687 * 688 * @param compactAcksAfterNoGC 689 * Number of empty GC cycles before we rewrite old ACKS. 690 */ 691 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 692 this.letter.setCompactAcksAfterNoGC(compactAcksAfterNoGC); 693 } 694 695 public boolean isCompactAcksIgnoresStoreGrowth() { 696 return this.letter.isCompactAcksIgnoresStoreGrowth(); 697 } 698 699 /** 700 * Configure if Ack compaction will occur regardless of continued growth of the 701 * journal logs meaning that the store has not run out of space yet. Because the 702 * compaction operation can be costly this value is defaulted to off and the Ack 703 * compaction is only done when it seems that the store cannot grow and larger. 704 * 705 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 706 */ 707 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 708 this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth); 709 } 710 711 /** 712 * Returns whether Ack compaction is enabled 713 * 714 * @return enableAckCompaction 715 */ 716 public boolean isEnableAckCompaction() { 717 return letter.isEnableAckCompaction(); 718 } 719 720 /** 721 * Configure if the Ack compaction task should be enabled to run 722 * 723 * @param enableAckCompaction 724 */ 725 public void setEnableAckCompaction(boolean enableAckCompaction) { 726 letter.setEnableAckCompaction(enableAckCompaction); 727 } 728 729 public KahaDBStore getStore() { 730 return letter; 731 } 732 733 public KahaTransactionInfo createTransactionInfo(TransactionId txid) { 734 if (txid == null) { 735 return null; 736 } 737 KahaTransactionInfo rc = new KahaTransactionInfo(); 738 739 if (txid.isLocalTransaction()) { 740 LocalTransactionId t = (LocalTransactionId) txid; 741 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId(); 742 kahaTxId.setConnectionId(t.getConnectionId().getValue()); 743 kahaTxId.setTransactionId(t.getValue()); 744 rc.setLocalTransactionId(kahaTxId); 745 } else { 746 XATransactionId t = (XATransactionId) txid; 747 KahaXATransactionId kahaTxId = new KahaXATransactionId(); 748 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier())); 749 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId())); 750 kahaTxId.setFormatId(t.getFormatId()); 751 rc.setXaTransactionId(kahaTxId); 752 } 753 return rc; 754 } 755 756 @Override 757 public Locker createDefaultLocker() throws IOException { 758 SharedFileLocker locker = new SharedFileLocker(); 759 locker.configure(this); 760 return locker; 761 } 762 763 @Override 764 public void init() throws Exception {} 765 766 @Override 767 public String toString() { 768 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 769 return "KahaDBPersistenceAdapter[" + path + (getIndexDirectory() != null ? ",Index:" + getIndexDirectory().getAbsolutePath() : "") + "]"; 770 } 771 772 @Override 773 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 774 getStore().setTransactionIdTransformer(transactionIdTransformer); 775 } 776 777 @Override 778 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 779 return this.letter.createJobSchedulerStore(); 780 } 781 782 /* 783 * When set, ensure that the cleanup/gc operation is executed during the stop procedure 784 */ 785 public void setCleanupOnStop(boolean cleanupOnStop) { 786 this.letter.setCleanupOnStop(cleanupOnStop); 787 } 788 789 public boolean getCleanupOnStop() { 790 return this.letter.getCleanupOnStop(); 791 } 792}