001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.DataInput; 024import java.io.DataOutput; 025import java.io.EOFException; 026import java.io.File; 027import java.io.IOException; 028import java.io.InputStream; 029import java.io.InterruptedIOException; 030import java.io.ObjectInputStream; 031import java.io.ObjectOutputStream; 032import java.io.OutputStream; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.Collections; 037import java.util.Date; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.LinkedHashMap; 042import java.util.LinkedHashSet; 043import java.util.LinkedList; 044import java.util.List; 045import java.util.Map; 046import java.util.Map.Entry; 047import java.util.Set; 048import java.util.SortedSet; 049import java.util.TreeMap; 050import java.util.TreeSet; 051 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.ConcurrentMap; 054import java.util.concurrent.Executors; 055import java.util.concurrent.ScheduledExecutorService; 056import java.util.concurrent.ThreadFactory; 057import java.util.concurrent.TimeUnit; 058import java.util.concurrent.atomic.AtomicBoolean; 059import java.util.concurrent.atomic.AtomicLong; 060import java.util.concurrent.atomic.AtomicReference; 061import java.util.concurrent.locks.ReentrantReadWriteLock; 062 063import org.apache.activemq.ActiveMQMessageAuditNoSync; 064import org.apache.activemq.broker.BrokerService; 065import org.apache.activemq.broker.BrokerServiceAware; 066import org.apache.activemq.command.TransactionId; 067import org.apache.activemq.openwire.OpenWireFormat; 068import org.apache.activemq.protobuf.Buffer; 069import org.apache.activemq.store.MessageStore; 070import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; 071import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 072import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 073import org.apache.activemq.store.kahadb.data.KahaDestination; 074import org.apache.activemq.store.kahadb.data.KahaEntryType; 075import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 076import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; 077import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 078import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 079import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand; 080import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 081import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 082import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 083import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 084import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 085import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 086import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; 087import org.apache.activemq.store.kahadb.disk.index.ListIndex; 088import org.apache.activemq.store.kahadb.disk.journal.DataFile; 089import org.apache.activemq.store.kahadb.disk.journal.Journal; 090import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; 091import org.apache.activemq.store.kahadb.disk.journal.Location; 092import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender; 093import org.apache.activemq.store.kahadb.disk.page.Page; 094import org.apache.activemq.store.kahadb.disk.page.PageFile; 095import org.apache.activemq.store.kahadb.disk.page.Transaction; 096import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; 097import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; 098import org.apache.activemq.store.kahadb.disk.util.Marshaller; 099import org.apache.activemq.store.kahadb.disk.util.Sequence; 100import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 101import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 102import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 103import org.apache.activemq.util.ByteSequence; 104import org.apache.activemq.util.DataByteArrayInputStream; 105import org.apache.activemq.util.DataByteArrayOutputStream; 106import org.apache.activemq.util.IOExceptionSupport; 107import org.apache.activemq.util.IOHelper; 108import org.apache.activemq.util.ServiceStopper; 109import org.apache.activemq.util.ServiceSupport; 110import org.apache.activemq.util.ThreadPoolUtils; 111import org.slf4j.Logger; 112import org.slf4j.LoggerFactory; 113import org.slf4j.MDC; 114 115public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { 116 117 protected BrokerService brokerService; 118 119 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 120 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); 121 public static final File DEFAULT_DIRECTORY = new File("KahaDB"); 122 protected static final Buffer UNMATCHED; 123 static { 124 UNMATCHED = new Buffer(new byte[]{}); 125 } 126 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); 127 128 static final int CLOSED_STATE = 1; 129 static final int OPEN_STATE = 2; 130 static final long NOT_ACKED = -1; 131 132 static final int VERSION = 5; 133 134 static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1; 135 136 protected class Metadata { 137 protected Page<Metadata> page; 138 protected int state; 139 protected BTreeIndex<String, StoredDestination> destinations; 140 protected Location lastUpdate; 141 protected Location firstInProgressTransactionLocation; 142 protected Location producerSequenceIdTrackerLocation = null; 143 protected Location ackMessageFileMapLocation = null; 144 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); 145 protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>(); 146 protected int version = VERSION; 147 protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION; 148 149 public void read(DataInput is) throws IOException { 150 state = is.readInt(); 151 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong()); 152 if (is.readBoolean()) { 153 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); 154 } else { 155 lastUpdate = null; 156 } 157 if (is.readBoolean()) { 158 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); 159 } else { 160 firstInProgressTransactionLocation = null; 161 } 162 try { 163 if (is.readBoolean()) { 164 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); 165 } else { 166 producerSequenceIdTrackerLocation = null; 167 } 168 } catch (EOFException expectedOnUpgrade) { 169 } 170 try { 171 version = is.readInt(); 172 } catch (EOFException expectedOnUpgrade) { 173 version = 1; 174 } 175 if (version >= 5 && is.readBoolean()) { 176 ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is); 177 } else { 178 ackMessageFileMapLocation = null; 179 } 180 try { 181 openwireVersion = is.readInt(); 182 } catch (EOFException expectedOnUpgrade) { 183 openwireVersion = OpenWireFormat.DEFAULT_VERSION; 184 } 185 LOG.info("KahaDB is version " + version); 186 } 187 188 public void write(DataOutput os) throws IOException { 189 os.writeInt(state); 190 os.writeLong(destinations.getPageId()); 191 192 if (lastUpdate != null) { 193 os.writeBoolean(true); 194 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); 195 } else { 196 os.writeBoolean(false); 197 } 198 199 if (firstInProgressTransactionLocation != null) { 200 os.writeBoolean(true); 201 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); 202 } else { 203 os.writeBoolean(false); 204 } 205 206 if (producerSequenceIdTrackerLocation != null) { 207 os.writeBoolean(true); 208 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); 209 } else { 210 os.writeBoolean(false); 211 } 212 os.writeInt(VERSION); 213 if (ackMessageFileMapLocation != null) { 214 os.writeBoolean(true); 215 LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os); 216 } else { 217 os.writeBoolean(false); 218 } 219 os.writeInt(this.openwireVersion); 220 } 221 } 222 223 class MetadataMarshaller extends VariableMarshaller<Metadata> { 224 @Override 225 public Metadata readPayload(DataInput dataIn) throws IOException { 226 Metadata rc = createMetadata(); 227 rc.read(dataIn); 228 return rc; 229 } 230 231 @Override 232 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 233 object.write(dataOut); 234 } 235 } 236 237 protected PageFile pageFile; 238 protected Journal journal; 239 protected Metadata metadata = new Metadata(); 240 241 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); 242 243 protected boolean failIfDatabaseIsLocked; 244 245 protected boolean deleteAllMessages; 246 protected File directory = DEFAULT_DIRECTORY; 247 protected File indexDirectory = null; 248 protected ScheduledExecutorService scheduler; 249 private final Object schedulerLock = new Object(); 250 251 protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 252 protected boolean archiveDataLogs; 253 protected File directoryArchive; 254 protected AtomicLong journalSize = new AtomicLong(0); 255 long journalDiskSyncInterval = 1000; 256 long checkpointInterval = 5*1000; 257 long cleanupInterval = 30*1000; 258 boolean cleanupOnStop = true; 259 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 260 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 261 boolean enableIndexWriteAsync = false; 262 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 263 private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name(); 264 private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name(); 265 266 protected AtomicBoolean opened = new AtomicBoolean(); 267 private boolean ignoreMissingJournalfiles = false; 268 private int indexCacheSize = 10000; 269 private boolean checkForCorruptJournalFiles = false; 270 private boolean checksumJournalFiles = true; 271 protected boolean forceRecoverIndex = false; 272 273 private boolean archiveCorruptedIndex = false; 274 private boolean useIndexLFRUEviction = false; 275 private float indexLFUEvictionFactor = 0.2f; 276 private boolean enableIndexDiskSyncs = true; 277 private boolean enableIndexRecoveryFile = true; 278 private boolean enableIndexPageCaching = true; 279 ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); 280 281 private boolean enableAckCompaction = false; 282 private int compactAcksAfterNoGC = 10; 283 private boolean compactAcksIgnoresStoreGrowth = false; 284 private int checkPointCyclesWithNoGC; 285 private int journalLogOnLastCompactionCheck; 286 287 //only set when using JournalDiskSyncStrategy.PERIODIC 288 protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>(); 289 290 @Override 291 public void doStart() throws Exception { 292 load(); 293 } 294 295 @Override 296 public void doStop(ServiceStopper stopper) throws Exception { 297 unload(); 298 } 299 300 public void allowIOResumption() { 301 if (pageFile != null) { 302 pageFile.allowIOResumption(); 303 } 304 if (journal != null) { 305 journal.allowIOResumption(); 306 } 307 } 308 309 private void loadPageFile() throws IOException { 310 this.indexLock.writeLock().lock(); 311 try { 312 final PageFile pageFile = getPageFile(); 313 pageFile.load(); 314 pageFile.tx().execute(new Transaction.Closure<IOException>() { 315 @Override 316 public void execute(Transaction tx) throws IOException { 317 if (pageFile.getPageCount() == 0) { 318 // First time this is created.. Initialize the metadata 319 Page<Metadata> page = tx.allocate(); 320 assert page.getPageId() == 0; 321 page.set(metadata); 322 metadata.page = page; 323 metadata.state = CLOSED_STATE; 324 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 325 326 tx.store(metadata.page, metadataMarshaller, true); 327 } else { 328 Page<Metadata> page = tx.load(0, metadataMarshaller); 329 metadata = page.get(); 330 metadata.page = page; 331 } 332 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 333 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); 334 metadata.destinations.load(tx); 335 } 336 }); 337 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. 338 // Perhaps we should just keep an index of file 339 storedDestinations.clear(); 340 pageFile.tx().execute(new Transaction.Closure<IOException>() { 341 @Override 342 public void execute(Transaction tx) throws IOException { 343 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 344 Entry<String, StoredDestination> entry = iterator.next(); 345 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); 346 storedDestinations.put(entry.getKey(), sd); 347 348 if (checkForCorruptJournalFiles) { 349 // sanity check the index also 350 if (!entry.getValue().locationIndex.isEmpty(tx)) { 351 if (entry.getValue().orderIndex.nextMessageId <= 0) { 352 throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); 353 } 354 } 355 } 356 } 357 } 358 }); 359 pageFile.flush(); 360 } finally { 361 this.indexLock.writeLock().unlock(); 362 } 363 } 364 365 private void startCheckpoint() { 366 if (checkpointInterval == 0 && cleanupInterval == 0) { 367 LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart"); 368 return; 369 } 370 synchronized (schedulerLock) { 371 if (scheduler == null || scheduler.isShutdown()) { 372 scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { 373 374 @Override 375 public Thread newThread(Runnable r) { 376 Thread schedulerThread = new Thread(r); 377 378 schedulerThread.setName("ActiveMQ Journal Checkpoint Worker"); 379 schedulerThread.setDaemon(true); 380 381 return schedulerThread; 382 } 383 }); 384 385 // Short intervals for check-point and cleanups 386 long delay; 387 if (journal.isJournalDiskSyncPeriodic()) { 388 delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500); 389 } else { 390 delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); 391 } 392 393 scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS); 394 } 395 } 396 } 397 398 private final class CheckpointRunner implements Runnable { 399 400 private long lastCheckpoint = System.currentTimeMillis(); 401 private long lastCleanup = System.currentTimeMillis(); 402 private long lastSync = System.currentTimeMillis(); 403 private Location lastAsyncUpdate = null; 404 405 @Override 406 public void run() { 407 try { 408 // Decide on cleanup vs full checkpoint here. 409 if (opened.get()) { 410 long now = System.currentTimeMillis(); 411 if (journal.isJournalDiskSyncPeriodic() && 412 journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) { 413 Location currentUpdate = lastAsyncJournalUpdate.get(); 414 if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) { 415 lastAsyncUpdate = currentUpdate; 416 if (LOG.isTraceEnabled()) { 417 LOG.trace("Writing trace command to trigger journal sync"); 418 } 419 store(new KahaTraceCommand(), true, null, null); 420 } 421 lastSync = now; 422 } 423 if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) { 424 checkpointCleanup(true); 425 lastCleanup = now; 426 lastCheckpoint = now; 427 } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) { 428 checkpointCleanup(false); 429 lastCheckpoint = now; 430 } 431 } 432 } catch (IOException ioe) { 433 LOG.error("Checkpoint failed", ioe); 434 brokerService.handleIOException(ioe); 435 } catch (Throwable e) { 436 LOG.error("Checkpoint failed", e); 437 brokerService.handleIOException(IOExceptionSupport.create(e)); 438 } 439 } 440 } 441 442 public void open() throws IOException { 443 if( opened.compareAndSet(false, true) ) { 444 getJournal().start(); 445 try { 446 loadPageFile(); 447 } catch (Throwable t) { 448 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); 449 if (LOG.isDebugEnabled()) { 450 LOG.debug("Index load failure", t); 451 } 452 // try to recover index 453 try { 454 pageFile.unload(); 455 } catch (Exception ignore) {} 456 if (archiveCorruptedIndex) { 457 pageFile.archive(); 458 } else { 459 pageFile.delete(); 460 } 461 metadata = createMetadata(); 462 pageFile = null; 463 loadPageFile(); 464 } 465 recover(); 466 startCheckpoint(); 467 } 468 } 469 470 public void load() throws IOException { 471 this.indexLock.writeLock().lock(); 472 IOHelper.mkdirs(directory); 473 try { 474 if (deleteAllMessages) { 475 getJournal().setCheckForCorruptionOnStartup(false); 476 getJournal().start(); 477 getJournal().delete(); 478 getJournal().close(); 479 journal = null; 480 getPageFile().delete(); 481 LOG.info("Persistence store purged."); 482 deleteAllMessages = false; 483 } 484 485 open(); 486 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 487 } finally { 488 this.indexLock.writeLock().unlock(); 489 } 490 } 491 492 public void close() throws IOException, InterruptedException { 493 if( opened.compareAndSet(true, false)) { 494 checkpointLock.writeLock().lock(); 495 try { 496 if (metadata.page != null) { 497 checkpointUpdate(getCleanupOnStop()); 498 } 499 pageFile.unload(); 500 metadata = createMetadata(); 501 } finally { 502 checkpointLock.writeLock().unlock(); 503 } 504 journal.close(); 505 synchronized(schedulerLock) { 506 if (scheduler != null) { 507 ThreadPoolUtils.shutdownGraceful(scheduler, -1); 508 scheduler = null; 509 } 510 } 511 // clear the cache and journalSize on shutdown of the store 512 storeCache.clear(); 513 journalSize.set(0); 514 } 515 } 516 517 public void unload() throws IOException, InterruptedException { 518 this.indexLock.writeLock().lock(); 519 try { 520 if( pageFile != null && pageFile.isLoaded() ) { 521 metadata.state = CLOSED_STATE; 522 metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; 523 524 if (metadata.page != null) { 525 pageFile.tx().execute(new Transaction.Closure<IOException>() { 526 @Override 527 public void execute(Transaction tx) throws IOException { 528 tx.store(metadata.page, metadataMarshaller, true); 529 } 530 }); 531 } 532 } 533 } finally { 534 this.indexLock.writeLock().unlock(); 535 } 536 close(); 537 } 538 539 // public for testing 540 @SuppressWarnings("rawtypes") 541 public Location[] getInProgressTxLocationRange() { 542 Location[] range = new Location[]{null, null}; 543 synchronized (inflightTransactions) { 544 if (!inflightTransactions.isEmpty()) { 545 for (List<Operation> ops : inflightTransactions.values()) { 546 if (!ops.isEmpty()) { 547 trackMaxAndMin(range, ops); 548 } 549 } 550 } 551 if (!preparedTransactions.isEmpty()) { 552 for (List<Operation> ops : preparedTransactions.values()) { 553 if (!ops.isEmpty()) { 554 trackMaxAndMin(range, ops); 555 } 556 } 557 } 558 } 559 return range; 560 } 561 562 @SuppressWarnings("rawtypes") 563 private void trackMaxAndMin(Location[] range, List<Operation> ops) { 564 Location t = ops.get(0).getLocation(); 565 if (range[0] == null || t.compareTo(range[0]) <= 0) { 566 range[0] = t; 567 } 568 t = ops.get(ops.size() -1).getLocation(); 569 if (range[1] == null || t.compareTo(range[1]) >= 0) { 570 range[1] = t; 571 } 572 } 573 574 class TranInfo { 575 TransactionId id; 576 Location location; 577 578 class opCount { 579 int add; 580 int remove; 581 } 582 HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>(); 583 584 @SuppressWarnings("rawtypes") 585 public void track(Operation operation) { 586 if (location == null ) { 587 location = operation.getLocation(); 588 } 589 KahaDestination destination; 590 boolean isAdd = false; 591 if (operation instanceof AddOperation) { 592 AddOperation add = (AddOperation) operation; 593 destination = add.getCommand().getDestination(); 594 isAdd = true; 595 } else { 596 RemoveOperation removeOpperation = (RemoveOperation) operation; 597 destination = removeOpperation.getCommand().getDestination(); 598 } 599 opCount opCount = destinationOpCount.get(destination); 600 if (opCount == null) { 601 opCount = new opCount(); 602 destinationOpCount.put(destination, opCount); 603 } 604 if (isAdd) { 605 opCount.add++; 606 } else { 607 opCount.remove++; 608 } 609 } 610 611 @Override 612 public String toString() { 613 StringBuffer buffer = new StringBuffer(); 614 buffer.append(location).append(";").append(id).append(";\n"); 615 for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) { 616 buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); 617 } 618 return buffer.toString(); 619 } 620 } 621 622 @SuppressWarnings("rawtypes") 623 public String getTransactions() { 624 625 ArrayList<TranInfo> infos = new ArrayList<TranInfo>(); 626 synchronized (inflightTransactions) { 627 if (!inflightTransactions.isEmpty()) { 628 for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) { 629 TranInfo info = new TranInfo(); 630 info.id = entry.getKey(); 631 for (Operation operation : entry.getValue()) { 632 info.track(operation); 633 } 634 infos.add(info); 635 } 636 } 637 } 638 synchronized (preparedTransactions) { 639 if (!preparedTransactions.isEmpty()) { 640 for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) { 641 TranInfo info = new TranInfo(); 642 info.id = entry.getKey(); 643 for (Operation operation : entry.getValue()) { 644 info.track(operation); 645 } 646 infos.add(info); 647 } 648 } 649 } 650 return infos.toString(); 651 } 652 653 /** 654 * Move all the messages that were in the journal into long term storage. We 655 * just replay and do a checkpoint. 656 * 657 * @throws IOException 658 * @throws IOException 659 * @throws IllegalStateException 660 */ 661 private void recover() throws IllegalStateException, IOException { 662 this.indexLock.writeLock().lock(); 663 try { 664 665 long start = System.currentTimeMillis(); 666 boolean requiresJournalReplay = recoverProducerAudit(); 667 requiresJournalReplay |= recoverAckMessageFileMap(); 668 Location lastIndoubtPosition = getRecoveryPosition(); 669 Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition; 670 if (recoveryPosition != null) { 671 int redoCounter = 0; 672 int dataFileRotationTracker = recoveryPosition.getDataFileId(); 673 LOG.info("Recovering from the journal @" + recoveryPosition); 674 while (recoveryPosition != null) { 675 try { 676 JournalCommand<?> message = load(recoveryPosition); 677 metadata.lastUpdate = recoveryPosition; 678 process(message, recoveryPosition, lastIndoubtPosition); 679 redoCounter++; 680 } catch (IOException failedRecovery) { 681 if (isIgnoreMissingJournalfiles()) { 682 LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); 683 // track this dud location 684 journal.corruptRecoveryLocation(recoveryPosition); 685 } else { 686 throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery); 687 } 688 } 689 recoveryPosition = journal.getNextLocation(recoveryPosition); 690 // hold on to the minimum number of open files during recovery 691 if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) { 692 dataFileRotationTracker = recoveryPosition.getDataFileId(); 693 journal.cleanup(); 694 } 695 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { 696 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered .."); 697 } 698 } 699 if (LOG.isInfoEnabled()) { 700 long end = System.currentTimeMillis(); 701 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); 702 } 703 } 704 705 // We may have to undo some index updates. 706 pageFile.tx().execute(new Transaction.Closure<IOException>() { 707 @Override 708 public void execute(Transaction tx) throws IOException { 709 recoverIndex(tx); 710 } 711 }); 712 713 // rollback any recovered inflight local transactions, and discard any inflight XA transactions. 714 Set<TransactionId> toRollback = new HashSet<TransactionId>(); 715 Set<TransactionId> toDiscard = new HashSet<TransactionId>(); 716 synchronized (inflightTransactions) { 717 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { 718 TransactionId id = it.next(); 719 if (id.isLocalTransaction()) { 720 toRollback.add(id); 721 } else { 722 toDiscard.add(id); 723 } 724 } 725 for (TransactionId tx: toRollback) { 726 if (LOG.isDebugEnabled()) { 727 LOG.debug("rolling back recovered indoubt local transaction " + tx); 728 } 729 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null); 730 } 731 for (TransactionId tx: toDiscard) { 732 if (LOG.isDebugEnabled()) { 733 LOG.debug("discarding recovered in-flight XA transaction " + tx); 734 } 735 inflightTransactions.remove(tx); 736 } 737 } 738 739 synchronized (preparedTransactions) { 740 for (TransactionId txId : preparedTransactions.keySet()) { 741 LOG.warn("Recovered prepared XA TX: [{}]", txId); 742 } 743 } 744 745 } finally { 746 this.indexLock.writeLock().unlock(); 747 } 748 } 749 750 @SuppressWarnings("unused") 751 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { 752 return TransactionIdConversion.convertToLocal(tx); 753 } 754 755 private Location minimum(Location x, 756 Location y) { 757 Location min = null; 758 if (x != null) { 759 min = x; 760 if (y != null) { 761 int compare = y.compareTo(x); 762 if (compare < 0) { 763 min = y; 764 } 765 } 766 } else { 767 min = y; 768 } 769 return min; 770 } 771 772 private boolean recoverProducerAudit() throws IOException { 773 boolean requiresReplay = true; 774 if (metadata.producerSequenceIdTrackerLocation != null) { 775 try { 776 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); 777 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); 778 int maxNumProducers = getMaxFailoverProducersToTrack(); 779 int maxAuditDepth = getFailoverProducersAuditDepth(); 780 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); 781 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); 782 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); 783 requiresReplay = false; 784 } catch (Exception e) { 785 LOG.warn("Cannot recover message audit", e); 786 } 787 } 788 // got no audit stored so got to recreate via replay from start of the journal 789 return requiresReplay; 790 } 791 792 @SuppressWarnings("unchecked") 793 private boolean recoverAckMessageFileMap() throws IOException { 794 boolean requiresReplay = true; 795 if (metadata.ackMessageFileMapLocation != null) { 796 try { 797 KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); 798 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); 799 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); 800 requiresReplay = false; 801 } catch (Exception e) { 802 LOG.warn("Cannot recover ackMessageFileMap", e); 803 } 804 } 805 // got no ackMessageFileMap stored so got to recreate via replay from start of the journal 806 return requiresReplay; 807 } 808 809 protected void recoverIndex(Transaction tx) throws IOException { 810 long start = System.currentTimeMillis(); 811 // It is possible index updates got applied before the journal updates.. 812 // in that case we need to removed references to messages that are not in the journal 813 final Location lastAppendLocation = journal.getLastAppendLocation(); 814 long undoCounter=0; 815 816 // Go through all the destinations to see if they have messages past the lastAppendLocation 817 for (StoredDestination sd : storedDestinations.values()) { 818 819 final ArrayList<Long> matches = new ArrayList<Long>(); 820 // Find all the Locations that are >= than the last Append Location. 821 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) { 822 @Override 823 protected void matched(Location key, Long value) { 824 matches.add(value); 825 } 826 }); 827 828 for (Long sequenceId : matches) { 829 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 830 if (keys != null) { 831 sd.locationIndex.remove(tx, keys.location); 832 sd.messageIdIndex.remove(tx, keys.messageId); 833 metadata.producerSequenceIdTracker.rollback(keys.messageId); 834 undoCounter++; 835 // TODO: do we need to modify the ack positions for the pub sub case? 836 } 837 } 838 } 839 840 if (undoCounter > 0) { 841 // The rolledback operations are basically in flight journal writes. To avoid getting 842 // these the end user should do sync writes to the journal. 843 if (LOG.isInfoEnabled()) { 844 long end = System.currentTimeMillis(); 845 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 846 } 847 } 848 849 undoCounter = 0; 850 start = System.currentTimeMillis(); 851 852 // Lets be extra paranoid here and verify that all the datafiles being referenced 853 // by the indexes still exists. 854 855 final SequenceSet ss = new SequenceSet(); 856 for (StoredDestination sd : storedDestinations.values()) { 857 // Use a visitor to cut down the number of pages that we load 858 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 859 int last=-1; 860 861 @Override 862 public boolean isInterestedInKeysBetween(Location first, Location second) { 863 if( first==null ) { 864 return !ss.contains(0, second.getDataFileId()); 865 } else if( second==null ) { 866 return true; 867 } else { 868 return !ss.contains(first.getDataFileId(), second.getDataFileId()); 869 } 870 } 871 872 @Override 873 public void visit(List<Location> keys, List<Long> values) { 874 for (Location l : keys) { 875 int fileId = l.getDataFileId(); 876 if( last != fileId ) { 877 ss.add(fileId); 878 last = fileId; 879 } 880 } 881 } 882 883 }); 884 } 885 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 886 while (!ss.isEmpty()) { 887 missingJournalFiles.add((int) ss.removeFirst()); 888 } 889 890 for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) { 891 missingJournalFiles.add(entry.getKey()); 892 for (Integer i : entry.getValue()) { 893 missingJournalFiles.add(i); 894 } 895 } 896 897 missingJournalFiles.removeAll(journal.getFileMap().keySet()); 898 899 if (!missingJournalFiles.isEmpty()) { 900 LOG.warn("Some journal files are missing: " + missingJournalFiles); 901 } 902 903 ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>(); 904 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>(); 905 for (Integer missing : missingJournalFiles) { 906 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0))); 907 } 908 909 if (checkForCorruptJournalFiles) { 910 Collection<DataFile> dataFiles = journal.getFileMap().values(); 911 for (DataFile dataFile : dataFiles) { 912 int id = dataFile.getDataFileId(); 913 // eof to next file id 914 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0))); 915 Sequence seq = dataFile.getCorruptedBlocks().getHead(); 916 while (seq != null) { 917 BTreeVisitor.BetweenVisitor<Location, Long> visitor = 918 new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)); 919 missingPredicates.add(visitor); 920 knownCorruption.add(visitor); 921 seq = seq.getNext(); 922 } 923 } 924 } 925 926 if (!missingPredicates.isEmpty()) { 927 for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) { 928 final StoredDestination sd = sdEntry.getValue(); 929 final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>(); 930 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) { 931 @Override 932 protected void matched(Location key, Long value) { 933 matches.put(value, key); 934 } 935 }); 936 937 // If some message references are affected by the missing data files... 938 if (!matches.isEmpty()) { 939 940 // We either 'gracefully' recover dropping the missing messages or 941 // we error out. 942 if( ignoreMissingJournalfiles ) { 943 // Update the index to remove the references to the missing data 944 for (Long sequenceId : matches.keySet()) { 945 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 946 sd.locationIndex.remove(tx, keys.location); 947 sd.messageIdIndex.remove(tx, keys.messageId); 948 LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); 949 undoCounter++; 950 // TODO: do we need to modify the ack positions for the pub sub case? 951 } 952 } else { 953 LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches); 954 throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected."); 955 } 956 } 957 } 958 } 959 960 if (!ignoreMissingJournalfiles) { 961 if (!knownCorruption.isEmpty()) { 962 LOG.error("Detected corrupt journal files. " + knownCorruption); 963 throw new IOException("Detected corrupt journal files. " + knownCorruption); 964 } 965 966 if (!missingJournalFiles.isEmpty()) { 967 LOG.error("Detected missing journal files. " + missingJournalFiles); 968 throw new IOException("Detected missing journal files. " + missingJournalFiles); 969 } 970 } 971 972 if (undoCounter > 0) { 973 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 974 // should do sync writes to the journal. 975 if (LOG.isInfoEnabled()) { 976 long end = System.currentTimeMillis(); 977 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 978 } 979 } 980 } 981 982 private Location nextRecoveryPosition; 983 private Location lastRecoveryPosition; 984 985 public void incrementalRecover() throws IOException { 986 this.indexLock.writeLock().lock(); 987 try { 988 if( nextRecoveryPosition == null ) { 989 if( lastRecoveryPosition==null ) { 990 nextRecoveryPosition = getRecoveryPosition(); 991 } else { 992 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 993 } 994 } 995 while (nextRecoveryPosition != null) { 996 lastRecoveryPosition = nextRecoveryPosition; 997 metadata.lastUpdate = lastRecoveryPosition; 998 JournalCommand<?> message = load(lastRecoveryPosition); 999 process(message, lastRecoveryPosition, (IndexAware) null); 1000 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 1001 } 1002 } finally { 1003 this.indexLock.writeLock().unlock(); 1004 } 1005 } 1006 1007 public Location getLastUpdatePosition() throws IOException { 1008 return metadata.lastUpdate; 1009 } 1010 1011 private Location getRecoveryPosition() throws IOException { 1012 1013 if (!this.forceRecoverIndex) { 1014 1015 // If we need to recover the transactions.. 1016 if (metadata.firstInProgressTransactionLocation != null) { 1017 return metadata.firstInProgressTransactionLocation; 1018 } 1019 1020 // Perhaps there were no transactions... 1021 if( metadata.lastUpdate!=null) { 1022 // Start replay at the record after the last one recorded in the index file. 1023 return getNextInitializedLocation(metadata.lastUpdate); 1024 } 1025 } 1026 // This loads the first position. 1027 return journal.getNextLocation(null); 1028 } 1029 1030 private Location getNextInitializedLocation(Location location) throws IOException { 1031 Location mayNotBeInitialized = journal.getNextLocation(location); 1032 if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) { 1033 // need to init size and type to skip 1034 return journal.getNextLocation(mayNotBeInitialized); 1035 } else { 1036 return mayNotBeInitialized; 1037 } 1038 } 1039 1040 protected void checkpointCleanup(final boolean cleanup) throws IOException { 1041 long start; 1042 this.indexLock.writeLock().lock(); 1043 try { 1044 start = System.currentTimeMillis(); 1045 if( !opened.get() ) { 1046 return; 1047 } 1048 } finally { 1049 this.indexLock.writeLock().unlock(); 1050 } 1051 checkpointUpdate(cleanup); 1052 long end = System.currentTimeMillis(); 1053 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1054 if (LOG.isInfoEnabled()) { 1055 LOG.info("Slow KahaDB access: cleanup took " + (end - start)); 1056 } 1057 } 1058 } 1059 1060 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { 1061 int size = data.serializedSizeFramed(); 1062 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 1063 os.writeByte(data.type().getNumber()); 1064 data.writeFramed(os); 1065 return os.toByteSequence(); 1066 } 1067 1068 // ///////////////////////////////////////////////////////////////// 1069 // Methods call by the broker to update and query the store. 1070 // ///////////////////////////////////////////////////////////////// 1071 public Location store(JournalCommand<?> data) throws IOException { 1072 return store(data, false, null,null); 1073 } 1074 1075 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException { 1076 return store(data, false, null, null, onJournalStoreComplete); 1077 } 1078 1079 public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException { 1080 return store(data, sync, before, after, null); 1081 } 1082 1083 /** 1084 * All updated are are funneled through this method. The updates are converted 1085 * to a JournalMessage which is logged to the journal and then the data from 1086 * the JournalMessage is used to update the index just like it would be done 1087 * during a recovery process. 1088 */ 1089 public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { 1090 try { 1091 ByteSequence sequence = toByteSequence(data); 1092 Location location; 1093 1094 checkpointLock.readLock().lock(); 1095 try { 1096 1097 long start = System.currentTimeMillis(); 1098 location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; 1099 long start2 = System.currentTimeMillis(); 1100 //Track the last async update so we know if we need to sync at the next checkpoint 1101 if (!sync && journal.isJournalDiskSyncPeriodic()) { 1102 lastAsyncJournalUpdate.set(location); 1103 } 1104 process(data, location, before); 1105 1106 long end = System.currentTimeMillis(); 1107 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1108 if (LOG.isInfoEnabled()) { 1109 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); 1110 } 1111 } 1112 } finally { 1113 checkpointLock.readLock().unlock(); 1114 } 1115 1116 if (after != null) { 1117 after.run(); 1118 } 1119 1120 return location; 1121 } catch (IOException ioe) { 1122 LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe); 1123 brokerService.handleIOException(ioe); 1124 throw ioe; 1125 } 1126 } 1127 1128 /** 1129 * Loads a previously stored JournalMessage 1130 * 1131 * @param location 1132 * @return 1133 * @throws IOException 1134 */ 1135 public JournalCommand<?> load(Location location) throws IOException { 1136 long start = System.currentTimeMillis(); 1137 ByteSequence data = journal.read(location); 1138 long end = System.currentTimeMillis(); 1139 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 1140 if (LOG.isInfoEnabled()) { 1141 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); 1142 } 1143 } 1144 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 1145 byte readByte = is.readByte(); 1146 KahaEntryType type = KahaEntryType.valueOf(readByte); 1147 if( type == null ) { 1148 try { 1149 is.close(); 1150 } catch (IOException e) {} 1151 throw new IOException("Could not load journal record, null type information from: " + readByte + " at location: "+location); 1152 } 1153 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 1154 message.mergeFramed(is); 1155 return message; 1156 } 1157 1158 /** 1159 * do minimal recovery till we reach the last inDoubtLocation 1160 * @param data 1161 * @param location 1162 * @param inDoubtlocation 1163 * @throws IOException 1164 */ 1165 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 1166 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 1167 initMessageStore(data); 1168 process(data, location, (IndexAware) null); 1169 } else { 1170 // just recover producer audit 1171 data.visit(new Visitor() { 1172 @Override 1173 public void visit(KahaAddMessageCommand command) throws IOException { 1174 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1175 } 1176 }); 1177 } 1178 } 1179 1180 private void initMessageStore(JournalCommand<?> data) throws IOException { 1181 data.visit(new Visitor() { 1182 @Override 1183 public void visit(KahaAddMessageCommand command) throws IOException { 1184 final KahaDestination destination = command.getDestination(); 1185 if (!storedDestinations.containsKey(key(destination))) { 1186 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1187 @Override 1188 public void execute(Transaction tx) throws IOException { 1189 getStoredDestination(destination, tx); 1190 } 1191 }); 1192 } 1193 } 1194 }); 1195 } 1196 1197 // ///////////////////////////////////////////////////////////////// 1198 // Journaled record processing methods. Once the record is journaled, 1199 // these methods handle applying the index updates. These may be called 1200 // from the recovery method too so they need to be idempotent 1201 // ///////////////////////////////////////////////////////////////// 1202 1203 void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException { 1204 data.visit(new Visitor() { 1205 @Override 1206 public void visit(KahaAddMessageCommand command) throws IOException { 1207 process(command, location, onSequenceAssignedCallback); 1208 } 1209 1210 @Override 1211 public void visit(KahaRemoveMessageCommand command) throws IOException { 1212 process(command, location); 1213 } 1214 1215 @Override 1216 public void visit(KahaPrepareCommand command) throws IOException { 1217 process(command, location); 1218 } 1219 1220 @Override 1221 public void visit(KahaCommitCommand command) throws IOException { 1222 process(command, location, onSequenceAssignedCallback); 1223 } 1224 1225 @Override 1226 public void visit(KahaRollbackCommand command) throws IOException { 1227 process(command, location); 1228 } 1229 1230 @Override 1231 public void visit(KahaRemoveDestinationCommand command) throws IOException { 1232 process(command, location); 1233 } 1234 1235 @Override 1236 public void visit(KahaSubscriptionCommand command) throws IOException { 1237 process(command, location); 1238 } 1239 1240 @Override 1241 public void visit(KahaProducerAuditCommand command) throws IOException { 1242 processLocation(location); 1243 } 1244 1245 @Override 1246 public void visit(KahaAckMessageFileMapCommand command) throws IOException { 1247 processLocation(location); 1248 } 1249 1250 @Override 1251 public void visit(KahaTraceCommand command) { 1252 processLocation(location); 1253 } 1254 1255 @Override 1256 public void visit(KahaUpdateMessageCommand command) throws IOException { 1257 process(command, location); 1258 } 1259 1260 @Override 1261 public void visit(KahaRewrittenDataFileCommand command) throws IOException { 1262 process(command, location); 1263 } 1264 }); 1265 } 1266 1267 @SuppressWarnings("rawtypes") 1268 protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { 1269 if (command.hasTransactionInfo()) { 1270 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1271 inflightTx.add(new AddOperation(command, location, runWithIndexLock)); 1272 } else { 1273 this.indexLock.writeLock().lock(); 1274 try { 1275 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1276 @Override 1277 public void execute(Transaction tx) throws IOException { 1278 long assignedIndex = updateIndex(tx, command, location); 1279 if (runWithIndexLock != null) { 1280 runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); 1281 } 1282 } 1283 }); 1284 1285 } finally { 1286 this.indexLock.writeLock().unlock(); 1287 } 1288 } 1289 } 1290 1291 protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { 1292 this.indexLock.writeLock().lock(); 1293 try { 1294 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1295 @Override 1296 public void execute(Transaction tx) throws IOException { 1297 updateIndex(tx, command, location); 1298 } 1299 }); 1300 } finally { 1301 this.indexLock.writeLock().unlock(); 1302 } 1303 } 1304 1305 @SuppressWarnings("rawtypes") 1306 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { 1307 if (command.hasTransactionInfo()) { 1308 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1309 inflightTx.add(new RemoveOperation(command, location)); 1310 } else { 1311 this.indexLock.writeLock().lock(); 1312 try { 1313 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1314 @Override 1315 public void execute(Transaction tx) throws IOException { 1316 updateIndex(tx, command, location); 1317 } 1318 }); 1319 } finally { 1320 this.indexLock.writeLock().unlock(); 1321 } 1322 } 1323 } 1324 1325 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { 1326 this.indexLock.writeLock().lock(); 1327 try { 1328 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1329 @Override 1330 public void execute(Transaction tx) throws IOException { 1331 updateIndex(tx, command, location); 1332 } 1333 }); 1334 } finally { 1335 this.indexLock.writeLock().unlock(); 1336 } 1337 } 1338 1339 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { 1340 this.indexLock.writeLock().lock(); 1341 try { 1342 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1343 @Override 1344 public void execute(Transaction tx) throws IOException { 1345 updateIndex(tx, command, location); 1346 } 1347 }); 1348 } finally { 1349 this.indexLock.writeLock().unlock(); 1350 } 1351 } 1352 1353 protected void processLocation(final Location location) { 1354 this.indexLock.writeLock().lock(); 1355 try { 1356 metadata.lastUpdate = location; 1357 } finally { 1358 this.indexLock.writeLock().unlock(); 1359 } 1360 } 1361 1362 @SuppressWarnings("rawtypes") 1363 protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { 1364 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1365 List<Operation> inflightTx; 1366 synchronized (inflightTransactions) { 1367 inflightTx = inflightTransactions.remove(key); 1368 if (inflightTx == null) { 1369 inflightTx = preparedTransactions.remove(key); 1370 } 1371 } 1372 if (inflightTx == null) { 1373 // only non persistent messages in this tx 1374 if (before != null) { 1375 before.sequenceAssignedWithIndexLocked(-1); 1376 } 1377 return; 1378 } 1379 1380 final List<Operation> messagingTx = inflightTx; 1381 indexLock.writeLock().lock(); 1382 try { 1383 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1384 @Override 1385 public void execute(Transaction tx) throws IOException { 1386 for (Operation op : messagingTx) { 1387 op.execute(tx); 1388 recordAckMessageReferenceLocation(location, op.getLocation()); 1389 } 1390 } 1391 }); 1392 metadata.lastUpdate = location; 1393 } finally { 1394 indexLock.writeLock().unlock(); 1395 } 1396 } 1397 1398 @SuppressWarnings("rawtypes") 1399 protected void process(KahaPrepareCommand command, Location location) { 1400 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1401 List<Operation> tx = null; 1402 synchronized (inflightTransactions) { 1403 tx = inflightTransactions.remove(key); 1404 if (tx != null) { 1405 preparedTransactions.put(key, tx); 1406 } 1407 } 1408 if (tx != null && !tx.isEmpty()) { 1409 indexLock.writeLock().lock(); 1410 try { 1411 for (Operation op : tx) { 1412 recordAckMessageReferenceLocation(location, op.getLocation()); 1413 } 1414 } finally { 1415 indexLock.writeLock().unlock(); 1416 } 1417 } 1418 } 1419 1420 @SuppressWarnings("rawtypes") 1421 protected void process(KahaRollbackCommand command, Location location) throws IOException { 1422 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1423 List<Operation> updates = null; 1424 synchronized (inflightTransactions) { 1425 updates = inflightTransactions.remove(key); 1426 if (updates == null) { 1427 updates = preparedTransactions.remove(key); 1428 } 1429 } 1430 if (key.isXATransaction() && updates != null && !updates.isEmpty()) { 1431 indexLock.writeLock().lock(); 1432 try { 1433 for (Operation op : updates) { 1434 recordAckMessageReferenceLocation(location, op.getLocation()); 1435 } 1436 } finally { 1437 indexLock.writeLock().unlock(); 1438 } 1439 } 1440 } 1441 1442 protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException { 1443 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1444 1445 // Mark the current journal file as a compacted file so that gc checks can skip 1446 // over logs that are smaller compaction type logs. 1447 DataFile current = journal.getDataFileById(location.getDataFileId()); 1448 current.setTypeCode(command.getRewriteType()); 1449 1450 if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) { 1451 // Move offset so that next location read jumps to next file. 1452 location.setOffset(journalMaxFileLength); 1453 } 1454 } 1455 1456 // ///////////////////////////////////////////////////////////////// 1457 // These methods do the actual index updates. 1458 // ///////////////////////////////////////////////////////////////// 1459 1460 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 1461 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 1462 1463 long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { 1464 StoredDestination sd = getExistingStoredDestination(command.getDestination(), tx); 1465 if (sd == null) { 1466 // if the store no longer exists, skip 1467 return -1; 1468 } 1469 // Skip adding the message to the index if this is a topic and there are 1470 // no subscriptions. 1471 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { 1472 return -1; 1473 } 1474 1475 // Add the message. 1476 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; 1477 long id = sd.orderIndex.getNextMessageId(); 1478 Long previous = sd.locationIndex.put(tx, location, id); 1479 if (previous == null) { 1480 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 1481 if (previous == null) { 1482 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); 1483 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { 1484 addAckLocationForNewMessage(tx, sd, id); 1485 } 1486 metadata.lastUpdate = location; 1487 } else { 1488 1489 MessageKeys messageKeys = sd.orderIndex.get(tx, previous); 1490 if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { 1491 // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt 1492 LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); 1493 } 1494 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 1495 sd.locationIndex.remove(tx, location); 1496 id = -1; 1497 } 1498 } else { 1499 // restore the previous value.. Looks like this was a redo of a previously 1500 // added message. We don't want to assign it a new id as the other indexes would 1501 // be wrong.. 1502 sd.locationIndex.put(tx, location, previous); 1503 // ensure sequence is not broken 1504 sd.orderIndex.revertNextMessageId(); 1505 metadata.lastUpdate = location; 1506 } 1507 // record this id in any event, initial send or recovery 1508 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1509 return id; 1510 } 1511 1512 void trackPendingAdd(KahaDestination destination, Long seq) { 1513 StoredDestination sd = storedDestinations.get(key(destination)); 1514 if (sd != null) { 1515 sd.trackPendingAdd(seq); 1516 } 1517 } 1518 1519 void trackPendingAddComplete(KahaDestination destination, Long seq) { 1520 StoredDestination sd = storedDestinations.get(key(destination)); 1521 if (sd != null) { 1522 sd.trackPendingAddComplete(seq); 1523 } 1524 } 1525 1526 void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException { 1527 KahaAddMessageCommand command = updateMessageCommand.getMessage(); 1528 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1529 1530 Long id = sd.messageIdIndex.get(tx, command.getMessageId()); 1531 if (id != null) { 1532 MessageKeys previousKeys = sd.orderIndex.put( 1533 tx, 1534 command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY, 1535 id, 1536 new MessageKeys(command.getMessageId(), location) 1537 ); 1538 sd.locationIndex.put(tx, location, id); 1539 // on first update previous is original location, on recovery/replay it may be the updated location 1540 if(previousKeys != null && !previousKeys.location.equals(location)) { 1541 sd.locationIndex.remove(tx, previousKeys.location); 1542 } 1543 metadata.lastUpdate = location; 1544 } else { 1545 //Add the message if it can't be found 1546 this.updateIndex(tx, command, location); 1547 } 1548 } 1549 1550 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { 1551 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1552 if (!command.hasSubscriptionKey()) { 1553 1554 // In the queue case we just remove the message from the index.. 1555 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 1556 if (sequenceId != null) { 1557 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 1558 if (keys != null) { 1559 sd.locationIndex.remove(tx, keys.location); 1560 recordAckMessageReferenceLocation(ackLocation, keys.location); 1561 metadata.lastUpdate = ackLocation; 1562 } else if (LOG.isDebugEnabled()) { 1563 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); 1564 } 1565 } else if (LOG.isDebugEnabled()) { 1566 LOG.debug("message not found in sequence id index: " + command.getMessageId()); 1567 } 1568 } else { 1569 // In the topic case we need remove the message once it's been acked 1570 // by all the subs 1571 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 1572 1573 // Make sure it's a valid message id... 1574 if (sequence != null) { 1575 String subscriptionKey = command.getSubscriptionKey(); 1576 if (command.getAck() != UNMATCHED) { 1577 sd.orderIndex.get(tx, sequence); 1578 byte priority = sd.orderIndex.lastGetPriority(); 1579 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); 1580 } 1581 1582 MessageKeys keys = sd.orderIndex.get(tx, sequence); 1583 if (keys != null) { 1584 recordAckMessageReferenceLocation(ackLocation, keys.location); 1585 } 1586 // The following method handles deleting un-referenced messages. 1587 removeAckLocation(tx, sd, subscriptionKey, sequence); 1588 metadata.lastUpdate = ackLocation; 1589 } else if (LOG.isDebugEnabled()) { 1590 LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); 1591 } 1592 1593 } 1594 } 1595 1596 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { 1597 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); 1598 if (referenceFileIds == null) { 1599 referenceFileIds = new HashSet<Integer>(); 1600 referenceFileIds.add(messageLocation.getDataFileId()); 1601 metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); 1602 } else { 1603 Integer id = Integer.valueOf(messageLocation.getDataFileId()); 1604 if (!referenceFileIds.contains(id)) { 1605 referenceFileIds.add(id); 1606 } 1607 } 1608 } 1609 1610 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { 1611 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1612 sd.orderIndex.remove(tx); 1613 1614 sd.locationIndex.clear(tx); 1615 sd.locationIndex.unload(tx); 1616 tx.free(sd.locationIndex.getPageId()); 1617 1618 sd.messageIdIndex.clear(tx); 1619 sd.messageIdIndex.unload(tx); 1620 tx.free(sd.messageIdIndex.getPageId()); 1621 1622 if (sd.subscriptions != null) { 1623 sd.subscriptions.clear(tx); 1624 sd.subscriptions.unload(tx); 1625 tx.free(sd.subscriptions.getPageId()); 1626 1627 sd.subscriptionAcks.clear(tx); 1628 sd.subscriptionAcks.unload(tx); 1629 tx.free(sd.subscriptionAcks.getPageId()); 1630 1631 sd.ackPositions.clear(tx); 1632 sd.ackPositions.unload(tx); 1633 tx.free(sd.ackPositions.getHeadPageId()); 1634 1635 sd.subLocations.clear(tx); 1636 sd.subLocations.unload(tx); 1637 tx.free(sd.subLocations.getHeadPageId()); 1638 } 1639 1640 String key = key(command.getDestination()); 1641 storedDestinations.remove(key); 1642 metadata.destinations.remove(tx, key); 1643 storeCache.remove(key(command.getDestination())); 1644 } 1645 1646 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { 1647 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1648 final String subscriptionKey = command.getSubscriptionKey(); 1649 1650 // If set then we are creating it.. otherwise we are destroying the sub 1651 if (command.hasSubscriptionInfo()) { 1652 Location existing = sd.subLocations.get(tx, subscriptionKey); 1653 if (existing != null && existing.compareTo(location) == 0) { 1654 // replay on recovery, ignore 1655 LOG.trace("ignoring journal replay of replay of sub from: " + location); 1656 return; 1657 } 1658 1659 sd.subscriptions.put(tx, subscriptionKey, command); 1660 sd.subLocations.put(tx, subscriptionKey, location); 1661 long ackLocation=NOT_ACKED; 1662 if (!command.getRetroactive()) { 1663 ackLocation = sd.orderIndex.nextMessageId-1; 1664 } else { 1665 addAckLocationForRetroactiveSub(tx, sd, subscriptionKey); 1666 } 1667 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); 1668 sd.subscriptionCache.add(subscriptionKey); 1669 } else { 1670 // delete the sub... 1671 sd.subscriptions.remove(tx, subscriptionKey); 1672 sd.subLocations.remove(tx, subscriptionKey); 1673 sd.subscriptionAcks.remove(tx, subscriptionKey); 1674 sd.subscriptionCache.remove(subscriptionKey); 1675 removeAckLocationsForSub(tx, sd, subscriptionKey); 1676 1677 if (sd.subscriptions.isEmpty(tx)) { 1678 // remove the stored destination 1679 KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand(); 1680 removeDestinationCommand.setDestination(command.getDestination()); 1681 updateIndex(tx, removeDestinationCommand, null); 1682 } 1683 } 1684 } 1685 1686 private void checkpointUpdate(final boolean cleanup) throws IOException { 1687 checkpointLock.writeLock().lock(); 1688 try { 1689 this.indexLock.writeLock().lock(); 1690 try { 1691 Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>, IOException>() { 1692 @Override 1693 public Set<Integer> execute(Transaction tx) throws IOException { 1694 return checkpointUpdate(tx, cleanup); 1695 } 1696 }); 1697 pageFile.flush(); 1698 // after the index update such that partial removal does not leave dangling references in the index. 1699 journal.removeDataFiles(filesToGc); 1700 } finally { 1701 this.indexLock.writeLock().unlock(); 1702 } 1703 1704 } finally { 1705 checkpointLock.writeLock().unlock(); 1706 } 1707 } 1708 1709 /** 1710 * @param tx 1711 * @throws IOException 1712 */ 1713 Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 1714 MDC.put("activemq.persistenceDir", getDirectory().getName()); 1715 LOG.debug("Checkpoint started."); 1716 1717 // reflect last update exclusive of current checkpoint 1718 Location lastUpdate = metadata.lastUpdate; 1719 1720 metadata.state = OPEN_STATE; 1721 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); 1722 metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap(); 1723 Location[] inProgressTxRange = getInProgressTxLocationRange(); 1724 metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; 1725 tx.store(metadata.page, metadataMarshaller, true); 1726 1727 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(); 1728 if (cleanup) { 1729 1730 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1731 gcCandidateSet.addAll(completeFileSet); 1732 1733 if (LOG.isTraceEnabled()) { 1734 LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); 1735 } 1736 1737 if (lastUpdate != null) { 1738 // we won't delete past the last update, ackCompaction journal can be a candidate in error 1739 gcCandidateSet.removeAll(new TreeSet<Integer>(gcCandidateSet.tailSet(lastUpdate.getDataFileId()))); 1740 } 1741 1742 // Don't GC files under replication 1743 if( journalFilesBeingReplicated!=null ) { 1744 gcCandidateSet.removeAll(journalFilesBeingReplicated); 1745 } 1746 1747 if (metadata.producerSequenceIdTrackerLocation != null) { 1748 int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId(); 1749 if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) { 1750 // rewrite so we don't prevent gc 1751 metadata.producerSequenceIdTracker.setModified(true); 1752 if (LOG.isTraceEnabled()) { 1753 LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation); 1754 } 1755 } 1756 gcCandidateSet.remove(dataFileId); 1757 if (LOG.isTraceEnabled()) { 1758 LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet); 1759 } 1760 } 1761 1762 if (metadata.ackMessageFileMapLocation != null) { 1763 int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); 1764 gcCandidateSet.remove(dataFileId); 1765 if (LOG.isTraceEnabled()) { 1766 LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet); 1767 } 1768 } 1769 1770 // Don't GC files referenced by in-progress tx 1771 if (inProgressTxRange[0] != null) { 1772 for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { 1773 gcCandidateSet.remove(pendingTx); 1774 } 1775 } 1776 if (LOG.isTraceEnabled()) { 1777 LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); 1778 } 1779 1780 // Go through all the destinations to see if any of them can remove GC candidates. 1781 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) { 1782 if( gcCandidateSet.isEmpty() ) { 1783 break; 1784 } 1785 1786 // Use a visitor to cut down the number of pages that we load 1787 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 1788 int last=-1; 1789 @Override 1790 public boolean isInterestedInKeysBetween(Location first, Location second) { 1791 if( first==null ) { 1792 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1); 1793 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1794 subset.remove(second.getDataFileId()); 1795 } 1796 return !subset.isEmpty(); 1797 } else if( second==null ) { 1798 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId()); 1799 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1800 subset.remove(first.getDataFileId()); 1801 } 1802 return !subset.isEmpty(); 1803 } else { 1804 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); 1805 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1806 subset.remove(first.getDataFileId()); 1807 } 1808 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1809 subset.remove(second.getDataFileId()); 1810 } 1811 return !subset.isEmpty(); 1812 } 1813 } 1814 1815 @Override 1816 public void visit(List<Location> keys, List<Long> values) { 1817 for (Location l : keys) { 1818 int fileId = l.getDataFileId(); 1819 if( last != fileId ) { 1820 gcCandidateSet.remove(fileId); 1821 last = fileId; 1822 } 1823 } 1824 } 1825 }); 1826 1827 // Durable Subscription 1828 if (entry.getValue().subLocations != null) { 1829 Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx); 1830 while (iter.hasNext()) { 1831 Entry<String, Location> subscription = iter.next(); 1832 int dataFileId = subscription.getValue().getDataFileId(); 1833 1834 // Move subscription along if it has no outstanding messages that need ack'd 1835 // and its in the last log file in the journal. 1836 if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) { 1837 final StoredDestination destination = entry.getValue(); 1838 final String subscriptionKey = subscription.getKey(); 1839 SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey); 1840 1841 // When pending is size one that is the next message Id meaning there 1842 // are no pending messages currently. 1843 if (pendingAcks == null || pendingAcks.isEmpty() || 1844 (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) { 1845 1846 if (LOG.isTraceEnabled()) { 1847 LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId); 1848 } 1849 1850 final KahaSubscriptionCommand kahaSub = 1851 destination.subscriptions.get(tx, subscriptionKey); 1852 destination.subLocations.put( 1853 tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub)); 1854 1855 // Skips the remove from candidates if we rewrote the subscription 1856 // in order to prevent duplicate subscription commands on recover. 1857 // If another subscription is on the same file and isn't rewritten 1858 // than it will remove the file from the set. 1859 continue; 1860 } 1861 } 1862 1863 gcCandidateSet.remove(dataFileId); 1864 } 1865 } 1866 1867 if (LOG.isTraceEnabled()) { 1868 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); 1869 } 1870 } 1871 1872 // check we are not deleting file with ack for in-use journal files 1873 if (LOG.isTraceEnabled()) { 1874 LOG.trace("gc candidates: " + gcCandidateSet); 1875 LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap); 1876 } 1877 1878 boolean ackMessageFileMapMod = false; 1879 Iterator<Integer> candidates = gcCandidateSet.iterator(); 1880 while (candidates.hasNext()) { 1881 Integer candidate = candidates.next(); 1882 Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate); 1883 if (referencedFileIds != null) { 1884 for (Integer referencedFileId : referencedFileIds) { 1885 if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) { 1886 // active file that is not targeted for deletion is referenced so don't delete 1887 candidates.remove(); 1888 break; 1889 } 1890 } 1891 if (gcCandidateSet.contains(candidate)) { 1892 ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null); 1893 } else { 1894 if (LOG.isTraceEnabled()) { 1895 LOG.trace("not removing data file: " + candidate 1896 + " as contained ack(s) refer to referenced file: " + referencedFileIds); 1897 } 1898 } 1899 } 1900 } 1901 1902 if (!gcCandidateSet.isEmpty()) { 1903 LOG.debug("Cleanup removing the data files: {}", gcCandidateSet); 1904 for (Integer candidate : gcCandidateSet) { 1905 for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) { 1906 ackMessageFileMapMod |= ackFiles.remove(candidate); 1907 } 1908 } 1909 if (ackMessageFileMapMod) { 1910 checkpointUpdate(tx, false); 1911 } 1912 } else if (isEnableAckCompaction()) { 1913 if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) { 1914 // First check length of journal to make sure it makes sense to even try. 1915 // 1916 // If there is only one journal file with Acks in it we don't need to move 1917 // it since it won't be chained to any later logs. 1918 // 1919 // If the logs haven't grown since the last time then we need to compact 1920 // otherwise there seems to still be room for growth and we don't need to incur 1921 // the overhead. Depending on configuration this check can be avoided and 1922 // Ack compaction will run any time the store has not GC'd a journal file in 1923 // the configured amount of cycles. 1924 if (metadata.ackMessageFileMap.size() > 1 && 1925 (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) { 1926 1927 LOG.trace("No files GC'd checking if threshold to ACK compaction has been met."); 1928 try { 1929 scheduler.execute(new AckCompactionRunner()); 1930 } catch (Exception ex) { 1931 LOG.warn("Error on queueing the Ack Compactor", ex); 1932 } 1933 } else { 1934 LOG.trace("Journal activity detected, no Ack compaction scheduled."); 1935 } 1936 1937 checkPointCyclesWithNoGC = 0; 1938 } else { 1939 LOG.trace("Not yet time to check for compaction: {} of {} cycles", 1940 checkPointCyclesWithNoGC, getCompactAcksAfterNoGC()); 1941 } 1942 1943 journalLogOnLastCompactionCheck = journal.getCurrentDataFileId(); 1944 } 1945 } 1946 MDC.remove("activemq.persistenceDir"); 1947 1948 LOG.debug("Checkpoint done."); 1949 return gcCandidateSet; 1950 } 1951 1952 private final class AckCompactionRunner implements Runnable { 1953 1954 @Override 1955 public void run() { 1956 1957 int journalToAdvance = -1; 1958 Set<Integer> journalLogsReferenced = new HashSet<Integer>(); 1959 1960 //flag to know whether the ack forwarding completed without an exception 1961 boolean forwarded = false; 1962 1963 try { 1964 //acquire the checkpoint lock to prevent other threads from 1965 //running a checkpoint while this is running 1966 // 1967 //Normally this task runs on the same executor as the checkpoint task 1968 //so this ack compaction runner wouldn't run at the same time as the checkpoint task. 1969 // 1970 //However, there are two cases where this isn't always true. 1971 //First, the checkpoint() method is public and can be called through the 1972 //PersistenceAdapter interface by someone at the same time this is running. 1973 //Second, a checkpoint is called during shutdown without using the executor. 1974 // 1975 //In the future it might be better to just remove the checkpointLock entirely 1976 //and only use the executor but this would need to be examined for any unintended 1977 //consequences 1978 checkpointLock.readLock().lock(); 1979 1980 try { 1981 1982 // Lock index to capture the ackMessageFileMap data 1983 indexLock.writeLock().lock(); 1984 1985 // Map keys might not be sorted, find the earliest log file to forward acks 1986 // from and move only those, future cycles can chip away at more as needed. 1987 // We won't move files that are themselves rewritten on a previous compaction. 1988 List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet()); 1989 Collections.sort(journalFileIds); 1990 for (Integer journalFileId : journalFileIds) { 1991 DataFile current = journal.getDataFileById(journalFileId); 1992 if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { 1993 journalToAdvance = journalFileId; 1994 break; 1995 } 1996 } 1997 1998 // Check if we found one, or if we only found the current file being written to. 1999 if (journalToAdvance == -1 || blockedFromCompaction(journalToAdvance)) { 2000 return; 2001 } 2002 2003 journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); 2004 2005 } finally { 2006 indexLock.writeLock().unlock(); 2007 } 2008 2009 try { 2010 // Background rewrite of the old acks 2011 forwardAllAcks(journalToAdvance, journalLogsReferenced); 2012 forwarded = true; 2013 } catch (IOException ioe) { 2014 LOG.error("Forwarding of acks failed", ioe); 2015 brokerService.handleIOException(ioe); 2016 } catch (Throwable e) { 2017 LOG.error("Forwarding of acks failed", e); 2018 brokerService.handleIOException(IOExceptionSupport.create(e)); 2019 } 2020 } finally { 2021 checkpointLock.readLock().unlock(); 2022 } 2023 2024 try { 2025 if (forwarded) { 2026 // Checkpoint with changes from the ackMessageFileMap 2027 checkpointUpdate(false); 2028 } 2029 } catch (IOException ioe) { 2030 LOG.error("Checkpoint failed", ioe); 2031 brokerService.handleIOException(ioe); 2032 } catch (Throwable e) { 2033 LOG.error("Checkpoint failed", e); 2034 brokerService.handleIOException(IOExceptionSupport.create(e)); 2035 } 2036 } 2037 } 2038 2039 // called with the index lock held 2040 private boolean blockedFromCompaction(int journalToAdvance) { 2041 // don't forward the current data file 2042 if (journalToAdvance == journal.getCurrentDataFileId()) { 2043 return true; 2044 } 2045 // don't forward any data file with inflight transaction records because it will whack the tx - data file link 2046 // in the ack map when all acks are migrated (now that the ack map is not just for acks) 2047 // TODO: prepare records can be dropped but completion records (maybe only commit outcomes) need to be migrated 2048 // as part of the forward work. 2049 Location[] inProgressTxRange = getInProgressTxLocationRange(); 2050 if (inProgressTxRange[0] != null) { 2051 for (int pendingTx = inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { 2052 if (journalToAdvance == pendingTx) { 2053 LOG.trace("Compaction target:{} blocked by inflight transaction records: {}", journalToAdvance, inProgressTxRange); 2054 return true; 2055 } 2056 } 2057 } 2058 return false; 2059 } 2060 2061 private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException { 2062 LOG.trace("Attempting to move all acks in journal:{} to the front. Referenced files:{}", journalToRead, journalLogsReferenced); 2063 2064 DataFile forwardsFile = journal.reserveDataFile(); 2065 forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE); 2066 LOG.trace("Reserved file for forwarded acks: {}", forwardsFile); 2067 2068 Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>(); 2069 2070 try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) { 2071 KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand(); 2072 compactionMarker.setSourceDataFileId(journalToRead); 2073 compactionMarker.setRewriteType(forwardsFile.getTypeCode()); 2074 2075 ByteSequence payload = toByteSequence(compactionMarker); 2076 appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); 2077 LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead); 2078 2079 final Location limit = new Location(journalToRead + 1, 0); 2080 Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0), limit); 2081 while (nextLocation != null) { 2082 JournalCommand<?> command = null; 2083 try { 2084 command = load(nextLocation); 2085 } catch (IOException ex) { 2086 LOG.trace("Error loading command during ack forward: {}", nextLocation); 2087 } 2088 2089 if (shouldForward(command)) { 2090 payload = toByteSequence(command); 2091 Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); 2092 updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced); 2093 } 2094 2095 nextLocation = getNextLocationForAckForward(nextLocation, limit); 2096 } 2097 } 2098 2099 LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations); 2100 2101 // Lock index while we update the ackMessageFileMap. 2102 indexLock.writeLock().lock(); 2103 2104 // Update the ack map with the new locations of the acks 2105 for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) { 2106 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); 2107 if (referenceFileIds == null) { 2108 referenceFileIds = new HashSet<Integer>(); 2109 referenceFileIds.addAll(entry.getValue()); 2110 metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); 2111 } else { 2112 referenceFileIds.addAll(entry.getValue()); 2113 } 2114 } 2115 2116 // remove the old location data from the ack map so that the old journal log file can 2117 // be removed on next GC. 2118 metadata.ackMessageFileMap.remove(journalToRead); 2119 2120 indexLock.writeLock().unlock(); 2121 2122 LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); 2123 } 2124 2125 private boolean shouldForward(JournalCommand<?> command) { 2126 boolean result = false; 2127 if (command != null) { 2128 if (command instanceof KahaRemoveMessageCommand) { 2129 result = true; 2130 } else if (command instanceof KahaCommitCommand) { 2131 KahaCommitCommand kahaCommitCommand = (KahaCommitCommand) command; 2132 if (kahaCommitCommand.hasTransactionInfo() && kahaCommitCommand.getTransactionInfo().hasXaTransactionId()) { 2133 result = true; 2134 } 2135 } 2136 } 2137 return result; 2138 } 2139 2140 private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) { 2141 //getNextLocation() can throw an IOException, we should handle it and set 2142 //nextLocation to null and abort gracefully 2143 //Should not happen in the normal case 2144 Location location = null; 2145 try { 2146 location = journal.getNextLocation(nextLocation, limit); 2147 } catch (IOException e) { 2148 LOG.warn("Failed to load next journal location after: {}, reason: {}", nextLocation, e); 2149 if (LOG.isDebugEnabled()) { 2150 LOG.debug("Failed to load next journal location after: {}", nextLocation, e); 2151 } 2152 } 2153 return location; 2154 } 2155 2156 final Runnable nullCompletionCallback = new Runnable() { 2157 @Override 2158 public void run() { 2159 } 2160 }; 2161 2162 private Location checkpointProducerAudit() throws IOException { 2163 if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { 2164 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2165 ObjectOutputStream oout = new ObjectOutputStream(baos); 2166 oout.writeObject(metadata.producerSequenceIdTracker); 2167 oout.flush(); 2168 oout.close(); 2169 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2170 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); 2171 try { 2172 location.getLatch().await(); 2173 if (location.getException().get() != null) { 2174 throw location.getException().get(); 2175 } 2176 } catch (InterruptedException e) { 2177 throw new InterruptedIOException(e.toString()); 2178 } 2179 return location; 2180 } 2181 return metadata.producerSequenceIdTrackerLocation; 2182 } 2183 2184 private Location checkpointAckMessageFileMap() throws IOException { 2185 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2186 ObjectOutputStream oout = new ObjectOutputStream(baos); 2187 oout.writeObject(metadata.ackMessageFileMap); 2188 oout.flush(); 2189 oout.close(); 2190 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2191 Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback); 2192 try { 2193 location.getLatch().await(); 2194 } catch (InterruptedException e) { 2195 throw new InterruptedIOException(e.toString()); 2196 } 2197 return location; 2198 } 2199 2200 private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException { 2201 2202 ByteSequence sequence = toByteSequence(subscription); 2203 Location location = journal.write(sequence, nullCompletionCallback) ; 2204 2205 try { 2206 location.getLatch().await(); 2207 } catch (InterruptedException e) { 2208 throw new InterruptedIOException(e.toString()); 2209 } 2210 return location; 2211 } 2212 2213 public HashSet<Integer> getJournalFilesBeingReplicated() { 2214 return journalFilesBeingReplicated; 2215 } 2216 2217 // ///////////////////////////////////////////////////////////////// 2218 // StoredDestination related implementation methods. 2219 // ///////////////////////////////////////////////////////////////// 2220 2221 protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 2222 2223 static class MessageKeys { 2224 final String messageId; 2225 final Location location; 2226 2227 public MessageKeys(String messageId, Location location) { 2228 this.messageId=messageId; 2229 this.location=location; 2230 } 2231 2232 @Override 2233 public String toString() { 2234 return "["+messageId+","+location+"]"; 2235 } 2236 } 2237 2238 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> { 2239 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); 2240 2241 @Override 2242 public MessageKeys readPayload(DataInput dataIn) throws IOException { 2243 return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn)); 2244 } 2245 2246 @Override 2247 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { 2248 dataOut.writeUTF(object.messageId); 2249 LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); 2250 } 2251 } 2252 2253 class LastAck { 2254 long lastAckedSequence; 2255 byte priority; 2256 2257 public LastAck(LastAck source) { 2258 this.lastAckedSequence = source.lastAckedSequence; 2259 this.priority = source.priority; 2260 } 2261 2262 public LastAck() { 2263 this.priority = MessageOrderIndex.HI; 2264 } 2265 2266 public LastAck(long ackLocation) { 2267 this.lastAckedSequence = ackLocation; 2268 this.priority = MessageOrderIndex.LO; 2269 } 2270 2271 public LastAck(long ackLocation, byte priority) { 2272 this.lastAckedSequence = ackLocation; 2273 this.priority = priority; 2274 } 2275 2276 @Override 2277 public String toString() { 2278 return "[" + lastAckedSequence + ":" + priority + "]"; 2279 } 2280 } 2281 2282 protected class LastAckMarshaller implements Marshaller<LastAck> { 2283 2284 @Override 2285 public void writePayload(LastAck object, DataOutput dataOut) throws IOException { 2286 dataOut.writeLong(object.lastAckedSequence); 2287 dataOut.writeByte(object.priority); 2288 } 2289 2290 @Override 2291 public LastAck readPayload(DataInput dataIn) throws IOException { 2292 LastAck lastAcked = new LastAck(); 2293 lastAcked.lastAckedSequence = dataIn.readLong(); 2294 if (metadata.version >= 3) { 2295 lastAcked.priority = dataIn.readByte(); 2296 } 2297 return lastAcked; 2298 } 2299 2300 @Override 2301 public int getFixedSize() { 2302 return 9; 2303 } 2304 2305 @Override 2306 public LastAck deepCopy(LastAck source) { 2307 return new LastAck(source); 2308 } 2309 2310 @Override 2311 public boolean isDeepCopySupported() { 2312 return true; 2313 } 2314 } 2315 2316 class StoredDestination { 2317 2318 MessageOrderIndex orderIndex = new MessageOrderIndex(); 2319 BTreeIndex<Location, Long> locationIndex; 2320 BTreeIndex<String, Long> messageIdIndex; 2321 2322 // These bits are only set for Topics 2323 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 2324 BTreeIndex<String, LastAck> subscriptionAcks; 2325 HashMap<String, MessageOrderCursor> subscriptionCursors; 2326 ListIndex<String, SequenceSet> ackPositions; 2327 ListIndex<String, Location> subLocations; 2328 2329 // Transient data used to track which Messages are no longer needed. 2330 final HashSet<String> subscriptionCache = new LinkedHashSet<String>(); 2331 2332 public void trackPendingAdd(Long seq) { 2333 orderIndex.trackPendingAdd(seq); 2334 } 2335 2336 public void trackPendingAddComplete(Long seq) { 2337 orderIndex.trackPendingAddComplete(seq); 2338 } 2339 2340 @Override 2341 public String toString() { 2342 return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size(); 2343 } 2344 } 2345 2346 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 2347 2348 @Override 2349 public StoredDestination readPayload(final DataInput dataIn) throws IOException { 2350 final StoredDestination value = new StoredDestination(); 2351 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2352 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong()); 2353 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 2354 2355 if (dataIn.readBoolean()) { 2356 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 2357 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong()); 2358 if (metadata.version >= 4) { 2359 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong()); 2360 } else { 2361 // upgrade 2362 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2363 @Override 2364 public void execute(Transaction tx) throws IOException { 2365 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>(); 2366 2367 if (metadata.version >= 3) { 2368 // migrate 2369 BTreeIndex<Long, HashSet<String>> oldAckPositions = 2370 new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong()); 2371 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); 2372 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); 2373 oldAckPositions.load(tx); 2374 2375 2376 // Do the initial build of the data in memory before writing into the store 2377 // based Ack Positions List to avoid a lot of disk thrashing. 2378 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx); 2379 while (iterator.hasNext()) { 2380 Entry<Long, HashSet<String>> entry = iterator.next(); 2381 2382 for(String subKey : entry.getValue()) { 2383 SequenceSet pendingAcks = temp.get(subKey); 2384 if (pendingAcks == null) { 2385 pendingAcks = new SequenceSet(); 2386 temp.put(subKey, pendingAcks); 2387 } 2388 2389 pendingAcks.add(entry.getKey()); 2390 } 2391 } 2392 } 2393 // Now move the pending messages to ack data into the store backed 2394 // structure. 2395 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2396 value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2397 value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2398 value.ackPositions.load(tx); 2399 for(String subscriptionKey : temp.keySet()) { 2400 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); 2401 } 2402 2403 } 2404 }); 2405 } 2406 2407 if (metadata.version >= 5) { 2408 value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong()); 2409 } else { 2410 // upgrade 2411 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2412 @Override 2413 public void execute(Transaction tx) throws IOException { 2414 value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2415 value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2416 value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2417 value.subLocations.load(tx); 2418 } 2419 }); 2420 } 2421 } 2422 if (metadata.version >= 2) { 2423 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2424 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2425 } else { 2426 // upgrade 2427 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2428 @Override 2429 public void execute(Transaction tx) throws IOException { 2430 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2431 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2432 value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2433 value.orderIndex.lowPriorityIndex.load(tx); 2434 2435 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2436 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2437 value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2438 value.orderIndex.highPriorityIndex.load(tx); 2439 } 2440 }); 2441 } 2442 2443 return value; 2444 } 2445 2446 @Override 2447 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 2448 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId()); 2449 dataOut.writeLong(value.locationIndex.getPageId()); 2450 dataOut.writeLong(value.messageIdIndex.getPageId()); 2451 if (value.subscriptions != null) { 2452 dataOut.writeBoolean(true); 2453 dataOut.writeLong(value.subscriptions.getPageId()); 2454 dataOut.writeLong(value.subscriptionAcks.getPageId()); 2455 dataOut.writeLong(value.ackPositions.getHeadPageId()); 2456 dataOut.writeLong(value.subLocations.getHeadPageId()); 2457 } else { 2458 dataOut.writeBoolean(false); 2459 } 2460 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); 2461 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); 2462 } 2463 } 2464 2465 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 2466 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 2467 2468 @Override 2469 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 2470 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 2471 rc.mergeFramed((InputStream)dataIn); 2472 return rc; 2473 } 2474 2475 @Override 2476 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 2477 object.writeFramed((OutputStream)dataOut); 2478 } 2479 } 2480 2481 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2482 String key = key(destination); 2483 StoredDestination rc = storedDestinations.get(key); 2484 if (rc == null) { 2485 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 2486 rc = loadStoredDestination(tx, key, topic); 2487 // Cache it. We may want to remove/unload destinations from the 2488 // cache that are not used for a while 2489 // to reduce memory usage. 2490 storedDestinations.put(key, rc); 2491 } 2492 return rc; 2493 } 2494 2495 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2496 String key = key(destination); 2497 StoredDestination rc = storedDestinations.get(key); 2498 if (rc == null && metadata.destinations.containsKey(tx, key)) { 2499 rc = getStoredDestination(destination, tx); 2500 } 2501 return rc; 2502 } 2503 2504 /** 2505 * @param tx 2506 * @param key 2507 * @param topic 2508 * @return 2509 * @throws IOException 2510 */ 2511 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 2512 // Try to load the existing indexes.. 2513 StoredDestination rc = metadata.destinations.get(tx, key); 2514 if (rc == null) { 2515 // Brand new destination.. allocate indexes for it. 2516 rc = new StoredDestination(); 2517 rc.orderIndex.allocate(tx); 2518 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate()); 2519 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 2520 2521 if (topic) { 2522 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 2523 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate()); 2524 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2525 rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2526 } 2527 metadata.destinations.put(tx, key, rc); 2528 } 2529 2530 // Configure the marshalers and load. 2531 rc.orderIndex.load(tx); 2532 2533 // Figure out the next key using the last entry in the destination. 2534 rc.orderIndex.configureLast(tx); 2535 2536 rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE); 2537 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2538 rc.locationIndex.load(tx); 2539 2540 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 2541 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2542 rc.messageIdIndex.load(tx); 2543 2544 // If it was a topic... 2545 if (topic) { 2546 2547 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 2548 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 2549 rc.subscriptions.load(tx); 2550 2551 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 2552 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); 2553 rc.subscriptionAcks.load(tx); 2554 2555 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2556 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2557 rc.ackPositions.load(tx); 2558 2559 rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2560 rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2561 rc.subLocations.load(tx); 2562 2563 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>(); 2564 2565 if (metadata.version < 3) { 2566 2567 // on upgrade need to fill ackLocation with available messages past last ack 2568 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2569 Entry<String, LastAck> entry = iterator.next(); 2570 for (Iterator<Entry<Long, MessageKeys>> orderIterator = 2571 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { 2572 Long sequence = orderIterator.next().getKey(); 2573 addAckLocation(tx, rc, sequence, entry.getKey()); 2574 } 2575 // modify so it is upgraded 2576 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue()); 2577 } 2578 } 2579 2580 2581 // Configure the subscription cache 2582 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2583 Entry<String, LastAck> entry = iterator.next(); 2584 rc.subscriptionCache.add(entry.getKey()); 2585 } 2586 2587 if (rc.orderIndex.nextMessageId == 0) { 2588 // check for existing durable sub all acked out - pull next seq from acks as messages are gone 2589 if (!rc.subscriptionAcks.isEmpty(tx)) { 2590 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 2591 Entry<String, LastAck> entry = iterator.next(); 2592 rc.orderIndex.nextMessageId = 2593 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); 2594 } 2595 } 2596 } else { 2597 // update based on ackPositions for unmatched, last entry is always the next 2598 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx); 2599 while (subscriptions.hasNext()) { 2600 Entry<String, SequenceSet> subscription = subscriptions.next(); 2601 SequenceSet pendingAcks = subscription.getValue(); 2602 if (pendingAcks != null && !pendingAcks.isEmpty()) { 2603 for (Long sequenceId : pendingAcks) { 2604 rc.orderIndex.nextMessageId = Math.max(rc.orderIndex.nextMessageId, sequenceId); 2605 } 2606 } 2607 } 2608 } 2609 } 2610 2611 if (metadata.version < VERSION) { 2612 // store again after upgrade 2613 metadata.destinations.put(tx, key, rc); 2614 } 2615 return rc; 2616 } 2617 2618 /** 2619 * This is a map to cache MessageStores for a specific 2620 * KahaDestination key 2621 */ 2622 protected final ConcurrentMap<String, MessageStore> storeCache = 2623 new ConcurrentHashMap<>(); 2624 2625 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 2626 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2627 if (sequences == null) { 2628 sequences = new SequenceSet(); 2629 sequences.add(messageSequence); 2630 sd.ackPositions.add(tx, subscriptionKey, sequences); 2631 } else { 2632 sequences.add(messageSequence); 2633 sd.ackPositions.put(tx, subscriptionKey, sequences); 2634 } 2635 } 2636 2637 // new sub is interested in potentially all existing messages 2638 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2639 SequenceSet allOutstanding = new SequenceSet(); 2640 Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx); 2641 while (iterator.hasNext()) { 2642 SequenceSet set = iterator.next().getValue(); 2643 for (Long entry : set) { 2644 allOutstanding.add(entry); 2645 } 2646 } 2647 sd.ackPositions.put(tx, subscriptionKey, allOutstanding); 2648 } 2649 2650 // on a new message add, all existing subs are interested in this message 2651 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { 2652 for(String subscriptionKey : sd.subscriptionCache) { 2653 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2654 if (sequences == null) { 2655 sequences = new SequenceSet(); 2656 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2657 sd.ackPositions.add(tx, subscriptionKey, sequences); 2658 } else { 2659 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2660 sd.ackPositions.put(tx, subscriptionKey, sequences); 2661 } 2662 } 2663 } 2664 2665 private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2666 if (!sd.ackPositions.isEmpty(tx)) { 2667 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey); 2668 if (sequences == null || sequences.isEmpty()) { 2669 return; 2670 } 2671 2672 ArrayList<Long> unreferenced = new ArrayList<Long>(); 2673 2674 for(Long sequenceId : sequences) { 2675 if(!isSequenceReferenced(tx, sd, sequenceId)) { 2676 unreferenced.add(sequenceId); 2677 } 2678 } 2679 2680 for(Long sequenceId : unreferenced) { 2681 // Find all the entries that need to get deleted. 2682 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2683 sd.orderIndex.getDeleteList(tx, deletes, sequenceId); 2684 2685 // Do the actual deletes. 2686 for (Entry<Long, MessageKeys> entry : deletes) { 2687 sd.locationIndex.remove(tx, entry.getValue().location); 2688 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2689 sd.orderIndex.remove(tx, entry.getKey()); 2690 } 2691 } 2692 } 2693 } 2694 2695 private boolean isSequenceReferenced(final Transaction tx, final StoredDestination sd, final Long sequenceId) throws IOException { 2696 for(String subscriptionKey : sd.subscriptionCache) { 2697 SequenceSet sequence = sd.ackPositions.get(tx, subscriptionKey); 2698 if (sequence != null && sequence.contains(sequenceId)) { 2699 return true; 2700 } 2701 } 2702 return false; 2703 } 2704 2705 /** 2706 * @param tx 2707 * @param sd 2708 * @param subscriptionKey 2709 * @param messageSequence 2710 * @throws IOException 2711 */ 2712 private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException { 2713 // Remove the sub from the previous location set.. 2714 if (messageSequence != null) { 2715 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); 2716 if (range != null && !range.isEmpty()) { 2717 range.remove(messageSequence); 2718 if (!range.isEmpty()) { 2719 sd.ackPositions.put(tx, subscriptionKey, range); 2720 } else { 2721 sd.ackPositions.remove(tx, subscriptionKey); 2722 } 2723 2724 // Check if the message is reference by any other subscription. 2725 if (isSequenceReferenced(tx, sd, messageSequence)) { 2726 return; 2727 } 2728 // Find all the entries that need to get deleted. 2729 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2730 sd.orderIndex.getDeleteList(tx, deletes, messageSequence); 2731 2732 // Do the actual deletes. 2733 for (Entry<Long, MessageKeys> entry : deletes) { 2734 sd.locationIndex.remove(tx, entry.getValue().location); 2735 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2736 sd.orderIndex.remove(tx, entry.getKey()); 2737 } 2738 } 2739 } 2740 } 2741 2742 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2743 return sd.subscriptionAcks.get(tx, subscriptionKey); 2744 } 2745 2746 protected SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2747 if (sd.ackPositions != null) { 2748 final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2749 return messageSequences; 2750 } 2751 2752 return null; 2753 } 2754 2755 protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2756 if (sd.ackPositions != null) { 2757 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2758 if (messageSequences != null) { 2759 long result = messageSequences.rangeSize(); 2760 // if there's anything in the range the last value is always the nextMessage marker, so remove 1. 2761 return result > 0 ? result - 1 : 0; 2762 } 2763 } 2764 2765 return 0; 2766 } 2767 2768 protected String key(KahaDestination destination) { 2769 return destination.getType().getNumber() + ":" + destination.getName(); 2770 } 2771 2772 // ///////////////////////////////////////////////////////////////// 2773 // Transaction related implementation methods. 2774 // ///////////////////////////////////////////////////////////////// 2775 @SuppressWarnings("rawtypes") 2776 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2777 @SuppressWarnings("rawtypes") 2778 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2779 2780 @SuppressWarnings("rawtypes") 2781 private List<Operation> getInflightTx(KahaTransactionInfo info) { 2782 TransactionId key = TransactionIdConversion.convert(info); 2783 List<Operation> tx; 2784 synchronized (inflightTransactions) { 2785 tx = inflightTransactions.get(key); 2786 if (tx == null) { 2787 tx = Collections.synchronizedList(new ArrayList<Operation>()); 2788 inflightTransactions.put(key, tx); 2789 } 2790 } 2791 return tx; 2792 } 2793 2794 @SuppressWarnings("unused") 2795 private TransactionId key(KahaTransactionInfo transactionInfo) { 2796 return TransactionIdConversion.convert(transactionInfo); 2797 } 2798 2799 abstract class Operation <T extends JournalCommand<T>> { 2800 final T command; 2801 final Location location; 2802 2803 public Operation(T command, Location location) { 2804 this.command = command; 2805 this.location = location; 2806 } 2807 2808 public Location getLocation() { 2809 return location; 2810 } 2811 2812 public T getCommand() { 2813 return command; 2814 } 2815 2816 abstract public void execute(Transaction tx) throws IOException; 2817 } 2818 2819 class AddOperation extends Operation<KahaAddMessageCommand> { 2820 final IndexAware runWithIndexLock; 2821 public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) { 2822 super(command, location); 2823 this.runWithIndexLock = runWithIndexLock; 2824 } 2825 2826 @Override 2827 public void execute(Transaction tx) throws IOException { 2828 long seq = updateIndex(tx, command, location); 2829 if (runWithIndexLock != null) { 2830 runWithIndexLock.sequenceAssignedWithIndexLocked(seq); 2831 } 2832 } 2833 } 2834 2835 class RemoveOperation extends Operation<KahaRemoveMessageCommand> { 2836 2837 public RemoveOperation(KahaRemoveMessageCommand command, Location location) { 2838 super(command, location); 2839 } 2840 2841 @Override 2842 public void execute(Transaction tx) throws IOException { 2843 updateIndex(tx, command, location); 2844 } 2845 } 2846 2847 // ///////////////////////////////////////////////////////////////// 2848 // Initialization related implementation methods. 2849 // ///////////////////////////////////////////////////////////////// 2850 2851 private PageFile createPageFile() throws IOException { 2852 if (indexDirectory == null) { 2853 indexDirectory = directory; 2854 } 2855 IOHelper.mkdirs(indexDirectory); 2856 PageFile index = new PageFile(indexDirectory, "db"); 2857 index.setEnableWriteThread(isEnableIndexWriteAsync()); 2858 index.setWriteBatchSize(getIndexWriteBatchSize()); 2859 index.setPageCacheSize(indexCacheSize); 2860 index.setUseLFRUEviction(isUseIndexLFRUEviction()); 2861 index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); 2862 index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); 2863 index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); 2864 index.setEnablePageCaching(isEnableIndexPageCaching()); 2865 return index; 2866 } 2867 2868 protected Journal createJournal() throws IOException { 2869 Journal manager = new Journal(); 2870 manager.setDirectory(directory); 2871 manager.setMaxFileLength(getJournalMaxFileLength()); 2872 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); 2873 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); 2874 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 2875 manager.setArchiveDataLogs(isArchiveDataLogs()); 2876 manager.setSizeAccumulator(journalSize); 2877 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); 2878 manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase())); 2879 manager.setPreallocationStrategy( 2880 Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase())); 2881 manager.setJournalDiskSyncStrategy(journalDiskSyncStrategy); 2882 if (getDirectoryArchive() != null) { 2883 IOHelper.mkdirs(getDirectoryArchive()); 2884 manager.setDirectoryArchive(getDirectoryArchive()); 2885 } 2886 return manager; 2887 } 2888 2889 private Metadata createMetadata() { 2890 Metadata md = new Metadata(); 2891 md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth()); 2892 md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack()); 2893 return md; 2894 } 2895 2896 public int getJournalMaxWriteBatchSize() { 2897 return journalMaxWriteBatchSize; 2898 } 2899 2900 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 2901 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 2902 } 2903 2904 public File getDirectory() { 2905 return directory; 2906 } 2907 2908 public void setDirectory(File directory) { 2909 this.directory = directory; 2910 } 2911 2912 public boolean isDeleteAllMessages() { 2913 return deleteAllMessages; 2914 } 2915 2916 public void setDeleteAllMessages(boolean deleteAllMessages) { 2917 this.deleteAllMessages = deleteAllMessages; 2918 } 2919 2920 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 2921 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 2922 } 2923 2924 public int getIndexWriteBatchSize() { 2925 return setIndexWriteBatchSize; 2926 } 2927 2928 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 2929 this.enableIndexWriteAsync = enableIndexWriteAsync; 2930 } 2931 2932 boolean isEnableIndexWriteAsync() { 2933 return enableIndexWriteAsync; 2934 } 2935 2936 /** 2937 * @deprecated use {@link #getJournalDiskSyncStrategyEnum} or {@link #getJournalDiskSyncStrategy} instead 2938 * @return 2939 */ 2940 public boolean isEnableJournalDiskSyncs() { 2941 return journalDiskSyncStrategy == JournalDiskSyncStrategy.ALWAYS; 2942 } 2943 2944 /** 2945 * @deprecated use {@link #setEnableJournalDiskSyncs} instead 2946 * @param syncWrites 2947 */ 2948 public void setEnableJournalDiskSyncs(boolean syncWrites) { 2949 if (syncWrites) { 2950 journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 2951 } else { 2952 journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER; 2953 } 2954 } 2955 2956 public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() { 2957 return journalDiskSyncStrategy; 2958 } 2959 2960 public String getJournalDiskSyncStrategy() { 2961 return journalDiskSyncStrategy.name(); 2962 } 2963 2964 public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) { 2965 this.journalDiskSyncStrategy = JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase()); 2966 } 2967 2968 public long getJournalDiskSyncInterval() { 2969 return journalDiskSyncInterval; 2970 } 2971 2972 public void setJournalDiskSyncInterval(long journalDiskSyncInterval) { 2973 this.journalDiskSyncInterval = journalDiskSyncInterval; 2974 } 2975 2976 public long getCheckpointInterval() { 2977 return checkpointInterval; 2978 } 2979 2980 public void setCheckpointInterval(long checkpointInterval) { 2981 this.checkpointInterval = checkpointInterval; 2982 } 2983 2984 public long getCleanupInterval() { 2985 return cleanupInterval; 2986 } 2987 2988 public void setCleanupInterval(long cleanupInterval) { 2989 this.cleanupInterval = cleanupInterval; 2990 } 2991 2992 public boolean getCleanupOnStop() { 2993 return cleanupOnStop; 2994 } 2995 2996 public void setCleanupOnStop(boolean cleanupOnStop) { 2997 this.cleanupOnStop = cleanupOnStop; 2998 } 2999 3000 public void setJournalMaxFileLength(int journalMaxFileLength) { 3001 this.journalMaxFileLength = journalMaxFileLength; 3002 } 3003 3004 public int getJournalMaxFileLength() { 3005 return journalMaxFileLength; 3006 } 3007 3008 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 3009 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); 3010 } 3011 3012 public int getMaxFailoverProducersToTrack() { 3013 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); 3014 } 3015 3016 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 3017 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); 3018 } 3019 3020 public int getFailoverProducersAuditDepth() { 3021 return this.metadata.producerSequenceIdTracker.getAuditDepth(); 3022 } 3023 3024 public PageFile getPageFile() throws IOException { 3025 if (pageFile == null) { 3026 pageFile = createPageFile(); 3027 } 3028 return pageFile; 3029 } 3030 3031 public Journal getJournal() throws IOException { 3032 if (journal == null) { 3033 journal = createJournal(); 3034 } 3035 return journal; 3036 } 3037 3038 protected Metadata getMetadata() { 3039 return metadata; 3040 } 3041 3042 public boolean isFailIfDatabaseIsLocked() { 3043 return failIfDatabaseIsLocked; 3044 } 3045 3046 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 3047 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 3048 } 3049 3050 public boolean isIgnoreMissingJournalfiles() { 3051 return ignoreMissingJournalfiles; 3052 } 3053 3054 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 3055 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 3056 } 3057 3058 public int getIndexCacheSize() { 3059 return indexCacheSize; 3060 } 3061 3062 public void setIndexCacheSize(int indexCacheSize) { 3063 this.indexCacheSize = indexCacheSize; 3064 } 3065 3066 public boolean isCheckForCorruptJournalFiles() { 3067 return checkForCorruptJournalFiles; 3068 } 3069 3070 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 3071 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 3072 } 3073 3074 public boolean isChecksumJournalFiles() { 3075 return checksumJournalFiles; 3076 } 3077 3078 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 3079 this.checksumJournalFiles = checksumJournalFiles; 3080 } 3081 3082 @Override 3083 public void setBrokerService(BrokerService brokerService) { 3084 this.brokerService = brokerService; 3085 } 3086 3087 /** 3088 * @return the archiveDataLogs 3089 */ 3090 public boolean isArchiveDataLogs() { 3091 return this.archiveDataLogs; 3092 } 3093 3094 /** 3095 * @param archiveDataLogs the archiveDataLogs to set 3096 */ 3097 public void setArchiveDataLogs(boolean archiveDataLogs) { 3098 this.archiveDataLogs = archiveDataLogs; 3099 } 3100 3101 /** 3102 * @return the directoryArchive 3103 */ 3104 public File getDirectoryArchive() { 3105 return this.directoryArchive; 3106 } 3107 3108 /** 3109 * @param directoryArchive the directoryArchive to set 3110 */ 3111 public void setDirectoryArchive(File directoryArchive) { 3112 this.directoryArchive = directoryArchive; 3113 } 3114 3115 public boolean isArchiveCorruptedIndex() { 3116 return archiveCorruptedIndex; 3117 } 3118 3119 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 3120 this.archiveCorruptedIndex = archiveCorruptedIndex; 3121 } 3122 3123 public float getIndexLFUEvictionFactor() { 3124 return indexLFUEvictionFactor; 3125 } 3126 3127 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 3128 this.indexLFUEvictionFactor = indexLFUEvictionFactor; 3129 } 3130 3131 public boolean isUseIndexLFRUEviction() { 3132 return useIndexLFRUEviction; 3133 } 3134 3135 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 3136 this.useIndexLFRUEviction = useIndexLFRUEviction; 3137 } 3138 3139 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { 3140 this.enableIndexDiskSyncs = enableIndexDiskSyncs; 3141 } 3142 3143 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { 3144 this.enableIndexRecoveryFile = enableIndexRecoveryFile; 3145 } 3146 3147 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { 3148 this.enableIndexPageCaching = enableIndexPageCaching; 3149 } 3150 3151 public boolean isEnableIndexDiskSyncs() { 3152 return enableIndexDiskSyncs; 3153 } 3154 3155 public boolean isEnableIndexRecoveryFile() { 3156 return enableIndexRecoveryFile; 3157 } 3158 3159 public boolean isEnableIndexPageCaching() { 3160 return enableIndexPageCaching; 3161 } 3162 3163 // ///////////////////////////////////////////////////////////////// 3164 // Internal conversion methods. 3165 // ///////////////////////////////////////////////////////////////// 3166 3167 class MessageOrderCursor{ 3168 long defaultCursorPosition; 3169 long lowPriorityCursorPosition; 3170 long highPriorityCursorPosition; 3171 MessageOrderCursor(){ 3172 } 3173 3174 MessageOrderCursor(long position){ 3175 this.defaultCursorPosition=position; 3176 this.lowPriorityCursorPosition=position; 3177 this.highPriorityCursorPosition=position; 3178 } 3179 3180 MessageOrderCursor(MessageOrderCursor other){ 3181 this.defaultCursorPosition=other.defaultCursorPosition; 3182 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3183 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3184 } 3185 3186 MessageOrderCursor copy() { 3187 return new MessageOrderCursor(this); 3188 } 3189 3190 void reset() { 3191 this.defaultCursorPosition=0; 3192 this.highPriorityCursorPosition=0; 3193 this.lowPriorityCursorPosition=0; 3194 } 3195 3196 void increment() { 3197 if (defaultCursorPosition!=0) { 3198 defaultCursorPosition++; 3199 } 3200 if (highPriorityCursorPosition!=0) { 3201 highPriorityCursorPosition++; 3202 } 3203 if (lowPriorityCursorPosition!=0) { 3204 lowPriorityCursorPosition++; 3205 } 3206 } 3207 3208 @Override 3209 public String toString() { 3210 return "MessageOrderCursor:[def:" + defaultCursorPosition 3211 + ", low:" + lowPriorityCursorPosition 3212 + ", high:" + highPriorityCursorPosition + "]"; 3213 } 3214 3215 public void sync(MessageOrderCursor other) { 3216 this.defaultCursorPosition=other.defaultCursorPosition; 3217 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3218 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3219 } 3220 } 3221 3222 class MessageOrderIndex { 3223 static final byte HI = 9; 3224 static final byte LO = 0; 3225 static final byte DEF = 4; 3226 3227 long nextMessageId; 3228 BTreeIndex<Long, MessageKeys> defaultPriorityIndex; 3229 BTreeIndex<Long, MessageKeys> lowPriorityIndex; 3230 BTreeIndex<Long, MessageKeys> highPriorityIndex; 3231 final MessageOrderCursor cursor = new MessageOrderCursor(); 3232 Long lastDefaultKey; 3233 Long lastHighKey; 3234 Long lastLowKey; 3235 byte lastGetPriority; 3236 final List<Long> pendingAdditions = new LinkedList<Long>(); 3237 3238 MessageKeys remove(Transaction tx, Long key) throws IOException { 3239 MessageKeys result = defaultPriorityIndex.remove(tx, key); 3240 if (result == null && highPriorityIndex!=null) { 3241 result = highPriorityIndex.remove(tx, key); 3242 if (result ==null && lowPriorityIndex!=null) { 3243 result = lowPriorityIndex.remove(tx, key); 3244 } 3245 } 3246 return result; 3247 } 3248 3249 void load(Transaction tx) throws IOException { 3250 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3251 defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 3252 defaultPriorityIndex.load(tx); 3253 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3254 lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 3255 lowPriorityIndex.load(tx); 3256 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3257 highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 3258 highPriorityIndex.load(tx); 3259 } 3260 3261 void allocate(Transaction tx) throws IOException { 3262 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3263 if (metadata.version >= 2) { 3264 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3265 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3266 } 3267 } 3268 3269 void configureLast(Transaction tx) throws IOException { 3270 // Figure out the next key using the last entry in the destination. 3271 TreeSet<Long> orderedSet = new TreeSet<Long>(); 3272 3273 addLast(orderedSet, highPriorityIndex, tx); 3274 addLast(orderedSet, defaultPriorityIndex, tx); 3275 addLast(orderedSet, lowPriorityIndex, tx); 3276 3277 if (!orderedSet.isEmpty()) { 3278 nextMessageId = orderedSet.last() + 1; 3279 } 3280 } 3281 3282 private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException { 3283 if (index != null) { 3284 Entry<Long, MessageKeys> lastEntry = index.getLast(tx); 3285 if (lastEntry != null) { 3286 orderedSet.add(lastEntry.getKey()); 3287 } 3288 } 3289 } 3290 3291 void clear(Transaction tx) throws IOException { 3292 this.remove(tx); 3293 this.resetCursorPosition(); 3294 this.allocate(tx); 3295 this.load(tx); 3296 this.configureLast(tx); 3297 } 3298 3299 void remove(Transaction tx) throws IOException { 3300 defaultPriorityIndex.clear(tx); 3301 defaultPriorityIndex.unload(tx); 3302 tx.free(defaultPriorityIndex.getPageId()); 3303 if (lowPriorityIndex != null) { 3304 lowPriorityIndex.clear(tx); 3305 lowPriorityIndex.unload(tx); 3306 3307 tx.free(lowPriorityIndex.getPageId()); 3308 } 3309 if (highPriorityIndex != null) { 3310 highPriorityIndex.clear(tx); 3311 highPriorityIndex.unload(tx); 3312 tx.free(highPriorityIndex.getPageId()); 3313 } 3314 } 3315 3316 void resetCursorPosition() { 3317 this.cursor.reset(); 3318 lastDefaultKey = null; 3319 lastHighKey = null; 3320 lastLowKey = null; 3321 } 3322 3323 void setBatch(Transaction tx, Long sequence) throws IOException { 3324 if (sequence != null) { 3325 Long nextPosition = new Long(sequence.longValue() + 1); 3326 lastDefaultKey = sequence; 3327 cursor.defaultCursorPosition = nextPosition.longValue(); 3328 lastHighKey = sequence; 3329 cursor.highPriorityCursorPosition = nextPosition.longValue(); 3330 lastLowKey = sequence; 3331 cursor.lowPriorityCursorPosition = nextPosition.longValue(); 3332 } 3333 } 3334 3335 void setBatch(Transaction tx, LastAck last) throws IOException { 3336 setBatch(tx, last.lastAckedSequence); 3337 if (cursor.defaultCursorPosition == 0 3338 && cursor.highPriorityCursorPosition == 0 3339 && cursor.lowPriorityCursorPosition == 0) { 3340 long next = last.lastAckedSequence + 1; 3341 switch (last.priority) { 3342 case DEF: 3343 cursor.defaultCursorPosition = next; 3344 cursor.highPriorityCursorPosition = next; 3345 break; 3346 case HI: 3347 cursor.highPriorityCursorPosition = next; 3348 break; 3349 case LO: 3350 cursor.lowPriorityCursorPosition = next; 3351 cursor.defaultCursorPosition = next; 3352 cursor.highPriorityCursorPosition = next; 3353 break; 3354 } 3355 } 3356 } 3357 3358 void stoppedIterating() { 3359 if (lastDefaultKey!=null) { 3360 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1; 3361 } 3362 if (lastHighKey!=null) { 3363 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1; 3364 } 3365 if (lastLowKey!=null) { 3366 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1; 3367 } 3368 lastDefaultKey = null; 3369 lastHighKey = null; 3370 lastLowKey = null; 3371 } 3372 3373 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId) 3374 throws IOException { 3375 if (defaultPriorityIndex.containsKey(tx, sequenceId)) { 3376 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); 3377 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) { 3378 getDeleteList(tx, deletes, highPriorityIndex, sequenceId); 3379 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) { 3380 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId); 3381 } 3382 } 3383 3384 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, 3385 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException { 3386 3387 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null); 3388 deletes.add(iterator.next()); 3389 } 3390 3391 long getNextMessageId() { 3392 return nextMessageId++; 3393 } 3394 3395 void revertNextMessageId() { 3396 nextMessageId--; 3397 } 3398 3399 MessageKeys get(Transaction tx, Long key) throws IOException { 3400 MessageKeys result = defaultPriorityIndex.get(tx, key); 3401 if (result == null) { 3402 result = highPriorityIndex.get(tx, key); 3403 if (result == null) { 3404 result = lowPriorityIndex.get(tx, key); 3405 lastGetPriority = LO; 3406 } else { 3407 lastGetPriority = HI; 3408 } 3409 } else { 3410 lastGetPriority = DEF; 3411 } 3412 return result; 3413 } 3414 3415 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException { 3416 if (priority == javax.jms.Message.DEFAULT_PRIORITY) { 3417 return defaultPriorityIndex.put(tx, key, value); 3418 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) { 3419 return highPriorityIndex.put(tx, key, value); 3420 } else { 3421 return lowPriorityIndex.put(tx, key, value); 3422 } 3423 } 3424 3425 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{ 3426 return new MessageOrderIterator(tx,cursor,this); 3427 } 3428 3429 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ 3430 return new MessageOrderIterator(tx,m,this); 3431 } 3432 3433 public byte lastGetPriority() { 3434 return lastGetPriority; 3435 } 3436 3437 public boolean alreadyDispatched(Long sequence) { 3438 return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) || 3439 (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) || 3440 (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence); 3441 } 3442 3443 public void trackPendingAdd(Long seq) { 3444 synchronized (pendingAdditions) { 3445 pendingAdditions.add(seq); 3446 } 3447 } 3448 3449 public void trackPendingAddComplete(Long seq) { 3450 synchronized (pendingAdditions) { 3451 pendingAdditions.remove(seq); 3452 } 3453 } 3454 3455 public Long minPendingAdd() { 3456 synchronized (pendingAdditions) { 3457 if (!pendingAdditions.isEmpty()) { 3458 return pendingAdditions.get(0); 3459 } else { 3460 return null; 3461 } 3462 } 3463 } 3464 3465 3466 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ 3467 Iterator<Entry<Long, MessageKeys>>currentIterator; 3468 final Iterator<Entry<Long, MessageKeys>>highIterator; 3469 final Iterator<Entry<Long, MessageKeys>>defaultIterator; 3470 final Iterator<Entry<Long, MessageKeys>>lowIterator; 3471 3472 MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException { 3473 Long pendingAddLimiter = messageOrderIndex.minPendingAdd(); 3474 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter); 3475 if (highPriorityIndex != null) { 3476 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter); 3477 } else { 3478 this.highIterator = null; 3479 } 3480 if (lowPriorityIndex != null) { 3481 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter); 3482 } else { 3483 this.lowIterator = null; 3484 } 3485 } 3486 3487 @Override 3488 public boolean hasNext() { 3489 if (currentIterator == null) { 3490 if (highIterator != null) { 3491 if (highIterator.hasNext()) { 3492 currentIterator = highIterator; 3493 return currentIterator.hasNext(); 3494 } 3495 if (defaultIterator.hasNext()) { 3496 currentIterator = defaultIterator; 3497 return currentIterator.hasNext(); 3498 } 3499 if (lowIterator.hasNext()) { 3500 currentIterator = lowIterator; 3501 return currentIterator.hasNext(); 3502 } 3503 return false; 3504 } else { 3505 currentIterator = defaultIterator; 3506 return currentIterator.hasNext(); 3507 } 3508 } 3509 if (highIterator != null) { 3510 if (currentIterator.hasNext()) { 3511 return true; 3512 } 3513 if (currentIterator == highIterator) { 3514 if (defaultIterator.hasNext()) { 3515 currentIterator = defaultIterator; 3516 return currentIterator.hasNext(); 3517 } 3518 if (lowIterator.hasNext()) { 3519 currentIterator = lowIterator; 3520 return currentIterator.hasNext(); 3521 } 3522 return false; 3523 } 3524 3525 if (currentIterator == defaultIterator) { 3526 if (lowIterator.hasNext()) { 3527 currentIterator = lowIterator; 3528 return currentIterator.hasNext(); 3529 } 3530 return false; 3531 } 3532 } 3533 return currentIterator.hasNext(); 3534 } 3535 3536 @Override 3537 public Entry<Long, MessageKeys> next() { 3538 Entry<Long, MessageKeys> result = currentIterator.next(); 3539 if (result != null) { 3540 Long key = result.getKey(); 3541 if (highIterator != null) { 3542 if (currentIterator == defaultIterator) { 3543 lastDefaultKey = key; 3544 } else if (currentIterator == highIterator) { 3545 lastHighKey = key; 3546 } else { 3547 lastLowKey = key; 3548 } 3549 } else { 3550 lastDefaultKey = key; 3551 } 3552 } 3553 return result; 3554 } 3555 3556 @Override 3557 public void remove() { 3558 throw new UnsupportedOperationException(); 3559 } 3560 3561 } 3562 } 3563 3564 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> { 3565 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); 3566 3567 @Override 3568 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException { 3569 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 3570 ObjectOutputStream oout = new ObjectOutputStream(baos); 3571 oout.writeObject(object); 3572 oout.flush(); 3573 oout.close(); 3574 byte[] data = baos.toByteArray(); 3575 dataOut.writeInt(data.length); 3576 dataOut.write(data); 3577 } 3578 3579 @Override 3580 @SuppressWarnings("unchecked") 3581 public HashSet<String> readPayload(DataInput dataIn) throws IOException { 3582 int dataLen = dataIn.readInt(); 3583 byte[] data = new byte[dataLen]; 3584 dataIn.readFully(data); 3585 ByteArrayInputStream bais = new ByteArrayInputStream(data); 3586 ObjectInputStream oin = new ObjectInputStream(bais); 3587 try { 3588 return (HashSet<String>) oin.readObject(); 3589 } catch (ClassNotFoundException cfe) { 3590 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe); 3591 ioe.initCause(cfe); 3592 throw ioe; 3593 } 3594 } 3595 } 3596 3597 public File getIndexDirectory() { 3598 return indexDirectory; 3599 } 3600 3601 public void setIndexDirectory(File indexDirectory) { 3602 this.indexDirectory = indexDirectory; 3603 } 3604 3605 interface IndexAware { 3606 public void sequenceAssignedWithIndexLocked(long index); 3607 } 3608 3609 public String getPreallocationScope() { 3610 return preallocationScope; 3611 } 3612 3613 public void setPreallocationScope(String preallocationScope) { 3614 this.preallocationScope = preallocationScope; 3615 } 3616 3617 public String getPreallocationStrategy() { 3618 return preallocationStrategy; 3619 } 3620 3621 public void setPreallocationStrategy(String preallocationStrategy) { 3622 this.preallocationStrategy = preallocationStrategy; 3623 } 3624 3625 public int getCompactAcksAfterNoGC() { 3626 return compactAcksAfterNoGC; 3627 } 3628 3629 /** 3630 * Sets the number of GC cycles where no journal logs were removed before an attempt to 3631 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 3632 * <p> 3633 * A value of -1 will disable this feature. 3634 * 3635 * @param compactAcksAfterNoGC 3636 * Number of empty GC cycles before we rewrite old ACKS. 3637 */ 3638 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 3639 this.compactAcksAfterNoGC = compactAcksAfterNoGC; 3640 } 3641 3642 /** 3643 * Returns whether Ack compaction will ignore that the store is still growing 3644 * and run more often. 3645 * 3646 * @return the compactAcksIgnoresStoreGrowth current value. 3647 */ 3648 public boolean isCompactAcksIgnoresStoreGrowth() { 3649 return compactAcksIgnoresStoreGrowth; 3650 } 3651 3652 /** 3653 * Configure if Ack compaction will occur regardless of continued growth of the 3654 * journal logs meaning that the store has not run out of space yet. Because the 3655 * compaction operation can be costly this value is defaulted to off and the Ack 3656 * compaction is only done when it seems that the store cannot grow and larger. 3657 * 3658 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 3659 */ 3660 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 3661 this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth; 3662 } 3663 3664 /** 3665 * Returns whether Ack compaction is enabled 3666 * 3667 * @return enableAckCompaction 3668 */ 3669 public boolean isEnableAckCompaction() { 3670 return enableAckCompaction; 3671 } 3672 3673 /** 3674 * Configure if the Ack compaction task should be enabled to run 3675 * 3676 * @param enableAckCompaction 3677 */ 3678 public void setEnableAckCompaction(boolean enableAckCompaction) { 3679 this.enableAckCompaction = enableAckCompaction; 3680 } 3681}