001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.DataInput; 024import java.io.DataOutput; 025import java.io.EOFException; 026import java.io.File; 027import java.io.IOException; 028import java.io.InputStream; 029import java.io.InterruptedIOException; 030import java.io.ObjectInputStream; 031import java.io.ObjectOutputStream; 032import java.io.OutputStream; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.Collections; 037import java.util.Date; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.LinkedHashMap; 042import java.util.LinkedHashSet; 043import java.util.LinkedList; 044import java.util.List; 045import java.util.Map; 046import java.util.Map.Entry; 047import java.util.Set; 048import java.util.SortedSet; 049import java.util.TreeMap; 050import java.util.TreeSet; 051 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.ConcurrentMap; 054import java.util.concurrent.Executors; 055import java.util.concurrent.ScheduledExecutorService; 056import java.util.concurrent.ThreadFactory; 057import java.util.concurrent.TimeUnit; 058import java.util.concurrent.atomic.AtomicBoolean; 059import java.util.concurrent.atomic.AtomicLong; 060import java.util.concurrent.atomic.AtomicReference; 061import java.util.concurrent.locks.ReentrantReadWriteLock; 062 063import org.apache.activemq.ActiveMQMessageAuditNoSync; 064import org.apache.activemq.broker.BrokerService; 065import org.apache.activemq.broker.BrokerServiceAware; 066import org.apache.activemq.command.TransactionId; 067import org.apache.activemq.openwire.OpenWireFormat; 068import org.apache.activemq.protobuf.Buffer; 069import org.apache.activemq.store.MessageStore; 070import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; 071import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 072import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 073import org.apache.activemq.store.kahadb.data.KahaDestination; 074import org.apache.activemq.store.kahadb.data.KahaEntryType; 075import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 076import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; 077import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 078import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 079import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand; 080import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 081import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 082import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 083import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 084import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 085import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 086import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; 087import org.apache.activemq.store.kahadb.disk.index.ListIndex; 088import org.apache.activemq.store.kahadb.disk.journal.DataFile; 089import org.apache.activemq.store.kahadb.disk.journal.Journal; 090import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; 091import org.apache.activemq.store.kahadb.disk.journal.Location; 092import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender; 093import org.apache.activemq.store.kahadb.disk.page.Page; 094import org.apache.activemq.store.kahadb.disk.page.PageFile; 095import org.apache.activemq.store.kahadb.disk.page.Transaction; 096import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; 097import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; 098import org.apache.activemq.store.kahadb.disk.util.Marshaller; 099import org.apache.activemq.store.kahadb.disk.util.Sequence; 100import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 101import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 102import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 103import org.apache.activemq.util.ByteSequence; 104import org.apache.activemq.util.DataByteArrayInputStream; 105import org.apache.activemq.util.DataByteArrayOutputStream; 106import org.apache.activemq.util.IOExceptionSupport; 107import org.apache.activemq.util.IOHelper; 108import org.apache.activemq.util.ServiceStopper; 109import org.apache.activemq.util.ServiceSupport; 110import org.apache.activemq.util.ThreadPoolUtils; 111import org.slf4j.Logger; 112import org.slf4j.LoggerFactory; 113import org.slf4j.MDC; 114 115public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { 116 117 protected BrokerService brokerService; 118 119 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 120 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); 121 public static final File DEFAULT_DIRECTORY = new File("KahaDB"); 122 protected static final Buffer UNMATCHED; 123 static { 124 UNMATCHED = new Buffer(new byte[]{}); 125 } 126 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); 127 128 static final int CLOSED_STATE = 1; 129 static final int OPEN_STATE = 2; 130 static final long NOT_ACKED = -1; 131 132 static final int VERSION = 5; 133 134 static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1; 135 136 protected class Metadata { 137 protected Page<Metadata> page; 138 protected int state; 139 protected BTreeIndex<String, StoredDestination> destinations; 140 protected Location lastUpdate; 141 protected Location firstInProgressTransactionLocation; 142 protected Location producerSequenceIdTrackerLocation = null; 143 protected Location ackMessageFileMapLocation = null; 144 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); 145 protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>(); 146 protected int version = VERSION; 147 protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION; 148 149 public void read(DataInput is) throws IOException { 150 state = is.readInt(); 151 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong()); 152 if (is.readBoolean()) { 153 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); 154 } else { 155 lastUpdate = null; 156 } 157 if (is.readBoolean()) { 158 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); 159 } else { 160 firstInProgressTransactionLocation = null; 161 } 162 try { 163 if (is.readBoolean()) { 164 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); 165 } else { 166 producerSequenceIdTrackerLocation = null; 167 } 168 } catch (EOFException expectedOnUpgrade) { 169 } 170 try { 171 version = is.readInt(); 172 } catch (EOFException expectedOnUpgrade) { 173 version = 1; 174 } 175 if (version >= 5 && is.readBoolean()) { 176 ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is); 177 } else { 178 ackMessageFileMapLocation = null; 179 } 180 try { 181 openwireVersion = is.readInt(); 182 } catch (EOFException expectedOnUpgrade) { 183 openwireVersion = OpenWireFormat.DEFAULT_VERSION; 184 } 185 LOG.info("KahaDB is version " + version); 186 } 187 188 public void write(DataOutput os) throws IOException { 189 os.writeInt(state); 190 os.writeLong(destinations.getPageId()); 191 192 if (lastUpdate != null) { 193 os.writeBoolean(true); 194 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); 195 } else { 196 os.writeBoolean(false); 197 } 198 199 if (firstInProgressTransactionLocation != null) { 200 os.writeBoolean(true); 201 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); 202 } else { 203 os.writeBoolean(false); 204 } 205 206 if (producerSequenceIdTrackerLocation != null) { 207 os.writeBoolean(true); 208 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); 209 } else { 210 os.writeBoolean(false); 211 } 212 os.writeInt(VERSION); 213 if (ackMessageFileMapLocation != null) { 214 os.writeBoolean(true); 215 LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os); 216 } else { 217 os.writeBoolean(false); 218 } 219 os.writeInt(this.openwireVersion); 220 } 221 } 222 223 class MetadataMarshaller extends VariableMarshaller<Metadata> { 224 @Override 225 public Metadata readPayload(DataInput dataIn) throws IOException { 226 Metadata rc = createMetadata(); 227 rc.read(dataIn); 228 return rc; 229 } 230 231 @Override 232 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 233 object.write(dataOut); 234 } 235 } 236 237 protected PageFile pageFile; 238 protected Journal journal; 239 protected Metadata metadata = new Metadata(); 240 241 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); 242 243 protected boolean failIfDatabaseIsLocked; 244 245 protected boolean deleteAllMessages; 246 protected File directory = DEFAULT_DIRECTORY; 247 protected File indexDirectory = null; 248 protected ScheduledExecutorService scheduler; 249 private final Object schedulerLock = new Object(); 250 251 protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 252 protected boolean archiveDataLogs; 253 protected File directoryArchive; 254 protected AtomicLong journalSize = new AtomicLong(0); 255 long journalDiskSyncInterval = 1000; 256 long checkpointInterval = 5*1000; 257 long cleanupInterval = 30*1000; 258 boolean cleanupOnStop = true; 259 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 260 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 261 boolean enableIndexWriteAsync = false; 262 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 263 private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name(); 264 private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name(); 265 266 protected AtomicBoolean opened = new AtomicBoolean(); 267 private boolean ignoreMissingJournalfiles = false; 268 private int indexCacheSize = 10000; 269 private boolean checkForCorruptJournalFiles = false; 270 private boolean checksumJournalFiles = true; 271 protected boolean forceRecoverIndex = false; 272 273 private boolean archiveCorruptedIndex = false; 274 private boolean useIndexLFRUEviction = false; 275 private float indexLFUEvictionFactor = 0.2f; 276 private boolean enableIndexDiskSyncs = true; 277 private boolean enableIndexRecoveryFile = true; 278 private boolean enableIndexPageCaching = true; 279 ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); 280 281 private boolean enableAckCompaction = false; 282 private int compactAcksAfterNoGC = 10; 283 private boolean compactAcksIgnoresStoreGrowth = false; 284 private int checkPointCyclesWithNoGC; 285 private int journalLogOnLastCompactionCheck; 286 287 //only set when using JournalDiskSyncStrategy.PERIODIC 288 protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>(); 289 290 @Override 291 public void doStart() throws Exception { 292 load(); 293 } 294 295 @Override 296 public void doStop(ServiceStopper stopper) throws Exception { 297 unload(); 298 } 299 300 public void allowIOResumption() { 301 if (pageFile != null) { 302 pageFile.allowIOResumption(); 303 } 304 if (journal != null) { 305 journal.allowIOResumption(); 306 } 307 } 308 309 private void loadPageFile() throws IOException { 310 this.indexLock.writeLock().lock(); 311 try { 312 final PageFile pageFile = getPageFile(); 313 pageFile.load(); 314 pageFile.tx().execute(new Transaction.Closure<IOException>() { 315 @Override 316 public void execute(Transaction tx) throws IOException { 317 if (pageFile.getPageCount() == 0) { 318 // First time this is created.. Initialize the metadata 319 Page<Metadata> page = tx.allocate(); 320 assert page.getPageId() == 0; 321 page.set(metadata); 322 metadata.page = page; 323 metadata.state = CLOSED_STATE; 324 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 325 326 tx.store(metadata.page, metadataMarshaller, true); 327 } else { 328 Page<Metadata> page = tx.load(0, metadataMarshaller); 329 metadata = page.get(); 330 metadata.page = page; 331 } 332 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 333 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); 334 metadata.destinations.load(tx); 335 } 336 }); 337 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. 338 // Perhaps we should just keep an index of file 339 storedDestinations.clear(); 340 pageFile.tx().execute(new Transaction.Closure<IOException>() { 341 @Override 342 public void execute(Transaction tx) throws IOException { 343 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 344 Entry<String, StoredDestination> entry = iterator.next(); 345 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); 346 storedDestinations.put(entry.getKey(), sd); 347 348 if (checkForCorruptJournalFiles) { 349 // sanity check the index also 350 if (!entry.getValue().locationIndex.isEmpty(tx)) { 351 if (entry.getValue().orderIndex.nextMessageId <= 0) { 352 throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); 353 } 354 } 355 } 356 } 357 } 358 }); 359 pageFile.flush(); 360 } finally { 361 this.indexLock.writeLock().unlock(); 362 } 363 } 364 365 private void startCheckpoint() { 366 if (checkpointInterval == 0 && cleanupInterval == 0) { 367 LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart"); 368 return; 369 } 370 synchronized (schedulerLock) { 371 if (scheduler == null || scheduler.isShutdown()) { 372 scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { 373 374 @Override 375 public Thread newThread(Runnable r) { 376 Thread schedulerThread = new Thread(r); 377 378 schedulerThread.setName("ActiveMQ Journal Checkpoint Worker"); 379 schedulerThread.setDaemon(true); 380 381 return schedulerThread; 382 } 383 }); 384 385 // Short intervals for check-point and cleanups 386 long delay; 387 if (journal.isJournalDiskSyncPeriodic()) { 388 delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500); 389 } else { 390 delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); 391 } 392 393 scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS); 394 } 395 } 396 } 397 398 private final class CheckpointRunner implements Runnable { 399 400 private long lastCheckpoint = System.currentTimeMillis(); 401 private long lastCleanup = System.currentTimeMillis(); 402 private long lastSync = System.currentTimeMillis(); 403 private Location lastAsyncUpdate = null; 404 405 @Override 406 public void run() { 407 try { 408 // Decide on cleanup vs full checkpoint here. 409 if (opened.get()) { 410 long now = System.currentTimeMillis(); 411 if (journal.isJournalDiskSyncPeriodic() && 412 journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) { 413 Location currentUpdate = lastAsyncJournalUpdate.get(); 414 if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) { 415 lastAsyncUpdate = currentUpdate; 416 if (LOG.isTraceEnabled()) { 417 LOG.trace("Writing trace command to trigger journal sync"); 418 } 419 store(new KahaTraceCommand(), true, null, null); 420 } 421 lastSync = now; 422 } 423 if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) { 424 checkpointCleanup(true); 425 lastCleanup = now; 426 lastCheckpoint = now; 427 } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) { 428 checkpointCleanup(false); 429 lastCheckpoint = now; 430 } 431 } 432 } catch (IOException ioe) { 433 LOG.error("Checkpoint failed", ioe); 434 brokerService.handleIOException(ioe); 435 } catch (Throwable e) { 436 LOG.error("Checkpoint failed", e); 437 brokerService.handleIOException(IOExceptionSupport.create(e)); 438 } 439 } 440 } 441 442 public void open() throws IOException { 443 if( opened.compareAndSet(false, true) ) { 444 getJournal().start(); 445 try { 446 loadPageFile(); 447 } catch (Throwable t) { 448 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); 449 if (LOG.isDebugEnabled()) { 450 LOG.debug("Index load failure", t); 451 } 452 // try to recover index 453 try { 454 pageFile.unload(); 455 } catch (Exception ignore) {} 456 if (archiveCorruptedIndex) { 457 pageFile.archive(); 458 } else { 459 pageFile.delete(); 460 } 461 metadata = createMetadata(); 462 pageFile = null; 463 loadPageFile(); 464 } 465 recover(); 466 startCheckpoint(); 467 } 468 } 469 470 public void load() throws IOException { 471 this.indexLock.writeLock().lock(); 472 IOHelper.mkdirs(directory); 473 try { 474 if (deleteAllMessages) { 475 getJournal().setCheckForCorruptionOnStartup(false); 476 getJournal().start(); 477 getJournal().delete(); 478 getJournal().close(); 479 journal = null; 480 getPageFile().delete(); 481 LOG.info("Persistence store purged."); 482 deleteAllMessages = false; 483 } 484 485 open(); 486 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 487 } finally { 488 this.indexLock.writeLock().unlock(); 489 } 490 } 491 492 public void close() throws IOException, InterruptedException { 493 if( opened.compareAndSet(true, false)) { 494 checkpointLock.writeLock().lock(); 495 try { 496 if (metadata.page != null) { 497 checkpointUpdate(getCleanupOnStop()); 498 } 499 pageFile.unload(); 500 metadata = createMetadata(); 501 } finally { 502 checkpointLock.writeLock().unlock(); 503 } 504 journal.close(); 505 synchronized(schedulerLock) { 506 if (scheduler != null) { 507 ThreadPoolUtils.shutdownGraceful(scheduler, -1); 508 scheduler = null; 509 } 510 } 511 // clear the cache and journalSize on shutdown of the store 512 storeCache.clear(); 513 journalSize.set(0); 514 } 515 } 516 517 public void unload() throws IOException, InterruptedException { 518 this.indexLock.writeLock().lock(); 519 try { 520 if( pageFile != null && pageFile.isLoaded() ) { 521 metadata.state = CLOSED_STATE; 522 metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; 523 524 if (metadata.page != null) { 525 pageFile.tx().execute(new Transaction.Closure<IOException>() { 526 @Override 527 public void execute(Transaction tx) throws IOException { 528 tx.store(metadata.page, metadataMarshaller, true); 529 } 530 }); 531 } 532 } 533 } finally { 534 this.indexLock.writeLock().unlock(); 535 } 536 close(); 537 } 538 539 // public for testing 540 @SuppressWarnings("rawtypes") 541 public Location[] getInProgressTxLocationRange() { 542 Location[] range = new Location[]{null, null}; 543 synchronized (inflightTransactions) { 544 if (!inflightTransactions.isEmpty()) { 545 for (List<Operation> ops : inflightTransactions.values()) { 546 if (!ops.isEmpty()) { 547 trackMaxAndMin(range, ops); 548 } 549 } 550 } 551 if (!preparedTransactions.isEmpty()) { 552 for (List<Operation> ops : preparedTransactions.values()) { 553 if (!ops.isEmpty()) { 554 trackMaxAndMin(range, ops); 555 } 556 } 557 } 558 } 559 return range; 560 } 561 562 @SuppressWarnings("rawtypes") 563 private void trackMaxAndMin(Location[] range, List<Operation> ops) { 564 Location t = ops.get(0).getLocation(); 565 if (range[0] == null || t.compareTo(range[0]) <= 0) { 566 range[0] = t; 567 } 568 t = ops.get(ops.size() -1).getLocation(); 569 if (range[1] == null || t.compareTo(range[1]) >= 0) { 570 range[1] = t; 571 } 572 } 573 574 class TranInfo { 575 TransactionId id; 576 Location location; 577 578 class opCount { 579 int add; 580 int remove; 581 } 582 HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>(); 583 584 @SuppressWarnings("rawtypes") 585 public void track(Operation operation) { 586 if (location == null ) { 587 location = operation.getLocation(); 588 } 589 KahaDestination destination; 590 boolean isAdd = false; 591 if (operation instanceof AddOperation) { 592 AddOperation add = (AddOperation) operation; 593 destination = add.getCommand().getDestination(); 594 isAdd = true; 595 } else { 596 RemoveOperation removeOpperation = (RemoveOperation) operation; 597 destination = removeOpperation.getCommand().getDestination(); 598 } 599 opCount opCount = destinationOpCount.get(destination); 600 if (opCount == null) { 601 opCount = new opCount(); 602 destinationOpCount.put(destination, opCount); 603 } 604 if (isAdd) { 605 opCount.add++; 606 } else { 607 opCount.remove++; 608 } 609 } 610 611 @Override 612 public String toString() { 613 StringBuffer buffer = new StringBuffer(); 614 buffer.append(location).append(";").append(id).append(";\n"); 615 for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) { 616 buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); 617 } 618 return buffer.toString(); 619 } 620 } 621 622 @SuppressWarnings("rawtypes") 623 public String getTransactions() { 624 625 ArrayList<TranInfo> infos = new ArrayList<TranInfo>(); 626 synchronized (inflightTransactions) { 627 if (!inflightTransactions.isEmpty()) { 628 for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) { 629 TranInfo info = new TranInfo(); 630 info.id = entry.getKey(); 631 for (Operation operation : entry.getValue()) { 632 info.track(operation); 633 } 634 infos.add(info); 635 } 636 } 637 } 638 synchronized (preparedTransactions) { 639 if (!preparedTransactions.isEmpty()) { 640 for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) { 641 TranInfo info = new TranInfo(); 642 info.id = entry.getKey(); 643 for (Operation operation : entry.getValue()) { 644 info.track(operation); 645 } 646 infos.add(info); 647 } 648 } 649 } 650 return infos.toString(); 651 } 652 653 public String getPreparedTransaction(TransactionId transactionId) { 654 String result = ""; 655 synchronized (preparedTransactions) { 656 List<Operation> operations = preparedTransactions.get(transactionId); 657 if (operations != null) { 658 TranInfo info = new TranInfo(); 659 info.id = transactionId; 660 for (Operation operation : preparedTransactions.get(transactionId)) { 661 info.track(operation); 662 } 663 result = info.toString(); 664 } 665 } 666 return result; 667 } 668 669 /** 670 * Move all the messages that were in the journal into long term storage. We 671 * just replay and do a checkpoint. 672 * 673 * @throws IOException 674 * @throws IOException 675 * @throws IllegalStateException 676 */ 677 private void recover() throws IllegalStateException, IOException { 678 this.indexLock.writeLock().lock(); 679 try { 680 681 long start = System.currentTimeMillis(); 682 boolean requiresJournalReplay = recoverProducerAudit(); 683 requiresJournalReplay |= recoverAckMessageFileMap(); 684 Location lastIndoubtPosition = getRecoveryPosition(); 685 Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition; 686 if (recoveryPosition != null) { 687 int redoCounter = 0; 688 int dataFileRotationTracker = recoveryPosition.getDataFileId(); 689 LOG.info("Recovering from the journal @" + recoveryPosition); 690 while (recoveryPosition != null) { 691 try { 692 JournalCommand<?> message = load(recoveryPosition); 693 metadata.lastUpdate = recoveryPosition; 694 process(message, recoveryPosition, lastIndoubtPosition); 695 redoCounter++; 696 } catch (IOException failedRecovery) { 697 if (isIgnoreMissingJournalfiles()) { 698 LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); 699 // track this dud location 700 journal.corruptRecoveryLocation(recoveryPosition); 701 } else { 702 throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery); 703 } 704 } 705 recoveryPosition = journal.getNextLocation(recoveryPosition); 706 // hold on to the minimum number of open files during recovery 707 if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) { 708 dataFileRotationTracker = recoveryPosition.getDataFileId(); 709 journal.cleanup(); 710 } 711 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { 712 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered .."); 713 } 714 } 715 if (LOG.isInfoEnabled()) { 716 long end = System.currentTimeMillis(); 717 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); 718 } 719 } 720 721 // We may have to undo some index updates. 722 pageFile.tx().execute(new Transaction.Closure<IOException>() { 723 @Override 724 public void execute(Transaction tx) throws IOException { 725 recoverIndex(tx); 726 } 727 }); 728 729 // rollback any recovered inflight local transactions, and discard any inflight XA transactions. 730 Set<TransactionId> toRollback = new HashSet<TransactionId>(); 731 Set<TransactionId> toDiscard = new HashSet<TransactionId>(); 732 synchronized (inflightTransactions) { 733 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { 734 TransactionId id = it.next(); 735 if (id.isLocalTransaction()) { 736 toRollback.add(id); 737 } else { 738 toDiscard.add(id); 739 } 740 } 741 for (TransactionId tx: toRollback) { 742 if (LOG.isDebugEnabled()) { 743 LOG.debug("rolling back recovered indoubt local transaction " + tx); 744 } 745 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null); 746 } 747 for (TransactionId tx: toDiscard) { 748 if (LOG.isDebugEnabled()) { 749 LOG.debug("discarding recovered in-flight XA transaction " + tx); 750 } 751 inflightTransactions.remove(tx); 752 } 753 } 754 755 synchronized (preparedTransactions) { 756 for (TransactionId txId : preparedTransactions.keySet()) { 757 LOG.warn("Recovered prepared XA TX: [{}]", txId); 758 } 759 } 760 761 } finally { 762 this.indexLock.writeLock().unlock(); 763 } 764 } 765 766 @SuppressWarnings("unused") 767 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { 768 return TransactionIdConversion.convertToLocal(tx); 769 } 770 771 private Location minimum(Location x, 772 Location y) { 773 Location min = null; 774 if (x != null) { 775 min = x; 776 if (y != null) { 777 int compare = y.compareTo(x); 778 if (compare < 0) { 779 min = y; 780 } 781 } 782 } else { 783 min = y; 784 } 785 return min; 786 } 787 788 private boolean recoverProducerAudit() throws IOException { 789 boolean requiresReplay = true; 790 if (metadata.producerSequenceIdTrackerLocation != null) { 791 try { 792 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); 793 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); 794 int maxNumProducers = getMaxFailoverProducersToTrack(); 795 int maxAuditDepth = getFailoverProducersAuditDepth(); 796 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); 797 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); 798 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); 799 requiresReplay = false; 800 } catch (Exception e) { 801 LOG.warn("Cannot recover message audit", e); 802 } 803 } 804 // got no audit stored so got to recreate via replay from start of the journal 805 return requiresReplay; 806 } 807 808 @SuppressWarnings("unchecked") 809 private boolean recoverAckMessageFileMap() throws IOException { 810 boolean requiresReplay = true; 811 if (metadata.ackMessageFileMapLocation != null) { 812 try { 813 KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); 814 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); 815 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); 816 requiresReplay = false; 817 } catch (Exception e) { 818 LOG.warn("Cannot recover ackMessageFileMap", e); 819 } 820 } 821 // got no ackMessageFileMap stored so got to recreate via replay from start of the journal 822 return requiresReplay; 823 } 824 825 protected void recoverIndex(Transaction tx) throws IOException { 826 long start = System.currentTimeMillis(); 827 // It is possible index updates got applied before the journal updates.. 828 // in that case we need to removed references to messages that are not in the journal 829 final Location lastAppendLocation = journal.getLastAppendLocation(); 830 long undoCounter=0; 831 832 // Go through all the destinations to see if they have messages past the lastAppendLocation 833 for (StoredDestination sd : storedDestinations.values()) { 834 835 final ArrayList<Long> matches = new ArrayList<Long>(); 836 // Find all the Locations that are >= than the last Append Location. 837 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) { 838 @Override 839 protected void matched(Location key, Long value) { 840 matches.add(value); 841 } 842 }); 843 844 for (Long sequenceId : matches) { 845 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 846 if (keys != null) { 847 sd.locationIndex.remove(tx, keys.location); 848 sd.messageIdIndex.remove(tx, keys.messageId); 849 metadata.producerSequenceIdTracker.rollback(keys.messageId); 850 undoCounter++; 851 // TODO: do we need to modify the ack positions for the pub sub case? 852 } 853 } 854 } 855 856 if (undoCounter > 0) { 857 // The rolledback operations are basically in flight journal writes. To avoid getting 858 // these the end user should do sync writes to the journal. 859 if (LOG.isInfoEnabled()) { 860 long end = System.currentTimeMillis(); 861 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 862 } 863 } 864 865 undoCounter = 0; 866 start = System.currentTimeMillis(); 867 868 // Lets be extra paranoid here and verify that all the datafiles being referenced 869 // by the indexes still exists. 870 871 final SequenceSet ss = new SequenceSet(); 872 for (StoredDestination sd : storedDestinations.values()) { 873 // Use a visitor to cut down the number of pages that we load 874 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 875 int last=-1; 876 877 @Override 878 public boolean isInterestedInKeysBetween(Location first, Location second) { 879 if( first==null ) { 880 return !ss.contains(0, second.getDataFileId()); 881 } else if( second==null ) { 882 return true; 883 } else { 884 return !ss.contains(first.getDataFileId(), second.getDataFileId()); 885 } 886 } 887 888 @Override 889 public void visit(List<Location> keys, List<Long> values) { 890 for (Location l : keys) { 891 int fileId = l.getDataFileId(); 892 if( last != fileId ) { 893 ss.add(fileId); 894 last = fileId; 895 } 896 } 897 } 898 899 }); 900 } 901 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 902 while (!ss.isEmpty()) { 903 missingJournalFiles.add((int) ss.removeFirst()); 904 } 905 906 for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) { 907 missingJournalFiles.add(entry.getKey()); 908 for (Integer i : entry.getValue()) { 909 missingJournalFiles.add(i); 910 } 911 } 912 913 missingJournalFiles.removeAll(journal.getFileMap().keySet()); 914 915 if (!missingJournalFiles.isEmpty()) { 916 LOG.warn("Some journal files are missing: " + missingJournalFiles); 917 } 918 919 ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>(); 920 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>(); 921 for (Integer missing : missingJournalFiles) { 922 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0))); 923 } 924 925 if (checkForCorruptJournalFiles) { 926 Collection<DataFile> dataFiles = journal.getFileMap().values(); 927 for (DataFile dataFile : dataFiles) { 928 int id = dataFile.getDataFileId(); 929 // eof to next file id 930 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0))); 931 Sequence seq = dataFile.getCorruptedBlocks().getHead(); 932 while (seq != null) { 933 BTreeVisitor.BetweenVisitor<Location, Long> visitor = 934 new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)); 935 missingPredicates.add(visitor); 936 knownCorruption.add(visitor); 937 seq = seq.getNext(); 938 } 939 } 940 } 941 942 if (!missingPredicates.isEmpty()) { 943 for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) { 944 final StoredDestination sd = sdEntry.getValue(); 945 final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>(); 946 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) { 947 @Override 948 protected void matched(Location key, Long value) { 949 matches.put(value, key); 950 } 951 }); 952 953 // If some message references are affected by the missing data files... 954 if (!matches.isEmpty()) { 955 956 // We either 'gracefully' recover dropping the missing messages or 957 // we error out. 958 if( ignoreMissingJournalfiles ) { 959 // Update the index to remove the references to the missing data 960 for (Long sequenceId : matches.keySet()) { 961 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 962 sd.locationIndex.remove(tx, keys.location); 963 sd.messageIdIndex.remove(tx, keys.messageId); 964 LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); 965 undoCounter++; 966 // TODO: do we need to modify the ack positions for the pub sub case? 967 } 968 } else { 969 LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches); 970 throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected."); 971 } 972 } 973 } 974 } 975 976 if (!ignoreMissingJournalfiles) { 977 if (!knownCorruption.isEmpty()) { 978 LOG.error("Detected corrupt journal files. " + knownCorruption); 979 throw new IOException("Detected corrupt journal files. " + knownCorruption); 980 } 981 982 if (!missingJournalFiles.isEmpty()) { 983 LOG.error("Detected missing journal files. " + missingJournalFiles); 984 throw new IOException("Detected missing journal files. " + missingJournalFiles); 985 } 986 } 987 988 if (undoCounter > 0) { 989 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 990 // should do sync writes to the journal. 991 if (LOG.isInfoEnabled()) { 992 long end = System.currentTimeMillis(); 993 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 994 } 995 } 996 } 997 998 private Location nextRecoveryPosition; 999 private Location lastRecoveryPosition; 1000 1001 public void incrementalRecover() throws IOException { 1002 this.indexLock.writeLock().lock(); 1003 try { 1004 if( nextRecoveryPosition == null ) { 1005 if( lastRecoveryPosition==null ) { 1006 nextRecoveryPosition = getRecoveryPosition(); 1007 } else { 1008 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 1009 } 1010 } 1011 while (nextRecoveryPosition != null) { 1012 lastRecoveryPosition = nextRecoveryPosition; 1013 metadata.lastUpdate = lastRecoveryPosition; 1014 JournalCommand<?> message = load(lastRecoveryPosition); 1015 process(message, lastRecoveryPosition, (IndexAware) null); 1016 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 1017 } 1018 } finally { 1019 this.indexLock.writeLock().unlock(); 1020 } 1021 } 1022 1023 public Location getLastUpdatePosition() throws IOException { 1024 return metadata.lastUpdate; 1025 } 1026 1027 private Location getRecoveryPosition() throws IOException { 1028 1029 if (!this.forceRecoverIndex) { 1030 1031 // If we need to recover the transactions.. 1032 if (metadata.firstInProgressTransactionLocation != null) { 1033 return metadata.firstInProgressTransactionLocation; 1034 } 1035 1036 // Perhaps there were no transactions... 1037 if( metadata.lastUpdate!=null) { 1038 // Start replay at the record after the last one recorded in the index file. 1039 return getNextInitializedLocation(metadata.lastUpdate); 1040 } 1041 } 1042 // This loads the first position. 1043 return journal.getNextLocation(null); 1044 } 1045 1046 private Location getNextInitializedLocation(Location location) throws IOException { 1047 Location mayNotBeInitialized = journal.getNextLocation(location); 1048 if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) { 1049 // need to init size and type to skip 1050 return journal.getNextLocation(mayNotBeInitialized); 1051 } else { 1052 return mayNotBeInitialized; 1053 } 1054 } 1055 1056 protected void checkpointCleanup(final boolean cleanup) throws IOException { 1057 long start; 1058 this.indexLock.writeLock().lock(); 1059 try { 1060 start = System.currentTimeMillis(); 1061 if( !opened.get() ) { 1062 return; 1063 } 1064 } finally { 1065 this.indexLock.writeLock().unlock(); 1066 } 1067 checkpointUpdate(cleanup); 1068 long end = System.currentTimeMillis(); 1069 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1070 if (LOG.isInfoEnabled()) { 1071 LOG.info("Slow KahaDB access: cleanup took " + (end - start)); 1072 } 1073 } 1074 } 1075 1076 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { 1077 int size = data.serializedSizeFramed(); 1078 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 1079 os.writeByte(data.type().getNumber()); 1080 data.writeFramed(os); 1081 return os.toByteSequence(); 1082 } 1083 1084 // ///////////////////////////////////////////////////////////////// 1085 // Methods call by the broker to update and query the store. 1086 // ///////////////////////////////////////////////////////////////// 1087 public Location store(JournalCommand<?> data) throws IOException { 1088 return store(data, false, null,null); 1089 } 1090 1091 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException { 1092 return store(data, false, null, null, onJournalStoreComplete); 1093 } 1094 1095 public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException { 1096 return store(data, sync, before, after, null); 1097 } 1098 1099 /** 1100 * All updated are are funneled through this method. The updates are converted 1101 * to a JournalMessage which is logged to the journal and then the data from 1102 * the JournalMessage is used to update the index just like it would be done 1103 * during a recovery process. 1104 */ 1105 public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { 1106 try { 1107 ByteSequence sequence = toByteSequence(data); 1108 Location location; 1109 1110 checkpointLock.readLock().lock(); 1111 try { 1112 1113 long start = System.currentTimeMillis(); 1114 location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; 1115 long start2 = System.currentTimeMillis(); 1116 //Track the last async update so we know if we need to sync at the next checkpoint 1117 if (!sync && journal.isJournalDiskSyncPeriodic()) { 1118 lastAsyncJournalUpdate.set(location); 1119 } 1120 process(data, location, before); 1121 1122 long end = System.currentTimeMillis(); 1123 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1124 if (LOG.isInfoEnabled()) { 1125 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); 1126 } 1127 } 1128 } finally { 1129 checkpointLock.readLock().unlock(); 1130 } 1131 1132 if (after != null) { 1133 after.run(); 1134 } 1135 1136 return location; 1137 } catch (IOException ioe) { 1138 LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe); 1139 brokerService.handleIOException(ioe); 1140 throw ioe; 1141 } 1142 } 1143 1144 /** 1145 * Loads a previously stored JournalMessage 1146 * 1147 * @param location 1148 * @return 1149 * @throws IOException 1150 */ 1151 public JournalCommand<?> load(Location location) throws IOException { 1152 long start = System.currentTimeMillis(); 1153 ByteSequence data = journal.read(location); 1154 long end = System.currentTimeMillis(); 1155 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 1156 if (LOG.isInfoEnabled()) { 1157 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); 1158 } 1159 } 1160 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 1161 byte readByte = is.readByte(); 1162 KahaEntryType type = KahaEntryType.valueOf(readByte); 1163 if( type == null ) { 1164 try { 1165 is.close(); 1166 } catch (IOException e) {} 1167 throw new IOException("Could not load journal record, null type information from: " + readByte + " at location: "+location); 1168 } 1169 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 1170 message.mergeFramed(is); 1171 return message; 1172 } 1173 1174 /** 1175 * do minimal recovery till we reach the last inDoubtLocation 1176 * @param data 1177 * @param location 1178 * @param inDoubtlocation 1179 * @throws IOException 1180 */ 1181 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 1182 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 1183 initMessageStore(data); 1184 process(data, location, (IndexAware) null); 1185 } else { 1186 // just recover producer audit 1187 data.visit(new Visitor() { 1188 @Override 1189 public void visit(KahaAddMessageCommand command) throws IOException { 1190 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1191 } 1192 }); 1193 } 1194 } 1195 1196 private void initMessageStore(JournalCommand<?> data) throws IOException { 1197 data.visit(new Visitor() { 1198 @Override 1199 public void visit(KahaAddMessageCommand command) throws IOException { 1200 final KahaDestination destination = command.getDestination(); 1201 if (!storedDestinations.containsKey(key(destination))) { 1202 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1203 @Override 1204 public void execute(Transaction tx) throws IOException { 1205 getStoredDestination(destination, tx); 1206 } 1207 }); 1208 } 1209 } 1210 }); 1211 } 1212 1213 // ///////////////////////////////////////////////////////////////// 1214 // Journaled record processing methods. Once the record is journaled, 1215 // these methods handle applying the index updates. These may be called 1216 // from the recovery method too so they need to be idempotent 1217 // ///////////////////////////////////////////////////////////////// 1218 1219 void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException { 1220 data.visit(new Visitor() { 1221 @Override 1222 public void visit(KahaAddMessageCommand command) throws IOException { 1223 process(command, location, onSequenceAssignedCallback); 1224 } 1225 1226 @Override 1227 public void visit(KahaRemoveMessageCommand command) throws IOException { 1228 process(command, location); 1229 } 1230 1231 @Override 1232 public void visit(KahaPrepareCommand command) throws IOException { 1233 process(command, location); 1234 } 1235 1236 @Override 1237 public void visit(KahaCommitCommand command) throws IOException { 1238 process(command, location, onSequenceAssignedCallback); 1239 } 1240 1241 @Override 1242 public void visit(KahaRollbackCommand command) throws IOException { 1243 process(command, location); 1244 } 1245 1246 @Override 1247 public void visit(KahaRemoveDestinationCommand command) throws IOException { 1248 process(command, location); 1249 } 1250 1251 @Override 1252 public void visit(KahaSubscriptionCommand command) throws IOException { 1253 process(command, location); 1254 } 1255 1256 @Override 1257 public void visit(KahaProducerAuditCommand command) throws IOException { 1258 processLocation(location); 1259 } 1260 1261 @Override 1262 public void visit(KahaAckMessageFileMapCommand command) throws IOException { 1263 processLocation(location); 1264 } 1265 1266 @Override 1267 public void visit(KahaTraceCommand command) { 1268 processLocation(location); 1269 } 1270 1271 @Override 1272 public void visit(KahaUpdateMessageCommand command) throws IOException { 1273 process(command, location); 1274 } 1275 1276 @Override 1277 public void visit(KahaRewrittenDataFileCommand command) throws IOException { 1278 process(command, location); 1279 } 1280 }); 1281 } 1282 1283 @SuppressWarnings("rawtypes") 1284 protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { 1285 if (command.hasTransactionInfo()) { 1286 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1287 inflightTx.add(new AddOperation(command, location, runWithIndexLock)); 1288 } else { 1289 this.indexLock.writeLock().lock(); 1290 try { 1291 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1292 @Override 1293 public void execute(Transaction tx) throws IOException { 1294 long assignedIndex = updateIndex(tx, command, location); 1295 if (runWithIndexLock != null) { 1296 runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); 1297 } 1298 } 1299 }); 1300 1301 } finally { 1302 this.indexLock.writeLock().unlock(); 1303 } 1304 } 1305 } 1306 1307 protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { 1308 this.indexLock.writeLock().lock(); 1309 try { 1310 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1311 @Override 1312 public void execute(Transaction tx) throws IOException { 1313 updateIndex(tx, command, location); 1314 } 1315 }); 1316 } finally { 1317 this.indexLock.writeLock().unlock(); 1318 } 1319 } 1320 1321 @SuppressWarnings("rawtypes") 1322 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { 1323 if (command.hasTransactionInfo()) { 1324 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1325 inflightTx.add(new RemoveOperation(command, location)); 1326 } else { 1327 this.indexLock.writeLock().lock(); 1328 try { 1329 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1330 @Override 1331 public void execute(Transaction tx) throws IOException { 1332 updateIndex(tx, command, location); 1333 } 1334 }); 1335 } finally { 1336 this.indexLock.writeLock().unlock(); 1337 } 1338 } 1339 } 1340 1341 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { 1342 this.indexLock.writeLock().lock(); 1343 try { 1344 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1345 @Override 1346 public void execute(Transaction tx) throws IOException { 1347 updateIndex(tx, command, location); 1348 } 1349 }); 1350 } finally { 1351 this.indexLock.writeLock().unlock(); 1352 } 1353 } 1354 1355 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { 1356 this.indexLock.writeLock().lock(); 1357 try { 1358 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1359 @Override 1360 public void execute(Transaction tx) throws IOException { 1361 updateIndex(tx, command, location); 1362 } 1363 }); 1364 } finally { 1365 this.indexLock.writeLock().unlock(); 1366 } 1367 } 1368 1369 protected void processLocation(final Location location) { 1370 this.indexLock.writeLock().lock(); 1371 try { 1372 metadata.lastUpdate = location; 1373 } finally { 1374 this.indexLock.writeLock().unlock(); 1375 } 1376 } 1377 1378 @SuppressWarnings("rawtypes") 1379 protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { 1380 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1381 List<Operation> inflightTx; 1382 synchronized (inflightTransactions) { 1383 inflightTx = inflightTransactions.remove(key); 1384 if (inflightTx == null) { 1385 inflightTx = preparedTransactions.remove(key); 1386 } 1387 } 1388 if (inflightTx == null) { 1389 // only non persistent messages in this tx 1390 if (before != null) { 1391 before.sequenceAssignedWithIndexLocked(-1); 1392 } 1393 return; 1394 } 1395 1396 final List<Operation> messagingTx = inflightTx; 1397 indexLock.writeLock().lock(); 1398 try { 1399 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1400 @Override 1401 public void execute(Transaction tx) throws IOException { 1402 for (Operation op : messagingTx) { 1403 op.execute(tx); 1404 recordAckMessageReferenceLocation(location, op.getLocation()); 1405 } 1406 } 1407 }); 1408 metadata.lastUpdate = location; 1409 } finally { 1410 indexLock.writeLock().unlock(); 1411 } 1412 } 1413 1414 @SuppressWarnings("rawtypes") 1415 protected void process(KahaPrepareCommand command, Location location) { 1416 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1417 List<Operation> tx = null; 1418 synchronized (inflightTransactions) { 1419 tx = inflightTransactions.remove(key); 1420 if (tx != null) { 1421 preparedTransactions.put(key, tx); 1422 } 1423 } 1424 if (tx != null && !tx.isEmpty()) { 1425 indexLock.writeLock().lock(); 1426 try { 1427 for (Operation op : tx) { 1428 recordAckMessageReferenceLocation(location, op.getLocation()); 1429 } 1430 } finally { 1431 indexLock.writeLock().unlock(); 1432 } 1433 } 1434 } 1435 1436 @SuppressWarnings("rawtypes") 1437 protected void process(KahaRollbackCommand command, Location location) throws IOException { 1438 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1439 List<Operation> updates = null; 1440 synchronized (inflightTransactions) { 1441 updates = inflightTransactions.remove(key); 1442 if (updates == null) { 1443 updates = preparedTransactions.remove(key); 1444 } 1445 } 1446 if (key.isXATransaction() && updates != null && !updates.isEmpty()) { 1447 indexLock.writeLock().lock(); 1448 try { 1449 for (Operation op : updates) { 1450 recordAckMessageReferenceLocation(location, op.getLocation()); 1451 } 1452 } finally { 1453 indexLock.writeLock().unlock(); 1454 } 1455 } 1456 } 1457 1458 protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException { 1459 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1460 1461 // Mark the current journal file as a compacted file so that gc checks can skip 1462 // over logs that are smaller compaction type logs. 1463 DataFile current = journal.getDataFileById(location.getDataFileId()); 1464 current.setTypeCode(command.getRewriteType()); 1465 1466 if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) { 1467 // Move offset so that next location read jumps to next file. 1468 location.setOffset(journalMaxFileLength); 1469 } 1470 } 1471 1472 // ///////////////////////////////////////////////////////////////// 1473 // These methods do the actual index updates. 1474 // ///////////////////////////////////////////////////////////////// 1475 1476 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 1477 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 1478 1479 long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { 1480 StoredDestination sd = getExistingStoredDestination(command.getDestination(), tx); 1481 if (sd == null) { 1482 // if the store no longer exists, skip 1483 return -1; 1484 } 1485 // Skip adding the message to the index if this is a topic and there are 1486 // no subscriptions. 1487 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { 1488 return -1; 1489 } 1490 1491 // Add the message. 1492 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; 1493 long id = sd.orderIndex.getNextMessageId(); 1494 Long previous = sd.locationIndex.put(tx, location, id); 1495 if (previous == null) { 1496 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 1497 if (previous == null) { 1498 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); 1499 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { 1500 addAckLocationForNewMessage(tx, sd, id); 1501 } 1502 metadata.lastUpdate = location; 1503 } else { 1504 1505 MessageKeys messageKeys = sd.orderIndex.get(tx, previous); 1506 if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { 1507 // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt 1508 LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); 1509 } 1510 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 1511 sd.locationIndex.remove(tx, location); 1512 id = -1; 1513 } 1514 } else { 1515 // restore the previous value.. Looks like this was a redo of a previously 1516 // added message. We don't want to assign it a new id as the other indexes would 1517 // be wrong.. 1518 sd.locationIndex.put(tx, location, previous); 1519 // ensure sequence is not broken 1520 sd.orderIndex.revertNextMessageId(); 1521 metadata.lastUpdate = location; 1522 } 1523 // record this id in any event, initial send or recovery 1524 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1525 return id; 1526 } 1527 1528 void trackPendingAdd(KahaDestination destination, Long seq) { 1529 StoredDestination sd = storedDestinations.get(key(destination)); 1530 if (sd != null) { 1531 sd.trackPendingAdd(seq); 1532 } 1533 } 1534 1535 void trackPendingAddComplete(KahaDestination destination, Long seq) { 1536 StoredDestination sd = storedDestinations.get(key(destination)); 1537 if (sd != null) { 1538 sd.trackPendingAddComplete(seq); 1539 } 1540 } 1541 1542 void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException { 1543 KahaAddMessageCommand command = updateMessageCommand.getMessage(); 1544 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1545 1546 Long id = sd.messageIdIndex.get(tx, command.getMessageId()); 1547 if (id != null) { 1548 MessageKeys previousKeys = sd.orderIndex.put( 1549 tx, 1550 command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY, 1551 id, 1552 new MessageKeys(command.getMessageId(), location) 1553 ); 1554 sd.locationIndex.put(tx, location, id); 1555 // on first update previous is original location, on recovery/replay it may be the updated location 1556 if(previousKeys != null && !previousKeys.location.equals(location)) { 1557 sd.locationIndex.remove(tx, previousKeys.location); 1558 } 1559 metadata.lastUpdate = location; 1560 } else { 1561 //Add the message if it can't be found 1562 this.updateIndex(tx, command, location); 1563 } 1564 } 1565 1566 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { 1567 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1568 if (!command.hasSubscriptionKey()) { 1569 1570 // In the queue case we just remove the message from the index.. 1571 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 1572 if (sequenceId != null) { 1573 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 1574 if (keys != null) { 1575 sd.locationIndex.remove(tx, keys.location); 1576 recordAckMessageReferenceLocation(ackLocation, keys.location); 1577 metadata.lastUpdate = ackLocation; 1578 } else if (LOG.isDebugEnabled()) { 1579 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); 1580 } 1581 } else if (LOG.isDebugEnabled()) { 1582 LOG.debug("message not found in sequence id index: " + command.getMessageId()); 1583 } 1584 } else { 1585 // In the topic case we need remove the message once it's been acked 1586 // by all the subs 1587 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 1588 1589 // Make sure it's a valid message id... 1590 if (sequence != null) { 1591 String subscriptionKey = command.getSubscriptionKey(); 1592 if (command.getAck() != UNMATCHED) { 1593 sd.orderIndex.get(tx, sequence); 1594 byte priority = sd.orderIndex.lastGetPriority(); 1595 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); 1596 } 1597 1598 MessageKeys keys = sd.orderIndex.get(tx, sequence); 1599 if (keys != null) { 1600 recordAckMessageReferenceLocation(ackLocation, keys.location); 1601 } 1602 // The following method handles deleting un-referenced messages. 1603 removeAckLocation(tx, sd, subscriptionKey, sequence); 1604 metadata.lastUpdate = ackLocation; 1605 } else if (LOG.isDebugEnabled()) { 1606 LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); 1607 } 1608 1609 } 1610 } 1611 1612 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { 1613 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); 1614 if (referenceFileIds == null) { 1615 referenceFileIds = new HashSet<Integer>(); 1616 referenceFileIds.add(messageLocation.getDataFileId()); 1617 metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); 1618 } else { 1619 Integer id = Integer.valueOf(messageLocation.getDataFileId()); 1620 if (!referenceFileIds.contains(id)) { 1621 referenceFileIds.add(id); 1622 } 1623 } 1624 } 1625 1626 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { 1627 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1628 sd.orderIndex.remove(tx); 1629 1630 sd.locationIndex.clear(tx); 1631 sd.locationIndex.unload(tx); 1632 tx.free(sd.locationIndex.getPageId()); 1633 1634 sd.messageIdIndex.clear(tx); 1635 sd.messageIdIndex.unload(tx); 1636 tx.free(sd.messageIdIndex.getPageId()); 1637 1638 if (sd.subscriptions != null) { 1639 sd.subscriptions.clear(tx); 1640 sd.subscriptions.unload(tx); 1641 tx.free(sd.subscriptions.getPageId()); 1642 1643 sd.subscriptionAcks.clear(tx); 1644 sd.subscriptionAcks.unload(tx); 1645 tx.free(sd.subscriptionAcks.getPageId()); 1646 1647 sd.ackPositions.clear(tx); 1648 sd.ackPositions.unload(tx); 1649 tx.free(sd.ackPositions.getHeadPageId()); 1650 1651 sd.subLocations.clear(tx); 1652 sd.subLocations.unload(tx); 1653 tx.free(sd.subLocations.getHeadPageId()); 1654 } 1655 1656 String key = key(command.getDestination()); 1657 storedDestinations.remove(key); 1658 metadata.destinations.remove(tx, key); 1659 storeCache.remove(key(command.getDestination())); 1660 } 1661 1662 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { 1663 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1664 final String subscriptionKey = command.getSubscriptionKey(); 1665 1666 // If set then we are creating it.. otherwise we are destroying the sub 1667 if (command.hasSubscriptionInfo()) { 1668 Location existing = sd.subLocations.get(tx, subscriptionKey); 1669 if (existing != null && existing.compareTo(location) == 0) { 1670 // replay on recovery, ignore 1671 LOG.trace("ignoring journal replay of replay of sub from: " + location); 1672 return; 1673 } 1674 1675 sd.subscriptions.put(tx, subscriptionKey, command); 1676 sd.subLocations.put(tx, subscriptionKey, location); 1677 long ackLocation=NOT_ACKED; 1678 if (!command.getRetroactive()) { 1679 ackLocation = sd.orderIndex.nextMessageId-1; 1680 } else { 1681 addAckLocationForRetroactiveSub(tx, sd, subscriptionKey); 1682 } 1683 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); 1684 sd.subscriptionCache.add(subscriptionKey); 1685 } else { 1686 // delete the sub... 1687 sd.subscriptions.remove(tx, subscriptionKey); 1688 sd.subLocations.remove(tx, subscriptionKey); 1689 sd.subscriptionAcks.remove(tx, subscriptionKey); 1690 sd.subscriptionCache.remove(subscriptionKey); 1691 removeAckLocationsForSub(tx, sd, subscriptionKey); 1692 1693 if (sd.subscriptions.isEmpty(tx)) { 1694 // remove the stored destination 1695 KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand(); 1696 removeDestinationCommand.setDestination(command.getDestination()); 1697 updateIndex(tx, removeDestinationCommand, null); 1698 } 1699 } 1700 } 1701 1702 private void checkpointUpdate(final boolean cleanup) throws IOException { 1703 checkpointLock.writeLock().lock(); 1704 try { 1705 this.indexLock.writeLock().lock(); 1706 try { 1707 Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>, IOException>() { 1708 @Override 1709 public Set<Integer> execute(Transaction tx) throws IOException { 1710 return checkpointUpdate(tx, cleanup); 1711 } 1712 }); 1713 pageFile.flush(); 1714 // after the index update such that partial removal does not leave dangling references in the index. 1715 journal.removeDataFiles(filesToGc); 1716 } finally { 1717 this.indexLock.writeLock().unlock(); 1718 } 1719 1720 } finally { 1721 checkpointLock.writeLock().unlock(); 1722 } 1723 } 1724 1725 /** 1726 * @param tx 1727 * @throws IOException 1728 */ 1729 Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 1730 MDC.put("activemq.persistenceDir", getDirectory().getName()); 1731 LOG.debug("Checkpoint started."); 1732 1733 // reflect last update exclusive of current checkpoint 1734 Location lastUpdate = metadata.lastUpdate; 1735 1736 metadata.state = OPEN_STATE; 1737 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); 1738 metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap(); 1739 Location[] inProgressTxRange = getInProgressTxLocationRange(); 1740 metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; 1741 tx.store(metadata.page, metadataMarshaller, true); 1742 1743 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(); 1744 if (cleanup) { 1745 1746 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1747 gcCandidateSet.addAll(completeFileSet); 1748 1749 if (LOG.isTraceEnabled()) { 1750 LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); 1751 } 1752 1753 if (lastUpdate != null) { 1754 // we won't delete past the last update, ackCompaction journal can be a candidate in error 1755 gcCandidateSet.removeAll(new TreeSet<Integer>(gcCandidateSet.tailSet(lastUpdate.getDataFileId()))); 1756 } 1757 1758 // Don't GC files under replication 1759 if( journalFilesBeingReplicated!=null ) { 1760 gcCandidateSet.removeAll(journalFilesBeingReplicated); 1761 } 1762 1763 if (metadata.producerSequenceIdTrackerLocation != null) { 1764 int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId(); 1765 if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) { 1766 // rewrite so we don't prevent gc 1767 metadata.producerSequenceIdTracker.setModified(true); 1768 if (LOG.isTraceEnabled()) { 1769 LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation); 1770 } 1771 } 1772 gcCandidateSet.remove(dataFileId); 1773 if (LOG.isTraceEnabled()) { 1774 LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet); 1775 } 1776 } 1777 1778 if (metadata.ackMessageFileMapLocation != null) { 1779 int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); 1780 gcCandidateSet.remove(dataFileId); 1781 if (LOG.isTraceEnabled()) { 1782 LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet); 1783 } 1784 } 1785 1786 // Don't GC files referenced by in-progress tx 1787 if (inProgressTxRange[0] != null) { 1788 for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { 1789 gcCandidateSet.remove(pendingTx); 1790 } 1791 } 1792 if (LOG.isTraceEnabled()) { 1793 LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); 1794 } 1795 1796 // Go through all the destinations to see if any of them can remove GC candidates. 1797 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) { 1798 if( gcCandidateSet.isEmpty() ) { 1799 break; 1800 } 1801 1802 // Use a visitor to cut down the number of pages that we load 1803 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 1804 int last=-1; 1805 @Override 1806 public boolean isInterestedInKeysBetween(Location first, Location second) { 1807 if( first==null ) { 1808 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1); 1809 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1810 subset.remove(second.getDataFileId()); 1811 } 1812 return !subset.isEmpty(); 1813 } else if( second==null ) { 1814 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId()); 1815 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1816 subset.remove(first.getDataFileId()); 1817 } 1818 return !subset.isEmpty(); 1819 } else { 1820 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); 1821 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1822 subset.remove(first.getDataFileId()); 1823 } 1824 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1825 subset.remove(second.getDataFileId()); 1826 } 1827 return !subset.isEmpty(); 1828 } 1829 } 1830 1831 @Override 1832 public void visit(List<Location> keys, List<Long> values) { 1833 for (Location l : keys) { 1834 int fileId = l.getDataFileId(); 1835 if( last != fileId ) { 1836 gcCandidateSet.remove(fileId); 1837 last = fileId; 1838 } 1839 } 1840 } 1841 }); 1842 1843 // Durable Subscription 1844 if (entry.getValue().subLocations != null) { 1845 Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx); 1846 while (iter.hasNext()) { 1847 Entry<String, Location> subscription = iter.next(); 1848 int dataFileId = subscription.getValue().getDataFileId(); 1849 1850 // Move subscription along if it has no outstanding messages that need ack'd 1851 // and its in the last log file in the journal. 1852 if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) { 1853 final StoredDestination destination = entry.getValue(); 1854 final String subscriptionKey = subscription.getKey(); 1855 SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey); 1856 1857 // When pending is size one that is the next message Id meaning there 1858 // are no pending messages currently. 1859 if (pendingAcks == null || pendingAcks.isEmpty() || 1860 (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) { 1861 1862 if (LOG.isTraceEnabled()) { 1863 LOG.trace("Found candidate for rewrite: sub {} on {} from file {}", subscriptionKey, entry.getKey(), dataFileId); 1864 } 1865 1866 final KahaSubscriptionCommand kahaSub = 1867 destination.subscriptions.get(tx, subscriptionKey); 1868 destination.subLocations.put( 1869 tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub)); 1870 1871 // Skips the remove from candidates if we rewrote the subscription 1872 // in order to prevent duplicate subscription commands on recover. 1873 // If another subscription is on the same file and isn't rewritten 1874 // than it will remove the file from the set. 1875 continue; 1876 } 1877 } 1878 1879 if (LOG.isTraceEnabled()) { 1880 final StoredDestination destination = entry.getValue(); 1881 final String subscriptionKey = subscription.getKey(); 1882 final SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey); 1883 LOG.trace("sub {} on {} in dataFile {} has pendingCount {}", subscriptionKey, entry.getKey(), dataFileId, pendingAcks.rangeSize()-1); 1884 } 1885 gcCandidateSet.remove(dataFileId); 1886 } 1887 } 1888 1889 if (LOG.isTraceEnabled()) { 1890 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); 1891 } 1892 } 1893 1894 // check we are not deleting file with ack for in-use journal files 1895 if (LOG.isTraceEnabled()) { 1896 LOG.trace("gc candidates: " + gcCandidateSet); 1897 LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap); 1898 } 1899 1900 boolean ackMessageFileMapMod = false; 1901 Iterator<Integer> candidates = gcCandidateSet.iterator(); 1902 while (candidates.hasNext()) { 1903 Integer candidate = candidates.next(); 1904 Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate); 1905 if (referencedFileIds != null) { 1906 for (Integer referencedFileId : referencedFileIds) { 1907 if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) { 1908 // active file that is not targeted for deletion is referenced so don't delete 1909 candidates.remove(); 1910 break; 1911 } 1912 } 1913 if (gcCandidateSet.contains(candidate)) { 1914 ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null); 1915 } else { 1916 if (LOG.isTraceEnabled()) { 1917 LOG.trace("not removing data file: " + candidate 1918 + " as contained ack(s) refer to referenced file: " + referencedFileIds); 1919 } 1920 } 1921 } 1922 } 1923 1924 if (!gcCandidateSet.isEmpty()) { 1925 LOG.debug("Cleanup removing the data files: {}", gcCandidateSet); 1926 for (Integer candidate : gcCandidateSet) { 1927 for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) { 1928 ackMessageFileMapMod |= ackFiles.remove(candidate); 1929 } 1930 } 1931 if (ackMessageFileMapMod) { 1932 checkpointUpdate(tx, false); 1933 } 1934 } else if (isEnableAckCompaction()) { 1935 if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) { 1936 // First check length of journal to make sure it makes sense to even try. 1937 // 1938 // If there is only one journal file with Acks in it we don't need to move 1939 // it since it won't be chained to any later logs. 1940 // 1941 // If the logs haven't grown since the last time then we need to compact 1942 // otherwise there seems to still be room for growth and we don't need to incur 1943 // the overhead. Depending on configuration this check can be avoided and 1944 // Ack compaction will run any time the store has not GC'd a journal file in 1945 // the configured amount of cycles. 1946 if (metadata.ackMessageFileMap.size() > 1 && 1947 (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) { 1948 1949 LOG.trace("No files GC'd checking if threshold to ACK compaction has been met."); 1950 try { 1951 scheduler.execute(new AckCompactionRunner()); 1952 } catch (Exception ex) { 1953 LOG.warn("Error on queueing the Ack Compactor", ex); 1954 } 1955 } else { 1956 LOG.trace("Journal activity detected, no Ack compaction scheduled."); 1957 } 1958 1959 checkPointCyclesWithNoGC = 0; 1960 } else { 1961 LOG.trace("Not yet time to check for compaction: {} of {} cycles", 1962 checkPointCyclesWithNoGC, getCompactAcksAfterNoGC()); 1963 } 1964 1965 journalLogOnLastCompactionCheck = journal.getCurrentDataFileId(); 1966 } 1967 } 1968 MDC.remove("activemq.persistenceDir"); 1969 1970 LOG.debug("Checkpoint done."); 1971 return gcCandidateSet; 1972 } 1973 1974 private final class AckCompactionRunner implements Runnable { 1975 1976 @Override 1977 public void run() { 1978 1979 int journalToAdvance = -1; 1980 Set<Integer> journalLogsReferenced = new HashSet<Integer>(); 1981 1982 //flag to know whether the ack forwarding completed without an exception 1983 boolean forwarded = false; 1984 1985 try { 1986 //acquire the checkpoint lock to prevent other threads from 1987 //running a checkpoint while this is running 1988 // 1989 //Normally this task runs on the same executor as the checkpoint task 1990 //so this ack compaction runner wouldn't run at the same time as the checkpoint task. 1991 // 1992 //However, there are two cases where this isn't always true. 1993 //First, the checkpoint() method is public and can be called through the 1994 //PersistenceAdapter interface by someone at the same time this is running. 1995 //Second, a checkpoint is called during shutdown without using the executor. 1996 // 1997 //In the future it might be better to just remove the checkpointLock entirely 1998 //and only use the executor but this would need to be examined for any unintended 1999 //consequences 2000 checkpointLock.readLock().lock(); 2001 2002 try { 2003 2004 // Lock index to capture the ackMessageFileMap data 2005 indexLock.writeLock().lock(); 2006 2007 // Map keys might not be sorted, find the earliest log file to forward acks 2008 // from and move only those, future cycles can chip away at more as needed. 2009 // We won't move files that are themselves rewritten on a previous compaction. 2010 List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet()); 2011 Collections.sort(journalFileIds); 2012 for (Integer journalFileId : journalFileIds) { 2013 DataFile current = journal.getDataFileById(journalFileId); 2014 if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { 2015 journalToAdvance = journalFileId; 2016 break; 2017 } 2018 } 2019 2020 // Check if we found one, or if we only found the current file being written to. 2021 if (journalToAdvance == -1 || blockedFromCompaction(journalToAdvance)) { 2022 return; 2023 } 2024 2025 journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); 2026 2027 } finally { 2028 indexLock.writeLock().unlock(); 2029 } 2030 2031 try { 2032 // Background rewrite of the old acks 2033 forwardAllAcks(journalToAdvance, journalLogsReferenced); 2034 forwarded = true; 2035 } catch (IOException ioe) { 2036 LOG.error("Forwarding of acks failed", ioe); 2037 brokerService.handleIOException(ioe); 2038 } catch (Throwable e) { 2039 LOG.error("Forwarding of acks failed", e); 2040 brokerService.handleIOException(IOExceptionSupport.create(e)); 2041 } 2042 } finally { 2043 checkpointLock.readLock().unlock(); 2044 } 2045 2046 try { 2047 if (forwarded) { 2048 // Checkpoint with changes from the ackMessageFileMap 2049 checkpointUpdate(false); 2050 } 2051 } catch (IOException ioe) { 2052 LOG.error("Checkpoint failed", ioe); 2053 brokerService.handleIOException(ioe); 2054 } catch (Throwable e) { 2055 LOG.error("Checkpoint failed", e); 2056 brokerService.handleIOException(IOExceptionSupport.create(e)); 2057 } 2058 } 2059 } 2060 2061 // called with the index lock held 2062 private boolean blockedFromCompaction(int journalToAdvance) { 2063 // don't forward the current data file 2064 if (journalToAdvance == journal.getCurrentDataFileId()) { 2065 return true; 2066 } 2067 // don't forward any data file with inflight transaction records because it will whack the tx - data file link 2068 // in the ack map when all acks are migrated (now that the ack map is not just for acks) 2069 // TODO: prepare records can be dropped but completion records (maybe only commit outcomes) need to be migrated 2070 // as part of the forward work. 2071 Location[] inProgressTxRange = getInProgressTxLocationRange(); 2072 if (inProgressTxRange[0] != null) { 2073 for (int pendingTx = inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { 2074 if (journalToAdvance == pendingTx) { 2075 LOG.trace("Compaction target:{} blocked by inflight transaction records: {}", journalToAdvance, inProgressTxRange); 2076 return true; 2077 } 2078 } 2079 } 2080 return false; 2081 } 2082 2083 private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException { 2084 LOG.trace("Attempting to move all acks in journal:{} to the front. Referenced files:{}", journalToRead, journalLogsReferenced); 2085 2086 DataFile forwardsFile = journal.reserveDataFile(); 2087 forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE); 2088 LOG.trace("Reserved file for forwarded acks: {}", forwardsFile); 2089 2090 Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>(); 2091 2092 try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) { 2093 KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand(); 2094 compactionMarker.setSourceDataFileId(journalToRead); 2095 compactionMarker.setRewriteType(forwardsFile.getTypeCode()); 2096 2097 ByteSequence payload = toByteSequence(compactionMarker); 2098 appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); 2099 LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead); 2100 2101 final Location limit = new Location(journalToRead + 1, 0); 2102 Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0), limit); 2103 while (nextLocation != null) { 2104 JournalCommand<?> command = null; 2105 try { 2106 command = load(nextLocation); 2107 } catch (IOException ex) { 2108 LOG.trace("Error loading command during ack forward: {}", nextLocation); 2109 } 2110 2111 if (shouldForward(command)) { 2112 payload = toByteSequence(command); 2113 Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); 2114 updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced); 2115 } 2116 2117 nextLocation = getNextLocationForAckForward(nextLocation, limit); 2118 } 2119 } 2120 2121 LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations); 2122 2123 // Lock index while we update the ackMessageFileMap. 2124 indexLock.writeLock().lock(); 2125 2126 // Update the ack map with the new locations of the acks 2127 for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) { 2128 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); 2129 if (referenceFileIds == null) { 2130 referenceFileIds = new HashSet<Integer>(); 2131 referenceFileIds.addAll(entry.getValue()); 2132 metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); 2133 } else { 2134 referenceFileIds.addAll(entry.getValue()); 2135 } 2136 } 2137 2138 // remove the old location data from the ack map so that the old journal log file can 2139 // be removed on next GC. 2140 metadata.ackMessageFileMap.remove(journalToRead); 2141 2142 indexLock.writeLock().unlock(); 2143 2144 LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); 2145 } 2146 2147 private boolean shouldForward(JournalCommand<?> command) { 2148 boolean result = false; 2149 if (command != null) { 2150 if (command instanceof KahaRemoveMessageCommand) { 2151 result = true; 2152 } else if (command instanceof KahaCommitCommand) { 2153 KahaCommitCommand kahaCommitCommand = (KahaCommitCommand) command; 2154 if (kahaCommitCommand.hasTransactionInfo() && kahaCommitCommand.getTransactionInfo().hasXaTransactionId()) { 2155 result = true; 2156 } 2157 } 2158 } 2159 return result; 2160 } 2161 2162 private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) { 2163 //getNextLocation() can throw an IOException, we should handle it and set 2164 //nextLocation to null and abort gracefully 2165 //Should not happen in the normal case 2166 Location location = null; 2167 try { 2168 location = journal.getNextLocation(nextLocation, limit); 2169 } catch (IOException e) { 2170 LOG.warn("Failed to load next journal location after: {}, reason: {}", nextLocation, e); 2171 if (LOG.isDebugEnabled()) { 2172 LOG.debug("Failed to load next journal location after: {}", nextLocation, e); 2173 } 2174 } 2175 return location; 2176 } 2177 2178 final Runnable nullCompletionCallback = new Runnable() { 2179 @Override 2180 public void run() { 2181 } 2182 }; 2183 2184 private Location checkpointProducerAudit() throws IOException { 2185 if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { 2186 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2187 ObjectOutputStream oout = new ObjectOutputStream(baos); 2188 oout.writeObject(metadata.producerSequenceIdTracker); 2189 oout.flush(); 2190 oout.close(); 2191 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2192 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); 2193 try { 2194 location.getLatch().await(); 2195 if (location.getException().get() != null) { 2196 throw location.getException().get(); 2197 } 2198 } catch (InterruptedException e) { 2199 throw new InterruptedIOException(e.toString()); 2200 } 2201 return location; 2202 } 2203 return metadata.producerSequenceIdTrackerLocation; 2204 } 2205 2206 private Location checkpointAckMessageFileMap() throws IOException { 2207 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2208 ObjectOutputStream oout = new ObjectOutputStream(baos); 2209 oout.writeObject(metadata.ackMessageFileMap); 2210 oout.flush(); 2211 oout.close(); 2212 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2213 Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback); 2214 try { 2215 location.getLatch().await(); 2216 } catch (InterruptedException e) { 2217 throw new InterruptedIOException(e.toString()); 2218 } 2219 return location; 2220 } 2221 2222 private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException { 2223 2224 ByteSequence sequence = toByteSequence(subscription); 2225 Location location = journal.write(sequence, nullCompletionCallback) ; 2226 2227 try { 2228 location.getLatch().await(); 2229 } catch (InterruptedException e) { 2230 throw new InterruptedIOException(e.toString()); 2231 } 2232 return location; 2233 } 2234 2235 public HashSet<Integer> getJournalFilesBeingReplicated() { 2236 return journalFilesBeingReplicated; 2237 } 2238 2239 // ///////////////////////////////////////////////////////////////// 2240 // StoredDestination related implementation methods. 2241 // ///////////////////////////////////////////////////////////////// 2242 2243 protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 2244 2245 static class MessageKeys { 2246 final String messageId; 2247 final Location location; 2248 2249 public MessageKeys(String messageId, Location location) { 2250 this.messageId=messageId; 2251 this.location=location; 2252 } 2253 2254 @Override 2255 public String toString() { 2256 return "["+messageId+","+location+"]"; 2257 } 2258 } 2259 2260 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> { 2261 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); 2262 2263 @Override 2264 public MessageKeys readPayload(DataInput dataIn) throws IOException { 2265 return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn)); 2266 } 2267 2268 @Override 2269 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { 2270 dataOut.writeUTF(object.messageId); 2271 LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); 2272 } 2273 } 2274 2275 class LastAck { 2276 long lastAckedSequence; 2277 byte priority; 2278 2279 public LastAck(LastAck source) { 2280 this.lastAckedSequence = source.lastAckedSequence; 2281 this.priority = source.priority; 2282 } 2283 2284 public LastAck() { 2285 this.priority = MessageOrderIndex.HI; 2286 } 2287 2288 public LastAck(long ackLocation) { 2289 this.lastAckedSequence = ackLocation; 2290 this.priority = MessageOrderIndex.LO; 2291 } 2292 2293 public LastAck(long ackLocation, byte priority) { 2294 this.lastAckedSequence = ackLocation; 2295 this.priority = priority; 2296 } 2297 2298 @Override 2299 public String toString() { 2300 return "[" + lastAckedSequence + ":" + priority + "]"; 2301 } 2302 } 2303 2304 protected class LastAckMarshaller implements Marshaller<LastAck> { 2305 2306 @Override 2307 public void writePayload(LastAck object, DataOutput dataOut) throws IOException { 2308 dataOut.writeLong(object.lastAckedSequence); 2309 dataOut.writeByte(object.priority); 2310 } 2311 2312 @Override 2313 public LastAck readPayload(DataInput dataIn) throws IOException { 2314 LastAck lastAcked = new LastAck(); 2315 lastAcked.lastAckedSequence = dataIn.readLong(); 2316 if (metadata.version >= 3) { 2317 lastAcked.priority = dataIn.readByte(); 2318 } 2319 return lastAcked; 2320 } 2321 2322 @Override 2323 public int getFixedSize() { 2324 return 9; 2325 } 2326 2327 @Override 2328 public LastAck deepCopy(LastAck source) { 2329 return new LastAck(source); 2330 } 2331 2332 @Override 2333 public boolean isDeepCopySupported() { 2334 return true; 2335 } 2336 } 2337 2338 class StoredDestination { 2339 2340 MessageOrderIndex orderIndex = new MessageOrderIndex(); 2341 BTreeIndex<Location, Long> locationIndex; 2342 BTreeIndex<String, Long> messageIdIndex; 2343 2344 // These bits are only set for Topics 2345 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 2346 BTreeIndex<String, LastAck> subscriptionAcks; 2347 HashMap<String, MessageOrderCursor> subscriptionCursors; 2348 ListIndex<String, SequenceSet> ackPositions; 2349 ListIndex<String, Location> subLocations; 2350 2351 // Transient data used to track which Messages are no longer needed. 2352 final HashSet<String> subscriptionCache = new LinkedHashSet<String>(); 2353 2354 public void trackPendingAdd(Long seq) { 2355 orderIndex.trackPendingAdd(seq); 2356 } 2357 2358 public void trackPendingAddComplete(Long seq) { 2359 orderIndex.trackPendingAddComplete(seq); 2360 } 2361 2362 @Override 2363 public String toString() { 2364 return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size(); 2365 } 2366 } 2367 2368 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 2369 2370 @Override 2371 public StoredDestination readPayload(final DataInput dataIn) throws IOException { 2372 final StoredDestination value = new StoredDestination(); 2373 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2374 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong()); 2375 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 2376 2377 if (dataIn.readBoolean()) { 2378 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 2379 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong()); 2380 if (metadata.version >= 4) { 2381 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong()); 2382 } else { 2383 // upgrade 2384 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2385 @Override 2386 public void execute(Transaction tx) throws IOException { 2387 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>(); 2388 2389 if (metadata.version >= 3) { 2390 // migrate 2391 BTreeIndex<Long, HashSet<String>> oldAckPositions = 2392 new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong()); 2393 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); 2394 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); 2395 oldAckPositions.load(tx); 2396 2397 2398 // Do the initial build of the data in memory before writing into the store 2399 // based Ack Positions List to avoid a lot of disk thrashing. 2400 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx); 2401 while (iterator.hasNext()) { 2402 Entry<Long, HashSet<String>> entry = iterator.next(); 2403 2404 for(String subKey : entry.getValue()) { 2405 SequenceSet pendingAcks = temp.get(subKey); 2406 if (pendingAcks == null) { 2407 pendingAcks = new SequenceSet(); 2408 temp.put(subKey, pendingAcks); 2409 } 2410 2411 pendingAcks.add(entry.getKey()); 2412 } 2413 } 2414 } 2415 // Now move the pending messages to ack data into the store backed 2416 // structure. 2417 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2418 value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2419 value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2420 value.ackPositions.load(tx); 2421 for(String subscriptionKey : temp.keySet()) { 2422 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); 2423 } 2424 2425 } 2426 }); 2427 } 2428 2429 if (metadata.version >= 5) { 2430 value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong()); 2431 } else { 2432 // upgrade 2433 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2434 @Override 2435 public void execute(Transaction tx) throws IOException { 2436 value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2437 value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2438 value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2439 value.subLocations.load(tx); 2440 } 2441 }); 2442 } 2443 } 2444 if (metadata.version >= 2) { 2445 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2446 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2447 } else { 2448 // upgrade 2449 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2450 @Override 2451 public void execute(Transaction tx) throws IOException { 2452 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2453 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2454 value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2455 value.orderIndex.lowPriorityIndex.load(tx); 2456 2457 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2458 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2459 value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2460 value.orderIndex.highPriorityIndex.load(tx); 2461 } 2462 }); 2463 } 2464 2465 return value; 2466 } 2467 2468 @Override 2469 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 2470 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId()); 2471 dataOut.writeLong(value.locationIndex.getPageId()); 2472 dataOut.writeLong(value.messageIdIndex.getPageId()); 2473 if (value.subscriptions != null) { 2474 dataOut.writeBoolean(true); 2475 dataOut.writeLong(value.subscriptions.getPageId()); 2476 dataOut.writeLong(value.subscriptionAcks.getPageId()); 2477 dataOut.writeLong(value.ackPositions.getHeadPageId()); 2478 dataOut.writeLong(value.subLocations.getHeadPageId()); 2479 } else { 2480 dataOut.writeBoolean(false); 2481 } 2482 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); 2483 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); 2484 } 2485 } 2486 2487 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 2488 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 2489 2490 @Override 2491 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 2492 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 2493 rc.mergeFramed((InputStream)dataIn); 2494 return rc; 2495 } 2496 2497 @Override 2498 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 2499 object.writeFramed((OutputStream)dataOut); 2500 } 2501 } 2502 2503 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2504 String key = key(destination); 2505 StoredDestination rc = storedDestinations.get(key); 2506 if (rc == null) { 2507 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 2508 rc = loadStoredDestination(tx, key, topic); 2509 // Cache it. We may want to remove/unload destinations from the 2510 // cache that are not used for a while 2511 // to reduce memory usage. 2512 storedDestinations.put(key, rc); 2513 } 2514 return rc; 2515 } 2516 2517 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2518 String key = key(destination); 2519 StoredDestination rc = storedDestinations.get(key); 2520 if (rc == null && metadata.destinations.containsKey(tx, key)) { 2521 rc = getStoredDestination(destination, tx); 2522 } 2523 return rc; 2524 } 2525 2526 /** 2527 * @param tx 2528 * @param key 2529 * @param topic 2530 * @return 2531 * @throws IOException 2532 */ 2533 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 2534 // Try to load the existing indexes.. 2535 StoredDestination rc = metadata.destinations.get(tx, key); 2536 if (rc == null) { 2537 // Brand new destination.. allocate indexes for it. 2538 rc = new StoredDestination(); 2539 rc.orderIndex.allocate(tx); 2540 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate()); 2541 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 2542 2543 if (topic) { 2544 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 2545 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate()); 2546 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2547 rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2548 } 2549 metadata.destinations.put(tx, key, rc); 2550 } 2551 2552 // Configure the marshalers and load. 2553 rc.orderIndex.load(tx); 2554 2555 // Figure out the next key using the last entry in the destination. 2556 rc.orderIndex.configureLast(tx); 2557 2558 rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE); 2559 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2560 rc.locationIndex.load(tx); 2561 2562 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 2563 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2564 rc.messageIdIndex.load(tx); 2565 2566 // If it was a topic... 2567 if (topic) { 2568 2569 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 2570 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 2571 rc.subscriptions.load(tx); 2572 2573 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 2574 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); 2575 rc.subscriptionAcks.load(tx); 2576 2577 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2578 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2579 rc.ackPositions.load(tx); 2580 2581 rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2582 rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2583 rc.subLocations.load(tx); 2584 2585 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>(); 2586 2587 if (metadata.version < 3) { 2588 2589 // on upgrade need to fill ackLocation with available messages past last ack 2590 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2591 Entry<String, LastAck> entry = iterator.next(); 2592 for (Iterator<Entry<Long, MessageKeys>> orderIterator = 2593 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { 2594 Long sequence = orderIterator.next().getKey(); 2595 addAckLocation(tx, rc, sequence, entry.getKey()); 2596 } 2597 // modify so it is upgraded 2598 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue()); 2599 } 2600 } 2601 2602 2603 // Configure the subscription cache 2604 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2605 Entry<String, LastAck> entry = iterator.next(); 2606 rc.subscriptionCache.add(entry.getKey()); 2607 } 2608 2609 if (rc.orderIndex.nextMessageId == 0) { 2610 // check for existing durable sub all acked out - pull next seq from acks as messages are gone 2611 if (!rc.subscriptionAcks.isEmpty(tx)) { 2612 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 2613 Entry<String, LastAck> entry = iterator.next(); 2614 rc.orderIndex.nextMessageId = 2615 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); 2616 } 2617 } 2618 } else { 2619 // update based on ackPositions for unmatched, last entry is always the next 2620 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx); 2621 while (subscriptions.hasNext()) { 2622 Entry<String, SequenceSet> subscription = subscriptions.next(); 2623 SequenceSet pendingAcks = subscription.getValue(); 2624 if (pendingAcks != null && !pendingAcks.isEmpty()) { 2625 for (Long sequenceId : pendingAcks) { 2626 rc.orderIndex.nextMessageId = Math.max(rc.orderIndex.nextMessageId, sequenceId); 2627 } 2628 } 2629 } 2630 } 2631 } 2632 2633 if (metadata.version < VERSION) { 2634 // store again after upgrade 2635 metadata.destinations.put(tx, key, rc); 2636 } 2637 return rc; 2638 } 2639 2640 /** 2641 * This is a map to cache MessageStores for a specific 2642 * KahaDestination key 2643 */ 2644 protected final ConcurrentMap<String, MessageStore> storeCache = 2645 new ConcurrentHashMap<>(); 2646 2647 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 2648 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2649 if (sequences == null) { 2650 sequences = new SequenceSet(); 2651 sequences.add(messageSequence); 2652 sd.ackPositions.add(tx, subscriptionKey, sequences); 2653 } else { 2654 sequences.add(messageSequence); 2655 sd.ackPositions.put(tx, subscriptionKey, sequences); 2656 } 2657 } 2658 2659 // new sub is interested in potentially all existing messages 2660 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2661 SequenceSet allOutstanding = new SequenceSet(); 2662 Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx); 2663 while (iterator.hasNext()) { 2664 SequenceSet set = iterator.next().getValue(); 2665 for (Long entry : set) { 2666 allOutstanding.add(entry); 2667 } 2668 } 2669 sd.ackPositions.put(tx, subscriptionKey, allOutstanding); 2670 } 2671 2672 // on a new message add, all existing subs are interested in this message 2673 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { 2674 for(String subscriptionKey : sd.subscriptionCache) { 2675 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2676 if (sequences == null) { 2677 sequences = new SequenceSet(); 2678 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2679 sd.ackPositions.add(tx, subscriptionKey, sequences); 2680 } else { 2681 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2682 sd.ackPositions.put(tx, subscriptionKey, sequences); 2683 } 2684 } 2685 } 2686 2687 private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2688 if (!sd.ackPositions.isEmpty(tx)) { 2689 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey); 2690 if (sequences == null || sequences.isEmpty()) { 2691 return; 2692 } 2693 2694 ArrayList<Long> unreferenced = new ArrayList<Long>(); 2695 2696 for(Long sequenceId : sequences) { 2697 if(!isSequenceReferenced(tx, sd, sequenceId)) { 2698 unreferenced.add(sequenceId); 2699 } 2700 } 2701 2702 for(Long sequenceId : unreferenced) { 2703 // Find all the entries that need to get deleted. 2704 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2705 sd.orderIndex.getDeleteList(tx, deletes, sequenceId); 2706 2707 // Do the actual deletes. 2708 for (Entry<Long, MessageKeys> entry : deletes) { 2709 sd.locationIndex.remove(tx, entry.getValue().location); 2710 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2711 sd.orderIndex.remove(tx, entry.getKey()); 2712 } 2713 } 2714 } 2715 } 2716 2717 private boolean isSequenceReferenced(final Transaction tx, final StoredDestination sd, final Long sequenceId) throws IOException { 2718 for(String subscriptionKey : sd.subscriptionCache) { 2719 SequenceSet sequence = sd.ackPositions.get(tx, subscriptionKey); 2720 if (sequence != null && sequence.contains(sequenceId)) { 2721 return true; 2722 } 2723 } 2724 return false; 2725 } 2726 2727 /** 2728 * @param tx 2729 * @param sd 2730 * @param subscriptionKey 2731 * @param messageSequence 2732 * @throws IOException 2733 */ 2734 private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException { 2735 // Remove the sub from the previous location set.. 2736 if (messageSequence != null) { 2737 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); 2738 if (range != null && !range.isEmpty()) { 2739 range.remove(messageSequence); 2740 if (!range.isEmpty()) { 2741 sd.ackPositions.put(tx, subscriptionKey, range); 2742 } else { 2743 sd.ackPositions.remove(tx, subscriptionKey); 2744 } 2745 2746 // Check if the message is reference by any other subscription. 2747 if (isSequenceReferenced(tx, sd, messageSequence)) { 2748 return; 2749 } 2750 // Find all the entries that need to get deleted. 2751 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2752 sd.orderIndex.getDeleteList(tx, deletes, messageSequence); 2753 2754 // Do the actual deletes. 2755 for (Entry<Long, MessageKeys> entry : deletes) { 2756 sd.locationIndex.remove(tx, entry.getValue().location); 2757 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2758 sd.orderIndex.remove(tx, entry.getKey()); 2759 } 2760 } 2761 } 2762 } 2763 2764 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2765 return sd.subscriptionAcks.get(tx, subscriptionKey); 2766 } 2767 2768 protected SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2769 if (sd.ackPositions != null) { 2770 final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2771 return messageSequences; 2772 } 2773 2774 return null; 2775 } 2776 2777 protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2778 if (sd.ackPositions != null) { 2779 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2780 if (messageSequences != null) { 2781 long result = messageSequences.rangeSize(); 2782 // if there's anything in the range the last value is always the nextMessage marker, so remove 1. 2783 return result > 0 ? result - 1 : 0; 2784 } 2785 } 2786 2787 return 0; 2788 } 2789 2790 protected String key(KahaDestination destination) { 2791 return destination.getType().getNumber() + ":" + destination.getName(); 2792 } 2793 2794 // ///////////////////////////////////////////////////////////////// 2795 // Transaction related implementation methods. 2796 // ///////////////////////////////////////////////////////////////// 2797 @SuppressWarnings("rawtypes") 2798 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2799 @SuppressWarnings("rawtypes") 2800 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2801 2802 @SuppressWarnings("rawtypes") 2803 private List<Operation> getInflightTx(KahaTransactionInfo info) { 2804 TransactionId key = TransactionIdConversion.convert(info); 2805 List<Operation> tx; 2806 synchronized (inflightTransactions) { 2807 tx = inflightTransactions.get(key); 2808 if (tx == null) { 2809 tx = Collections.synchronizedList(new ArrayList<Operation>()); 2810 inflightTransactions.put(key, tx); 2811 } 2812 } 2813 return tx; 2814 } 2815 2816 @SuppressWarnings("unused") 2817 private TransactionId key(KahaTransactionInfo transactionInfo) { 2818 return TransactionIdConversion.convert(transactionInfo); 2819 } 2820 2821 abstract class Operation <T extends JournalCommand<T>> { 2822 final T command; 2823 final Location location; 2824 2825 public Operation(T command, Location location) { 2826 this.command = command; 2827 this.location = location; 2828 } 2829 2830 public Location getLocation() { 2831 return location; 2832 } 2833 2834 public T getCommand() { 2835 return command; 2836 } 2837 2838 abstract public void execute(Transaction tx) throws IOException; 2839 } 2840 2841 class AddOperation extends Operation<KahaAddMessageCommand> { 2842 final IndexAware runWithIndexLock; 2843 public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) { 2844 super(command, location); 2845 this.runWithIndexLock = runWithIndexLock; 2846 } 2847 2848 @Override 2849 public void execute(Transaction tx) throws IOException { 2850 long seq = updateIndex(tx, command, location); 2851 if (runWithIndexLock != null) { 2852 runWithIndexLock.sequenceAssignedWithIndexLocked(seq); 2853 } 2854 } 2855 } 2856 2857 class RemoveOperation extends Operation<KahaRemoveMessageCommand> { 2858 2859 public RemoveOperation(KahaRemoveMessageCommand command, Location location) { 2860 super(command, location); 2861 } 2862 2863 @Override 2864 public void execute(Transaction tx) throws IOException { 2865 updateIndex(tx, command, location); 2866 } 2867 } 2868 2869 // ///////////////////////////////////////////////////////////////// 2870 // Initialization related implementation methods. 2871 // ///////////////////////////////////////////////////////////////// 2872 2873 private PageFile createPageFile() throws IOException { 2874 if (indexDirectory == null) { 2875 indexDirectory = directory; 2876 } 2877 IOHelper.mkdirs(indexDirectory); 2878 PageFile index = new PageFile(indexDirectory, "db"); 2879 index.setEnableWriteThread(isEnableIndexWriteAsync()); 2880 index.setWriteBatchSize(getIndexWriteBatchSize()); 2881 index.setPageCacheSize(indexCacheSize); 2882 index.setUseLFRUEviction(isUseIndexLFRUEviction()); 2883 index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); 2884 index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); 2885 index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); 2886 index.setEnablePageCaching(isEnableIndexPageCaching()); 2887 return index; 2888 } 2889 2890 protected Journal createJournal() throws IOException { 2891 Journal manager = new Journal(); 2892 manager.setDirectory(directory); 2893 manager.setMaxFileLength(getJournalMaxFileLength()); 2894 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); 2895 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); 2896 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 2897 manager.setArchiveDataLogs(isArchiveDataLogs()); 2898 manager.setSizeAccumulator(journalSize); 2899 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); 2900 manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase())); 2901 manager.setPreallocationStrategy( 2902 Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase())); 2903 manager.setJournalDiskSyncStrategy(journalDiskSyncStrategy); 2904 if (getDirectoryArchive() != null) { 2905 IOHelper.mkdirs(getDirectoryArchive()); 2906 manager.setDirectoryArchive(getDirectoryArchive()); 2907 } 2908 return manager; 2909 } 2910 2911 private Metadata createMetadata() { 2912 Metadata md = new Metadata(); 2913 md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth()); 2914 md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack()); 2915 return md; 2916 } 2917 2918 public int getJournalMaxWriteBatchSize() { 2919 return journalMaxWriteBatchSize; 2920 } 2921 2922 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 2923 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 2924 } 2925 2926 public File getDirectory() { 2927 return directory; 2928 } 2929 2930 public void setDirectory(File directory) { 2931 this.directory = directory; 2932 } 2933 2934 public boolean isDeleteAllMessages() { 2935 return deleteAllMessages; 2936 } 2937 2938 public void setDeleteAllMessages(boolean deleteAllMessages) { 2939 this.deleteAllMessages = deleteAllMessages; 2940 } 2941 2942 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 2943 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 2944 } 2945 2946 public int getIndexWriteBatchSize() { 2947 return setIndexWriteBatchSize; 2948 } 2949 2950 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 2951 this.enableIndexWriteAsync = enableIndexWriteAsync; 2952 } 2953 2954 boolean isEnableIndexWriteAsync() { 2955 return enableIndexWriteAsync; 2956 } 2957 2958 /** 2959 * @deprecated use {@link #getJournalDiskSyncStrategyEnum} or {@link #getJournalDiskSyncStrategy} instead 2960 * @return 2961 */ 2962 public boolean isEnableJournalDiskSyncs() { 2963 return journalDiskSyncStrategy == JournalDiskSyncStrategy.ALWAYS; 2964 } 2965 2966 /** 2967 * @deprecated use {@link #setEnableJournalDiskSyncs} instead 2968 * @param syncWrites 2969 */ 2970 public void setEnableJournalDiskSyncs(boolean syncWrites) { 2971 if (syncWrites) { 2972 journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 2973 } else { 2974 journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER; 2975 } 2976 } 2977 2978 public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() { 2979 return journalDiskSyncStrategy; 2980 } 2981 2982 public String getJournalDiskSyncStrategy() { 2983 return journalDiskSyncStrategy.name(); 2984 } 2985 2986 public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) { 2987 this.journalDiskSyncStrategy = JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase()); 2988 } 2989 2990 public long getJournalDiskSyncInterval() { 2991 return journalDiskSyncInterval; 2992 } 2993 2994 public void setJournalDiskSyncInterval(long journalDiskSyncInterval) { 2995 this.journalDiskSyncInterval = journalDiskSyncInterval; 2996 } 2997 2998 public long getCheckpointInterval() { 2999 return checkpointInterval; 3000 } 3001 3002 public void setCheckpointInterval(long checkpointInterval) { 3003 this.checkpointInterval = checkpointInterval; 3004 } 3005 3006 public long getCleanupInterval() { 3007 return cleanupInterval; 3008 } 3009 3010 public void setCleanupInterval(long cleanupInterval) { 3011 this.cleanupInterval = cleanupInterval; 3012 } 3013 3014 public boolean getCleanupOnStop() { 3015 return cleanupOnStop; 3016 } 3017 3018 public void setCleanupOnStop(boolean cleanupOnStop) { 3019 this.cleanupOnStop = cleanupOnStop; 3020 } 3021 3022 public void setJournalMaxFileLength(int journalMaxFileLength) { 3023 this.journalMaxFileLength = journalMaxFileLength; 3024 } 3025 3026 public int getJournalMaxFileLength() { 3027 return journalMaxFileLength; 3028 } 3029 3030 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 3031 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); 3032 } 3033 3034 public int getMaxFailoverProducersToTrack() { 3035 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); 3036 } 3037 3038 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 3039 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); 3040 } 3041 3042 public int getFailoverProducersAuditDepth() { 3043 return this.metadata.producerSequenceIdTracker.getAuditDepth(); 3044 } 3045 3046 public PageFile getPageFile() throws IOException { 3047 if (pageFile == null) { 3048 pageFile = createPageFile(); 3049 } 3050 return pageFile; 3051 } 3052 3053 public Journal getJournal() throws IOException { 3054 if (journal == null) { 3055 journal = createJournal(); 3056 } 3057 return journal; 3058 } 3059 3060 protected Metadata getMetadata() { 3061 return metadata; 3062 } 3063 3064 public boolean isFailIfDatabaseIsLocked() { 3065 return failIfDatabaseIsLocked; 3066 } 3067 3068 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 3069 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 3070 } 3071 3072 public boolean isIgnoreMissingJournalfiles() { 3073 return ignoreMissingJournalfiles; 3074 } 3075 3076 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 3077 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 3078 } 3079 3080 public int getIndexCacheSize() { 3081 return indexCacheSize; 3082 } 3083 3084 public void setIndexCacheSize(int indexCacheSize) { 3085 this.indexCacheSize = indexCacheSize; 3086 } 3087 3088 public boolean isCheckForCorruptJournalFiles() { 3089 return checkForCorruptJournalFiles; 3090 } 3091 3092 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 3093 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 3094 } 3095 3096 public boolean isChecksumJournalFiles() { 3097 return checksumJournalFiles; 3098 } 3099 3100 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 3101 this.checksumJournalFiles = checksumJournalFiles; 3102 } 3103 3104 @Override 3105 public void setBrokerService(BrokerService brokerService) { 3106 this.brokerService = brokerService; 3107 } 3108 3109 /** 3110 * @return the archiveDataLogs 3111 */ 3112 public boolean isArchiveDataLogs() { 3113 return this.archiveDataLogs; 3114 } 3115 3116 /** 3117 * @param archiveDataLogs the archiveDataLogs to set 3118 */ 3119 public void setArchiveDataLogs(boolean archiveDataLogs) { 3120 this.archiveDataLogs = archiveDataLogs; 3121 } 3122 3123 /** 3124 * @return the directoryArchive 3125 */ 3126 public File getDirectoryArchive() { 3127 return this.directoryArchive; 3128 } 3129 3130 /** 3131 * @param directoryArchive the directoryArchive to set 3132 */ 3133 public void setDirectoryArchive(File directoryArchive) { 3134 this.directoryArchive = directoryArchive; 3135 } 3136 3137 public boolean isArchiveCorruptedIndex() { 3138 return archiveCorruptedIndex; 3139 } 3140 3141 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 3142 this.archiveCorruptedIndex = archiveCorruptedIndex; 3143 } 3144 3145 public float getIndexLFUEvictionFactor() { 3146 return indexLFUEvictionFactor; 3147 } 3148 3149 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 3150 this.indexLFUEvictionFactor = indexLFUEvictionFactor; 3151 } 3152 3153 public boolean isUseIndexLFRUEviction() { 3154 return useIndexLFRUEviction; 3155 } 3156 3157 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 3158 this.useIndexLFRUEviction = useIndexLFRUEviction; 3159 } 3160 3161 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { 3162 this.enableIndexDiskSyncs = enableIndexDiskSyncs; 3163 } 3164 3165 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { 3166 this.enableIndexRecoveryFile = enableIndexRecoveryFile; 3167 } 3168 3169 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { 3170 this.enableIndexPageCaching = enableIndexPageCaching; 3171 } 3172 3173 public boolean isEnableIndexDiskSyncs() { 3174 return enableIndexDiskSyncs; 3175 } 3176 3177 public boolean isEnableIndexRecoveryFile() { 3178 return enableIndexRecoveryFile; 3179 } 3180 3181 public boolean isEnableIndexPageCaching() { 3182 return enableIndexPageCaching; 3183 } 3184 3185 // ///////////////////////////////////////////////////////////////// 3186 // Internal conversion methods. 3187 // ///////////////////////////////////////////////////////////////// 3188 3189 class MessageOrderCursor{ 3190 long defaultCursorPosition; 3191 long lowPriorityCursorPosition; 3192 long highPriorityCursorPosition; 3193 MessageOrderCursor(){ 3194 } 3195 3196 MessageOrderCursor(long position){ 3197 this.defaultCursorPosition=position; 3198 this.lowPriorityCursorPosition=position; 3199 this.highPriorityCursorPosition=position; 3200 } 3201 3202 MessageOrderCursor(MessageOrderCursor other){ 3203 this.defaultCursorPosition=other.defaultCursorPosition; 3204 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3205 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3206 } 3207 3208 MessageOrderCursor copy() { 3209 return new MessageOrderCursor(this); 3210 } 3211 3212 void reset() { 3213 this.defaultCursorPosition=0; 3214 this.highPriorityCursorPosition=0; 3215 this.lowPriorityCursorPosition=0; 3216 } 3217 3218 void increment() { 3219 if (defaultCursorPosition!=0) { 3220 defaultCursorPosition++; 3221 } 3222 if (highPriorityCursorPosition!=0) { 3223 highPriorityCursorPosition++; 3224 } 3225 if (lowPriorityCursorPosition!=0) { 3226 lowPriorityCursorPosition++; 3227 } 3228 } 3229 3230 @Override 3231 public String toString() { 3232 return "MessageOrderCursor:[def:" + defaultCursorPosition 3233 + ", low:" + lowPriorityCursorPosition 3234 + ", high:" + highPriorityCursorPosition + "]"; 3235 } 3236 3237 public void sync(MessageOrderCursor other) { 3238 this.defaultCursorPosition=other.defaultCursorPosition; 3239 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3240 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3241 } 3242 } 3243 3244 class MessageOrderIndex { 3245 static final byte HI = 9; 3246 static final byte LO = 0; 3247 static final byte DEF = 4; 3248 3249 long nextMessageId; 3250 BTreeIndex<Long, MessageKeys> defaultPriorityIndex; 3251 BTreeIndex<Long, MessageKeys> lowPriorityIndex; 3252 BTreeIndex<Long, MessageKeys> highPriorityIndex; 3253 final MessageOrderCursor cursor = new MessageOrderCursor(); 3254 Long lastDefaultKey; 3255 Long lastHighKey; 3256 Long lastLowKey; 3257 byte lastGetPriority; 3258 final List<Long> pendingAdditions = new LinkedList<Long>(); 3259 3260 MessageKeys remove(Transaction tx, Long key) throws IOException { 3261 MessageKeys result = defaultPriorityIndex.remove(tx, key); 3262 if (result == null && highPriorityIndex!=null) { 3263 result = highPriorityIndex.remove(tx, key); 3264 if (result ==null && lowPriorityIndex!=null) { 3265 result = lowPriorityIndex.remove(tx, key); 3266 } 3267 } 3268 return result; 3269 } 3270 3271 void load(Transaction tx) throws IOException { 3272 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3273 defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 3274 defaultPriorityIndex.load(tx); 3275 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3276 lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 3277 lowPriorityIndex.load(tx); 3278 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3279 highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 3280 highPriorityIndex.load(tx); 3281 } 3282 3283 void allocate(Transaction tx) throws IOException { 3284 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3285 if (metadata.version >= 2) { 3286 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3287 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3288 } 3289 } 3290 3291 void configureLast(Transaction tx) throws IOException { 3292 // Figure out the next key using the last entry in the destination. 3293 TreeSet<Long> orderedSet = new TreeSet<Long>(); 3294 3295 addLast(orderedSet, highPriorityIndex, tx); 3296 addLast(orderedSet, defaultPriorityIndex, tx); 3297 addLast(orderedSet, lowPriorityIndex, tx); 3298 3299 if (!orderedSet.isEmpty()) { 3300 nextMessageId = orderedSet.last() + 1; 3301 } 3302 } 3303 3304 private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException { 3305 if (index != null) { 3306 Entry<Long, MessageKeys> lastEntry = index.getLast(tx); 3307 if (lastEntry != null) { 3308 orderedSet.add(lastEntry.getKey()); 3309 } 3310 } 3311 } 3312 3313 void clear(Transaction tx) throws IOException { 3314 this.remove(tx); 3315 this.resetCursorPosition(); 3316 this.allocate(tx); 3317 this.load(tx); 3318 this.configureLast(tx); 3319 } 3320 3321 void remove(Transaction tx) throws IOException { 3322 defaultPriorityIndex.clear(tx); 3323 defaultPriorityIndex.unload(tx); 3324 tx.free(defaultPriorityIndex.getPageId()); 3325 if (lowPriorityIndex != null) { 3326 lowPriorityIndex.clear(tx); 3327 lowPriorityIndex.unload(tx); 3328 3329 tx.free(lowPriorityIndex.getPageId()); 3330 } 3331 if (highPriorityIndex != null) { 3332 highPriorityIndex.clear(tx); 3333 highPriorityIndex.unload(tx); 3334 tx.free(highPriorityIndex.getPageId()); 3335 } 3336 } 3337 3338 void resetCursorPosition() { 3339 this.cursor.reset(); 3340 lastDefaultKey = null; 3341 lastHighKey = null; 3342 lastLowKey = null; 3343 } 3344 3345 void setBatch(Transaction tx, Long sequence) throws IOException { 3346 if (sequence != null) { 3347 Long nextPosition = new Long(sequence.longValue() + 1); 3348 lastDefaultKey = sequence; 3349 cursor.defaultCursorPosition = nextPosition.longValue(); 3350 lastHighKey = sequence; 3351 cursor.highPriorityCursorPosition = nextPosition.longValue(); 3352 lastLowKey = sequence; 3353 cursor.lowPriorityCursorPosition = nextPosition.longValue(); 3354 } 3355 } 3356 3357 void setBatch(Transaction tx, LastAck last) throws IOException { 3358 setBatch(tx, last.lastAckedSequence); 3359 if (cursor.defaultCursorPosition == 0 3360 && cursor.highPriorityCursorPosition == 0 3361 && cursor.lowPriorityCursorPosition == 0) { 3362 long next = last.lastAckedSequence + 1; 3363 switch (last.priority) { 3364 case DEF: 3365 cursor.defaultCursorPosition = next; 3366 cursor.highPriorityCursorPosition = next; 3367 break; 3368 case HI: 3369 cursor.highPriorityCursorPosition = next; 3370 break; 3371 case LO: 3372 cursor.lowPriorityCursorPosition = next; 3373 cursor.defaultCursorPosition = next; 3374 cursor.highPriorityCursorPosition = next; 3375 break; 3376 } 3377 } 3378 } 3379 3380 void stoppedIterating() { 3381 if (lastDefaultKey!=null) { 3382 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1; 3383 } 3384 if (lastHighKey!=null) { 3385 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1; 3386 } 3387 if (lastLowKey!=null) { 3388 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1; 3389 } 3390 lastDefaultKey = null; 3391 lastHighKey = null; 3392 lastLowKey = null; 3393 } 3394 3395 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId) 3396 throws IOException { 3397 if (defaultPriorityIndex.containsKey(tx, sequenceId)) { 3398 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); 3399 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) { 3400 getDeleteList(tx, deletes, highPriorityIndex, sequenceId); 3401 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) { 3402 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId); 3403 } 3404 } 3405 3406 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, 3407 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException { 3408 3409 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null); 3410 deletes.add(iterator.next()); 3411 } 3412 3413 long getNextMessageId() { 3414 return nextMessageId++; 3415 } 3416 3417 void revertNextMessageId() { 3418 nextMessageId--; 3419 } 3420 3421 MessageKeys get(Transaction tx, Long key) throws IOException { 3422 MessageKeys result = defaultPriorityIndex.get(tx, key); 3423 if (result == null) { 3424 result = highPriorityIndex.get(tx, key); 3425 if (result == null) { 3426 result = lowPriorityIndex.get(tx, key); 3427 lastGetPriority = LO; 3428 } else { 3429 lastGetPriority = HI; 3430 } 3431 } else { 3432 lastGetPriority = DEF; 3433 } 3434 return result; 3435 } 3436 3437 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException { 3438 if (priority == javax.jms.Message.DEFAULT_PRIORITY) { 3439 return defaultPriorityIndex.put(tx, key, value); 3440 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) { 3441 return highPriorityIndex.put(tx, key, value); 3442 } else { 3443 return lowPriorityIndex.put(tx, key, value); 3444 } 3445 } 3446 3447 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{ 3448 return new MessageOrderIterator(tx,cursor,this); 3449 } 3450 3451 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ 3452 return new MessageOrderIterator(tx,m,this); 3453 } 3454 3455 public byte lastGetPriority() { 3456 return lastGetPriority; 3457 } 3458 3459 public boolean alreadyDispatched(Long sequence) { 3460 return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) || 3461 (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) || 3462 (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence); 3463 } 3464 3465 public void trackPendingAdd(Long seq) { 3466 synchronized (pendingAdditions) { 3467 pendingAdditions.add(seq); 3468 } 3469 } 3470 3471 public void trackPendingAddComplete(Long seq) { 3472 synchronized (pendingAdditions) { 3473 pendingAdditions.remove(seq); 3474 } 3475 } 3476 3477 public Long minPendingAdd() { 3478 synchronized (pendingAdditions) { 3479 if (!pendingAdditions.isEmpty()) { 3480 return pendingAdditions.get(0); 3481 } else { 3482 return null; 3483 } 3484 } 3485 } 3486 3487 3488 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ 3489 Iterator<Entry<Long, MessageKeys>>currentIterator; 3490 final Iterator<Entry<Long, MessageKeys>>highIterator; 3491 final Iterator<Entry<Long, MessageKeys>>defaultIterator; 3492 final Iterator<Entry<Long, MessageKeys>>lowIterator; 3493 3494 MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException { 3495 Long pendingAddLimiter = messageOrderIndex.minPendingAdd(); 3496 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter); 3497 if (highPriorityIndex != null) { 3498 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter); 3499 } else { 3500 this.highIterator = null; 3501 } 3502 if (lowPriorityIndex != null) { 3503 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter); 3504 } else { 3505 this.lowIterator = null; 3506 } 3507 } 3508 3509 @Override 3510 public boolean hasNext() { 3511 if (currentIterator == null) { 3512 if (highIterator != null) { 3513 if (highIterator.hasNext()) { 3514 currentIterator = highIterator; 3515 return currentIterator.hasNext(); 3516 } 3517 if (defaultIterator.hasNext()) { 3518 currentIterator = defaultIterator; 3519 return currentIterator.hasNext(); 3520 } 3521 if (lowIterator.hasNext()) { 3522 currentIterator = lowIterator; 3523 return currentIterator.hasNext(); 3524 } 3525 return false; 3526 } else { 3527 currentIterator = defaultIterator; 3528 return currentIterator.hasNext(); 3529 } 3530 } 3531 if (highIterator != null) { 3532 if (currentIterator.hasNext()) { 3533 return true; 3534 } 3535 if (currentIterator == highIterator) { 3536 if (defaultIterator.hasNext()) { 3537 currentIterator = defaultIterator; 3538 return currentIterator.hasNext(); 3539 } 3540 if (lowIterator.hasNext()) { 3541 currentIterator = lowIterator; 3542 return currentIterator.hasNext(); 3543 } 3544 return false; 3545 } 3546 3547 if (currentIterator == defaultIterator) { 3548 if (lowIterator.hasNext()) { 3549 currentIterator = lowIterator; 3550 return currentIterator.hasNext(); 3551 } 3552 return false; 3553 } 3554 } 3555 return currentIterator.hasNext(); 3556 } 3557 3558 @Override 3559 public Entry<Long, MessageKeys> next() { 3560 Entry<Long, MessageKeys> result = currentIterator.next(); 3561 if (result != null) { 3562 Long key = result.getKey(); 3563 if (highIterator != null) { 3564 if (currentIterator == defaultIterator) { 3565 lastDefaultKey = key; 3566 } else if (currentIterator == highIterator) { 3567 lastHighKey = key; 3568 } else { 3569 lastLowKey = key; 3570 } 3571 } else { 3572 lastDefaultKey = key; 3573 } 3574 } 3575 return result; 3576 } 3577 3578 @Override 3579 public void remove() { 3580 throw new UnsupportedOperationException(); 3581 } 3582 3583 } 3584 } 3585 3586 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> { 3587 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); 3588 3589 @Override 3590 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException { 3591 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 3592 ObjectOutputStream oout = new ObjectOutputStream(baos); 3593 oout.writeObject(object); 3594 oout.flush(); 3595 oout.close(); 3596 byte[] data = baos.toByteArray(); 3597 dataOut.writeInt(data.length); 3598 dataOut.write(data); 3599 } 3600 3601 @Override 3602 @SuppressWarnings("unchecked") 3603 public HashSet<String> readPayload(DataInput dataIn) throws IOException { 3604 int dataLen = dataIn.readInt(); 3605 byte[] data = new byte[dataLen]; 3606 dataIn.readFully(data); 3607 ByteArrayInputStream bais = new ByteArrayInputStream(data); 3608 ObjectInputStream oin = new ObjectInputStream(bais); 3609 try { 3610 return (HashSet<String>) oin.readObject(); 3611 } catch (ClassNotFoundException cfe) { 3612 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe); 3613 ioe.initCause(cfe); 3614 throw ioe; 3615 } 3616 } 3617 } 3618 3619 public File getIndexDirectory() { 3620 return indexDirectory; 3621 } 3622 3623 public void setIndexDirectory(File indexDirectory) { 3624 this.indexDirectory = indexDirectory; 3625 } 3626 3627 interface IndexAware { 3628 public void sequenceAssignedWithIndexLocked(long index); 3629 } 3630 3631 public String getPreallocationScope() { 3632 return preallocationScope; 3633 } 3634 3635 public void setPreallocationScope(String preallocationScope) { 3636 this.preallocationScope = preallocationScope; 3637 } 3638 3639 public String getPreallocationStrategy() { 3640 return preallocationStrategy; 3641 } 3642 3643 public void setPreallocationStrategy(String preallocationStrategy) { 3644 this.preallocationStrategy = preallocationStrategy; 3645 } 3646 3647 public int getCompactAcksAfterNoGC() { 3648 return compactAcksAfterNoGC; 3649 } 3650 3651 /** 3652 * Sets the number of GC cycles where no journal logs were removed before an attempt to 3653 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 3654 * <p> 3655 * A value of -1 will disable this feature. 3656 * 3657 * @param compactAcksAfterNoGC 3658 * Number of empty GC cycles before we rewrite old ACKS. 3659 */ 3660 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 3661 this.compactAcksAfterNoGC = compactAcksAfterNoGC; 3662 } 3663 3664 /** 3665 * Returns whether Ack compaction will ignore that the store is still growing 3666 * and run more often. 3667 * 3668 * @return the compactAcksIgnoresStoreGrowth current value. 3669 */ 3670 public boolean isCompactAcksIgnoresStoreGrowth() { 3671 return compactAcksIgnoresStoreGrowth; 3672 } 3673 3674 /** 3675 * Configure if Ack compaction will occur regardless of continued growth of the 3676 * journal logs meaning that the store has not run out of space yet. Because the 3677 * compaction operation can be costly this value is defaulted to off and the Ack 3678 * compaction is only done when it seems that the store cannot grow and larger. 3679 * 3680 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 3681 */ 3682 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 3683 this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth; 3684 } 3685 3686 /** 3687 * Returns whether Ack compaction is enabled 3688 * 3689 * @return enableAckCompaction 3690 */ 3691 public boolean isEnableAckCompaction() { 3692 return enableAckCompaction; 3693 } 3694 3695 /** 3696 * Configure if the Ack compaction task should be enabled to run 3697 * 3698 * @param enableAckCompaction 3699 */ 3700 public void setEnableAckCompaction(boolean enableAckCompaction) { 3701 this.enableAckCompaction = enableAckCompaction; 3702 } 3703}