001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import static org.apache.activemq.broker.jmx.BrokerMBeanSupport.createPersistenceAdapterName; 020 021import java.io.File; 022import java.io.IOException; 023import java.util.Set; 024import java.util.concurrent.Callable; 025 026import javax.management.ObjectName; 027 028import org.apache.activemq.broker.BrokerService; 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.broker.LockableServiceSupport; 031import org.apache.activemq.broker.Locker; 032import org.apache.activemq.broker.jmx.AnnotatedMBean; 033import org.apache.activemq.broker.jmx.PersistenceAdapterView; 034import org.apache.activemq.broker.scheduler.JobSchedulerStore; 035import org.apache.activemq.command.ActiveMQDestination; 036import org.apache.activemq.command.ActiveMQQueue; 037import org.apache.activemq.command.ActiveMQTopic; 038import org.apache.activemq.command.LocalTransactionId; 039import org.apache.activemq.command.ProducerId; 040import org.apache.activemq.command.TransactionId; 041import org.apache.activemq.command.XATransactionId; 042import org.apache.activemq.protobuf.Buffer; 043import org.apache.activemq.store.JournaledStore; 044import org.apache.activemq.store.MessageStore; 045import org.apache.activemq.store.PersistenceAdapter; 046import org.apache.activemq.store.SharedFileLocker; 047import org.apache.activemq.store.TopicMessageStore; 048import org.apache.activemq.store.TransactionIdTransformer; 049import org.apache.activemq.store.TransactionIdTransformerAware; 050import org.apache.activemq.store.TransactionStore; 051import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId; 052import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 053import org.apache.activemq.store.kahadb.data.KahaXATransactionId; 054import org.apache.activemq.usage.SystemUsage; 055import org.apache.activemq.util.ServiceStopper; 056 057/** 058 * An implementation of {@link PersistenceAdapter} designed for use with 059 * KahaDB - Embedded Lightweight Non-Relational Database 060 * 061 * @org.apache.xbean.XBean element="kahaDB" 062 * 063 */ 064public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore, TransactionIdTransformerAware { 065 private final KahaDBStore letter = new KahaDBStore(); 066 067 /** 068 * @param context 069 * @throws IOException 070 * @see org.apache.activemq.store.PersistenceAdapter#beginTransaction(org.apache.activemq.broker.ConnectionContext) 071 */ 072 @Override 073 public void beginTransaction(ConnectionContext context) throws IOException { 074 this.letter.beginTransaction(context); 075 } 076 077 /** 078 * @param sync 079 * @throws IOException 080 * @see org.apache.activemq.store.PersistenceAdapter#checkpoint(boolean) 081 */ 082 @Override 083 public void checkpoint(boolean sync) throws IOException { 084 this.letter.checkpoint(sync); 085 } 086 087 /** 088 * @param context 089 * @throws IOException 090 * @see org.apache.activemq.store.PersistenceAdapter#commitTransaction(org.apache.activemq.broker.ConnectionContext) 091 */ 092 @Override 093 public void commitTransaction(ConnectionContext context) throws IOException { 094 this.letter.commitTransaction(context); 095 } 096 097 /** 098 * @param destination 099 * @return MessageStore 100 * @throws IOException 101 * @see org.apache.activemq.store.PersistenceAdapter#createQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 102 */ 103 @Override 104 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 105 return this.letter.createQueueMessageStore(destination); 106 } 107 108 /** 109 * @param destination 110 * @return TopicMessageStore 111 * @throws IOException 112 * @see org.apache.activemq.store.PersistenceAdapter#createTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 113 */ 114 @Override 115 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 116 return this.letter.createTopicMessageStore(destination); 117 } 118 119 /** 120 * @return TransactionStore 121 * @throws IOException 122 * @see org.apache.activemq.store.PersistenceAdapter#createTransactionStore() 123 */ 124 @Override 125 public TransactionStore createTransactionStore() throws IOException { 126 return this.letter.createTransactionStore(); 127 } 128 129 /** 130 * @throws IOException 131 * @see org.apache.activemq.store.PersistenceAdapter#deleteAllMessages() 132 */ 133 @Override 134 public void deleteAllMessages() throws IOException { 135 this.letter.deleteAllMessages(); 136 } 137 138 /** 139 * @return destinations 140 * @see org.apache.activemq.store.PersistenceAdapter#getDestinations() 141 */ 142 @Override 143 public Set<ActiveMQDestination> getDestinations() { 144 return this.letter.getDestinations(); 145 } 146 147 /** 148 * @return lastMessageBrokerSequenceId 149 * @throws IOException 150 * @see org.apache.activemq.store.PersistenceAdapter#getLastMessageBrokerSequenceId() 151 */ 152 @Override 153 public long getLastMessageBrokerSequenceId() throws IOException { 154 return this.letter.getLastMessageBrokerSequenceId(); 155 } 156 157 @Override 158 public long getLastProducerSequenceId(ProducerId id) throws IOException { 159 return this.letter.getLastProducerSequenceId(id); 160 } 161 162 /** 163 * @param destination 164 * @see org.apache.activemq.store.PersistenceAdapter#removeQueueMessageStore(org.apache.activemq.command.ActiveMQQueue) 165 */ 166 @Override 167 public void removeQueueMessageStore(ActiveMQQueue destination) { 168 this.letter.removeQueueMessageStore(destination); 169 } 170 171 /** 172 * @param destination 173 * @see org.apache.activemq.store.PersistenceAdapter#removeTopicMessageStore(org.apache.activemq.command.ActiveMQTopic) 174 */ 175 @Override 176 public void removeTopicMessageStore(ActiveMQTopic destination) { 177 this.letter.removeTopicMessageStore(destination); 178 } 179 180 /** 181 * @param context 182 * @throws IOException 183 * @see org.apache.activemq.store.PersistenceAdapter#rollbackTransaction(org.apache.activemq.broker.ConnectionContext) 184 */ 185 @Override 186 public void rollbackTransaction(ConnectionContext context) throws IOException { 187 this.letter.rollbackTransaction(context); 188 } 189 190 /** 191 * @param brokerName 192 * @see org.apache.activemq.store.PersistenceAdapter#setBrokerName(java.lang.String) 193 */ 194 @Override 195 public void setBrokerName(String brokerName) { 196 this.letter.setBrokerName(brokerName); 197 } 198 199 /** 200 * @param usageManager 201 * @see org.apache.activemq.store.PersistenceAdapter#setUsageManager(org.apache.activemq.usage.SystemUsage) 202 */ 203 @Override 204 public void setUsageManager(SystemUsage usageManager) { 205 this.letter.setUsageManager(usageManager); 206 } 207 208 /** 209 * @return the size of the store 210 * @see org.apache.activemq.store.PersistenceAdapter#size() 211 */ 212 @Override 213 public long size() { 214 return this.letter.isStarted() ? this.letter.size() : 0l; 215 } 216 217 /** 218 * @throws Exception 219 * @see org.apache.activemq.Service#start() 220 */ 221 @Override 222 public void doStart() throws Exception { 223 this.letter.start(); 224 225 if (brokerService != null && brokerService.isUseJmx()) { 226 PersistenceAdapterView view = new PersistenceAdapterView(this); 227 view.setInflightTransactionViewCallable(new Callable<String>() { 228 @Override 229 public String call() throws Exception { 230 return letter.getTransactions(); 231 } 232 }); 233 view.setDataViewCallable(new Callable<String>() { 234 @Override 235 public String call() throws Exception { 236 return letter.getJournal().getFileMap().keySet().toString(); 237 } 238 }); 239 AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, 240 createPersistenceAdapterName(brokerService.getBrokerObjectName().toString(), toString())); 241 } 242 } 243 244 /** 245 * @throws Exception 246 * @see org.apache.activemq.Service#stop() 247 */ 248 @Override 249 public void doStop(ServiceStopper stopper) throws Exception { 250 this.letter.stop(); 251 252 if (brokerService != null && brokerService.isUseJmx()) { 253 ObjectName brokerObjectName = brokerService.getBrokerObjectName(); 254 brokerService.getManagementContext().unregisterMBean(createPersistenceAdapterName(brokerObjectName.toString(), toString())); 255 } 256 } 257 258 /** 259 * Get the journalMaxFileLength 260 * 261 * @return the journalMaxFileLength 262 */ 263 @Override 264 public int getJournalMaxFileLength() { 265 return this.letter.getJournalMaxFileLength(); 266 } 267 268 /** 269 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can 270 * be used 271 * 272 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 273 */ 274 public void setJournalMaxFileLength(int journalMaxFileLength) { 275 this.letter.setJournalMaxFileLength(journalMaxFileLength); 276 } 277 278 /** 279 * Set the max number of producers (LRU cache) to track for duplicate sends 280 */ 281 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 282 this.letter.setMaxFailoverProducersToTrack(maxFailoverProducersToTrack); 283 } 284 285 public int getMaxFailoverProducersToTrack() { 286 return this.letter.getMaxFailoverProducersToTrack(); 287 } 288 289 /** 290 * set the audit window depth for duplicate suppression (should exceed the max transaction 291 * batch) 292 */ 293 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 294 this.letter.setFailoverProducersAuditDepth(failoverProducersAuditDepth); 295 } 296 297 public int getFailoverProducersAuditDepth() { 298 return this.letter.getFailoverProducersAuditDepth(); 299 } 300 301 /** 302 * Get the checkpointInterval 303 * 304 * @return the checkpointInterval 305 */ 306 public long getCheckpointInterval() { 307 return this.letter.getCheckpointInterval(); 308 } 309 310 /** 311 * Set the checkpointInterval 312 * 313 * @param checkpointInterval 314 * the checkpointInterval to set 315 */ 316 public void setCheckpointInterval(long checkpointInterval) { 317 this.letter.setCheckpointInterval(checkpointInterval); 318 } 319 320 /** 321 * Get the cleanupInterval 322 * 323 * @return the cleanupInterval 324 */ 325 public long getCleanupInterval() { 326 return this.letter.getCleanupInterval(); 327 } 328 329 /** 330 * Set the cleanupInterval 331 * 332 * @param cleanupInterval 333 * the cleanupInterval to set 334 */ 335 public void setCleanupInterval(long cleanupInterval) { 336 this.letter.setCleanupInterval(cleanupInterval); 337 } 338 339 /** 340 * Get the indexWriteBatchSize 341 * 342 * @return the indexWriteBatchSize 343 */ 344 public int getIndexWriteBatchSize() { 345 return this.letter.getIndexWriteBatchSize(); 346 } 347 348 /** 349 * Set the indexWriteBatchSize 350 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 351 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 352 * @param indexWriteBatchSize 353 * the indexWriteBatchSize to set 354 */ 355 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 356 this.letter.setIndexWriteBatchSize(indexWriteBatchSize); 357 } 358 359 /** 360 * Get the journalMaxWriteBatchSize 361 * 362 * @return the journalMaxWriteBatchSize 363 */ 364 public int getJournalMaxWriteBatchSize() { 365 return this.letter.getJournalMaxWriteBatchSize(); 366 } 367 368 /** 369 * Set the journalMaxWriteBatchSize 370 * * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 371 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 372 * @param journalMaxWriteBatchSize 373 * the journalMaxWriteBatchSize to set 374 */ 375 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 376 this.letter.setJournalMaxWriteBatchSize(journalMaxWriteBatchSize); 377 } 378 379 /** 380 * Get the enableIndexWriteAsync 381 * 382 * @return the enableIndexWriteAsync 383 */ 384 public boolean isEnableIndexWriteAsync() { 385 return this.letter.isEnableIndexWriteAsync(); 386 } 387 388 /** 389 * Set the enableIndexWriteAsync 390 * 391 * @param enableIndexWriteAsync 392 * the enableIndexWriteAsync to set 393 */ 394 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 395 this.letter.setEnableIndexWriteAsync(enableIndexWriteAsync); 396 } 397 398 /** 399 * Get the directory 400 * 401 * @return the directory 402 */ 403 @Override 404 public File getDirectory() { 405 return this.letter.getDirectory(); 406 } 407 408 /** 409 * @param dir 410 * @see org.apache.activemq.store.PersistenceAdapter#setDirectory(java.io.File) 411 */ 412 @Override 413 public void setDirectory(File dir) { 414 this.letter.setDirectory(dir); 415 } 416 417 /** 418 * @return the currently configured location of the KahaDB index files. 419 */ 420 public File getIndexDirectory() { 421 return this.letter.getIndexDirectory(); 422 } 423 424 /** 425 * Sets the directory where KahaDB index files should be written. 426 * 427 * @param indexDirectory 428 * the directory where the KahaDB store index files should be written. 429 */ 430 public void setIndexDirectory(File indexDirectory) { 431 this.letter.setIndexDirectory(indexDirectory); 432 } 433 434 /** 435 * Get the enableJournalDiskSyncs 436 * @deprecated use {@link #getJournalDiskSyncStrategy} instead 437 * @return the enableJournalDiskSyncs 438 */ 439 public boolean isEnableJournalDiskSyncs() { 440 return this.letter.isEnableJournalDiskSyncs(); 441 } 442 443 /** 444 * Set the enableJournalDiskSyncs 445 * 446 * @deprecated use {@link #setJournalDiskSyncStrategy} instead 447 * @param enableJournalDiskSyncs 448 * the enableJournalDiskSyncs to set 449 */ 450 public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs) { 451 this.letter.setEnableJournalDiskSyncs(enableJournalDiskSyncs); 452 } 453 454 /** 455 * @return 456 */ 457 public String getJournalDiskSyncStrategy() { 458 return letter.getJournalDiskSyncStrategy(); 459 } 460 461 /** 462 * @param journalDiskSyncStrategy 463 */ 464 public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) { 465 letter.setJournalDiskSyncStrategy(journalDiskSyncStrategy); 466 } 467 468 /** 469 * @return 470 */ 471 public long getJournalDiskSyncInterval() { 472 return letter.getJournalDiskSyncInterval(); 473 } 474 475 /** 476 * @param journalDiskSyncInterval 477 */ 478 public void setJournalDiskSyncInterval(long journalDiskSyncInterval) { 479 letter.setJournalDiskSyncInterval(journalDiskSyncInterval); 480 } 481 482 /** 483 * Get the indexCacheSize 484 * 485 * @return the indexCacheSize 486 */ 487 public int getIndexCacheSize() { 488 return this.letter.getIndexCacheSize(); 489 } 490 491 /** 492 * Set the indexCacheSize 493 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 494 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor" 495 * @param indexCacheSize 496 * the indexCacheSize to set 497 */ 498 public void setIndexCacheSize(int indexCacheSize) { 499 this.letter.setIndexCacheSize(indexCacheSize); 500 } 501 502 /** 503 * Get the ignoreMissingJournalfiles 504 * 505 * @return the ignoreMissingJournalfiles 506 */ 507 public boolean isIgnoreMissingJournalfiles() { 508 return this.letter.isIgnoreMissingJournalfiles(); 509 } 510 511 /** 512 * Set the ignoreMissingJournalfiles 513 * 514 * @param ignoreMissingJournalfiles 515 * the ignoreMissingJournalfiles to set 516 */ 517 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 518 this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); 519 } 520 521 public boolean isChecksumJournalFiles() { 522 return letter.isChecksumJournalFiles(); 523 } 524 525 public boolean isCheckForCorruptJournalFiles() { 526 return letter.isCheckForCorruptJournalFiles(); 527 } 528 529 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 530 letter.setChecksumJournalFiles(checksumJournalFiles); 531 } 532 533 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 534 letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); 535 } 536 537 @Override 538 public void setBrokerService(BrokerService brokerService) { 539 super.setBrokerService(brokerService); 540 letter.setBrokerService(brokerService); 541 } 542 543 public String getPreallocationScope() { 544 return letter.getPreallocationScope(); 545 } 546 547 public void setPreallocationScope(String preallocationScope) { 548 this.letter.setPreallocationScope(preallocationScope); 549 } 550 551 public String getPreallocationStrategy() { 552 return letter.getPreallocationStrategy(); 553 } 554 555 public void setPreallocationStrategy(String preallocationStrategy) { 556 this.letter.setPreallocationStrategy(preallocationStrategy); 557 } 558 559 public boolean isArchiveDataLogs() { 560 return letter.isArchiveDataLogs(); 561 } 562 563 public void setArchiveDataLogs(boolean archiveDataLogs) { 564 letter.setArchiveDataLogs(archiveDataLogs); 565 } 566 567 public File getDirectoryArchive() { 568 return letter.getDirectoryArchive(); 569 } 570 571 public void setDirectoryArchive(File directoryArchive) { 572 letter.setDirectoryArchive(directoryArchive); 573 } 574 575 public boolean isConcurrentStoreAndDispatchQueues() { 576 return letter.isConcurrentStoreAndDispatchQueues(); 577 } 578 579 public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatch) { 580 letter.setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatch); 581 } 582 583 public boolean isConcurrentStoreAndDispatchTopics() { 584 return letter.isConcurrentStoreAndDispatchTopics(); 585 } 586 587 public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) { 588 letter.setConcurrentStoreAndDispatchTopics(concurrentStoreAndDispatch); 589 } 590 591 public int getMaxAsyncJobs() { 592 return letter.getMaxAsyncJobs(); 593 } 594 /** 595 * @param maxAsyncJobs 596 * the maxAsyncJobs to set 597 */ 598 public void setMaxAsyncJobs(int maxAsyncJobs) { 599 letter.setMaxAsyncJobs(maxAsyncJobs); 600 } 601 602 /** 603 * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead 604 * 605 * @param databaseLockedWaitDelay the databaseLockedWaitDelay to set 606 */ 607 @Deprecated 608 public void setDatabaseLockedWaitDelay(int databaseLockedWaitDelay) throws IOException { 609 getLocker().setLockAcquireSleepInterval(databaseLockedWaitDelay); 610 } 611 612 public boolean getForceRecoverIndex() { 613 return letter.getForceRecoverIndex(); 614 } 615 616 public void setForceRecoverIndex(boolean forceRecoverIndex) { 617 letter.setForceRecoverIndex(forceRecoverIndex); 618 } 619 620 public boolean isArchiveCorruptedIndex() { 621 return letter.isArchiveCorruptedIndex(); 622 } 623 624 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 625 letter.setArchiveCorruptedIndex(archiveCorruptedIndex); 626 } 627 628 public float getIndexLFUEvictionFactor() { 629 return letter.getIndexLFUEvictionFactor(); 630 } 631 632 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 633 letter.setIndexLFUEvictionFactor(indexLFUEvictionFactor); 634 } 635 636 public boolean isUseIndexLFRUEviction() { 637 return letter.isUseIndexLFRUEviction(); 638 } 639 640 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 641 letter.setUseIndexLFRUEviction(useIndexLFRUEviction); 642 } 643 644 public void setEnableIndexDiskSyncs(boolean diskSyncs) { 645 letter.setEnableIndexDiskSyncs(diskSyncs); 646 } 647 648 public boolean isEnableIndexDiskSyncs() { 649 return letter.isEnableIndexDiskSyncs(); 650 } 651 652 public void setEnableIndexRecoveryFile(boolean enable) { 653 letter.setEnableIndexRecoveryFile(enable); 654 } 655 656 public boolean isEnableIndexRecoveryFile() { 657 return letter.isEnableIndexRecoveryFile(); 658 } 659 660 public void setEnableIndexPageCaching(boolean enable) { 661 letter.setEnableIndexPageCaching(enable); 662 } 663 664 public boolean isEnableIndexPageCaching() { 665 return letter.isEnableIndexPageCaching(); 666 } 667 668 public int getCompactAcksAfterNoGC() { 669 return letter.getCompactAcksAfterNoGC(); 670 } 671 672 /** 673 * Sets the number of GC cycles where no journal logs were removed before an attempt to 674 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 675 * <p> 676 * A value of -1 will disable this feature. 677 * 678 * @param compactAcksAfterNoGC 679 * Number of empty GC cycles before we rewrite old ACKS. 680 */ 681 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 682 this.letter.setCompactAcksAfterNoGC(compactAcksAfterNoGC); 683 } 684 685 public boolean isCompactAcksIgnoresStoreGrowth() { 686 return this.letter.isCompactAcksIgnoresStoreGrowth(); 687 } 688 689 /** 690 * Configure if Ack compaction will occur regardless of continued growth of the 691 * journal logs meaning that the store has not run out of space yet. Because the 692 * compaction operation can be costly this value is defaulted to off and the Ack 693 * compaction is only done when it seems that the store cannot grow and larger. 694 * 695 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 696 */ 697 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 698 this.letter.setCompactAcksIgnoresStoreGrowth(compactAcksIgnoresStoreGrowth); 699 } 700 701 /** 702 * Returns whether Ack compaction is enabled 703 * 704 * @return enableAckCompaction 705 */ 706 public boolean isEnableAckCompaction() { 707 return letter.isEnableAckCompaction(); 708 } 709 710 /** 711 * Configure if the Ack compaction task should be enabled to run 712 * 713 * @param enableAckCompaction 714 */ 715 public void setEnableAckCompaction(boolean enableAckCompaction) { 716 letter.setEnableAckCompaction(enableAckCompaction); 717 } 718 719 public KahaDBStore getStore() { 720 return letter; 721 } 722 723 public KahaTransactionInfo createTransactionInfo(TransactionId txid) { 724 if (txid == null) { 725 return null; 726 } 727 KahaTransactionInfo rc = new KahaTransactionInfo(); 728 729 if (txid.isLocalTransaction()) { 730 LocalTransactionId t = (LocalTransactionId) txid; 731 KahaLocalTransactionId kahaTxId = new KahaLocalTransactionId(); 732 kahaTxId.setConnectionId(t.getConnectionId().getValue()); 733 kahaTxId.setTransactionId(t.getValue()); 734 rc.setLocalTransactionId(kahaTxId); 735 } else { 736 XATransactionId t = (XATransactionId) txid; 737 KahaXATransactionId kahaTxId = new KahaXATransactionId(); 738 kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier())); 739 kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId())); 740 kahaTxId.setFormatId(t.getFormatId()); 741 rc.setXaTransactionId(kahaTxId); 742 } 743 return rc; 744 } 745 746 @Override 747 public Locker createDefaultLocker() throws IOException { 748 SharedFileLocker locker = new SharedFileLocker(); 749 locker.configure(this); 750 return locker; 751 } 752 753 @Override 754 public void init() throws Exception {} 755 756 @Override 757 public String toString() { 758 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 759 return "KahaDBPersistenceAdapter[" + path + (getIndexDirectory() != null ? ",Index:" + getIndexDirectory().getAbsolutePath() : "") + "]"; 760 } 761 762 @Override 763 public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) { 764 getStore().setTransactionIdTransformer(transactionIdTransformer); 765 } 766 767 @Override 768 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 769 return this.letter.createJobSchedulerStore(); 770 } 771}