001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.DataInput; 024import java.io.DataOutput; 025import java.io.EOFException; 026import java.io.File; 027import java.io.IOException; 028import java.io.InputStream; 029import java.io.InterruptedIOException; 030import java.io.ObjectInputStream; 031import java.io.ObjectOutputStream; 032import java.io.OutputStream; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.Collections; 037import java.util.Date; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.LinkedHashMap; 042import java.util.LinkedHashSet; 043import java.util.LinkedList; 044import java.util.List; 045import java.util.Map; 046import java.util.Map.Entry; 047import java.util.Set; 048import java.util.SortedSet; 049import java.util.TreeMap; 050import java.util.TreeSet; 051 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.ConcurrentMap; 054import java.util.concurrent.Executors; 055import java.util.concurrent.ScheduledExecutorService; 056import java.util.concurrent.ThreadFactory; 057import java.util.concurrent.TimeUnit; 058import java.util.concurrent.atomic.AtomicBoolean; 059import java.util.concurrent.atomic.AtomicLong; 060import java.util.concurrent.atomic.AtomicReference; 061import java.util.concurrent.locks.ReentrantReadWriteLock; 062 063import org.apache.activemq.ActiveMQMessageAuditNoSync; 064import org.apache.activemq.broker.BrokerService; 065import org.apache.activemq.broker.BrokerServiceAware; 066import org.apache.activemq.command.TransactionId; 067import org.apache.activemq.openwire.OpenWireFormat; 068import org.apache.activemq.protobuf.Buffer; 069import org.apache.activemq.store.MessageStore; 070import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; 071import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 072import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 073import org.apache.activemq.store.kahadb.data.KahaDestination; 074import org.apache.activemq.store.kahadb.data.KahaEntryType; 075import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 076import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; 077import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 078import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 079import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand; 080import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 081import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 082import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 083import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 084import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 085import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 086import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; 087import org.apache.activemq.store.kahadb.disk.index.ListIndex; 088import org.apache.activemq.store.kahadb.disk.journal.DataFile; 089import org.apache.activemq.store.kahadb.disk.journal.Journal; 090import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; 091import org.apache.activemq.store.kahadb.disk.journal.Location; 092import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender; 093import org.apache.activemq.store.kahadb.disk.page.Page; 094import org.apache.activemq.store.kahadb.disk.page.PageFile; 095import org.apache.activemq.store.kahadb.disk.page.Transaction; 096import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; 097import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; 098import org.apache.activemq.store.kahadb.disk.util.Marshaller; 099import org.apache.activemq.store.kahadb.disk.util.Sequence; 100import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 101import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 102import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 103import org.apache.activemq.util.ByteSequence; 104import org.apache.activemq.util.DataByteArrayInputStream; 105import org.apache.activemq.util.DataByteArrayOutputStream; 106import org.apache.activemq.util.IOExceptionSupport; 107import org.apache.activemq.util.IOHelper; 108import org.apache.activemq.util.ServiceStopper; 109import org.apache.activemq.util.ServiceSupport; 110import org.apache.activemq.util.ThreadPoolUtils; 111import org.slf4j.Logger; 112import org.slf4j.LoggerFactory; 113import org.slf4j.MDC; 114 115public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { 116 117 protected BrokerService brokerService; 118 119 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 120 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); 121 public static final File DEFAULT_DIRECTORY = new File("KahaDB"); 122 protected static final Buffer UNMATCHED; 123 static { 124 UNMATCHED = new Buffer(new byte[]{}); 125 } 126 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); 127 128 static final int CLOSED_STATE = 1; 129 static final int OPEN_STATE = 2; 130 static final long NOT_ACKED = -1; 131 132 static final int VERSION = 5; 133 134 static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1; 135 136 protected class Metadata { 137 protected Page<Metadata> page; 138 protected int state; 139 protected BTreeIndex<String, StoredDestination> destinations; 140 protected Location lastUpdate; 141 protected Location firstInProgressTransactionLocation; 142 protected Location producerSequenceIdTrackerLocation = null; 143 protected Location ackMessageFileMapLocation = null; 144 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); 145 protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>(); 146 protected int version = VERSION; 147 protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION; 148 149 public void read(DataInput is) throws IOException { 150 state = is.readInt(); 151 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong()); 152 if (is.readBoolean()) { 153 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); 154 } else { 155 lastUpdate = null; 156 } 157 if (is.readBoolean()) { 158 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); 159 } else { 160 firstInProgressTransactionLocation = null; 161 } 162 try { 163 if (is.readBoolean()) { 164 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); 165 } else { 166 producerSequenceIdTrackerLocation = null; 167 } 168 } catch (EOFException expectedOnUpgrade) { 169 } 170 try { 171 version = is.readInt(); 172 } catch (EOFException expectedOnUpgrade) { 173 version = 1; 174 } 175 if (version >= 5 && is.readBoolean()) { 176 ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is); 177 } else { 178 ackMessageFileMapLocation = null; 179 } 180 try { 181 openwireVersion = is.readInt(); 182 } catch (EOFException expectedOnUpgrade) { 183 openwireVersion = OpenWireFormat.DEFAULT_VERSION; 184 } 185 LOG.info("KahaDB is version " + version); 186 } 187 188 public void write(DataOutput os) throws IOException { 189 os.writeInt(state); 190 os.writeLong(destinations.getPageId()); 191 192 if (lastUpdate != null) { 193 os.writeBoolean(true); 194 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); 195 } else { 196 os.writeBoolean(false); 197 } 198 199 if (firstInProgressTransactionLocation != null) { 200 os.writeBoolean(true); 201 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); 202 } else { 203 os.writeBoolean(false); 204 } 205 206 if (producerSequenceIdTrackerLocation != null) { 207 os.writeBoolean(true); 208 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); 209 } else { 210 os.writeBoolean(false); 211 } 212 os.writeInt(VERSION); 213 if (ackMessageFileMapLocation != null) { 214 os.writeBoolean(true); 215 LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os); 216 } else { 217 os.writeBoolean(false); 218 } 219 os.writeInt(this.openwireVersion); 220 } 221 } 222 223 class MetadataMarshaller extends VariableMarshaller<Metadata> { 224 @Override 225 public Metadata readPayload(DataInput dataIn) throws IOException { 226 Metadata rc = createMetadata(); 227 rc.read(dataIn); 228 return rc; 229 } 230 231 @Override 232 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 233 object.write(dataOut); 234 } 235 } 236 237 protected PageFile pageFile; 238 protected Journal journal; 239 protected Metadata metadata = new Metadata(); 240 241 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); 242 243 protected boolean failIfDatabaseIsLocked; 244 245 protected boolean deleteAllMessages; 246 protected File directory = DEFAULT_DIRECTORY; 247 protected File indexDirectory = null; 248 protected ScheduledExecutorService scheduler; 249 private final Object schedulerLock = new Object(); 250 251 protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 252 protected boolean archiveDataLogs; 253 protected File directoryArchive; 254 protected AtomicLong journalSize = new AtomicLong(0); 255 long journalDiskSyncInterval = 1000; 256 long checkpointInterval = 5*1000; 257 long cleanupInterval = 30*1000; 258 boolean cleanupOnStop = true; 259 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 260 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 261 boolean enableIndexWriteAsync = false; 262 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 263 private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name(); 264 private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name(); 265 266 protected AtomicBoolean opened = new AtomicBoolean(); 267 private boolean ignoreMissingJournalfiles = false; 268 private int indexCacheSize = 10000; 269 private boolean checkForCorruptJournalFiles = false; 270 private boolean checksumJournalFiles = true; 271 protected boolean forceRecoverIndex = false; 272 273 private boolean archiveCorruptedIndex = false; 274 private boolean useIndexLFRUEviction = false; 275 private float indexLFUEvictionFactor = 0.2f; 276 private boolean enableIndexDiskSyncs = true; 277 private boolean enableIndexRecoveryFile = true; 278 private boolean enableIndexPageCaching = true; 279 ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); 280 281 private boolean enableAckCompaction = false; 282 private int compactAcksAfterNoGC = 10; 283 private boolean compactAcksIgnoresStoreGrowth = false; 284 private int checkPointCyclesWithNoGC; 285 private int journalLogOnLastCompactionCheck; 286 287 //only set when using JournalDiskSyncStrategy.PERIODIC 288 protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>(); 289 290 @Override 291 public void doStart() throws Exception { 292 load(); 293 } 294 295 @Override 296 public void doStop(ServiceStopper stopper) throws Exception { 297 unload(); 298 } 299 300 public void allowIOResumption() { 301 if (pageFile != null) { 302 pageFile.allowIOResumption(); 303 } 304 if (journal != null) { 305 journal.allowIOResumption(); 306 } 307 } 308 309 private void loadPageFile() throws IOException { 310 this.indexLock.writeLock().lock(); 311 try { 312 final PageFile pageFile = getPageFile(); 313 pageFile.load(); 314 pageFile.tx().execute(new Transaction.Closure<IOException>() { 315 @Override 316 public void execute(Transaction tx) throws IOException { 317 if (pageFile.getPageCount() == 0) { 318 // First time this is created.. Initialize the metadata 319 Page<Metadata> page = tx.allocate(); 320 assert page.getPageId() == 0; 321 page.set(metadata); 322 metadata.page = page; 323 metadata.state = CLOSED_STATE; 324 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 325 326 tx.store(metadata.page, metadataMarshaller, true); 327 } else { 328 Page<Metadata> page = tx.load(0, metadataMarshaller); 329 metadata = page.get(); 330 metadata.page = page; 331 } 332 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 333 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); 334 metadata.destinations.load(tx); 335 } 336 }); 337 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. 338 // Perhaps we should just keep an index of file 339 storedDestinations.clear(); 340 pageFile.tx().execute(new Transaction.Closure<IOException>() { 341 @Override 342 public void execute(Transaction tx) throws IOException { 343 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 344 Entry<String, StoredDestination> entry = iterator.next(); 345 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); 346 storedDestinations.put(entry.getKey(), sd); 347 348 if (checkForCorruptJournalFiles) { 349 // sanity check the index also 350 if (!entry.getValue().locationIndex.isEmpty(tx)) { 351 if (entry.getValue().orderIndex.nextMessageId <= 0) { 352 throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); 353 } 354 } 355 } 356 } 357 } 358 }); 359 pageFile.flush(); 360 } finally { 361 this.indexLock.writeLock().unlock(); 362 } 363 } 364 365 private void startCheckpoint() { 366 if (checkpointInterval == 0 && cleanupInterval == 0) { 367 LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart"); 368 return; 369 } 370 synchronized (schedulerLock) { 371 if (scheduler == null || scheduler.isShutdown()) { 372 scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { 373 374 @Override 375 public Thread newThread(Runnable r) { 376 Thread schedulerThread = new Thread(r); 377 378 schedulerThread.setName("ActiveMQ Journal Checkpoint Worker"); 379 schedulerThread.setDaemon(true); 380 381 return schedulerThread; 382 } 383 }); 384 385 // Short intervals for check-point and cleanups 386 long delay; 387 if (journal.isJournalDiskSyncPeriodic()) { 388 delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500); 389 } else { 390 delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); 391 } 392 393 scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS); 394 } 395 } 396 } 397 398 private final class CheckpointRunner implements Runnable { 399 400 private long lastCheckpoint = System.currentTimeMillis(); 401 private long lastCleanup = System.currentTimeMillis(); 402 private long lastSync = System.currentTimeMillis(); 403 private Location lastAsyncUpdate = null; 404 405 @Override 406 public void run() { 407 try { 408 // Decide on cleanup vs full checkpoint here. 409 if (opened.get()) { 410 long now = System.currentTimeMillis(); 411 if (journal.isJournalDiskSyncPeriodic() && 412 journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) { 413 Location currentUpdate = lastAsyncJournalUpdate.get(); 414 if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) { 415 lastAsyncUpdate = currentUpdate; 416 if (LOG.isTraceEnabled()) { 417 LOG.trace("Writing trace command to trigger journal sync"); 418 } 419 store(new KahaTraceCommand(), true, null, null); 420 } 421 lastSync = now; 422 } 423 if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) { 424 checkpointCleanup(true); 425 lastCleanup = now; 426 lastCheckpoint = now; 427 } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) { 428 checkpointCleanup(false); 429 lastCheckpoint = now; 430 } 431 } 432 } catch (IOException ioe) { 433 LOG.error("Checkpoint failed", ioe); 434 brokerService.handleIOException(ioe); 435 } catch (Throwable e) { 436 LOG.error("Checkpoint failed", e); 437 brokerService.handleIOException(IOExceptionSupport.create(e)); 438 } 439 } 440 } 441 442 public void open() throws IOException { 443 if( opened.compareAndSet(false, true) ) { 444 getJournal().start(); 445 try { 446 loadPageFile(); 447 } catch (Throwable t) { 448 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); 449 if (LOG.isDebugEnabled()) { 450 LOG.debug("Index load failure", t); 451 } 452 // try to recover index 453 try { 454 pageFile.unload(); 455 } catch (Exception ignore) {} 456 if (archiveCorruptedIndex) { 457 pageFile.archive(); 458 } else { 459 pageFile.delete(); 460 } 461 metadata = createMetadata(); 462 pageFile = null; 463 loadPageFile(); 464 } 465 startCheckpoint(); 466 recover(); 467 } 468 } 469 470 public void load() throws IOException { 471 this.indexLock.writeLock().lock(); 472 IOHelper.mkdirs(directory); 473 try { 474 if (deleteAllMessages) { 475 getJournal().setCheckForCorruptionOnStartup(false); 476 getJournal().start(); 477 getJournal().delete(); 478 getJournal().close(); 479 journal = null; 480 getPageFile().delete(); 481 LOG.info("Persistence store purged."); 482 deleteAllMessages = false; 483 } 484 485 open(); 486 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 487 } finally { 488 this.indexLock.writeLock().unlock(); 489 } 490 } 491 492 public void close() throws IOException, InterruptedException { 493 if( opened.compareAndSet(true, false)) { 494 checkpointLock.writeLock().lock(); 495 try { 496 if (metadata.page != null) { 497 checkpointUpdate(getCleanupOnStop()); 498 } 499 pageFile.unload(); 500 metadata = createMetadata(); 501 } finally { 502 checkpointLock.writeLock().unlock(); 503 } 504 journal.close(); 505 synchronized(schedulerLock) { 506 if (scheduler != null) { 507 ThreadPoolUtils.shutdownGraceful(scheduler, -1); 508 scheduler = null; 509 } 510 } 511 // clear the cache and journalSize on shutdown of the store 512 storeCache.clear(); 513 journalSize.set(0); 514 } 515 } 516 517 public void unload() throws IOException, InterruptedException { 518 this.indexLock.writeLock().lock(); 519 try { 520 if( pageFile != null && pageFile.isLoaded() ) { 521 metadata.state = CLOSED_STATE; 522 metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; 523 524 if (metadata.page != null) { 525 pageFile.tx().execute(new Transaction.Closure<IOException>() { 526 @Override 527 public void execute(Transaction tx) throws IOException { 528 tx.store(metadata.page, metadataMarshaller, true); 529 } 530 }); 531 } 532 } 533 } finally { 534 this.indexLock.writeLock().unlock(); 535 } 536 close(); 537 } 538 539 // public for testing 540 @SuppressWarnings("rawtypes") 541 public Location[] getInProgressTxLocationRange() { 542 Location[] range = new Location[]{null, null}; 543 synchronized (inflightTransactions) { 544 if (!inflightTransactions.isEmpty()) { 545 for (List<Operation> ops : inflightTransactions.values()) { 546 if (!ops.isEmpty()) { 547 trackMaxAndMin(range, ops); 548 } 549 } 550 } 551 if (!preparedTransactions.isEmpty()) { 552 for (List<Operation> ops : preparedTransactions.values()) { 553 if (!ops.isEmpty()) { 554 trackMaxAndMin(range, ops); 555 } 556 } 557 } 558 } 559 return range; 560 } 561 562 @SuppressWarnings("rawtypes") 563 private void trackMaxAndMin(Location[] range, List<Operation> ops) { 564 Location t = ops.get(0).getLocation(); 565 if (range[0] == null || t.compareTo(range[0]) <= 0) { 566 range[0] = t; 567 } 568 t = ops.get(ops.size() -1).getLocation(); 569 if (range[1] == null || t.compareTo(range[1]) >= 0) { 570 range[1] = t; 571 } 572 } 573 574 class TranInfo { 575 TransactionId id; 576 Location location; 577 578 class opCount { 579 int add; 580 int remove; 581 } 582 HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>(); 583 584 @SuppressWarnings("rawtypes") 585 public void track(Operation operation) { 586 if (location == null ) { 587 location = operation.getLocation(); 588 } 589 KahaDestination destination; 590 boolean isAdd = false; 591 if (operation instanceof AddOperation) { 592 AddOperation add = (AddOperation) operation; 593 destination = add.getCommand().getDestination(); 594 isAdd = true; 595 } else { 596 RemoveOperation removeOpperation = (RemoveOperation) operation; 597 destination = removeOpperation.getCommand().getDestination(); 598 } 599 opCount opCount = destinationOpCount.get(destination); 600 if (opCount == null) { 601 opCount = new opCount(); 602 destinationOpCount.put(destination, opCount); 603 } 604 if (isAdd) { 605 opCount.add++; 606 } else { 607 opCount.remove++; 608 } 609 } 610 611 @Override 612 public String toString() { 613 StringBuffer buffer = new StringBuffer(); 614 buffer.append(location).append(";").append(id).append(";\n"); 615 for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) { 616 buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); 617 } 618 return buffer.toString(); 619 } 620 } 621 622 @SuppressWarnings("rawtypes") 623 public String getTransactions() { 624 625 ArrayList<TranInfo> infos = new ArrayList<TranInfo>(); 626 synchronized (inflightTransactions) { 627 if (!inflightTransactions.isEmpty()) { 628 for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) { 629 TranInfo info = new TranInfo(); 630 info.id = entry.getKey(); 631 for (Operation operation : entry.getValue()) { 632 info.track(operation); 633 } 634 infos.add(info); 635 } 636 } 637 } 638 synchronized (preparedTransactions) { 639 if (!preparedTransactions.isEmpty()) { 640 for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) { 641 TranInfo info = new TranInfo(); 642 info.id = entry.getKey(); 643 for (Operation operation : entry.getValue()) { 644 info.track(operation); 645 } 646 infos.add(info); 647 } 648 } 649 } 650 return infos.toString(); 651 } 652 653 /** 654 * Move all the messages that were in the journal into long term storage. We 655 * just replay and do a checkpoint. 656 * 657 * @throws IOException 658 * @throws IOException 659 * @throws IllegalStateException 660 */ 661 private void recover() throws IllegalStateException, IOException { 662 this.indexLock.writeLock().lock(); 663 try { 664 665 long start = System.currentTimeMillis(); 666 boolean requiresJournalReplay = recoverProducerAudit(); 667 requiresJournalReplay |= recoverAckMessageFileMap(); 668 Location lastIndoubtPosition = getRecoveryPosition(); 669 Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition; 670 if (recoveryPosition != null) { 671 int redoCounter = 0; 672 int dataFileRotationTracker = recoveryPosition.getDataFileId(); 673 LOG.info("Recovering from the journal @" + recoveryPosition); 674 while (recoveryPosition != null) { 675 try { 676 JournalCommand<?> message = load(recoveryPosition); 677 metadata.lastUpdate = recoveryPosition; 678 process(message, recoveryPosition, lastIndoubtPosition); 679 redoCounter++; 680 } catch (IOException failedRecovery) { 681 if (isIgnoreMissingJournalfiles()) { 682 LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); 683 // track this dud location 684 journal.corruptRecoveryLocation(recoveryPosition); 685 } else { 686 throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery); 687 } 688 } 689 recoveryPosition = journal.getNextLocation(recoveryPosition); 690 // hold on to the minimum number of open files during recovery 691 if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) { 692 dataFileRotationTracker = recoveryPosition.getDataFileId(); 693 journal.cleanup(); 694 } 695 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { 696 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered .."); 697 } 698 } 699 if (LOG.isInfoEnabled()) { 700 long end = System.currentTimeMillis(); 701 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); 702 } 703 } 704 705 // We may have to undo some index updates. 706 pageFile.tx().execute(new Transaction.Closure<IOException>() { 707 @Override 708 public void execute(Transaction tx) throws IOException { 709 recoverIndex(tx); 710 } 711 }); 712 713 // rollback any recovered inflight local transactions, and discard any inflight XA transactions. 714 Set<TransactionId> toRollback = new HashSet<TransactionId>(); 715 Set<TransactionId> toDiscard = new HashSet<TransactionId>(); 716 synchronized (inflightTransactions) { 717 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { 718 TransactionId id = it.next(); 719 if (id.isLocalTransaction()) { 720 toRollback.add(id); 721 } else { 722 toDiscard.add(id); 723 } 724 } 725 for (TransactionId tx: toRollback) { 726 if (LOG.isDebugEnabled()) { 727 LOG.debug("rolling back recovered indoubt local transaction " + tx); 728 } 729 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null); 730 } 731 for (TransactionId tx: toDiscard) { 732 if (LOG.isDebugEnabled()) { 733 LOG.debug("discarding recovered in-flight XA transaction " + tx); 734 } 735 inflightTransactions.remove(tx); 736 } 737 } 738 739 synchronized (preparedTransactions) { 740 for (TransactionId txId : preparedTransactions.keySet()) { 741 LOG.warn("Recovered prepared XA TX: [{}]", txId); 742 } 743 } 744 745 } finally { 746 this.indexLock.writeLock().unlock(); 747 } 748 } 749 750 @SuppressWarnings("unused") 751 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { 752 return TransactionIdConversion.convertToLocal(tx); 753 } 754 755 private Location minimum(Location x, 756 Location y) { 757 Location min = null; 758 if (x != null) { 759 min = x; 760 if (y != null) { 761 int compare = y.compareTo(x); 762 if (compare < 0) { 763 min = y; 764 } 765 } 766 } else { 767 min = y; 768 } 769 return min; 770 } 771 772 private boolean recoverProducerAudit() throws IOException { 773 boolean requiresReplay = true; 774 if (metadata.producerSequenceIdTrackerLocation != null) { 775 try { 776 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); 777 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); 778 int maxNumProducers = getMaxFailoverProducersToTrack(); 779 int maxAuditDepth = getFailoverProducersAuditDepth(); 780 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); 781 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); 782 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); 783 requiresReplay = false; 784 } catch (Exception e) { 785 LOG.warn("Cannot recover message audit", e); 786 } 787 } 788 // got no audit stored so got to recreate via replay from start of the journal 789 return requiresReplay; 790 } 791 792 @SuppressWarnings("unchecked") 793 private boolean recoverAckMessageFileMap() throws IOException { 794 boolean requiresReplay = true; 795 if (metadata.ackMessageFileMapLocation != null) { 796 try { 797 KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); 798 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); 799 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); 800 requiresReplay = false; 801 } catch (Exception e) { 802 LOG.warn("Cannot recover ackMessageFileMap", e); 803 } 804 } 805 // got no ackMessageFileMap stored so got to recreate via replay from start of the journal 806 return requiresReplay; 807 } 808 809 protected void recoverIndex(Transaction tx) throws IOException { 810 long start = System.currentTimeMillis(); 811 // It is possible index updates got applied before the journal updates.. 812 // in that case we need to removed references to messages that are not in the journal 813 final Location lastAppendLocation = journal.getLastAppendLocation(); 814 long undoCounter=0; 815 816 // Go through all the destinations to see if they have messages past the lastAppendLocation 817 for (StoredDestination sd : storedDestinations.values()) { 818 819 final ArrayList<Long> matches = new ArrayList<Long>(); 820 // Find all the Locations that are >= than the last Append Location. 821 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) { 822 @Override 823 protected void matched(Location key, Long value) { 824 matches.add(value); 825 } 826 }); 827 828 for (Long sequenceId : matches) { 829 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 830 sd.locationIndex.remove(tx, keys.location); 831 sd.messageIdIndex.remove(tx, keys.messageId); 832 metadata.producerSequenceIdTracker.rollback(keys.messageId); 833 undoCounter++; 834 // TODO: do we need to modify the ack positions for the pub sub case? 835 } 836 } 837 838 if (undoCounter > 0) { 839 // The rolledback operations are basically in flight journal writes. To avoid getting 840 // these the end user should do sync writes to the journal. 841 if (LOG.isInfoEnabled()) { 842 long end = System.currentTimeMillis(); 843 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 844 } 845 } 846 847 undoCounter = 0; 848 start = System.currentTimeMillis(); 849 850 // Lets be extra paranoid here and verify that all the datafiles being referenced 851 // by the indexes still exists. 852 853 final SequenceSet ss = new SequenceSet(); 854 for (StoredDestination sd : storedDestinations.values()) { 855 // Use a visitor to cut down the number of pages that we load 856 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 857 int last=-1; 858 859 @Override 860 public boolean isInterestedInKeysBetween(Location first, Location second) { 861 if( first==null ) { 862 return !ss.contains(0, second.getDataFileId()); 863 } else if( second==null ) { 864 return true; 865 } else { 866 return !ss.contains(first.getDataFileId(), second.getDataFileId()); 867 } 868 } 869 870 @Override 871 public void visit(List<Location> keys, List<Long> values) { 872 for (Location l : keys) { 873 int fileId = l.getDataFileId(); 874 if( last != fileId ) { 875 ss.add(fileId); 876 last = fileId; 877 } 878 } 879 } 880 881 }); 882 } 883 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 884 while (!ss.isEmpty()) { 885 missingJournalFiles.add((int) ss.removeFirst()); 886 } 887 888 for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) { 889 missingJournalFiles.add(entry.getKey()); 890 for (Integer i : entry.getValue()) { 891 missingJournalFiles.add(i); 892 } 893 } 894 895 missingJournalFiles.removeAll(journal.getFileMap().keySet()); 896 897 if (!missingJournalFiles.isEmpty()) { 898 LOG.warn("Some journal files are missing: " + missingJournalFiles); 899 } 900 901 ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>(); 902 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>(); 903 for (Integer missing : missingJournalFiles) { 904 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0))); 905 } 906 907 if (checkForCorruptJournalFiles) { 908 Collection<DataFile> dataFiles = journal.getFileMap().values(); 909 for (DataFile dataFile : dataFiles) { 910 int id = dataFile.getDataFileId(); 911 // eof to next file id 912 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0))); 913 Sequence seq = dataFile.getCorruptedBlocks().getHead(); 914 while (seq != null) { 915 BTreeVisitor.BetweenVisitor<Location, Long> visitor = 916 new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)); 917 missingPredicates.add(visitor); 918 knownCorruption.add(visitor); 919 seq = seq.getNext(); 920 } 921 } 922 } 923 924 if (!missingPredicates.isEmpty()) { 925 for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) { 926 final StoredDestination sd = sdEntry.getValue(); 927 final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>(); 928 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) { 929 @Override 930 protected void matched(Location key, Long value) { 931 matches.put(value, key); 932 } 933 }); 934 935 // If some message references are affected by the missing data files... 936 if (!matches.isEmpty()) { 937 938 // We either 'gracefully' recover dropping the missing messages or 939 // we error out. 940 if( ignoreMissingJournalfiles ) { 941 // Update the index to remove the references to the missing data 942 for (Long sequenceId : matches.keySet()) { 943 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 944 sd.locationIndex.remove(tx, keys.location); 945 sd.messageIdIndex.remove(tx, keys.messageId); 946 LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); 947 undoCounter++; 948 // TODO: do we need to modify the ack positions for the pub sub case? 949 } 950 } else { 951 LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches); 952 throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected."); 953 } 954 } 955 } 956 } 957 958 if (!ignoreMissingJournalfiles) { 959 if (!knownCorruption.isEmpty()) { 960 LOG.error("Detected corrupt journal files. " + knownCorruption); 961 throw new IOException("Detected corrupt journal files. " + knownCorruption); 962 } 963 964 if (!missingJournalFiles.isEmpty()) { 965 LOG.error("Detected missing journal files. " + missingJournalFiles); 966 throw new IOException("Detected missing journal files. " + missingJournalFiles); 967 } 968 } 969 970 if (undoCounter > 0) { 971 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 972 // should do sync writes to the journal. 973 if (LOG.isInfoEnabled()) { 974 long end = System.currentTimeMillis(); 975 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 976 } 977 } 978 } 979 980 private Location nextRecoveryPosition; 981 private Location lastRecoveryPosition; 982 983 public void incrementalRecover() throws IOException { 984 this.indexLock.writeLock().lock(); 985 try { 986 if( nextRecoveryPosition == null ) { 987 if( lastRecoveryPosition==null ) { 988 nextRecoveryPosition = getRecoveryPosition(); 989 } else { 990 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 991 } 992 } 993 while (nextRecoveryPosition != null) { 994 lastRecoveryPosition = nextRecoveryPosition; 995 metadata.lastUpdate = lastRecoveryPosition; 996 JournalCommand<?> message = load(lastRecoveryPosition); 997 process(message, lastRecoveryPosition, (IndexAware) null); 998 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 999 } 1000 } finally { 1001 this.indexLock.writeLock().unlock(); 1002 } 1003 } 1004 1005 public Location getLastUpdatePosition() throws IOException { 1006 return metadata.lastUpdate; 1007 } 1008 1009 private Location getRecoveryPosition() throws IOException { 1010 1011 if (!this.forceRecoverIndex) { 1012 1013 // If we need to recover the transactions.. 1014 if (metadata.firstInProgressTransactionLocation != null) { 1015 return metadata.firstInProgressTransactionLocation; 1016 } 1017 1018 // Perhaps there were no transactions... 1019 if( metadata.lastUpdate!=null) { 1020 // Start replay at the record after the last one recorded in the index file. 1021 return getNextInitializedLocation(metadata.lastUpdate); 1022 } 1023 } 1024 // This loads the first position. 1025 return journal.getNextLocation(null); 1026 } 1027 1028 private Location getNextInitializedLocation(Location location) throws IOException { 1029 Location mayNotBeInitialized = journal.getNextLocation(location); 1030 if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) { 1031 // need to init size and type to skip 1032 return journal.getNextLocation(mayNotBeInitialized); 1033 } else { 1034 return mayNotBeInitialized; 1035 } 1036 } 1037 1038 protected void checkpointCleanup(final boolean cleanup) throws IOException { 1039 long start; 1040 this.indexLock.writeLock().lock(); 1041 try { 1042 start = System.currentTimeMillis(); 1043 if( !opened.get() ) { 1044 return; 1045 } 1046 } finally { 1047 this.indexLock.writeLock().unlock(); 1048 } 1049 checkpointUpdate(cleanup); 1050 long end = System.currentTimeMillis(); 1051 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1052 if (LOG.isInfoEnabled()) { 1053 LOG.info("Slow KahaDB access: cleanup took " + (end - start)); 1054 } 1055 } 1056 } 1057 1058 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { 1059 int size = data.serializedSizeFramed(); 1060 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 1061 os.writeByte(data.type().getNumber()); 1062 data.writeFramed(os); 1063 return os.toByteSequence(); 1064 } 1065 1066 // ///////////////////////////////////////////////////////////////// 1067 // Methods call by the broker to update and query the store. 1068 // ///////////////////////////////////////////////////////////////// 1069 public Location store(JournalCommand<?> data) throws IOException { 1070 return store(data, false, null,null); 1071 } 1072 1073 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException { 1074 return store(data, false, null, null, onJournalStoreComplete); 1075 } 1076 1077 public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException { 1078 return store(data, sync, before, after, null); 1079 } 1080 1081 /** 1082 * All updated are are funneled through this method. The updates are converted 1083 * to a JournalMessage which is logged to the journal and then the data from 1084 * the JournalMessage is used to update the index just like it would be done 1085 * during a recovery process. 1086 */ 1087 public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { 1088 try { 1089 ByteSequence sequence = toByteSequence(data); 1090 Location location; 1091 1092 checkpointLock.readLock().lock(); 1093 try { 1094 1095 long start = System.currentTimeMillis(); 1096 location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; 1097 long start2 = System.currentTimeMillis(); 1098 //Track the last async update so we know if we need to sync at the next checkpoint 1099 if (!sync && journal.isJournalDiskSyncPeriodic()) { 1100 lastAsyncJournalUpdate.set(location); 1101 } 1102 process(data, location, before); 1103 1104 long end = System.currentTimeMillis(); 1105 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1106 if (LOG.isInfoEnabled()) { 1107 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); 1108 } 1109 } 1110 } finally { 1111 checkpointLock.readLock().unlock(); 1112 } 1113 1114 if (after != null) { 1115 after.run(); 1116 } 1117 1118 return location; 1119 } catch (IOException ioe) { 1120 LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe); 1121 brokerService.handleIOException(ioe); 1122 throw ioe; 1123 } 1124 } 1125 1126 /** 1127 * Loads a previously stored JournalMessage 1128 * 1129 * @param location 1130 * @return 1131 * @throws IOException 1132 */ 1133 public JournalCommand<?> load(Location location) throws IOException { 1134 long start = System.currentTimeMillis(); 1135 ByteSequence data = journal.read(location); 1136 long end = System.currentTimeMillis(); 1137 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 1138 if (LOG.isInfoEnabled()) { 1139 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); 1140 } 1141 } 1142 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 1143 byte readByte = is.readByte(); 1144 KahaEntryType type = KahaEntryType.valueOf(readByte); 1145 if( type == null ) { 1146 try { 1147 is.close(); 1148 } catch (IOException e) {} 1149 throw new IOException("Could not load journal record, null type information from: " + readByte + " at location: "+location); 1150 } 1151 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 1152 message.mergeFramed(is); 1153 return message; 1154 } 1155 1156 /** 1157 * do minimal recovery till we reach the last inDoubtLocation 1158 * @param data 1159 * @param location 1160 * @param inDoubtlocation 1161 * @throws IOException 1162 */ 1163 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 1164 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 1165 process(data, location, (IndexAware) null); 1166 } else { 1167 // just recover producer audit 1168 data.visit(new Visitor() { 1169 @Override 1170 public void visit(KahaAddMessageCommand command) throws IOException { 1171 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1172 } 1173 }); 1174 } 1175 } 1176 1177 // ///////////////////////////////////////////////////////////////// 1178 // Journaled record processing methods. Once the record is journaled, 1179 // these methods handle applying the index updates. These may be called 1180 // from the recovery method too so they need to be idempotent 1181 // ///////////////////////////////////////////////////////////////// 1182 1183 void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException { 1184 data.visit(new Visitor() { 1185 @Override 1186 public void visit(KahaAddMessageCommand command) throws IOException { 1187 process(command, location, onSequenceAssignedCallback); 1188 } 1189 1190 @Override 1191 public void visit(KahaRemoveMessageCommand command) throws IOException { 1192 process(command, location); 1193 } 1194 1195 @Override 1196 public void visit(KahaPrepareCommand command) throws IOException { 1197 process(command, location); 1198 } 1199 1200 @Override 1201 public void visit(KahaCommitCommand command) throws IOException { 1202 process(command, location, onSequenceAssignedCallback); 1203 } 1204 1205 @Override 1206 public void visit(KahaRollbackCommand command) throws IOException { 1207 process(command, location); 1208 } 1209 1210 @Override 1211 public void visit(KahaRemoveDestinationCommand command) throws IOException { 1212 process(command, location); 1213 } 1214 1215 @Override 1216 public void visit(KahaSubscriptionCommand command) throws IOException { 1217 process(command, location); 1218 } 1219 1220 @Override 1221 public void visit(KahaProducerAuditCommand command) throws IOException { 1222 processLocation(location); 1223 } 1224 1225 @Override 1226 public void visit(KahaAckMessageFileMapCommand command) throws IOException { 1227 processLocation(location); 1228 } 1229 1230 @Override 1231 public void visit(KahaTraceCommand command) { 1232 processLocation(location); 1233 } 1234 1235 @Override 1236 public void visit(KahaUpdateMessageCommand command) throws IOException { 1237 process(command, location); 1238 } 1239 1240 @Override 1241 public void visit(KahaRewrittenDataFileCommand command) throws IOException { 1242 process(command, location); 1243 } 1244 }); 1245 } 1246 1247 @SuppressWarnings("rawtypes") 1248 protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { 1249 if (command.hasTransactionInfo()) { 1250 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1251 inflightTx.add(new AddOperation(command, location, runWithIndexLock)); 1252 } else { 1253 this.indexLock.writeLock().lock(); 1254 try { 1255 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1256 @Override 1257 public void execute(Transaction tx) throws IOException { 1258 long assignedIndex = updateIndex(tx, command, location); 1259 if (runWithIndexLock != null) { 1260 runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); 1261 } 1262 } 1263 }); 1264 1265 } finally { 1266 this.indexLock.writeLock().unlock(); 1267 } 1268 } 1269 } 1270 1271 protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { 1272 this.indexLock.writeLock().lock(); 1273 try { 1274 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1275 @Override 1276 public void execute(Transaction tx) throws IOException { 1277 updateIndex(tx, command, location); 1278 } 1279 }); 1280 } finally { 1281 this.indexLock.writeLock().unlock(); 1282 } 1283 } 1284 1285 @SuppressWarnings("rawtypes") 1286 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { 1287 if (command.hasTransactionInfo()) { 1288 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1289 inflightTx.add(new RemoveOperation(command, location)); 1290 } else { 1291 this.indexLock.writeLock().lock(); 1292 try { 1293 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1294 @Override 1295 public void execute(Transaction tx) throws IOException { 1296 updateIndex(tx, command, location); 1297 } 1298 }); 1299 } finally { 1300 this.indexLock.writeLock().unlock(); 1301 } 1302 } 1303 } 1304 1305 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { 1306 this.indexLock.writeLock().lock(); 1307 try { 1308 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1309 @Override 1310 public void execute(Transaction tx) throws IOException { 1311 updateIndex(tx, command, location); 1312 } 1313 }); 1314 } finally { 1315 this.indexLock.writeLock().unlock(); 1316 } 1317 } 1318 1319 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { 1320 this.indexLock.writeLock().lock(); 1321 try { 1322 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1323 @Override 1324 public void execute(Transaction tx) throws IOException { 1325 updateIndex(tx, command, location); 1326 } 1327 }); 1328 } finally { 1329 this.indexLock.writeLock().unlock(); 1330 } 1331 } 1332 1333 protected void processLocation(final Location location) { 1334 this.indexLock.writeLock().lock(); 1335 try { 1336 metadata.lastUpdate = location; 1337 } finally { 1338 this.indexLock.writeLock().unlock(); 1339 } 1340 } 1341 1342 @SuppressWarnings("rawtypes") 1343 protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { 1344 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1345 List<Operation> inflightTx; 1346 synchronized (inflightTransactions) { 1347 inflightTx = inflightTransactions.remove(key); 1348 if (inflightTx == null) { 1349 inflightTx = preparedTransactions.remove(key); 1350 } 1351 } 1352 if (inflightTx == null) { 1353 // only non persistent messages in this tx 1354 if (before != null) { 1355 before.sequenceAssignedWithIndexLocked(-1); 1356 } 1357 return; 1358 } 1359 1360 final List<Operation> messagingTx = inflightTx; 1361 indexLock.writeLock().lock(); 1362 try { 1363 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1364 @Override 1365 public void execute(Transaction tx) throws IOException { 1366 for (Operation op : messagingTx) { 1367 op.execute(tx); 1368 recordAckMessageReferenceLocation(location, op.getLocation()); 1369 } 1370 } 1371 }); 1372 metadata.lastUpdate = location; 1373 } finally { 1374 indexLock.writeLock().unlock(); 1375 } 1376 } 1377 1378 @SuppressWarnings("rawtypes") 1379 protected void process(KahaPrepareCommand command, Location location) { 1380 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1381 List<Operation> tx = null; 1382 synchronized (inflightTransactions) { 1383 tx = inflightTransactions.remove(key); 1384 if (tx != null) { 1385 preparedTransactions.put(key, tx); 1386 } 1387 } 1388 if (tx != null && !tx.isEmpty()) { 1389 indexLock.writeLock().lock(); 1390 try { 1391 for (Operation op : tx) { 1392 recordAckMessageReferenceLocation(location, op.getLocation()); 1393 } 1394 } finally { 1395 indexLock.writeLock().unlock(); 1396 } 1397 } 1398 } 1399 1400 @SuppressWarnings("rawtypes") 1401 protected void process(KahaRollbackCommand command, Location location) throws IOException { 1402 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1403 List<Operation> updates = null; 1404 synchronized (inflightTransactions) { 1405 updates = inflightTransactions.remove(key); 1406 if (updates == null) { 1407 updates = preparedTransactions.remove(key); 1408 } 1409 } 1410 if (key.isXATransaction() && updates != null && !updates.isEmpty()) { 1411 indexLock.writeLock().lock(); 1412 try { 1413 for (Operation op : updates) { 1414 recordAckMessageReferenceLocation(location, op.getLocation()); 1415 } 1416 } finally { 1417 indexLock.writeLock().unlock(); 1418 } 1419 } 1420 } 1421 1422 protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException { 1423 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1424 1425 // Mark the current journal file as a compacted file so that gc checks can skip 1426 // over logs that are smaller compaction type logs. 1427 DataFile current = journal.getDataFileById(location.getDataFileId()); 1428 current.setTypeCode(command.getRewriteType()); 1429 1430 if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) { 1431 // Move offset so that next location read jumps to next file. 1432 location.setOffset(journalMaxFileLength); 1433 } 1434 } 1435 1436 // ///////////////////////////////////////////////////////////////// 1437 // These methods do the actual index updates. 1438 // ///////////////////////////////////////////////////////////////// 1439 1440 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 1441 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 1442 1443 long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { 1444 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1445 1446 // Skip adding the message to the index if this is a topic and there are 1447 // no subscriptions. 1448 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { 1449 return -1; 1450 } 1451 1452 // Add the message. 1453 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; 1454 long id = sd.orderIndex.getNextMessageId(priority); 1455 Long previous = sd.locationIndex.put(tx, location, id); 1456 if (previous == null) { 1457 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 1458 if (previous == null) { 1459 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); 1460 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { 1461 addAckLocationForNewMessage(tx, sd, id); 1462 } 1463 metadata.lastUpdate = location; 1464 } else { 1465 1466 MessageKeys messageKeys = sd.orderIndex.get(tx, previous); 1467 if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { 1468 // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt 1469 LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); 1470 } 1471 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 1472 sd.locationIndex.remove(tx, location); 1473 id = -1; 1474 } 1475 } else { 1476 // restore the previous value.. Looks like this was a redo of a previously 1477 // added message. We don't want to assign it a new id as the other indexes would 1478 // be wrong.. 1479 sd.locationIndex.put(tx, location, previous); 1480 metadata.lastUpdate = location; 1481 } 1482 // record this id in any event, initial send or recovery 1483 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1484 return id; 1485 } 1486 1487 void trackPendingAdd(KahaDestination destination, Long seq) { 1488 StoredDestination sd = storedDestinations.get(key(destination)); 1489 if (sd != null) { 1490 sd.trackPendingAdd(seq); 1491 } 1492 } 1493 1494 void trackPendingAddComplete(KahaDestination destination, Long seq) { 1495 StoredDestination sd = storedDestinations.get(key(destination)); 1496 if (sd != null) { 1497 sd.trackPendingAddComplete(seq); 1498 } 1499 } 1500 1501 void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException { 1502 KahaAddMessageCommand command = updateMessageCommand.getMessage(); 1503 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1504 1505 Long id = sd.messageIdIndex.get(tx, command.getMessageId()); 1506 if (id != null) { 1507 MessageKeys previousKeys = sd.orderIndex.put( 1508 tx, 1509 command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY, 1510 id, 1511 new MessageKeys(command.getMessageId(), location) 1512 ); 1513 sd.locationIndex.put(tx, location, id); 1514 // on first update previous is original location, on recovery/replay it may be the updated location 1515 if(previousKeys != null && !previousKeys.location.equals(location)) { 1516 sd.locationIndex.remove(tx, previousKeys.location); 1517 } 1518 metadata.lastUpdate = location; 1519 } else { 1520 //Add the message if it can't be found 1521 this.updateIndex(tx, command, location); 1522 } 1523 } 1524 1525 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { 1526 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1527 if (!command.hasSubscriptionKey()) { 1528 1529 // In the queue case we just remove the message from the index.. 1530 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 1531 if (sequenceId != null) { 1532 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 1533 if (keys != null) { 1534 sd.locationIndex.remove(tx, keys.location); 1535 recordAckMessageReferenceLocation(ackLocation, keys.location); 1536 metadata.lastUpdate = ackLocation; 1537 } else if (LOG.isDebugEnabled()) { 1538 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); 1539 } 1540 } else if (LOG.isDebugEnabled()) { 1541 LOG.debug("message not found in sequence id index: " + command.getMessageId()); 1542 } 1543 } else { 1544 // In the topic case we need remove the message once it's been acked 1545 // by all the subs 1546 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 1547 1548 // Make sure it's a valid message id... 1549 if (sequence != null) { 1550 String subscriptionKey = command.getSubscriptionKey(); 1551 if (command.getAck() != UNMATCHED) { 1552 sd.orderIndex.get(tx, sequence); 1553 byte priority = sd.orderIndex.lastGetPriority(); 1554 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); 1555 } 1556 1557 MessageKeys keys = sd.orderIndex.get(tx, sequence); 1558 if (keys != null) { 1559 recordAckMessageReferenceLocation(ackLocation, keys.location); 1560 } 1561 // The following method handles deleting un-referenced messages. 1562 removeAckLocation(tx, sd, subscriptionKey, sequence); 1563 metadata.lastUpdate = ackLocation; 1564 } else if (LOG.isDebugEnabled()) { 1565 LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); 1566 } 1567 1568 } 1569 } 1570 1571 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { 1572 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); 1573 if (referenceFileIds == null) { 1574 referenceFileIds = new HashSet<Integer>(); 1575 referenceFileIds.add(messageLocation.getDataFileId()); 1576 metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); 1577 } else { 1578 Integer id = Integer.valueOf(messageLocation.getDataFileId()); 1579 if (!referenceFileIds.contains(id)) { 1580 referenceFileIds.add(id); 1581 } 1582 } 1583 } 1584 1585 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { 1586 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1587 sd.orderIndex.remove(tx); 1588 1589 sd.locationIndex.clear(tx); 1590 sd.locationIndex.unload(tx); 1591 tx.free(sd.locationIndex.getPageId()); 1592 1593 sd.messageIdIndex.clear(tx); 1594 sd.messageIdIndex.unload(tx); 1595 tx.free(sd.messageIdIndex.getPageId()); 1596 1597 if (sd.subscriptions != null) { 1598 sd.subscriptions.clear(tx); 1599 sd.subscriptions.unload(tx); 1600 tx.free(sd.subscriptions.getPageId()); 1601 1602 sd.subscriptionAcks.clear(tx); 1603 sd.subscriptionAcks.unload(tx); 1604 tx.free(sd.subscriptionAcks.getPageId()); 1605 1606 sd.ackPositions.clear(tx); 1607 sd.ackPositions.unload(tx); 1608 tx.free(sd.ackPositions.getHeadPageId()); 1609 1610 sd.subLocations.clear(tx); 1611 sd.subLocations.unload(tx); 1612 tx.free(sd.subLocations.getHeadPageId()); 1613 } 1614 1615 String key = key(command.getDestination()); 1616 storedDestinations.remove(key); 1617 metadata.destinations.remove(tx, key); 1618 storeCache.remove(key(command.getDestination())); 1619 } 1620 1621 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { 1622 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1623 final String subscriptionKey = command.getSubscriptionKey(); 1624 1625 // If set then we are creating it.. otherwise we are destroying the sub 1626 if (command.hasSubscriptionInfo()) { 1627 Location existing = sd.subLocations.get(tx, subscriptionKey); 1628 if (existing != null && existing.compareTo(location) == 0) { 1629 // replay on recovery, ignore 1630 LOG.trace("ignoring journal replay of replay of sub from: " + location); 1631 return; 1632 } 1633 1634 sd.subscriptions.put(tx, subscriptionKey, command); 1635 sd.subLocations.put(tx, subscriptionKey, location); 1636 long ackLocation=NOT_ACKED; 1637 if (!command.getRetroactive()) { 1638 ackLocation = sd.orderIndex.nextMessageId-1; 1639 } else { 1640 addAckLocationForRetroactiveSub(tx, sd, subscriptionKey); 1641 } 1642 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); 1643 sd.subscriptionCache.add(subscriptionKey); 1644 } else { 1645 // delete the sub... 1646 sd.subscriptions.remove(tx, subscriptionKey); 1647 sd.subLocations.remove(tx, subscriptionKey); 1648 sd.subscriptionAcks.remove(tx, subscriptionKey); 1649 sd.subscriptionCache.remove(subscriptionKey); 1650 removeAckLocationsForSub(tx, sd, subscriptionKey); 1651 1652 if (sd.subscriptions.isEmpty(tx)) { 1653 // remove the stored destination 1654 KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand(); 1655 removeDestinationCommand.setDestination(command.getDestination()); 1656 updateIndex(tx, removeDestinationCommand, null); 1657 } 1658 } 1659 } 1660 1661 private void checkpointUpdate(final boolean cleanup) throws IOException { 1662 checkpointLock.writeLock().lock(); 1663 try { 1664 this.indexLock.writeLock().lock(); 1665 try { 1666 Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>, IOException>() { 1667 @Override 1668 public Set<Integer> execute(Transaction tx) throws IOException { 1669 return checkpointUpdate(tx, cleanup); 1670 } 1671 }); 1672 pageFile.flush(); 1673 // after the index update such that partial removal does not leave dangling references in the index. 1674 journal.removeDataFiles(filesToGc); 1675 } finally { 1676 this.indexLock.writeLock().unlock(); 1677 } 1678 1679 } finally { 1680 checkpointLock.writeLock().unlock(); 1681 } 1682 } 1683 1684 /** 1685 * @param tx 1686 * @throws IOException 1687 */ 1688 Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 1689 MDC.put("activemq.persistenceDir", getDirectory().getName()); 1690 LOG.debug("Checkpoint started."); 1691 1692 // reflect last update exclusive of current checkpoint 1693 Location lastUpdate = metadata.lastUpdate; 1694 1695 metadata.state = OPEN_STATE; 1696 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); 1697 metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap(); 1698 Location[] inProgressTxRange = getInProgressTxLocationRange(); 1699 metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; 1700 tx.store(metadata.page, metadataMarshaller, true); 1701 1702 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(); 1703 if (cleanup) { 1704 1705 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1706 gcCandidateSet.addAll(completeFileSet); 1707 1708 if (LOG.isTraceEnabled()) { 1709 LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); 1710 } 1711 1712 if (lastUpdate != null) { 1713 // we won't delete past the last update, ackCompaction journal can be a candidate in error 1714 gcCandidateSet.removeAll(new TreeSet<Integer>(gcCandidateSet.tailSet(lastUpdate.getDataFileId()))); 1715 } 1716 1717 // Don't GC files under replication 1718 if( journalFilesBeingReplicated!=null ) { 1719 gcCandidateSet.removeAll(journalFilesBeingReplicated); 1720 } 1721 1722 if (metadata.producerSequenceIdTrackerLocation != null) { 1723 int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId(); 1724 if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) { 1725 // rewrite so we don't prevent gc 1726 metadata.producerSequenceIdTracker.setModified(true); 1727 if (LOG.isTraceEnabled()) { 1728 LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation); 1729 } 1730 } 1731 gcCandidateSet.remove(dataFileId); 1732 if (LOG.isTraceEnabled()) { 1733 LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet); 1734 } 1735 } 1736 1737 if (metadata.ackMessageFileMapLocation != null) { 1738 int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); 1739 gcCandidateSet.remove(dataFileId); 1740 if (LOG.isTraceEnabled()) { 1741 LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet); 1742 } 1743 } 1744 1745 // Don't GC files referenced by in-progress tx 1746 if (inProgressTxRange[0] != null) { 1747 for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { 1748 gcCandidateSet.remove(pendingTx); 1749 } 1750 } 1751 if (LOG.isTraceEnabled()) { 1752 LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); 1753 } 1754 1755 // Go through all the destinations to see if any of them can remove GC candidates. 1756 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) { 1757 if( gcCandidateSet.isEmpty() ) { 1758 break; 1759 } 1760 1761 // Use a visitor to cut down the number of pages that we load 1762 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 1763 int last=-1; 1764 @Override 1765 public boolean isInterestedInKeysBetween(Location first, Location second) { 1766 if( first==null ) { 1767 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1); 1768 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1769 subset.remove(second.getDataFileId()); 1770 } 1771 return !subset.isEmpty(); 1772 } else if( second==null ) { 1773 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId()); 1774 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1775 subset.remove(first.getDataFileId()); 1776 } 1777 return !subset.isEmpty(); 1778 } else { 1779 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); 1780 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1781 subset.remove(first.getDataFileId()); 1782 } 1783 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1784 subset.remove(second.getDataFileId()); 1785 } 1786 return !subset.isEmpty(); 1787 } 1788 } 1789 1790 @Override 1791 public void visit(List<Location> keys, List<Long> values) { 1792 for (Location l : keys) { 1793 int fileId = l.getDataFileId(); 1794 if( last != fileId ) { 1795 gcCandidateSet.remove(fileId); 1796 last = fileId; 1797 } 1798 } 1799 } 1800 }); 1801 1802 // Durable Subscription 1803 if (entry.getValue().subLocations != null) { 1804 Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx); 1805 while (iter.hasNext()) { 1806 Entry<String, Location> subscription = iter.next(); 1807 int dataFileId = subscription.getValue().getDataFileId(); 1808 1809 // Move subscription along if it has no outstanding messages that need ack'd 1810 // and its in the last log file in the journal. 1811 if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) { 1812 final StoredDestination destination = entry.getValue(); 1813 final String subscriptionKey = subscription.getKey(); 1814 SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey); 1815 1816 // When pending is size one that is the next message Id meaning there 1817 // are no pending messages currently. 1818 if (pendingAcks == null || pendingAcks.isEmpty() || 1819 (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) { 1820 1821 if (LOG.isTraceEnabled()) { 1822 LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId); 1823 } 1824 1825 final KahaSubscriptionCommand kahaSub = 1826 destination.subscriptions.get(tx, subscriptionKey); 1827 destination.subLocations.put( 1828 tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub)); 1829 1830 // Skips the remove from candidates if we rewrote the subscription 1831 // in order to prevent duplicate subscription commands on recover. 1832 // If another subscription is on the same file and isn't rewritten 1833 // than it will remove the file from the set. 1834 continue; 1835 } 1836 } 1837 1838 gcCandidateSet.remove(dataFileId); 1839 } 1840 } 1841 1842 if (LOG.isTraceEnabled()) { 1843 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); 1844 } 1845 } 1846 1847 // check we are not deleting file with ack for in-use journal files 1848 if (LOG.isTraceEnabled()) { 1849 LOG.trace("gc candidates: " + gcCandidateSet); 1850 LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap); 1851 } 1852 1853 boolean ackMessageFileMapMod = false; 1854 Iterator<Integer> candidates = gcCandidateSet.iterator(); 1855 while (candidates.hasNext()) { 1856 Integer candidate = candidates.next(); 1857 Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate); 1858 if (referencedFileIds != null) { 1859 for (Integer referencedFileId : referencedFileIds) { 1860 if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) { 1861 // active file that is not targeted for deletion is referenced so don't delete 1862 candidates.remove(); 1863 break; 1864 } 1865 } 1866 if (gcCandidateSet.contains(candidate)) { 1867 ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null); 1868 } else { 1869 if (LOG.isTraceEnabled()) { 1870 LOG.trace("not removing data file: " + candidate 1871 + " as contained ack(s) refer to referenced file: " + referencedFileIds); 1872 } 1873 } 1874 } 1875 } 1876 1877 if (!gcCandidateSet.isEmpty()) { 1878 LOG.debug("Cleanup removing the data files: {}", gcCandidateSet); 1879 for (Integer candidate : gcCandidateSet) { 1880 for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) { 1881 ackMessageFileMapMod |= ackFiles.remove(candidate); 1882 } 1883 } 1884 if (ackMessageFileMapMod) { 1885 checkpointUpdate(tx, false); 1886 } 1887 } else if (isEnableAckCompaction()) { 1888 if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) { 1889 // First check length of journal to make sure it makes sense to even try. 1890 // 1891 // If there is only one journal file with Acks in it we don't need to move 1892 // it since it won't be chained to any later logs. 1893 // 1894 // If the logs haven't grown since the last time then we need to compact 1895 // otherwise there seems to still be room for growth and we don't need to incur 1896 // the overhead. Depending on configuration this check can be avoided and 1897 // Ack compaction will run any time the store has not GC'd a journal file in 1898 // the configured amount of cycles. 1899 if (metadata.ackMessageFileMap.size() > 1 && 1900 (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) { 1901 1902 LOG.trace("No files GC'd checking if threshold to ACK compaction has been met."); 1903 try { 1904 scheduler.execute(new AckCompactionRunner()); 1905 } catch (Exception ex) { 1906 LOG.warn("Error on queueing the Ack Compactor", ex); 1907 } 1908 } else { 1909 LOG.trace("Journal activity detected, no Ack compaction scheduled."); 1910 } 1911 1912 checkPointCyclesWithNoGC = 0; 1913 } else { 1914 LOG.trace("Not yet time to check for compaction: {} of {} cycles", 1915 checkPointCyclesWithNoGC, getCompactAcksAfterNoGC()); 1916 } 1917 1918 journalLogOnLastCompactionCheck = journal.getCurrentDataFileId(); 1919 } 1920 } 1921 MDC.remove("activemq.persistenceDir"); 1922 1923 LOG.debug("Checkpoint done."); 1924 return gcCandidateSet; 1925 } 1926 1927 private final class AckCompactionRunner implements Runnable { 1928 1929 @Override 1930 public void run() { 1931 1932 int journalToAdvance = -1; 1933 Set<Integer> journalLogsReferenced = new HashSet<Integer>(); 1934 1935 //flag to know whether the ack forwarding completed without an exception 1936 boolean forwarded = false; 1937 1938 try { 1939 //acquire the checkpoint lock to prevent other threads from 1940 //running a checkpoint while this is running 1941 // 1942 //Normally this task runs on the same executor as the checkpoint task 1943 //so this ack compaction runner wouldn't run at the same time as the checkpoint task. 1944 // 1945 //However, there are two cases where this isn't always true. 1946 //First, the checkpoint() method is public and can be called through the 1947 //PersistenceAdapter interface by someone at the same time this is running. 1948 //Second, a checkpoint is called during shutdown without using the executor. 1949 // 1950 //In the future it might be better to just remove the checkpointLock entirely 1951 //and only use the executor but this would need to be examined for any unintended 1952 //consequences 1953 checkpointLock.readLock().lock(); 1954 1955 try { 1956 1957 // Lock index to capture the ackMessageFileMap data 1958 indexLock.writeLock().lock(); 1959 1960 // Map keys might not be sorted, find the earliest log file to forward acks 1961 // from and move only those, future cycles can chip away at more as needed. 1962 // We won't move files that are themselves rewritten on a previous compaction. 1963 List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet()); 1964 Collections.sort(journalFileIds); 1965 for (Integer journalFileId : journalFileIds) { 1966 DataFile current = journal.getDataFileById(journalFileId); 1967 if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { 1968 journalToAdvance = journalFileId; 1969 break; 1970 } 1971 } 1972 1973 // Check if we found one, or if we only found the current file being written to. 1974 if (journalToAdvance == -1 || blockedFromCompaction(journalToAdvance)) { 1975 return; 1976 } 1977 1978 journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); 1979 1980 } finally { 1981 indexLock.writeLock().unlock(); 1982 } 1983 1984 try { 1985 // Background rewrite of the old acks 1986 forwardAllAcks(journalToAdvance, journalLogsReferenced); 1987 forwarded = true; 1988 } catch (IOException ioe) { 1989 LOG.error("Forwarding of acks failed", ioe); 1990 brokerService.handleIOException(ioe); 1991 } catch (Throwable e) { 1992 LOG.error("Forwarding of acks failed", e); 1993 brokerService.handleIOException(IOExceptionSupport.create(e)); 1994 } 1995 } finally { 1996 checkpointLock.readLock().unlock(); 1997 } 1998 1999 try { 2000 if (forwarded) { 2001 // Checkpoint with changes from the ackMessageFileMap 2002 checkpointUpdate(false); 2003 } 2004 } catch (IOException ioe) { 2005 LOG.error("Checkpoint failed", ioe); 2006 brokerService.handleIOException(ioe); 2007 } catch (Throwable e) { 2008 LOG.error("Checkpoint failed", e); 2009 brokerService.handleIOException(IOExceptionSupport.create(e)); 2010 } 2011 } 2012 } 2013 2014 // called with the index lock held 2015 private boolean blockedFromCompaction(int journalToAdvance) { 2016 // don't forward the current data file 2017 if (journalToAdvance == journal.getCurrentDataFileId()) { 2018 return true; 2019 } 2020 // don't forward any data file with inflight transaction records because it will whack the tx - data file link 2021 // in the ack map when all acks are migrated (now that the ack map is not just for acks) 2022 // TODO: prepare records can be dropped but completion records (maybe only commit outcomes) need to be migrated 2023 // as part of the forward work. 2024 Location[] inProgressTxRange = getInProgressTxLocationRange(); 2025 if (inProgressTxRange[0] != null) { 2026 for (int pendingTx = inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { 2027 if (journalToAdvance == pendingTx) { 2028 LOG.trace("Compaction target:{} blocked by inflight transaction records: {}", journalToAdvance, inProgressTxRange); 2029 return true; 2030 } 2031 } 2032 } 2033 return false; 2034 } 2035 2036 private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException { 2037 LOG.trace("Attempting to move all acks in journal:{} to the front. Referenced files:{}", journalToRead, journalLogsReferenced); 2038 2039 DataFile forwardsFile = journal.reserveDataFile(); 2040 forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE); 2041 LOG.trace("Reserved file for forwarded acks: {}", forwardsFile); 2042 2043 Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>(); 2044 2045 try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) { 2046 KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand(); 2047 compactionMarker.setSourceDataFileId(journalToRead); 2048 compactionMarker.setRewriteType(forwardsFile.getTypeCode()); 2049 2050 ByteSequence payload = toByteSequence(compactionMarker); 2051 appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); 2052 LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead); 2053 2054 final Location limit = new Location(journalToRead + 1, 0); 2055 Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0), limit); 2056 while (nextLocation != null) { 2057 JournalCommand<?> command = null; 2058 try { 2059 command = load(nextLocation); 2060 } catch (IOException ex) { 2061 LOG.trace("Error loading command during ack forward: {}", nextLocation); 2062 } 2063 2064 if (shouldForward(command)) { 2065 payload = toByteSequence(command); 2066 Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); 2067 updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced); 2068 } 2069 2070 nextLocation = getNextLocationForAckForward(nextLocation, limit); 2071 } 2072 } 2073 2074 LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations); 2075 2076 // Lock index while we update the ackMessageFileMap. 2077 indexLock.writeLock().lock(); 2078 2079 // Update the ack map with the new locations of the acks 2080 for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) { 2081 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); 2082 if (referenceFileIds == null) { 2083 referenceFileIds = new HashSet<Integer>(); 2084 referenceFileIds.addAll(entry.getValue()); 2085 metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); 2086 } else { 2087 referenceFileIds.addAll(entry.getValue()); 2088 } 2089 } 2090 2091 // remove the old location data from the ack map so that the old journal log file can 2092 // be removed on next GC. 2093 metadata.ackMessageFileMap.remove(journalToRead); 2094 2095 indexLock.writeLock().unlock(); 2096 2097 LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); 2098 } 2099 2100 private boolean shouldForward(JournalCommand<?> command) { 2101 boolean result = false; 2102 if (command != null) { 2103 if (command instanceof KahaRemoveMessageCommand) { 2104 result = true; 2105 } else if (command instanceof KahaCommitCommand) { 2106 KahaCommitCommand kahaCommitCommand = (KahaCommitCommand) command; 2107 if (kahaCommitCommand.hasTransactionInfo() && kahaCommitCommand.getTransactionInfo().hasXaTransactionId()) { 2108 result = true; 2109 } 2110 } 2111 } 2112 return result; 2113 } 2114 2115 private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) { 2116 //getNextLocation() can throw an IOException, we should handle it and set 2117 //nextLocation to null and abort gracefully 2118 //Should not happen in the normal case 2119 Location location = null; 2120 try { 2121 location = journal.getNextLocation(nextLocation, limit); 2122 } catch (IOException e) { 2123 LOG.warn("Failed to load next journal location after: {}, reason: {}", nextLocation, e); 2124 if (LOG.isDebugEnabled()) { 2125 LOG.debug("Failed to load next journal location after: {}", nextLocation, e); 2126 } 2127 } 2128 return location; 2129 } 2130 2131 final Runnable nullCompletionCallback = new Runnable() { 2132 @Override 2133 public void run() { 2134 } 2135 }; 2136 2137 private Location checkpointProducerAudit() throws IOException { 2138 if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { 2139 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2140 ObjectOutputStream oout = new ObjectOutputStream(baos); 2141 oout.writeObject(metadata.producerSequenceIdTracker); 2142 oout.flush(); 2143 oout.close(); 2144 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2145 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); 2146 try { 2147 location.getLatch().await(); 2148 if (location.getException().get() != null) { 2149 throw location.getException().get(); 2150 } 2151 } catch (InterruptedException e) { 2152 throw new InterruptedIOException(e.toString()); 2153 } 2154 return location; 2155 } 2156 return metadata.producerSequenceIdTrackerLocation; 2157 } 2158 2159 private Location checkpointAckMessageFileMap() throws IOException { 2160 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2161 ObjectOutputStream oout = new ObjectOutputStream(baos); 2162 oout.writeObject(metadata.ackMessageFileMap); 2163 oout.flush(); 2164 oout.close(); 2165 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2166 Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback); 2167 try { 2168 location.getLatch().await(); 2169 } catch (InterruptedException e) { 2170 throw new InterruptedIOException(e.toString()); 2171 } 2172 return location; 2173 } 2174 2175 private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException { 2176 2177 ByteSequence sequence = toByteSequence(subscription); 2178 Location location = journal.write(sequence, nullCompletionCallback) ; 2179 2180 try { 2181 location.getLatch().await(); 2182 } catch (InterruptedException e) { 2183 throw new InterruptedIOException(e.toString()); 2184 } 2185 return location; 2186 } 2187 2188 public HashSet<Integer> getJournalFilesBeingReplicated() { 2189 return journalFilesBeingReplicated; 2190 } 2191 2192 // ///////////////////////////////////////////////////////////////// 2193 // StoredDestination related implementation methods. 2194 // ///////////////////////////////////////////////////////////////// 2195 2196 protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 2197 2198 static class MessageKeys { 2199 final String messageId; 2200 final Location location; 2201 2202 public MessageKeys(String messageId, Location location) { 2203 this.messageId=messageId; 2204 this.location=location; 2205 } 2206 2207 @Override 2208 public String toString() { 2209 return "["+messageId+","+location+"]"; 2210 } 2211 } 2212 2213 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> { 2214 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); 2215 2216 @Override 2217 public MessageKeys readPayload(DataInput dataIn) throws IOException { 2218 return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn)); 2219 } 2220 2221 @Override 2222 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { 2223 dataOut.writeUTF(object.messageId); 2224 LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); 2225 } 2226 } 2227 2228 class LastAck { 2229 long lastAckedSequence; 2230 byte priority; 2231 2232 public LastAck(LastAck source) { 2233 this.lastAckedSequence = source.lastAckedSequence; 2234 this.priority = source.priority; 2235 } 2236 2237 public LastAck() { 2238 this.priority = MessageOrderIndex.HI; 2239 } 2240 2241 public LastAck(long ackLocation) { 2242 this.lastAckedSequence = ackLocation; 2243 this.priority = MessageOrderIndex.LO; 2244 } 2245 2246 public LastAck(long ackLocation, byte priority) { 2247 this.lastAckedSequence = ackLocation; 2248 this.priority = priority; 2249 } 2250 2251 @Override 2252 public String toString() { 2253 return "[" + lastAckedSequence + ":" + priority + "]"; 2254 } 2255 } 2256 2257 protected class LastAckMarshaller implements Marshaller<LastAck> { 2258 2259 @Override 2260 public void writePayload(LastAck object, DataOutput dataOut) throws IOException { 2261 dataOut.writeLong(object.lastAckedSequence); 2262 dataOut.writeByte(object.priority); 2263 } 2264 2265 @Override 2266 public LastAck readPayload(DataInput dataIn) throws IOException { 2267 LastAck lastAcked = new LastAck(); 2268 lastAcked.lastAckedSequence = dataIn.readLong(); 2269 if (metadata.version >= 3) { 2270 lastAcked.priority = dataIn.readByte(); 2271 } 2272 return lastAcked; 2273 } 2274 2275 @Override 2276 public int getFixedSize() { 2277 return 9; 2278 } 2279 2280 @Override 2281 public LastAck deepCopy(LastAck source) { 2282 return new LastAck(source); 2283 } 2284 2285 @Override 2286 public boolean isDeepCopySupported() { 2287 return true; 2288 } 2289 } 2290 2291 class StoredDestination { 2292 2293 MessageOrderIndex orderIndex = new MessageOrderIndex(); 2294 BTreeIndex<Location, Long> locationIndex; 2295 BTreeIndex<String, Long> messageIdIndex; 2296 2297 // These bits are only set for Topics 2298 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 2299 BTreeIndex<String, LastAck> subscriptionAcks; 2300 HashMap<String, MessageOrderCursor> subscriptionCursors; 2301 ListIndex<String, SequenceSet> ackPositions; 2302 ListIndex<String, Location> subLocations; 2303 2304 // Transient data used to track which Messages are no longer needed. 2305 final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>(); 2306 final HashSet<String> subscriptionCache = new LinkedHashSet<String>(); 2307 2308 public void trackPendingAdd(Long seq) { 2309 orderIndex.trackPendingAdd(seq); 2310 } 2311 2312 public void trackPendingAddComplete(Long seq) { 2313 orderIndex.trackPendingAddComplete(seq); 2314 } 2315 2316 @Override 2317 public String toString() { 2318 return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size(); 2319 } 2320 } 2321 2322 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 2323 2324 @Override 2325 public StoredDestination readPayload(final DataInput dataIn) throws IOException { 2326 final StoredDestination value = new StoredDestination(); 2327 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2328 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong()); 2329 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 2330 2331 if (dataIn.readBoolean()) { 2332 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 2333 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong()); 2334 if (metadata.version >= 4) { 2335 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong()); 2336 } else { 2337 // upgrade 2338 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2339 @Override 2340 public void execute(Transaction tx) throws IOException { 2341 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>(); 2342 2343 if (metadata.version >= 3) { 2344 // migrate 2345 BTreeIndex<Long, HashSet<String>> oldAckPositions = 2346 new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong()); 2347 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); 2348 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); 2349 oldAckPositions.load(tx); 2350 2351 2352 // Do the initial build of the data in memory before writing into the store 2353 // based Ack Positions List to avoid a lot of disk thrashing. 2354 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx); 2355 while (iterator.hasNext()) { 2356 Entry<Long, HashSet<String>> entry = iterator.next(); 2357 2358 for(String subKey : entry.getValue()) { 2359 SequenceSet pendingAcks = temp.get(subKey); 2360 if (pendingAcks == null) { 2361 pendingAcks = new SequenceSet(); 2362 temp.put(subKey, pendingAcks); 2363 } 2364 2365 pendingAcks.add(entry.getKey()); 2366 } 2367 } 2368 } 2369 // Now move the pending messages to ack data into the store backed 2370 // structure. 2371 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2372 value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2373 value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2374 value.ackPositions.load(tx); 2375 for(String subscriptionKey : temp.keySet()) { 2376 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); 2377 } 2378 2379 } 2380 }); 2381 } 2382 2383 if (metadata.version >= 5) { 2384 value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong()); 2385 } else { 2386 // upgrade 2387 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2388 @Override 2389 public void execute(Transaction tx) throws IOException { 2390 value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2391 value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2392 value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2393 value.subLocations.load(tx); 2394 } 2395 }); 2396 } 2397 } 2398 if (metadata.version >= 2) { 2399 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2400 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2401 } else { 2402 // upgrade 2403 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2404 @Override 2405 public void execute(Transaction tx) throws IOException { 2406 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2407 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2408 value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2409 value.orderIndex.lowPriorityIndex.load(tx); 2410 2411 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2412 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2413 value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2414 value.orderIndex.highPriorityIndex.load(tx); 2415 } 2416 }); 2417 } 2418 2419 return value; 2420 } 2421 2422 @Override 2423 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 2424 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId()); 2425 dataOut.writeLong(value.locationIndex.getPageId()); 2426 dataOut.writeLong(value.messageIdIndex.getPageId()); 2427 if (value.subscriptions != null) { 2428 dataOut.writeBoolean(true); 2429 dataOut.writeLong(value.subscriptions.getPageId()); 2430 dataOut.writeLong(value.subscriptionAcks.getPageId()); 2431 dataOut.writeLong(value.ackPositions.getHeadPageId()); 2432 dataOut.writeLong(value.subLocations.getHeadPageId()); 2433 } else { 2434 dataOut.writeBoolean(false); 2435 } 2436 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); 2437 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); 2438 } 2439 } 2440 2441 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 2442 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 2443 2444 @Override 2445 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 2446 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 2447 rc.mergeFramed((InputStream)dataIn); 2448 return rc; 2449 } 2450 2451 @Override 2452 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 2453 object.writeFramed((OutputStream)dataOut); 2454 } 2455 } 2456 2457 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2458 String key = key(destination); 2459 StoredDestination rc = storedDestinations.get(key); 2460 if (rc == null) { 2461 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 2462 rc = loadStoredDestination(tx, key, topic); 2463 // Cache it. We may want to remove/unload destinations from the 2464 // cache that are not used for a while 2465 // to reduce memory usage. 2466 storedDestinations.put(key, rc); 2467 } 2468 return rc; 2469 } 2470 2471 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2472 String key = key(destination); 2473 StoredDestination rc = storedDestinations.get(key); 2474 if (rc == null && metadata.destinations.containsKey(tx, key)) { 2475 rc = getStoredDestination(destination, tx); 2476 } 2477 return rc; 2478 } 2479 2480 /** 2481 * @param tx 2482 * @param key 2483 * @param topic 2484 * @return 2485 * @throws IOException 2486 */ 2487 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 2488 // Try to load the existing indexes.. 2489 StoredDestination rc = metadata.destinations.get(tx, key); 2490 if (rc == null) { 2491 // Brand new destination.. allocate indexes for it. 2492 rc = new StoredDestination(); 2493 rc.orderIndex.allocate(tx); 2494 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate()); 2495 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 2496 2497 if (topic) { 2498 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 2499 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate()); 2500 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2501 rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2502 } 2503 metadata.destinations.put(tx, key, rc); 2504 } 2505 2506 // Configure the marshalers and load. 2507 rc.orderIndex.load(tx); 2508 2509 // Figure out the next key using the last entry in the destination. 2510 rc.orderIndex.configureLast(tx); 2511 2512 rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE); 2513 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2514 rc.locationIndex.load(tx); 2515 2516 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 2517 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2518 rc.messageIdIndex.load(tx); 2519 2520 // If it was a topic... 2521 if (topic) { 2522 2523 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 2524 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 2525 rc.subscriptions.load(tx); 2526 2527 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 2528 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); 2529 rc.subscriptionAcks.load(tx); 2530 2531 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2532 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2533 rc.ackPositions.load(tx); 2534 2535 rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2536 rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2537 rc.subLocations.load(tx); 2538 2539 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>(); 2540 2541 if (metadata.version < 3) { 2542 2543 // on upgrade need to fill ackLocation with available messages past last ack 2544 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2545 Entry<String, LastAck> entry = iterator.next(); 2546 for (Iterator<Entry<Long, MessageKeys>> orderIterator = 2547 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { 2548 Long sequence = orderIterator.next().getKey(); 2549 addAckLocation(tx, rc, sequence, entry.getKey()); 2550 } 2551 // modify so it is upgraded 2552 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue()); 2553 } 2554 } 2555 2556 // Configure the message references index 2557 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx); 2558 while (subscriptions.hasNext()) { 2559 Entry<String, SequenceSet> subscription = subscriptions.next(); 2560 SequenceSet pendingAcks = subscription.getValue(); 2561 if (pendingAcks != null && !pendingAcks.isEmpty()) { 2562 Long lastPendingAck = pendingAcks.getTail().getLast(); 2563 for(Long sequenceId : pendingAcks) { 2564 Long current = rc.messageReferences.get(sequenceId); 2565 if (current == null) { 2566 current = new Long(0); 2567 } 2568 2569 // We always add a trailing empty entry for the next position to start from 2570 // so we need to ensure we don't count that as a message reference on reload. 2571 if (!sequenceId.equals(lastPendingAck)) { 2572 current = current.longValue() + 1; 2573 } 2574 2575 rc.messageReferences.put(sequenceId, current); 2576 } 2577 } 2578 } 2579 2580 // Configure the subscription cache 2581 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2582 Entry<String, LastAck> entry = iterator.next(); 2583 rc.subscriptionCache.add(entry.getKey()); 2584 } 2585 2586 if (rc.orderIndex.nextMessageId == 0) { 2587 // check for existing durable sub all acked out - pull next seq from acks as messages are gone 2588 if (!rc.subscriptionAcks.isEmpty(tx)) { 2589 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 2590 Entry<String, LastAck> entry = iterator.next(); 2591 rc.orderIndex.nextMessageId = 2592 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); 2593 } 2594 } 2595 } else { 2596 // update based on ackPositions for unmatched, last entry is always the next 2597 if (!rc.messageReferences.isEmpty()) { 2598 Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1]; 2599 rc.orderIndex.nextMessageId = 2600 Math.max(rc.orderIndex.nextMessageId, nextMessageId); 2601 } 2602 } 2603 } 2604 2605 if (metadata.version < VERSION) { 2606 // store again after upgrade 2607 metadata.destinations.put(tx, key, rc); 2608 } 2609 return rc; 2610 } 2611 2612 /** 2613 * This is a map to cache MessageStores for a specific 2614 * KahaDestination key 2615 */ 2616 protected final ConcurrentMap<String, MessageStore> storeCache = 2617 new ConcurrentHashMap<>(); 2618 2619 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 2620 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2621 if (sequences == null) { 2622 sequences = new SequenceSet(); 2623 sequences.add(messageSequence); 2624 sd.ackPositions.add(tx, subscriptionKey, sequences); 2625 } else { 2626 sequences.add(messageSequence); 2627 sd.ackPositions.put(tx, subscriptionKey, sequences); 2628 } 2629 2630 Long count = sd.messageReferences.get(messageSequence); 2631 if (count == null) { 2632 count = Long.valueOf(0L); 2633 } 2634 count = count.longValue() + 1; 2635 sd.messageReferences.put(messageSequence, count); 2636 } 2637 2638 // new sub is interested in potentially all existing messages 2639 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2640 SequenceSet allOutstanding = new SequenceSet(); 2641 Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx); 2642 while (iterator.hasNext()) { 2643 SequenceSet set = iterator.next().getValue(); 2644 for (Long entry : set) { 2645 allOutstanding.add(entry); 2646 } 2647 } 2648 sd.ackPositions.put(tx, subscriptionKey, allOutstanding); 2649 2650 for (Long ackPosition : allOutstanding) { 2651 Long count = sd.messageReferences.get(ackPosition); 2652 count = count.longValue() + 1; 2653 sd.messageReferences.put(ackPosition, count); 2654 } 2655 } 2656 2657 // on a new message add, all existing subs are interested in this message 2658 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { 2659 for(String subscriptionKey : sd.subscriptionCache) { 2660 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2661 if (sequences == null) { 2662 sequences = new SequenceSet(); 2663 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2664 sd.ackPositions.add(tx, subscriptionKey, sequences); 2665 } else { 2666 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2667 sd.ackPositions.put(tx, subscriptionKey, sequences); 2668 } 2669 2670 Long count = sd.messageReferences.get(messageSequence); 2671 if (count == null) { 2672 count = Long.valueOf(0L); 2673 } 2674 count = count.longValue() + 1; 2675 sd.messageReferences.put(messageSequence, count); 2676 sd.messageReferences.put(messageSequence+1, Long.valueOf(0L)); 2677 } 2678 } 2679 2680 private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2681 if (!sd.ackPositions.isEmpty(tx)) { 2682 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey); 2683 if (sequences == null || sequences.isEmpty()) { 2684 return; 2685 } 2686 2687 ArrayList<Long> unreferenced = new ArrayList<Long>(); 2688 2689 for(Long sequenceId : sequences) { 2690 Long references = sd.messageReferences.get(sequenceId); 2691 if (references != null) { 2692 references = references.longValue() - 1; 2693 2694 if (references.longValue() > 0) { 2695 sd.messageReferences.put(sequenceId, references); 2696 } else { 2697 sd.messageReferences.remove(sequenceId); 2698 unreferenced.add(sequenceId); 2699 } 2700 } 2701 } 2702 2703 for(Long sequenceId : unreferenced) { 2704 // Find all the entries that need to get deleted. 2705 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2706 sd.orderIndex.getDeleteList(tx, deletes, sequenceId); 2707 2708 // Do the actual deletes. 2709 for (Entry<Long, MessageKeys> entry : deletes) { 2710 sd.locationIndex.remove(tx, entry.getValue().location); 2711 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2712 sd.orderIndex.remove(tx, entry.getKey()); 2713 } 2714 } 2715 } 2716 } 2717 2718 /** 2719 * @param tx 2720 * @param sd 2721 * @param subscriptionKey 2722 * @param messageSequence 2723 * @throws IOException 2724 */ 2725 private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException { 2726 // Remove the sub from the previous location set.. 2727 if (messageSequence != null) { 2728 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); 2729 if (range != null && !range.isEmpty()) { 2730 range.remove(messageSequence); 2731 if (!range.isEmpty()) { 2732 sd.ackPositions.put(tx, subscriptionKey, range); 2733 } else { 2734 sd.ackPositions.remove(tx, subscriptionKey); 2735 } 2736 2737 // Check if the message is reference by any other subscription. 2738 Long count = sd.messageReferences.get(messageSequence); 2739 if (count != null){ 2740 long references = count.longValue() - 1; 2741 if (references > 0) { 2742 sd.messageReferences.put(messageSequence, Long.valueOf(references)); 2743 return; 2744 } else { 2745 sd.messageReferences.remove(messageSequence); 2746 } 2747 } 2748 2749 // Find all the entries that need to get deleted. 2750 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2751 sd.orderIndex.getDeleteList(tx, deletes, messageSequence); 2752 2753 // Do the actual deletes. 2754 for (Entry<Long, MessageKeys> entry : deletes) { 2755 sd.locationIndex.remove(tx, entry.getValue().location); 2756 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2757 sd.orderIndex.remove(tx, entry.getKey()); 2758 } 2759 } 2760 } 2761 } 2762 2763 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2764 return sd.subscriptionAcks.get(tx, subscriptionKey); 2765 } 2766 2767 public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2768 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2769 if (messageSequences != null) { 2770 long result = messageSequences.rangeSize(); 2771 // if there's anything in the range the last value is always the nextMessage marker, so remove 1. 2772 return result > 0 ? result - 1 : 0; 2773 } 2774 2775 return 0; 2776 } 2777 2778 protected String key(KahaDestination destination) { 2779 return destination.getType().getNumber() + ":" + destination.getName(); 2780 } 2781 2782 // ///////////////////////////////////////////////////////////////// 2783 // Transaction related implementation methods. 2784 // ///////////////////////////////////////////////////////////////// 2785 @SuppressWarnings("rawtypes") 2786 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2787 @SuppressWarnings("rawtypes") 2788 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2789 2790 @SuppressWarnings("rawtypes") 2791 private List<Operation> getInflightTx(KahaTransactionInfo info) { 2792 TransactionId key = TransactionIdConversion.convert(info); 2793 List<Operation> tx; 2794 synchronized (inflightTransactions) { 2795 tx = inflightTransactions.get(key); 2796 if (tx == null) { 2797 tx = Collections.synchronizedList(new ArrayList<Operation>()); 2798 inflightTransactions.put(key, tx); 2799 } 2800 } 2801 return tx; 2802 } 2803 2804 @SuppressWarnings("unused") 2805 private TransactionId key(KahaTransactionInfo transactionInfo) { 2806 return TransactionIdConversion.convert(transactionInfo); 2807 } 2808 2809 abstract class Operation <T extends JournalCommand<T>> { 2810 final T command; 2811 final Location location; 2812 2813 public Operation(T command, Location location) { 2814 this.command = command; 2815 this.location = location; 2816 } 2817 2818 public Location getLocation() { 2819 return location; 2820 } 2821 2822 public T getCommand() { 2823 return command; 2824 } 2825 2826 abstract public void execute(Transaction tx) throws IOException; 2827 } 2828 2829 class AddOperation extends Operation<KahaAddMessageCommand> { 2830 final IndexAware runWithIndexLock; 2831 public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) { 2832 super(command, location); 2833 this.runWithIndexLock = runWithIndexLock; 2834 } 2835 2836 @Override 2837 public void execute(Transaction tx) throws IOException { 2838 long seq = updateIndex(tx, command, location); 2839 if (runWithIndexLock != null) { 2840 runWithIndexLock.sequenceAssignedWithIndexLocked(seq); 2841 } 2842 } 2843 } 2844 2845 class RemoveOperation extends Operation<KahaRemoveMessageCommand> { 2846 2847 public RemoveOperation(KahaRemoveMessageCommand command, Location location) { 2848 super(command, location); 2849 } 2850 2851 @Override 2852 public void execute(Transaction tx) throws IOException { 2853 updateIndex(tx, command, location); 2854 } 2855 } 2856 2857 // ///////////////////////////////////////////////////////////////// 2858 // Initialization related implementation methods. 2859 // ///////////////////////////////////////////////////////////////// 2860 2861 private PageFile createPageFile() throws IOException { 2862 if (indexDirectory == null) { 2863 indexDirectory = directory; 2864 } 2865 IOHelper.mkdirs(indexDirectory); 2866 PageFile index = new PageFile(indexDirectory, "db"); 2867 index.setEnableWriteThread(isEnableIndexWriteAsync()); 2868 index.setWriteBatchSize(getIndexWriteBatchSize()); 2869 index.setPageCacheSize(indexCacheSize); 2870 index.setUseLFRUEviction(isUseIndexLFRUEviction()); 2871 index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); 2872 index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); 2873 index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); 2874 index.setEnablePageCaching(isEnableIndexPageCaching()); 2875 return index; 2876 } 2877 2878 protected Journal createJournal() throws IOException { 2879 Journal manager = new Journal(); 2880 manager.setDirectory(directory); 2881 manager.setMaxFileLength(getJournalMaxFileLength()); 2882 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); 2883 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); 2884 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 2885 manager.setArchiveDataLogs(isArchiveDataLogs()); 2886 manager.setSizeAccumulator(journalSize); 2887 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); 2888 manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase())); 2889 manager.setPreallocationStrategy( 2890 Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase())); 2891 manager.setJournalDiskSyncStrategy(journalDiskSyncStrategy); 2892 if (getDirectoryArchive() != null) { 2893 IOHelper.mkdirs(getDirectoryArchive()); 2894 manager.setDirectoryArchive(getDirectoryArchive()); 2895 } 2896 return manager; 2897 } 2898 2899 private Metadata createMetadata() { 2900 Metadata md = new Metadata(); 2901 md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth()); 2902 md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack()); 2903 return md; 2904 } 2905 2906 public int getJournalMaxWriteBatchSize() { 2907 return journalMaxWriteBatchSize; 2908 } 2909 2910 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 2911 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 2912 } 2913 2914 public File getDirectory() { 2915 return directory; 2916 } 2917 2918 public void setDirectory(File directory) { 2919 this.directory = directory; 2920 } 2921 2922 public boolean isDeleteAllMessages() { 2923 return deleteAllMessages; 2924 } 2925 2926 public void setDeleteAllMessages(boolean deleteAllMessages) { 2927 this.deleteAllMessages = deleteAllMessages; 2928 } 2929 2930 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 2931 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 2932 } 2933 2934 public int getIndexWriteBatchSize() { 2935 return setIndexWriteBatchSize; 2936 } 2937 2938 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 2939 this.enableIndexWriteAsync = enableIndexWriteAsync; 2940 } 2941 2942 boolean isEnableIndexWriteAsync() { 2943 return enableIndexWriteAsync; 2944 } 2945 2946 /** 2947 * @deprecated use {@link #getJournalDiskSyncStrategyEnum} or {@link #getJournalDiskSyncStrategy} instead 2948 * @return 2949 */ 2950 public boolean isEnableJournalDiskSyncs() { 2951 return journalDiskSyncStrategy == JournalDiskSyncStrategy.ALWAYS; 2952 } 2953 2954 /** 2955 * @deprecated use {@link #setEnableJournalDiskSyncs} instead 2956 * @param syncWrites 2957 */ 2958 public void setEnableJournalDiskSyncs(boolean syncWrites) { 2959 if (syncWrites) { 2960 journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 2961 } else { 2962 journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER; 2963 } 2964 } 2965 2966 public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() { 2967 return journalDiskSyncStrategy; 2968 } 2969 2970 public String getJournalDiskSyncStrategy() { 2971 return journalDiskSyncStrategy.name(); 2972 } 2973 2974 public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) { 2975 this.journalDiskSyncStrategy = JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase()); 2976 } 2977 2978 public long getJournalDiskSyncInterval() { 2979 return journalDiskSyncInterval; 2980 } 2981 2982 public void setJournalDiskSyncInterval(long journalDiskSyncInterval) { 2983 this.journalDiskSyncInterval = journalDiskSyncInterval; 2984 } 2985 2986 public long getCheckpointInterval() { 2987 return checkpointInterval; 2988 } 2989 2990 public void setCheckpointInterval(long checkpointInterval) { 2991 this.checkpointInterval = checkpointInterval; 2992 } 2993 2994 public long getCleanupInterval() { 2995 return cleanupInterval; 2996 } 2997 2998 public void setCleanupInterval(long cleanupInterval) { 2999 this.cleanupInterval = cleanupInterval; 3000 } 3001 3002 public boolean getCleanupOnStop() { 3003 return cleanupOnStop; 3004 } 3005 3006 public void setCleanupOnStop(boolean cleanupOnStop) { 3007 this.cleanupOnStop = cleanupOnStop; 3008 } 3009 3010 public void setJournalMaxFileLength(int journalMaxFileLength) { 3011 this.journalMaxFileLength = journalMaxFileLength; 3012 } 3013 3014 public int getJournalMaxFileLength() { 3015 return journalMaxFileLength; 3016 } 3017 3018 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 3019 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); 3020 } 3021 3022 public int getMaxFailoverProducersToTrack() { 3023 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); 3024 } 3025 3026 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 3027 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); 3028 } 3029 3030 public int getFailoverProducersAuditDepth() { 3031 return this.metadata.producerSequenceIdTracker.getAuditDepth(); 3032 } 3033 3034 public PageFile getPageFile() throws IOException { 3035 if (pageFile == null) { 3036 pageFile = createPageFile(); 3037 } 3038 return pageFile; 3039 } 3040 3041 public Journal getJournal() throws IOException { 3042 if (journal == null) { 3043 journal = createJournal(); 3044 } 3045 return journal; 3046 } 3047 3048 protected Metadata getMetadata() { 3049 return metadata; 3050 } 3051 3052 public boolean isFailIfDatabaseIsLocked() { 3053 return failIfDatabaseIsLocked; 3054 } 3055 3056 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 3057 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 3058 } 3059 3060 public boolean isIgnoreMissingJournalfiles() { 3061 return ignoreMissingJournalfiles; 3062 } 3063 3064 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 3065 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 3066 } 3067 3068 public int getIndexCacheSize() { 3069 return indexCacheSize; 3070 } 3071 3072 public void setIndexCacheSize(int indexCacheSize) { 3073 this.indexCacheSize = indexCacheSize; 3074 } 3075 3076 public boolean isCheckForCorruptJournalFiles() { 3077 return checkForCorruptJournalFiles; 3078 } 3079 3080 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 3081 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 3082 } 3083 3084 public boolean isChecksumJournalFiles() { 3085 return checksumJournalFiles; 3086 } 3087 3088 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 3089 this.checksumJournalFiles = checksumJournalFiles; 3090 } 3091 3092 @Override 3093 public void setBrokerService(BrokerService brokerService) { 3094 this.brokerService = brokerService; 3095 } 3096 3097 /** 3098 * @return the archiveDataLogs 3099 */ 3100 public boolean isArchiveDataLogs() { 3101 return this.archiveDataLogs; 3102 } 3103 3104 /** 3105 * @param archiveDataLogs the archiveDataLogs to set 3106 */ 3107 public void setArchiveDataLogs(boolean archiveDataLogs) { 3108 this.archiveDataLogs = archiveDataLogs; 3109 } 3110 3111 /** 3112 * @return the directoryArchive 3113 */ 3114 public File getDirectoryArchive() { 3115 return this.directoryArchive; 3116 } 3117 3118 /** 3119 * @param directoryArchive the directoryArchive to set 3120 */ 3121 public void setDirectoryArchive(File directoryArchive) { 3122 this.directoryArchive = directoryArchive; 3123 } 3124 3125 public boolean isArchiveCorruptedIndex() { 3126 return archiveCorruptedIndex; 3127 } 3128 3129 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 3130 this.archiveCorruptedIndex = archiveCorruptedIndex; 3131 } 3132 3133 public float getIndexLFUEvictionFactor() { 3134 return indexLFUEvictionFactor; 3135 } 3136 3137 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 3138 this.indexLFUEvictionFactor = indexLFUEvictionFactor; 3139 } 3140 3141 public boolean isUseIndexLFRUEviction() { 3142 return useIndexLFRUEviction; 3143 } 3144 3145 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 3146 this.useIndexLFRUEviction = useIndexLFRUEviction; 3147 } 3148 3149 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { 3150 this.enableIndexDiskSyncs = enableIndexDiskSyncs; 3151 } 3152 3153 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { 3154 this.enableIndexRecoveryFile = enableIndexRecoveryFile; 3155 } 3156 3157 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { 3158 this.enableIndexPageCaching = enableIndexPageCaching; 3159 } 3160 3161 public boolean isEnableIndexDiskSyncs() { 3162 return enableIndexDiskSyncs; 3163 } 3164 3165 public boolean isEnableIndexRecoveryFile() { 3166 return enableIndexRecoveryFile; 3167 } 3168 3169 public boolean isEnableIndexPageCaching() { 3170 return enableIndexPageCaching; 3171 } 3172 3173 // ///////////////////////////////////////////////////////////////// 3174 // Internal conversion methods. 3175 // ///////////////////////////////////////////////////////////////// 3176 3177 class MessageOrderCursor{ 3178 long defaultCursorPosition; 3179 long lowPriorityCursorPosition; 3180 long highPriorityCursorPosition; 3181 MessageOrderCursor(){ 3182 } 3183 3184 MessageOrderCursor(long position){ 3185 this.defaultCursorPosition=position; 3186 this.lowPriorityCursorPosition=position; 3187 this.highPriorityCursorPosition=position; 3188 } 3189 3190 MessageOrderCursor(MessageOrderCursor other){ 3191 this.defaultCursorPosition=other.defaultCursorPosition; 3192 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3193 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3194 } 3195 3196 MessageOrderCursor copy() { 3197 return new MessageOrderCursor(this); 3198 } 3199 3200 void reset() { 3201 this.defaultCursorPosition=0; 3202 this.highPriorityCursorPosition=0; 3203 this.lowPriorityCursorPosition=0; 3204 } 3205 3206 void increment() { 3207 if (defaultCursorPosition!=0) { 3208 defaultCursorPosition++; 3209 } 3210 if (highPriorityCursorPosition!=0) { 3211 highPriorityCursorPosition++; 3212 } 3213 if (lowPriorityCursorPosition!=0) { 3214 lowPriorityCursorPosition++; 3215 } 3216 } 3217 3218 @Override 3219 public String toString() { 3220 return "MessageOrderCursor:[def:" + defaultCursorPosition 3221 + ", low:" + lowPriorityCursorPosition 3222 + ", high:" + highPriorityCursorPosition + "]"; 3223 } 3224 3225 public void sync(MessageOrderCursor other) { 3226 this.defaultCursorPosition=other.defaultCursorPosition; 3227 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3228 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3229 } 3230 } 3231 3232 class MessageOrderIndex { 3233 static final byte HI = 9; 3234 static final byte LO = 0; 3235 static final byte DEF = 4; 3236 3237 long nextMessageId; 3238 BTreeIndex<Long, MessageKeys> defaultPriorityIndex; 3239 BTreeIndex<Long, MessageKeys> lowPriorityIndex; 3240 BTreeIndex<Long, MessageKeys> highPriorityIndex; 3241 final MessageOrderCursor cursor = new MessageOrderCursor(); 3242 Long lastDefaultKey; 3243 Long lastHighKey; 3244 Long lastLowKey; 3245 byte lastGetPriority; 3246 final List<Long> pendingAdditions = new LinkedList<Long>(); 3247 3248 MessageKeys remove(Transaction tx, Long key) throws IOException { 3249 MessageKeys result = defaultPriorityIndex.remove(tx, key); 3250 if (result == null && highPriorityIndex!=null) { 3251 result = highPriorityIndex.remove(tx, key); 3252 if (result ==null && lowPriorityIndex!=null) { 3253 result = lowPriorityIndex.remove(tx, key); 3254 } 3255 } 3256 return result; 3257 } 3258 3259 void load(Transaction tx) throws IOException { 3260 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3261 defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 3262 defaultPriorityIndex.load(tx); 3263 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3264 lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 3265 lowPriorityIndex.load(tx); 3266 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3267 highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 3268 highPriorityIndex.load(tx); 3269 } 3270 3271 void allocate(Transaction tx) throws IOException { 3272 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3273 if (metadata.version >= 2) { 3274 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3275 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3276 } 3277 } 3278 3279 void configureLast(Transaction tx) throws IOException { 3280 // Figure out the next key using the last entry in the destination. 3281 TreeSet<Long> orderedSet = new TreeSet<Long>(); 3282 3283 addLast(orderedSet, highPriorityIndex, tx); 3284 addLast(orderedSet, defaultPriorityIndex, tx); 3285 addLast(orderedSet, lowPriorityIndex, tx); 3286 3287 if (!orderedSet.isEmpty()) { 3288 nextMessageId = orderedSet.last() + 1; 3289 } 3290 } 3291 3292 private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException { 3293 if (index != null) { 3294 Entry<Long, MessageKeys> lastEntry = index.getLast(tx); 3295 if (lastEntry != null) { 3296 orderedSet.add(lastEntry.getKey()); 3297 } 3298 } 3299 } 3300 3301 void clear(Transaction tx) throws IOException { 3302 this.remove(tx); 3303 this.resetCursorPosition(); 3304 this.allocate(tx); 3305 this.load(tx); 3306 this.configureLast(tx); 3307 } 3308 3309 void remove(Transaction tx) throws IOException { 3310 defaultPriorityIndex.clear(tx); 3311 defaultPriorityIndex.unload(tx); 3312 tx.free(defaultPriorityIndex.getPageId()); 3313 if (lowPriorityIndex != null) { 3314 lowPriorityIndex.clear(tx); 3315 lowPriorityIndex.unload(tx); 3316 3317 tx.free(lowPriorityIndex.getPageId()); 3318 } 3319 if (highPriorityIndex != null) { 3320 highPriorityIndex.clear(tx); 3321 highPriorityIndex.unload(tx); 3322 tx.free(highPriorityIndex.getPageId()); 3323 } 3324 } 3325 3326 void resetCursorPosition() { 3327 this.cursor.reset(); 3328 lastDefaultKey = null; 3329 lastHighKey = null; 3330 lastLowKey = null; 3331 } 3332 3333 void setBatch(Transaction tx, Long sequence) throws IOException { 3334 if (sequence != null) { 3335 Long nextPosition = new Long(sequence.longValue() + 1); 3336 lastDefaultKey = sequence; 3337 cursor.defaultCursorPosition = nextPosition.longValue(); 3338 lastHighKey = sequence; 3339 cursor.highPriorityCursorPosition = nextPosition.longValue(); 3340 lastLowKey = sequence; 3341 cursor.lowPriorityCursorPosition = nextPosition.longValue(); 3342 } 3343 } 3344 3345 void setBatch(Transaction tx, LastAck last) throws IOException { 3346 setBatch(tx, last.lastAckedSequence); 3347 if (cursor.defaultCursorPosition == 0 3348 && cursor.highPriorityCursorPosition == 0 3349 && cursor.lowPriorityCursorPosition == 0) { 3350 long next = last.lastAckedSequence + 1; 3351 switch (last.priority) { 3352 case DEF: 3353 cursor.defaultCursorPosition = next; 3354 cursor.highPriorityCursorPosition = next; 3355 break; 3356 case HI: 3357 cursor.highPriorityCursorPosition = next; 3358 break; 3359 case LO: 3360 cursor.lowPriorityCursorPosition = next; 3361 cursor.defaultCursorPosition = next; 3362 cursor.highPriorityCursorPosition = next; 3363 break; 3364 } 3365 } 3366 } 3367 3368 void stoppedIterating() { 3369 if (lastDefaultKey!=null) { 3370 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1; 3371 } 3372 if (lastHighKey!=null) { 3373 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1; 3374 } 3375 if (lastLowKey!=null) { 3376 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1; 3377 } 3378 lastDefaultKey = null; 3379 lastHighKey = null; 3380 lastLowKey = null; 3381 } 3382 3383 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId) 3384 throws IOException { 3385 if (defaultPriorityIndex.containsKey(tx, sequenceId)) { 3386 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); 3387 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) { 3388 getDeleteList(tx, deletes, highPriorityIndex, sequenceId); 3389 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) { 3390 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId); 3391 } 3392 } 3393 3394 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, 3395 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException { 3396 3397 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null); 3398 deletes.add(iterator.next()); 3399 } 3400 3401 long getNextMessageId(int priority) { 3402 return nextMessageId++; 3403 } 3404 3405 MessageKeys get(Transaction tx, Long key) throws IOException { 3406 MessageKeys result = defaultPriorityIndex.get(tx, key); 3407 if (result == null) { 3408 result = highPriorityIndex.get(tx, key); 3409 if (result == null) { 3410 result = lowPriorityIndex.get(tx, key); 3411 lastGetPriority = LO; 3412 } else { 3413 lastGetPriority = HI; 3414 } 3415 } else { 3416 lastGetPriority = DEF; 3417 } 3418 return result; 3419 } 3420 3421 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException { 3422 if (priority == javax.jms.Message.DEFAULT_PRIORITY) { 3423 return defaultPriorityIndex.put(tx, key, value); 3424 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) { 3425 return highPriorityIndex.put(tx, key, value); 3426 } else { 3427 return lowPriorityIndex.put(tx, key, value); 3428 } 3429 } 3430 3431 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{ 3432 return new MessageOrderIterator(tx,cursor,this); 3433 } 3434 3435 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ 3436 return new MessageOrderIterator(tx,m,this); 3437 } 3438 3439 public byte lastGetPriority() { 3440 return lastGetPriority; 3441 } 3442 3443 public boolean alreadyDispatched(Long sequence) { 3444 return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) || 3445 (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) || 3446 (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence); 3447 } 3448 3449 public void trackPendingAdd(Long seq) { 3450 synchronized (pendingAdditions) { 3451 pendingAdditions.add(seq); 3452 } 3453 } 3454 3455 public void trackPendingAddComplete(Long seq) { 3456 synchronized (pendingAdditions) { 3457 pendingAdditions.remove(seq); 3458 } 3459 } 3460 3461 public Long minPendingAdd() { 3462 synchronized (pendingAdditions) { 3463 if (!pendingAdditions.isEmpty()) { 3464 return pendingAdditions.get(0); 3465 } else { 3466 return null; 3467 } 3468 } 3469 } 3470 3471 3472 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ 3473 Iterator<Entry<Long, MessageKeys>>currentIterator; 3474 final Iterator<Entry<Long, MessageKeys>>highIterator; 3475 final Iterator<Entry<Long, MessageKeys>>defaultIterator; 3476 final Iterator<Entry<Long, MessageKeys>>lowIterator; 3477 3478 MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException { 3479 Long pendingAddLimiter = messageOrderIndex.minPendingAdd(); 3480 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter); 3481 if (highPriorityIndex != null) { 3482 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter); 3483 } else { 3484 this.highIterator = null; 3485 } 3486 if (lowPriorityIndex != null) { 3487 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter); 3488 } else { 3489 this.lowIterator = null; 3490 } 3491 } 3492 3493 @Override 3494 public boolean hasNext() { 3495 if (currentIterator == null) { 3496 if (highIterator != null) { 3497 if (highIterator.hasNext()) { 3498 currentIterator = highIterator; 3499 return currentIterator.hasNext(); 3500 } 3501 if (defaultIterator.hasNext()) { 3502 currentIterator = defaultIterator; 3503 return currentIterator.hasNext(); 3504 } 3505 if (lowIterator.hasNext()) { 3506 currentIterator = lowIterator; 3507 return currentIterator.hasNext(); 3508 } 3509 return false; 3510 } else { 3511 currentIterator = defaultIterator; 3512 return currentIterator.hasNext(); 3513 } 3514 } 3515 if (highIterator != null) { 3516 if (currentIterator.hasNext()) { 3517 return true; 3518 } 3519 if (currentIterator == highIterator) { 3520 if (defaultIterator.hasNext()) { 3521 currentIterator = defaultIterator; 3522 return currentIterator.hasNext(); 3523 } 3524 if (lowIterator.hasNext()) { 3525 currentIterator = lowIterator; 3526 return currentIterator.hasNext(); 3527 } 3528 return false; 3529 } 3530 3531 if (currentIterator == defaultIterator) { 3532 if (lowIterator.hasNext()) { 3533 currentIterator = lowIterator; 3534 return currentIterator.hasNext(); 3535 } 3536 return false; 3537 } 3538 } 3539 return currentIterator.hasNext(); 3540 } 3541 3542 @Override 3543 public Entry<Long, MessageKeys> next() { 3544 Entry<Long, MessageKeys> result = currentIterator.next(); 3545 if (result != null) { 3546 Long key = result.getKey(); 3547 if (highIterator != null) { 3548 if (currentIterator == defaultIterator) { 3549 lastDefaultKey = key; 3550 } else if (currentIterator == highIterator) { 3551 lastHighKey = key; 3552 } else { 3553 lastLowKey = key; 3554 } 3555 } else { 3556 lastDefaultKey = key; 3557 } 3558 } 3559 return result; 3560 } 3561 3562 @Override 3563 public void remove() { 3564 throw new UnsupportedOperationException(); 3565 } 3566 3567 } 3568 } 3569 3570 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> { 3571 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); 3572 3573 @Override 3574 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException { 3575 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 3576 ObjectOutputStream oout = new ObjectOutputStream(baos); 3577 oout.writeObject(object); 3578 oout.flush(); 3579 oout.close(); 3580 byte[] data = baos.toByteArray(); 3581 dataOut.writeInt(data.length); 3582 dataOut.write(data); 3583 } 3584 3585 @Override 3586 @SuppressWarnings("unchecked") 3587 public HashSet<String> readPayload(DataInput dataIn) throws IOException { 3588 int dataLen = dataIn.readInt(); 3589 byte[] data = new byte[dataLen]; 3590 dataIn.readFully(data); 3591 ByteArrayInputStream bais = new ByteArrayInputStream(data); 3592 ObjectInputStream oin = new ObjectInputStream(bais); 3593 try { 3594 return (HashSet<String>) oin.readObject(); 3595 } catch (ClassNotFoundException cfe) { 3596 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe); 3597 ioe.initCause(cfe); 3598 throw ioe; 3599 } 3600 } 3601 } 3602 3603 public File getIndexDirectory() { 3604 return indexDirectory; 3605 } 3606 3607 public void setIndexDirectory(File indexDirectory) { 3608 this.indexDirectory = indexDirectory; 3609 } 3610 3611 interface IndexAware { 3612 public void sequenceAssignedWithIndexLocked(long index); 3613 } 3614 3615 public String getPreallocationScope() { 3616 return preallocationScope; 3617 } 3618 3619 public void setPreallocationScope(String preallocationScope) { 3620 this.preallocationScope = preallocationScope; 3621 } 3622 3623 public String getPreallocationStrategy() { 3624 return preallocationStrategy; 3625 } 3626 3627 public void setPreallocationStrategy(String preallocationStrategy) { 3628 this.preallocationStrategy = preallocationStrategy; 3629 } 3630 3631 public int getCompactAcksAfterNoGC() { 3632 return compactAcksAfterNoGC; 3633 } 3634 3635 /** 3636 * Sets the number of GC cycles where no journal logs were removed before an attempt to 3637 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 3638 * <p> 3639 * A value of -1 will disable this feature. 3640 * 3641 * @param compactAcksAfterNoGC 3642 * Number of empty GC cycles before we rewrite old ACKS. 3643 */ 3644 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 3645 this.compactAcksAfterNoGC = compactAcksAfterNoGC; 3646 } 3647 3648 /** 3649 * Returns whether Ack compaction will ignore that the store is still growing 3650 * and run more often. 3651 * 3652 * @return the compactAcksIgnoresStoreGrowth current value. 3653 */ 3654 public boolean isCompactAcksIgnoresStoreGrowth() { 3655 return compactAcksIgnoresStoreGrowth; 3656 } 3657 3658 /** 3659 * Configure if Ack compaction will occur regardless of continued growth of the 3660 * journal logs meaning that the store has not run out of space yet. Because the 3661 * compaction operation can be costly this value is defaulted to off and the Ack 3662 * compaction is only done when it seems that the store cannot grow and larger. 3663 * 3664 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 3665 */ 3666 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 3667 this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth; 3668 } 3669 3670 /** 3671 * Returns whether Ack compaction is enabled 3672 * 3673 * @return enableAckCompaction 3674 */ 3675 public boolean isEnableAckCompaction() { 3676 return enableAckCompaction; 3677 } 3678 3679 /** 3680 * Configure if the Ack compaction task should be enabled to run 3681 * 3682 * @param enableAckCompaction 3683 */ 3684 public void setEnableAckCompaction(boolean enableAckCompaction) { 3685 this.enableAckCompaction = enableAckCompaction; 3686 } 3687}