001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET; 020 021import java.io.ByteArrayInputStream; 022import java.io.ByteArrayOutputStream; 023import java.io.DataInput; 024import java.io.DataOutput; 025import java.io.EOFException; 026import java.io.File; 027import java.io.IOException; 028import java.io.InputStream; 029import java.io.InterruptedIOException; 030import java.io.ObjectInputStream; 031import java.io.ObjectOutputStream; 032import java.io.OutputStream; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.Collections; 037import java.util.Date; 038import java.util.HashMap; 039import java.util.HashSet; 040import java.util.Iterator; 041import java.util.LinkedHashMap; 042import java.util.LinkedHashSet; 043import java.util.LinkedList; 044import java.util.List; 045import java.util.Map; 046import java.util.Map.Entry; 047import java.util.Set; 048import java.util.SortedSet; 049import java.util.TreeMap; 050import java.util.TreeSet; 051 052import java.util.concurrent.Executors; 053import java.util.concurrent.ScheduledExecutorService; 054import java.util.concurrent.ThreadFactory; 055import java.util.concurrent.TimeUnit; 056import java.util.concurrent.atomic.AtomicBoolean; 057import java.util.concurrent.atomic.AtomicLong; 058import java.util.concurrent.atomic.AtomicReference; 059import java.util.concurrent.locks.ReentrantReadWriteLock; 060 061import org.apache.activemq.ActiveMQMessageAuditNoSync; 062import org.apache.activemq.broker.BrokerService; 063import org.apache.activemq.broker.BrokerServiceAware; 064import org.apache.activemq.command.MessageAck; 065import org.apache.activemq.command.TransactionId; 066import org.apache.activemq.openwire.OpenWireFormat; 067import org.apache.activemq.protobuf.Buffer; 068import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand; 069import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; 070import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 071import org.apache.activemq.store.kahadb.data.KahaDestination; 072import org.apache.activemq.store.kahadb.data.KahaEntryType; 073import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 074import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand; 075import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; 076import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; 077import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand; 078import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 079import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; 080import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 081import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 082import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; 083import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 084import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; 085import org.apache.activemq.store.kahadb.disk.index.ListIndex; 086import org.apache.activemq.store.kahadb.disk.journal.DataFile; 087import org.apache.activemq.store.kahadb.disk.journal.Journal; 088import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy; 089import org.apache.activemq.store.kahadb.disk.journal.Location; 090import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender; 091import org.apache.activemq.store.kahadb.disk.page.Page; 092import org.apache.activemq.store.kahadb.disk.page.PageFile; 093import org.apache.activemq.store.kahadb.disk.page.Transaction; 094import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; 095import org.apache.activemq.store.kahadb.disk.util.LongMarshaller; 096import org.apache.activemq.store.kahadb.disk.util.Marshaller; 097import org.apache.activemq.store.kahadb.disk.util.Sequence; 098import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 099import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 100import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 101import org.apache.activemq.util.ByteSequence; 102import org.apache.activemq.util.DataByteArrayInputStream; 103import org.apache.activemq.util.DataByteArrayOutputStream; 104import org.apache.activemq.util.IOExceptionSupport; 105import org.apache.activemq.util.IOHelper; 106import org.apache.activemq.util.ServiceStopper; 107import org.apache.activemq.util.ServiceSupport; 108import org.apache.activemq.util.ThreadPoolUtils; 109import org.slf4j.Logger; 110import org.slf4j.LoggerFactory; 111import org.slf4j.MDC; 112 113public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware { 114 115 protected BrokerService brokerService; 116 117 public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME"; 118 public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0); 119 public static final File DEFAULT_DIRECTORY = new File("KahaDB"); 120 protected static final Buffer UNMATCHED; 121 static { 122 UNMATCHED = new Buffer(new byte[]{}); 123 } 124 private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class); 125 126 static final int CLOSED_STATE = 1; 127 static final int OPEN_STATE = 2; 128 static final long NOT_ACKED = -1; 129 130 static final int VERSION = 5; 131 132 static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1; 133 134 protected class Metadata { 135 protected Page<Metadata> page; 136 protected int state; 137 protected BTreeIndex<String, StoredDestination> destinations; 138 protected Location lastUpdate; 139 protected Location firstInProgressTransactionLocation; 140 protected Location producerSequenceIdTrackerLocation = null; 141 protected Location ackMessageFileMapLocation = null; 142 protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync(); 143 protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>(); 144 protected int version = VERSION; 145 protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION; 146 147 public void read(DataInput is) throws IOException { 148 state = is.readInt(); 149 destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong()); 150 if (is.readBoolean()) { 151 lastUpdate = LocationMarshaller.INSTANCE.readPayload(is); 152 } else { 153 lastUpdate = null; 154 } 155 if (is.readBoolean()) { 156 firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is); 157 } else { 158 firstInProgressTransactionLocation = null; 159 } 160 try { 161 if (is.readBoolean()) { 162 producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is); 163 } else { 164 producerSequenceIdTrackerLocation = null; 165 } 166 } catch (EOFException expectedOnUpgrade) { 167 } 168 try { 169 version = is.readInt(); 170 } catch (EOFException expectedOnUpgrade) { 171 version = 1; 172 } 173 if (version >= 5 && is.readBoolean()) { 174 ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is); 175 } else { 176 ackMessageFileMapLocation = null; 177 } 178 try { 179 openwireVersion = is.readInt(); 180 } catch (EOFException expectedOnUpgrade) { 181 openwireVersion = OpenWireFormat.DEFAULT_VERSION; 182 } 183 LOG.info("KahaDB is version " + version); 184 } 185 186 public void write(DataOutput os) throws IOException { 187 os.writeInt(state); 188 os.writeLong(destinations.getPageId()); 189 190 if (lastUpdate != null) { 191 os.writeBoolean(true); 192 LocationMarshaller.INSTANCE.writePayload(lastUpdate, os); 193 } else { 194 os.writeBoolean(false); 195 } 196 197 if (firstInProgressTransactionLocation != null) { 198 os.writeBoolean(true); 199 LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os); 200 } else { 201 os.writeBoolean(false); 202 } 203 204 if (producerSequenceIdTrackerLocation != null) { 205 os.writeBoolean(true); 206 LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os); 207 } else { 208 os.writeBoolean(false); 209 } 210 os.writeInt(VERSION); 211 if (ackMessageFileMapLocation != null) { 212 os.writeBoolean(true); 213 LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os); 214 } else { 215 os.writeBoolean(false); 216 } 217 os.writeInt(this.openwireVersion); 218 } 219 } 220 221 class MetadataMarshaller extends VariableMarshaller<Metadata> { 222 @Override 223 public Metadata readPayload(DataInput dataIn) throws IOException { 224 Metadata rc = createMetadata(); 225 rc.read(dataIn); 226 return rc; 227 } 228 229 @Override 230 public void writePayload(Metadata object, DataOutput dataOut) throws IOException { 231 object.write(dataOut); 232 } 233 } 234 235 protected PageFile pageFile; 236 protected Journal journal; 237 protected Metadata metadata = new Metadata(); 238 239 protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); 240 241 protected boolean failIfDatabaseIsLocked; 242 243 protected boolean deleteAllMessages; 244 protected File directory = DEFAULT_DIRECTORY; 245 protected File indexDirectory = null; 246 protected ScheduledExecutorService scheduler; 247 private final Object schedulerLock = new Object(); 248 249 protected String journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name(); 250 protected boolean archiveDataLogs; 251 protected File directoryArchive; 252 protected AtomicLong journalSize = new AtomicLong(0); 253 long journalDiskSyncInterval = 1000; 254 long checkpointInterval = 5*1000; 255 long cleanupInterval = 30*1000; 256 int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 257 int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 258 boolean enableIndexWriteAsync = false; 259 int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 260 private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name(); 261 private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name(); 262 263 protected AtomicBoolean opened = new AtomicBoolean(); 264 private boolean ignoreMissingJournalfiles = false; 265 private int indexCacheSize = 10000; 266 private boolean checkForCorruptJournalFiles = false; 267 private boolean checksumJournalFiles = true; 268 protected boolean forceRecoverIndex = false; 269 270 private boolean archiveCorruptedIndex = false; 271 private boolean useIndexLFRUEviction = false; 272 private float indexLFUEvictionFactor = 0.2f; 273 private boolean enableIndexDiskSyncs = true; 274 private boolean enableIndexRecoveryFile = true; 275 private boolean enableIndexPageCaching = true; 276 ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock(); 277 278 private boolean enableAckCompaction = false; 279 private int compactAcksAfterNoGC = 10; 280 private boolean compactAcksIgnoresStoreGrowth = false; 281 private int checkPointCyclesWithNoGC; 282 private int journalLogOnLastCompactionCheck; 283 284 //only set when using JournalDiskSyncStrategy.PERIODIC 285 protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>(); 286 287 @Override 288 public void doStart() throws Exception { 289 load(); 290 } 291 292 @Override 293 public void doStop(ServiceStopper stopper) throws Exception { 294 unload(); 295 } 296 297 public void allowIOResumption() { 298 if (pageFile != null) { 299 pageFile.allowIOResumption(); 300 } 301 if (journal != null) { 302 journal.allowIOResumption(); 303 } 304 } 305 306 private void loadPageFile() throws IOException { 307 this.indexLock.writeLock().lock(); 308 try { 309 final PageFile pageFile = getPageFile(); 310 pageFile.load(); 311 pageFile.tx().execute(new Transaction.Closure<IOException>() { 312 @Override 313 public void execute(Transaction tx) throws IOException { 314 if (pageFile.getPageCount() == 0) { 315 // First time this is created.. Initialize the metadata 316 Page<Metadata> page = tx.allocate(); 317 assert page.getPageId() == 0; 318 page.set(metadata); 319 metadata.page = page; 320 metadata.state = CLOSED_STATE; 321 metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId()); 322 323 tx.store(metadata.page, metadataMarshaller, true); 324 } else { 325 Page<Metadata> page = tx.load(0, metadataMarshaller); 326 metadata = page.get(); 327 metadata.page = page; 328 } 329 metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); 330 metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); 331 metadata.destinations.load(tx); 332 } 333 }); 334 // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. 335 // Perhaps we should just keep an index of file 336 storedDestinations.clear(); 337 pageFile.tx().execute(new Transaction.Closure<IOException>() { 338 @Override 339 public void execute(Transaction tx) throws IOException { 340 for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { 341 Entry<String, StoredDestination> entry = iterator.next(); 342 StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); 343 storedDestinations.put(entry.getKey(), sd); 344 345 if (checkForCorruptJournalFiles) { 346 // sanity check the index also 347 if (!entry.getValue().locationIndex.isEmpty(tx)) { 348 if (entry.getValue().orderIndex.nextMessageId <= 0) { 349 throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); 350 } 351 } 352 } 353 } 354 } 355 }); 356 pageFile.flush(); 357 } finally { 358 this.indexLock.writeLock().unlock(); 359 } 360 } 361 362 private void startCheckpoint() { 363 if (checkpointInterval == 0 && cleanupInterval == 0) { 364 LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart"); 365 return; 366 } 367 synchronized (schedulerLock) { 368 if (scheduler == null || scheduler.isShutdown()) { 369 scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { 370 371 @Override 372 public Thread newThread(Runnable r) { 373 Thread schedulerThread = new Thread(r); 374 375 schedulerThread.setName("ActiveMQ Journal Checkpoint Worker"); 376 schedulerThread.setDaemon(true); 377 378 return schedulerThread; 379 } 380 }); 381 382 // Short intervals for check-point and cleanups 383 long delay; 384 if (journal.isJournalDiskSyncPeriodic()) { 385 delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500); 386 } else { 387 delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500); 388 } 389 390 scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS); 391 } 392 } 393 } 394 395 private final class CheckpointRunner implements Runnable { 396 397 private long lastCheckpoint = System.currentTimeMillis(); 398 private long lastCleanup = System.currentTimeMillis(); 399 private long lastSync = System.currentTimeMillis(); 400 private Location lastAsyncUpdate = null; 401 402 @Override 403 public void run() { 404 try { 405 // Decide on cleanup vs full checkpoint here. 406 if (opened.get()) { 407 long now = System.currentTimeMillis(); 408 if (journal.isJournalDiskSyncPeriodic() && 409 journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) { 410 Location currentUpdate = lastAsyncJournalUpdate.get(); 411 if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) { 412 lastAsyncUpdate = currentUpdate; 413 if (LOG.isTraceEnabled()) { 414 LOG.trace("Writing trace command to trigger journal sync"); 415 } 416 store(new KahaTraceCommand(), true, null, null); 417 } 418 lastSync = now; 419 } 420 if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) { 421 checkpointCleanup(true); 422 lastCleanup = now; 423 lastCheckpoint = now; 424 } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) { 425 checkpointCleanup(false); 426 lastCheckpoint = now; 427 } 428 } 429 } catch (IOException ioe) { 430 LOG.error("Checkpoint failed", ioe); 431 brokerService.handleIOException(ioe); 432 } catch (Throwable e) { 433 LOG.error("Checkpoint failed", e); 434 brokerService.handleIOException(IOExceptionSupport.create(e)); 435 } 436 } 437 } 438 439 public void open() throws IOException { 440 if( opened.compareAndSet(false, true) ) { 441 getJournal().start(); 442 try { 443 loadPageFile(); 444 } catch (Throwable t) { 445 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); 446 if (LOG.isDebugEnabled()) { 447 LOG.debug("Index load failure", t); 448 } 449 // try to recover index 450 try { 451 pageFile.unload(); 452 } catch (Exception ignore) {} 453 if (archiveCorruptedIndex) { 454 pageFile.archive(); 455 } else { 456 pageFile.delete(); 457 } 458 metadata = createMetadata(); 459 pageFile = null; 460 loadPageFile(); 461 } 462 startCheckpoint(); 463 recover(); 464 } 465 } 466 467 public void load() throws IOException { 468 this.indexLock.writeLock().lock(); 469 IOHelper.mkdirs(directory); 470 try { 471 if (deleteAllMessages) { 472 getJournal().start(); 473 getJournal().delete(); 474 getJournal().close(); 475 journal = null; 476 getPageFile().delete(); 477 LOG.info("Persistence store purged."); 478 deleteAllMessages = false; 479 } 480 481 open(); 482 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 483 } finally { 484 this.indexLock.writeLock().unlock(); 485 } 486 } 487 488 public void close() throws IOException, InterruptedException { 489 if( opened.compareAndSet(true, false)) { 490 checkpointLock.writeLock().lock(); 491 try { 492 if (metadata.page != null) { 493 checkpointUpdate(true); 494 } 495 pageFile.unload(); 496 metadata = createMetadata(); 497 } finally { 498 checkpointLock.writeLock().unlock(); 499 } 500 journal.close(); 501 synchronized(schedulerLock) { 502 if (scheduler != null) { 503 ThreadPoolUtils.shutdownGraceful(scheduler, -1); 504 scheduler = null; 505 } 506 } 507 journalSize.set(0); 508 } 509 } 510 511 public void unload() throws IOException, InterruptedException { 512 this.indexLock.writeLock().lock(); 513 try { 514 if( pageFile != null && pageFile.isLoaded() ) { 515 metadata.state = CLOSED_STATE; 516 metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; 517 518 if (metadata.page != null) { 519 pageFile.tx().execute(new Transaction.Closure<IOException>() { 520 @Override 521 public void execute(Transaction tx) throws IOException { 522 tx.store(metadata.page, metadataMarshaller, true); 523 } 524 }); 525 } 526 } 527 } finally { 528 this.indexLock.writeLock().unlock(); 529 } 530 close(); 531 } 532 533 // public for testing 534 @SuppressWarnings("rawtypes") 535 public Location[] getInProgressTxLocationRange() { 536 Location[] range = new Location[]{null, null}; 537 synchronized (inflightTransactions) { 538 if (!inflightTransactions.isEmpty()) { 539 for (List<Operation> ops : inflightTransactions.values()) { 540 if (!ops.isEmpty()) { 541 trackMaxAndMin(range, ops); 542 } 543 } 544 } 545 if (!preparedTransactions.isEmpty()) { 546 for (List<Operation> ops : preparedTransactions.values()) { 547 if (!ops.isEmpty()) { 548 trackMaxAndMin(range, ops); 549 } 550 } 551 } 552 } 553 return range; 554 } 555 556 @SuppressWarnings("rawtypes") 557 private void trackMaxAndMin(Location[] range, List<Operation> ops) { 558 Location t = ops.get(0).getLocation(); 559 if (range[0] == null || t.compareTo(range[0]) <= 0) { 560 range[0] = t; 561 } 562 t = ops.get(ops.size() -1).getLocation(); 563 if (range[1] == null || t.compareTo(range[1]) >= 0) { 564 range[1] = t; 565 } 566 } 567 568 class TranInfo { 569 TransactionId id; 570 Location location; 571 572 class opCount { 573 int add; 574 int remove; 575 } 576 HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>(); 577 578 @SuppressWarnings("rawtypes") 579 public void track(Operation operation) { 580 if (location == null ) { 581 location = operation.getLocation(); 582 } 583 KahaDestination destination; 584 boolean isAdd = false; 585 if (operation instanceof AddOperation) { 586 AddOperation add = (AddOperation) operation; 587 destination = add.getCommand().getDestination(); 588 isAdd = true; 589 } else { 590 RemoveOperation removeOpperation = (RemoveOperation) operation; 591 destination = removeOpperation.getCommand().getDestination(); 592 } 593 opCount opCount = destinationOpCount.get(destination); 594 if (opCount == null) { 595 opCount = new opCount(); 596 destinationOpCount.put(destination, opCount); 597 } 598 if (isAdd) { 599 opCount.add++; 600 } else { 601 opCount.remove++; 602 } 603 } 604 605 @Override 606 public String toString() { 607 StringBuffer buffer = new StringBuffer(); 608 buffer.append(location).append(";").append(id).append(";\n"); 609 for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) { 610 buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); 611 } 612 return buffer.toString(); 613 } 614 } 615 616 @SuppressWarnings("rawtypes") 617 public String getTransactions() { 618 619 ArrayList<TranInfo> infos = new ArrayList<TranInfo>(); 620 synchronized (inflightTransactions) { 621 if (!inflightTransactions.isEmpty()) { 622 for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) { 623 TranInfo info = new TranInfo(); 624 info.id = entry.getKey(); 625 for (Operation operation : entry.getValue()) { 626 info.track(operation); 627 } 628 infos.add(info); 629 } 630 } 631 } 632 synchronized (preparedTransactions) { 633 if (!preparedTransactions.isEmpty()) { 634 for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) { 635 TranInfo info = new TranInfo(); 636 info.id = entry.getKey(); 637 for (Operation operation : entry.getValue()) { 638 info.track(operation); 639 } 640 infos.add(info); 641 } 642 } 643 } 644 return infos.toString(); 645 } 646 647 /** 648 * Move all the messages that were in the journal into long term storage. We 649 * just replay and do a checkpoint. 650 * 651 * @throws IOException 652 * @throws IOException 653 * @throws IllegalStateException 654 */ 655 private void recover() throws IllegalStateException, IOException { 656 this.indexLock.writeLock().lock(); 657 try { 658 659 long start = System.currentTimeMillis(); 660 Location afterProducerAudit = recoverProducerAudit(); 661 Location afterAckMessageFile = recoverAckMessageFileMap(); 662 Location lastIndoubtPosition = getRecoveryPosition(); 663 664 if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) { 665 // valid checkpoint, possible recover from afterAckMessageFile 666 afterProducerAudit = null; 667 } 668 Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile); 669 recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); 670 671 if (recoveryPosition != null) { 672 int redoCounter = 0; 673 int dataFileRotationTracker = recoveryPosition.getDataFileId(); 674 LOG.info("Recovering from the journal @" + recoveryPosition); 675 while (recoveryPosition != null) { 676 try { 677 JournalCommand<?> message = load(recoveryPosition); 678 metadata.lastUpdate = recoveryPosition; 679 process(message, recoveryPosition, lastIndoubtPosition); 680 redoCounter++; 681 } catch (IOException failedRecovery) { 682 if (isIgnoreMissingJournalfiles()) { 683 LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); 684 // track this dud location 685 journal.corruptRecoveryLocation(recoveryPosition); 686 } else { 687 throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery); 688 } 689 } 690 recoveryPosition = journal.getNextLocation(recoveryPosition); 691 // hold on to the minimum number of open files during recovery 692 if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) { 693 dataFileRotationTracker = recoveryPosition.getDataFileId(); 694 journal.cleanup(); 695 } 696 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { 697 LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered .."); 698 } 699 } 700 if (LOG.isInfoEnabled()) { 701 long end = System.currentTimeMillis(); 702 LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); 703 } 704 } 705 706 // We may have to undo some index updates. 707 pageFile.tx().execute(new Transaction.Closure<IOException>() { 708 @Override 709 public void execute(Transaction tx) throws IOException { 710 recoverIndex(tx); 711 } 712 }); 713 714 // rollback any recovered inflight local transactions, and discard any inflight XA transactions. 715 Set<TransactionId> toRollback = new HashSet<TransactionId>(); 716 Set<TransactionId> toDiscard = new HashSet<TransactionId>(); 717 synchronized (inflightTransactions) { 718 for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { 719 TransactionId id = it.next(); 720 if (id.isLocalTransaction()) { 721 toRollback.add(id); 722 } else { 723 toDiscard.add(id); 724 } 725 } 726 for (TransactionId tx: toRollback) { 727 if (LOG.isDebugEnabled()) { 728 LOG.debug("rolling back recovered indoubt local transaction " + tx); 729 } 730 store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null); 731 } 732 for (TransactionId tx: toDiscard) { 733 if (LOG.isDebugEnabled()) { 734 LOG.debug("discarding recovered in-flight XA transaction " + tx); 735 } 736 inflightTransactions.remove(tx); 737 } 738 } 739 740 synchronized (preparedTransactions) { 741 for (TransactionId txId : preparedTransactions.keySet()) { 742 LOG.warn("Recovered prepared XA TX: [{}]", txId); 743 } 744 } 745 746 } finally { 747 this.indexLock.writeLock().unlock(); 748 } 749 } 750 751 @SuppressWarnings("unused") 752 private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { 753 return TransactionIdConversion.convertToLocal(tx); 754 } 755 756 private Location minimum(Location x, 757 Location y) { 758 Location min = null; 759 if (x != null) { 760 min = x; 761 if (y != null) { 762 int compare = y.compareTo(x); 763 if (compare < 0) { 764 min = y; 765 } 766 } 767 } else { 768 min = y; 769 } 770 return min; 771 } 772 773 private Location recoverProducerAudit() throws IOException { 774 if (metadata.producerSequenceIdTrackerLocation != null) { 775 try { 776 KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation); 777 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput()); 778 int maxNumProducers = getMaxFailoverProducersToTrack(); 779 int maxAuditDepth = getFailoverProducersAuditDepth(); 780 metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject(); 781 metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth); 782 metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers); 783 return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation); 784 } catch (Exception e) { 785 LOG.warn("Cannot recover message audit", e); 786 return journal.getNextLocation(null); 787 } 788 } else { 789 // got no audit stored so got to recreate via replay from start of the journal 790 return journal.getNextLocation(null); 791 } 792 } 793 794 @SuppressWarnings("unchecked") 795 private Location recoverAckMessageFileMap() throws IOException { 796 if (metadata.ackMessageFileMapLocation != null) { 797 try { 798 KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation); 799 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput()); 800 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject(); 801 return getNextInitializedLocation(metadata.ackMessageFileMapLocation); 802 } catch (Exception e) { 803 LOG.warn("Cannot recover ackMessageFileMap", e); 804 return journal.getNextLocation(null); 805 } 806 } else { 807 // got no ackMessageFileMap stored so got to recreate via replay from start of the journal 808 return journal.getNextLocation(null); 809 } 810 } 811 812 protected void recoverIndex(Transaction tx) throws IOException { 813 long start = System.currentTimeMillis(); 814 // It is possible index updates got applied before the journal updates.. 815 // in that case we need to removed references to messages that are not in the journal 816 final Location lastAppendLocation = journal.getLastAppendLocation(); 817 long undoCounter=0; 818 819 // Go through all the destinations to see if they have messages past the lastAppendLocation 820 for (StoredDestination sd : storedDestinations.values()) { 821 822 final ArrayList<Long> matches = new ArrayList<Long>(); 823 // Find all the Locations that are >= than the last Append Location. 824 sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) { 825 @Override 826 protected void matched(Location key, Long value) { 827 matches.add(value); 828 } 829 }); 830 831 for (Long sequenceId : matches) { 832 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 833 sd.locationIndex.remove(tx, keys.location); 834 sd.messageIdIndex.remove(tx, keys.messageId); 835 metadata.producerSequenceIdTracker.rollback(keys.messageId); 836 undoCounter++; 837 // TODO: do we need to modify the ack positions for the pub sub case? 838 } 839 } 840 841 if (undoCounter > 0) { 842 // The rolledback operations are basically in flight journal writes. To avoid getting 843 // these the end user should do sync writes to the journal. 844 if (LOG.isInfoEnabled()) { 845 long end = System.currentTimeMillis(); 846 LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 847 } 848 } 849 850 undoCounter = 0; 851 start = System.currentTimeMillis(); 852 853 // Lets be extra paranoid here and verify that all the datafiles being referenced 854 // by the indexes still exists. 855 856 final SequenceSet ss = new SequenceSet(); 857 for (StoredDestination sd : storedDestinations.values()) { 858 // Use a visitor to cut down the number of pages that we load 859 sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 860 int last=-1; 861 862 @Override 863 public boolean isInterestedInKeysBetween(Location first, Location second) { 864 if( first==null ) { 865 return !ss.contains(0, second.getDataFileId()); 866 } else if( second==null ) { 867 return true; 868 } else { 869 return !ss.contains(first.getDataFileId(), second.getDataFileId()); 870 } 871 } 872 873 @Override 874 public void visit(List<Location> keys, List<Long> values) { 875 for (Location l : keys) { 876 int fileId = l.getDataFileId(); 877 if( last != fileId ) { 878 ss.add(fileId); 879 last = fileId; 880 } 881 } 882 } 883 884 }); 885 } 886 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 887 while (!ss.isEmpty()) { 888 missingJournalFiles.add((int) ss.removeFirst()); 889 } 890 891 for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) { 892 missingJournalFiles.add(entry.getKey()); 893 for (Integer i : entry.getValue()) { 894 missingJournalFiles.add(i); 895 } 896 } 897 898 missingJournalFiles.removeAll(journal.getFileMap().keySet()); 899 900 if (!missingJournalFiles.isEmpty()) { 901 LOG.warn("Some journal files are missing: " + missingJournalFiles); 902 } 903 904 ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>(); 905 ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>(); 906 for (Integer missing : missingJournalFiles) { 907 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0))); 908 } 909 910 if (checkForCorruptJournalFiles) { 911 Collection<DataFile> dataFiles = journal.getFileMap().values(); 912 for (DataFile dataFile : dataFiles) { 913 int id = dataFile.getDataFileId(); 914 // eof to next file id 915 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0))); 916 Sequence seq = dataFile.getCorruptedBlocks().getHead(); 917 while (seq != null) { 918 BTreeVisitor.BetweenVisitor<Location, Long> visitor = 919 new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)); 920 missingPredicates.add(visitor); 921 knownCorruption.add(visitor); 922 seq = seq.getNext(); 923 } 924 } 925 } 926 927 if (!missingPredicates.isEmpty()) { 928 for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) { 929 final StoredDestination sd = sdEntry.getValue(); 930 final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>(); 931 sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) { 932 @Override 933 protected void matched(Location key, Long value) { 934 matches.put(value, key); 935 } 936 }); 937 938 // If some message references are affected by the missing data files... 939 if (!matches.isEmpty()) { 940 941 // We either 'gracefully' recover dropping the missing messages or 942 // we error out. 943 if( ignoreMissingJournalfiles ) { 944 // Update the index to remove the references to the missing data 945 for (Long sequenceId : matches.keySet()) { 946 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 947 sd.locationIndex.remove(tx, keys.location); 948 sd.messageIdIndex.remove(tx, keys.messageId); 949 LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); 950 undoCounter++; 951 // TODO: do we need to modify the ack positions for the pub sub case? 952 } 953 } else { 954 LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches); 955 throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected."); 956 } 957 } 958 } 959 } 960 961 if (!ignoreMissingJournalfiles) { 962 if (!knownCorruption.isEmpty()) { 963 LOG.error("Detected corrupt journal files. " + knownCorruption); 964 throw new IOException("Detected corrupt journal files. " + knownCorruption); 965 } 966 967 if (!missingJournalFiles.isEmpty()) { 968 LOG.error("Detected missing journal files. " + missingJournalFiles); 969 throw new IOException("Detected missing journal files. " + missingJournalFiles); 970 } 971 } 972 973 if (undoCounter > 0) { 974 // The rolledback operations are basically in flight journal writes. To avoid getting these the end user 975 // should do sync writes to the journal. 976 if (LOG.isInfoEnabled()) { 977 long end = System.currentTimeMillis(); 978 LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); 979 } 980 } 981 } 982 983 private Location nextRecoveryPosition; 984 private Location lastRecoveryPosition; 985 986 public void incrementalRecover() throws IOException { 987 this.indexLock.writeLock().lock(); 988 try { 989 if( nextRecoveryPosition == null ) { 990 if( lastRecoveryPosition==null ) { 991 nextRecoveryPosition = getRecoveryPosition(); 992 } else { 993 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 994 } 995 } 996 while (nextRecoveryPosition != null) { 997 lastRecoveryPosition = nextRecoveryPosition; 998 metadata.lastUpdate = lastRecoveryPosition; 999 JournalCommand<?> message = load(lastRecoveryPosition); 1000 process(message, lastRecoveryPosition, (IndexAware) null); 1001 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); 1002 } 1003 } finally { 1004 this.indexLock.writeLock().unlock(); 1005 } 1006 } 1007 1008 public Location getLastUpdatePosition() throws IOException { 1009 return metadata.lastUpdate; 1010 } 1011 1012 private Location getRecoveryPosition() throws IOException { 1013 1014 if (!this.forceRecoverIndex) { 1015 1016 // If we need to recover the transactions.. 1017 if (metadata.firstInProgressTransactionLocation != null) { 1018 return metadata.firstInProgressTransactionLocation; 1019 } 1020 1021 // Perhaps there were no transactions... 1022 if( metadata.lastUpdate!=null) { 1023 // Start replay at the record after the last one recorded in the index file. 1024 return getNextInitializedLocation(metadata.lastUpdate); 1025 } 1026 } 1027 // This loads the first position. 1028 return journal.getNextLocation(null); 1029 } 1030 1031 private Location getNextInitializedLocation(Location location) throws IOException { 1032 Location mayNotBeInitialized = journal.getNextLocation(location); 1033 if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) { 1034 // need to init size and type to skip 1035 return journal.getNextLocation(mayNotBeInitialized); 1036 } else { 1037 return mayNotBeInitialized; 1038 } 1039 } 1040 1041 protected void checkpointCleanup(final boolean cleanup) throws IOException { 1042 long start; 1043 this.indexLock.writeLock().lock(); 1044 try { 1045 start = System.currentTimeMillis(); 1046 if( !opened.get() ) { 1047 return; 1048 } 1049 } finally { 1050 this.indexLock.writeLock().unlock(); 1051 } 1052 checkpointUpdate(cleanup); 1053 long end = System.currentTimeMillis(); 1054 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1055 if (LOG.isInfoEnabled()) { 1056 LOG.info("Slow KahaDB access: cleanup took " + (end - start)); 1057 } 1058 } 1059 } 1060 1061 public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException { 1062 int size = data.serializedSizeFramed(); 1063 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 1064 os.writeByte(data.type().getNumber()); 1065 data.writeFramed(os); 1066 return os.toByteSequence(); 1067 } 1068 1069 // ///////////////////////////////////////////////////////////////// 1070 // Methods call by the broker to update and query the store. 1071 // ///////////////////////////////////////////////////////////////// 1072 public Location store(JournalCommand<?> data) throws IOException { 1073 return store(data, false, null,null); 1074 } 1075 1076 public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException { 1077 return store(data, false, null, null, onJournalStoreComplete); 1078 } 1079 1080 public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException { 1081 return store(data, sync, before, after, null); 1082 } 1083 1084 /** 1085 * All updated are are funneled through this method. The updates are converted 1086 * to a JournalMessage which is logged to the journal and then the data from 1087 * the JournalMessage is used to update the index just like it would be done 1088 * during a recovery process. 1089 */ 1090 public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { 1091 try { 1092 ByteSequence sequence = toByteSequence(data); 1093 Location location; 1094 1095 checkpointLock.readLock().lock(); 1096 try { 1097 1098 long start = System.currentTimeMillis(); 1099 location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ; 1100 long start2 = System.currentTimeMillis(); 1101 //Track the last async update so we know if we need to sync at the next checkpoint 1102 if (!sync && journal.isJournalDiskSyncPeriodic()) { 1103 lastAsyncJournalUpdate.set(location); 1104 } 1105 process(data, location, before); 1106 1107 long end = System.currentTimeMillis(); 1108 if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) { 1109 if (LOG.isInfoEnabled()) { 1110 LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); 1111 } 1112 } 1113 } finally { 1114 checkpointLock.readLock().unlock(); 1115 } 1116 1117 if (after != null) { 1118 after.run(); 1119 } 1120 1121 if (scheduler == null && opened.get()) { 1122 startCheckpoint(); 1123 } 1124 return location; 1125 } catch (IOException ioe) { 1126 LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe); 1127 brokerService.handleIOException(ioe); 1128 throw ioe; 1129 } 1130 } 1131 1132 /** 1133 * Loads a previously stored JournalMessage 1134 * 1135 * @param location 1136 * @return 1137 * @throws IOException 1138 */ 1139 public JournalCommand<?> load(Location location) throws IOException { 1140 long start = System.currentTimeMillis(); 1141 ByteSequence data = journal.read(location); 1142 long end = System.currentTimeMillis(); 1143 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) { 1144 if (LOG.isInfoEnabled()) { 1145 LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms"); 1146 } 1147 } 1148 DataByteArrayInputStream is = new DataByteArrayInputStream(data); 1149 byte readByte = is.readByte(); 1150 KahaEntryType type = KahaEntryType.valueOf(readByte); 1151 if( type == null ) { 1152 try { 1153 is.close(); 1154 } catch (IOException e) {} 1155 throw new IOException("Could not load journal record. Invalid location: "+location); 1156 } 1157 JournalCommand<?> message = (JournalCommand<?>)type.createMessage(); 1158 message.mergeFramed(is); 1159 return message; 1160 } 1161 1162 /** 1163 * do minimal recovery till we reach the last inDoubtLocation 1164 * @param data 1165 * @param location 1166 * @param inDoubtlocation 1167 * @throws IOException 1168 */ 1169 void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 1170 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 1171 process(data, location, (IndexAware) null); 1172 } else { 1173 // just recover producer audit 1174 data.visit(new Visitor() { 1175 @Override 1176 public void visit(KahaAddMessageCommand command) throws IOException { 1177 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1178 } 1179 }); 1180 } 1181 } 1182 1183 // ///////////////////////////////////////////////////////////////// 1184 // Journaled record processing methods. Once the record is journaled, 1185 // these methods handle applying the index updates. These may be called 1186 // from the recovery method too so they need to be idempotent 1187 // ///////////////////////////////////////////////////////////////// 1188 1189 void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException { 1190 data.visit(new Visitor() { 1191 @Override 1192 public void visit(KahaAddMessageCommand command) throws IOException { 1193 process(command, location, onSequenceAssignedCallback); 1194 } 1195 1196 @Override 1197 public void visit(KahaRemoveMessageCommand command) throws IOException { 1198 process(command, location); 1199 } 1200 1201 @Override 1202 public void visit(KahaPrepareCommand command) throws IOException { 1203 process(command, location); 1204 } 1205 1206 @Override 1207 public void visit(KahaCommitCommand command) throws IOException { 1208 process(command, location, onSequenceAssignedCallback); 1209 } 1210 1211 @Override 1212 public void visit(KahaRollbackCommand command) throws IOException { 1213 process(command, location); 1214 } 1215 1216 @Override 1217 public void visit(KahaRemoveDestinationCommand command) throws IOException { 1218 process(command, location); 1219 } 1220 1221 @Override 1222 public void visit(KahaSubscriptionCommand command) throws IOException { 1223 process(command, location); 1224 } 1225 1226 @Override 1227 public void visit(KahaProducerAuditCommand command) throws IOException { 1228 processLocation(location); 1229 } 1230 1231 @Override 1232 public void visit(KahaAckMessageFileMapCommand command) throws IOException { 1233 processLocation(location); 1234 } 1235 1236 @Override 1237 public void visit(KahaTraceCommand command) { 1238 processLocation(location); 1239 } 1240 1241 @Override 1242 public void visit(KahaUpdateMessageCommand command) throws IOException { 1243 process(command, location); 1244 } 1245 1246 @Override 1247 public void visit(KahaRewrittenDataFileCommand command) throws IOException { 1248 process(command, location); 1249 } 1250 }); 1251 } 1252 1253 @SuppressWarnings("rawtypes") 1254 protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { 1255 if (command.hasTransactionInfo()) { 1256 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1257 inflightTx.add(new AddOperation(command, location, runWithIndexLock)); 1258 } else { 1259 this.indexLock.writeLock().lock(); 1260 try { 1261 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1262 @Override 1263 public void execute(Transaction tx) throws IOException { 1264 long assignedIndex = updateIndex(tx, command, location); 1265 if (runWithIndexLock != null) { 1266 runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); 1267 } 1268 } 1269 }); 1270 1271 } finally { 1272 this.indexLock.writeLock().unlock(); 1273 } 1274 } 1275 } 1276 1277 protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { 1278 this.indexLock.writeLock().lock(); 1279 try { 1280 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1281 @Override 1282 public void execute(Transaction tx) throws IOException { 1283 updateIndex(tx, command, location); 1284 } 1285 }); 1286 } finally { 1287 this.indexLock.writeLock().unlock(); 1288 } 1289 } 1290 1291 @SuppressWarnings("rawtypes") 1292 protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException { 1293 if (command.hasTransactionInfo()) { 1294 List<Operation> inflightTx = getInflightTx(command.getTransactionInfo()); 1295 inflightTx.add(new RemoveOperation(command, location)); 1296 } else { 1297 this.indexLock.writeLock().lock(); 1298 try { 1299 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1300 @Override 1301 public void execute(Transaction tx) throws IOException { 1302 updateIndex(tx, command, location); 1303 } 1304 }); 1305 } finally { 1306 this.indexLock.writeLock().unlock(); 1307 } 1308 } 1309 } 1310 1311 protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { 1312 this.indexLock.writeLock().lock(); 1313 try { 1314 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1315 @Override 1316 public void execute(Transaction tx) throws IOException { 1317 updateIndex(tx, command, location); 1318 } 1319 }); 1320 } finally { 1321 this.indexLock.writeLock().unlock(); 1322 } 1323 } 1324 1325 protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { 1326 this.indexLock.writeLock().lock(); 1327 try { 1328 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1329 @Override 1330 public void execute(Transaction tx) throws IOException { 1331 updateIndex(tx, command, location); 1332 } 1333 }); 1334 } finally { 1335 this.indexLock.writeLock().unlock(); 1336 } 1337 } 1338 1339 protected void processLocation(final Location location) { 1340 this.indexLock.writeLock().lock(); 1341 try { 1342 metadata.lastUpdate = location; 1343 } finally { 1344 this.indexLock.writeLock().unlock(); 1345 } 1346 } 1347 1348 @SuppressWarnings("rawtypes") 1349 protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { 1350 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1351 List<Operation> inflightTx; 1352 synchronized (inflightTransactions) { 1353 inflightTx = inflightTransactions.remove(key); 1354 if (inflightTx == null) { 1355 inflightTx = preparedTransactions.remove(key); 1356 } 1357 } 1358 if (inflightTx == null) { 1359 // only non persistent messages in this tx 1360 if (before != null) { 1361 before.sequenceAssignedWithIndexLocked(-1); 1362 } 1363 return; 1364 } 1365 1366 final List<Operation> messagingTx = inflightTx; 1367 indexLock.writeLock().lock(); 1368 try { 1369 pageFile.tx().execute(new Transaction.Closure<IOException>() { 1370 @Override 1371 public void execute(Transaction tx) throws IOException { 1372 for (Operation op : messagingTx) { 1373 op.execute(tx); 1374 } 1375 } 1376 }); 1377 metadata.lastUpdate = location; 1378 } finally { 1379 indexLock.writeLock().unlock(); 1380 } 1381 } 1382 1383 @SuppressWarnings("rawtypes") 1384 protected void process(KahaPrepareCommand command, Location location) { 1385 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1386 synchronized (inflightTransactions) { 1387 List<Operation> tx = inflightTransactions.remove(key); 1388 if (tx != null) { 1389 preparedTransactions.put(key, tx); 1390 } 1391 } 1392 } 1393 1394 @SuppressWarnings("rawtypes") 1395 protected void process(KahaRollbackCommand command, Location location) throws IOException { 1396 TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); 1397 List<Operation> updates = null; 1398 synchronized (inflightTransactions) { 1399 updates = inflightTransactions.remove(key); 1400 if (updates == null) { 1401 updates = preparedTransactions.remove(key); 1402 } 1403 } 1404 } 1405 1406 protected void process(KahaRewrittenDataFileCommand command, Location location) throws IOException { 1407 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1408 1409 // Mark the current journal file as a compacted file so that gc checks can skip 1410 // over logs that are smaller compaction type logs. 1411 DataFile current = journal.getDataFileById(location.getDataFileId()); 1412 current.setTypeCode(command.getRewriteType()); 1413 1414 if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) { 1415 // Move offset so that next location read jumps to next file. 1416 location.setOffset(journalMaxFileLength); 1417 } 1418 } 1419 1420 // ///////////////////////////////////////////////////////////////// 1421 // These methods do the actual index updates. 1422 // ///////////////////////////////////////////////////////////////// 1423 1424 protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); 1425 private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>(); 1426 1427 long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { 1428 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1429 1430 // Skip adding the message to the index if this is a topic and there are 1431 // no subscriptions. 1432 if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) { 1433 return -1; 1434 } 1435 1436 // Add the message. 1437 int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY; 1438 long id = sd.orderIndex.getNextMessageId(priority); 1439 Long previous = sd.locationIndex.put(tx, location, id); 1440 if (previous == null) { 1441 previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); 1442 if (previous == null) { 1443 sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); 1444 if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { 1445 addAckLocationForNewMessage(tx, sd, id); 1446 } 1447 metadata.lastUpdate = location; 1448 } else { 1449 1450 MessageKeys messageKeys = sd.orderIndex.get(tx, previous); 1451 if (messageKeys != null && messageKeys.location.compareTo(location) < 0) { 1452 // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt 1453 LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId()); 1454 } 1455 sd.messageIdIndex.put(tx, command.getMessageId(), previous); 1456 sd.locationIndex.remove(tx, location); 1457 id = -1; 1458 } 1459 } else { 1460 // restore the previous value.. Looks like this was a redo of a previously 1461 // added message. We don't want to assign it a new id as the other indexes would 1462 // be wrong.. 1463 sd.locationIndex.put(tx, location, previous); 1464 metadata.lastUpdate = location; 1465 } 1466 // record this id in any event, initial send or recovery 1467 metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); 1468 return id; 1469 } 1470 1471 void trackPendingAdd(KahaDestination destination, Long seq) { 1472 StoredDestination sd = storedDestinations.get(key(destination)); 1473 if (sd != null) { 1474 sd.trackPendingAdd(seq); 1475 } 1476 } 1477 1478 void trackPendingAddComplete(KahaDestination destination, Long seq) { 1479 StoredDestination sd = storedDestinations.get(key(destination)); 1480 if (sd != null) { 1481 sd.trackPendingAddComplete(seq); 1482 } 1483 } 1484 1485 void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException { 1486 KahaAddMessageCommand command = updateMessageCommand.getMessage(); 1487 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1488 1489 Long id = sd.messageIdIndex.get(tx, command.getMessageId()); 1490 if (id != null) { 1491 MessageKeys previousKeys = sd.orderIndex.put( 1492 tx, 1493 command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY, 1494 id, 1495 new MessageKeys(command.getMessageId(), location) 1496 ); 1497 sd.locationIndex.put(tx, location, id); 1498 // on first update previous is original location, on recovery/replay it may be the updated location 1499 if(previousKeys != null && !previousKeys.location.equals(location)) { 1500 sd.locationIndex.remove(tx, previousKeys.location); 1501 } 1502 metadata.lastUpdate = location; 1503 } else { 1504 //Add the message if it can't be found 1505 this.updateIndex(tx, command, location); 1506 } 1507 } 1508 1509 void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException { 1510 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1511 if (!command.hasSubscriptionKey()) { 1512 1513 // In the queue case we just remove the message from the index.. 1514 Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId()); 1515 if (sequenceId != null) { 1516 MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); 1517 if (keys != null) { 1518 sd.locationIndex.remove(tx, keys.location); 1519 recordAckMessageReferenceLocation(ackLocation, keys.location); 1520 metadata.lastUpdate = ackLocation; 1521 } else if (LOG.isDebugEnabled()) { 1522 LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); 1523 } 1524 } else if (LOG.isDebugEnabled()) { 1525 LOG.debug("message not found in sequence id index: " + command.getMessageId()); 1526 } 1527 } else { 1528 // In the topic case we need remove the message once it's been acked 1529 // by all the subs 1530 Long sequence = sd.messageIdIndex.get(tx, command.getMessageId()); 1531 1532 // Make sure it's a valid message id... 1533 if (sequence != null) { 1534 String subscriptionKey = command.getSubscriptionKey(); 1535 if (command.getAck() != UNMATCHED) { 1536 sd.orderIndex.get(tx, sequence); 1537 byte priority = sd.orderIndex.lastGetPriority(); 1538 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority)); 1539 } 1540 1541 MessageKeys keys = sd.orderIndex.get(tx, sequence); 1542 if (keys != null) { 1543 recordAckMessageReferenceLocation(ackLocation, keys.location); 1544 } 1545 // The following method handles deleting un-referenced messages. 1546 removeAckLocation(tx, sd, subscriptionKey, sequence); 1547 metadata.lastUpdate = ackLocation; 1548 } else if (LOG.isDebugEnabled()) { 1549 LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); 1550 } 1551 1552 } 1553 } 1554 1555 private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) { 1556 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId())); 1557 if (referenceFileIds == null) { 1558 referenceFileIds = new HashSet<Integer>(); 1559 referenceFileIds.add(messageLocation.getDataFileId()); 1560 metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds); 1561 } else { 1562 Integer id = Integer.valueOf(messageLocation.getDataFileId()); 1563 if (!referenceFileIds.contains(id)) { 1564 referenceFileIds.add(id); 1565 } 1566 } 1567 } 1568 1569 void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { 1570 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1571 sd.orderIndex.remove(tx); 1572 1573 sd.locationIndex.clear(tx); 1574 sd.locationIndex.unload(tx); 1575 tx.free(sd.locationIndex.getPageId()); 1576 1577 sd.messageIdIndex.clear(tx); 1578 sd.messageIdIndex.unload(tx); 1579 tx.free(sd.messageIdIndex.getPageId()); 1580 1581 if (sd.subscriptions != null) { 1582 sd.subscriptions.clear(tx); 1583 sd.subscriptions.unload(tx); 1584 tx.free(sd.subscriptions.getPageId()); 1585 1586 sd.subscriptionAcks.clear(tx); 1587 sd.subscriptionAcks.unload(tx); 1588 tx.free(sd.subscriptionAcks.getPageId()); 1589 1590 sd.ackPositions.clear(tx); 1591 sd.ackPositions.unload(tx); 1592 tx.free(sd.ackPositions.getHeadPageId()); 1593 1594 sd.subLocations.clear(tx); 1595 sd.subLocations.unload(tx); 1596 tx.free(sd.subLocations.getHeadPageId()); 1597 } 1598 1599 String key = key(command.getDestination()); 1600 storedDestinations.remove(key); 1601 metadata.destinations.remove(tx, key); 1602 } 1603 1604 void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException { 1605 StoredDestination sd = getStoredDestination(command.getDestination(), tx); 1606 final String subscriptionKey = command.getSubscriptionKey(); 1607 1608 // If set then we are creating it.. otherwise we are destroying the sub 1609 if (command.hasSubscriptionInfo()) { 1610 Location existing = sd.subLocations.get(tx, subscriptionKey); 1611 if (existing != null && existing.compareTo(location) == 0) { 1612 // replay on recovery, ignore 1613 LOG.trace("ignoring journal replay of replay of sub from: " + location); 1614 return; 1615 } 1616 1617 sd.subscriptions.put(tx, subscriptionKey, command); 1618 sd.subLocations.put(tx, subscriptionKey, location); 1619 long ackLocation=NOT_ACKED; 1620 if (!command.getRetroactive()) { 1621 ackLocation = sd.orderIndex.nextMessageId-1; 1622 } else { 1623 addAckLocationForRetroactiveSub(tx, sd, subscriptionKey); 1624 } 1625 sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation)); 1626 sd.subscriptionCache.add(subscriptionKey); 1627 } else { 1628 // delete the sub... 1629 sd.subscriptions.remove(tx, subscriptionKey); 1630 sd.subLocations.remove(tx, subscriptionKey); 1631 sd.subscriptionAcks.remove(tx, subscriptionKey); 1632 sd.subscriptionCache.remove(subscriptionKey); 1633 removeAckLocationsForSub(tx, sd, subscriptionKey); 1634 1635 if (sd.subscriptions.isEmpty(tx)) { 1636 // remove the stored destination 1637 KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand(); 1638 removeDestinationCommand.setDestination(command.getDestination()); 1639 updateIndex(tx, removeDestinationCommand, null); 1640 } 1641 } 1642 } 1643 1644 private void checkpointUpdate(final boolean cleanup) throws IOException { 1645 checkpointLock.writeLock().lock(); 1646 try { 1647 this.indexLock.writeLock().lock(); 1648 try { 1649 Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>, IOException>() { 1650 @Override 1651 public Set<Integer> execute(Transaction tx) throws IOException { 1652 return checkpointUpdate(tx, cleanup); 1653 } 1654 }); 1655 pageFile.flush(); 1656 // after the index update such that partial removal does not leave dangling references in the index. 1657 journal.removeDataFiles(filesToGc); 1658 } finally { 1659 this.indexLock.writeLock().unlock(); 1660 } 1661 1662 } finally { 1663 checkpointLock.writeLock().unlock(); 1664 } 1665 } 1666 1667 /** 1668 * @param tx 1669 * @throws IOException 1670 */ 1671 Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 1672 MDC.put("activemq.persistenceDir", getDirectory().getName()); 1673 LOG.debug("Checkpoint started."); 1674 1675 // reflect last update exclusive of current checkpoint 1676 Location lastUpdate = metadata.lastUpdate; 1677 1678 metadata.state = OPEN_STATE; 1679 metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit(); 1680 metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap(); 1681 Location[] inProgressTxRange = getInProgressTxLocationRange(); 1682 metadata.firstInProgressTransactionLocation = inProgressTxRange[0]; 1683 tx.store(metadata.page, metadataMarshaller, true); 1684 1685 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(); 1686 if (cleanup) { 1687 1688 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 1689 gcCandidateSet.addAll(completeFileSet); 1690 1691 if (LOG.isTraceEnabled()) { 1692 LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); 1693 } 1694 1695 if (lastUpdate != null) { 1696 gcCandidateSet.remove(lastUpdate.getDataFileId()); 1697 } 1698 1699 // Don't GC files under replication 1700 if( journalFilesBeingReplicated!=null ) { 1701 gcCandidateSet.removeAll(journalFilesBeingReplicated); 1702 } 1703 1704 if (metadata.producerSequenceIdTrackerLocation != null) { 1705 int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId(); 1706 if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) { 1707 // rewrite so we don't prevent gc 1708 metadata.producerSequenceIdTracker.setModified(true); 1709 if (LOG.isTraceEnabled()) { 1710 LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation); 1711 } 1712 } 1713 gcCandidateSet.remove(dataFileId); 1714 if (LOG.isTraceEnabled()) { 1715 LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet); 1716 } 1717 } 1718 1719 if (metadata.ackMessageFileMapLocation != null) { 1720 int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); 1721 gcCandidateSet.remove(dataFileId); 1722 if (LOG.isTraceEnabled()) { 1723 LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet); 1724 } 1725 } 1726 1727 // Don't GC files referenced by in-progress tx 1728 if (inProgressTxRange[0] != null) { 1729 for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) { 1730 gcCandidateSet.remove(pendingTx); 1731 } 1732 } 1733 if (LOG.isTraceEnabled()) { 1734 LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); 1735 } 1736 1737 // Go through all the destinations to see if any of them can remove GC candidates. 1738 for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) { 1739 if( gcCandidateSet.isEmpty() ) { 1740 break; 1741 } 1742 1743 // Use a visitor to cut down the number of pages that we load 1744 entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() { 1745 int last=-1; 1746 @Override 1747 public boolean isInterestedInKeysBetween(Location first, Location second) { 1748 if( first==null ) { 1749 SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1); 1750 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1751 subset.remove(second.getDataFileId()); 1752 } 1753 return !subset.isEmpty(); 1754 } else if( second==null ) { 1755 SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId()); 1756 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1757 subset.remove(first.getDataFileId()); 1758 } 1759 return !subset.isEmpty(); 1760 } else { 1761 SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); 1762 if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { 1763 subset.remove(first.getDataFileId()); 1764 } 1765 if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { 1766 subset.remove(second.getDataFileId()); 1767 } 1768 return !subset.isEmpty(); 1769 } 1770 } 1771 1772 @Override 1773 public void visit(List<Location> keys, List<Long> values) { 1774 for (Location l : keys) { 1775 int fileId = l.getDataFileId(); 1776 if( last != fileId ) { 1777 gcCandidateSet.remove(fileId); 1778 last = fileId; 1779 } 1780 } 1781 } 1782 }); 1783 1784 // Durable Subscription 1785 if (entry.getValue().subLocations != null) { 1786 Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx); 1787 while (iter.hasNext()) { 1788 Entry<String, Location> subscription = iter.next(); 1789 int dataFileId = subscription.getValue().getDataFileId(); 1790 1791 // Move subscription along if it has no outstanding messages that need ack'd 1792 // and its in the last log file in the journal. 1793 if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) { 1794 final StoredDestination destination = entry.getValue(); 1795 final String subscriptionKey = subscription.getKey(); 1796 SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey); 1797 1798 // When pending is size one that is the next message Id meaning there 1799 // are no pending messages currently. 1800 if (pendingAcks == null || pendingAcks.isEmpty() || 1801 (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) { 1802 1803 if (LOG.isTraceEnabled()) { 1804 LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId); 1805 } 1806 1807 final KahaSubscriptionCommand kahaSub = 1808 destination.subscriptions.get(tx, subscriptionKey); 1809 destination.subLocations.put( 1810 tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub)); 1811 1812 // Skips the remove from candidates if we rewrote the subscription 1813 // in order to prevent duplicate subscription commands on recover. 1814 // If another subscription is on the same file and isn't rewritten 1815 // than it will remove the file from the set. 1816 continue; 1817 } 1818 } 1819 1820 gcCandidateSet.remove(dataFileId); 1821 } 1822 } 1823 1824 if (LOG.isTraceEnabled()) { 1825 LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); 1826 } 1827 } 1828 1829 // check we are not deleting file with ack for in-use journal files 1830 if (LOG.isTraceEnabled()) { 1831 LOG.trace("gc candidates: " + gcCandidateSet); 1832 LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap); 1833 } 1834 1835 boolean ackMessageFileMapMod = false; 1836 Iterator<Integer> candidates = gcCandidateSet.iterator(); 1837 while (candidates.hasNext()) { 1838 Integer candidate = candidates.next(); 1839 Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate); 1840 if (referencedFileIds != null) { 1841 for (Integer referencedFileId : referencedFileIds) { 1842 if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) { 1843 // active file that is not targeted for deletion is referenced so don't delete 1844 candidates.remove(); 1845 break; 1846 } 1847 } 1848 if (gcCandidateSet.contains(candidate)) { 1849 ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null); 1850 } else { 1851 if (LOG.isTraceEnabled()) { 1852 LOG.trace("not removing data file: " + candidate 1853 + " as contained ack(s) refer to referenced file: " + referencedFileIds); 1854 } 1855 } 1856 } 1857 } 1858 1859 if (!gcCandidateSet.isEmpty()) { 1860 LOG.debug("Cleanup removing the data files: {}", gcCandidateSet); 1861 for (Integer candidate : gcCandidateSet) { 1862 for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) { 1863 ackMessageFileMapMod |= ackFiles.remove(candidate); 1864 } 1865 } 1866 if (ackMessageFileMapMod) { 1867 checkpointUpdate(tx, false); 1868 } 1869 } else if (isEnableAckCompaction()) { 1870 if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) { 1871 // First check length of journal to make sure it makes sense to even try. 1872 // 1873 // If there is only one journal file with Acks in it we don't need to move 1874 // it since it won't be chained to any later logs. 1875 // 1876 // If the logs haven't grown since the last time then we need to compact 1877 // otherwise there seems to still be room for growth and we don't need to incur 1878 // the overhead. Depending on configuration this check can be avoided and 1879 // Ack compaction will run any time the store has not GC'd a journal file in 1880 // the configured amount of cycles. 1881 if (metadata.ackMessageFileMap.size() > 1 && 1882 (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) { 1883 1884 LOG.trace("No files GC'd checking if threshold to ACK compaction has been met."); 1885 try { 1886 scheduler.execute(new AckCompactionRunner()); 1887 } catch (Exception ex) { 1888 LOG.warn("Error on queueing the Ack Compactor", ex); 1889 } 1890 } else { 1891 LOG.trace("Journal activity detected, no Ack compaction scheduled."); 1892 } 1893 1894 checkPointCyclesWithNoGC = 0; 1895 } else { 1896 LOG.trace("Not yet time to check for compaction: {} of {} cycles", 1897 checkPointCyclesWithNoGC, getCompactAcksAfterNoGC()); 1898 } 1899 1900 journalLogOnLastCompactionCheck = journal.getCurrentDataFileId(); 1901 } 1902 } 1903 MDC.remove("activemq.persistenceDir"); 1904 1905 LOG.debug("Checkpoint done."); 1906 return gcCandidateSet; 1907 } 1908 1909 private final class AckCompactionRunner implements Runnable { 1910 1911 @Override 1912 public void run() { 1913 1914 int journalToAdvance = -1; 1915 Set<Integer> journalLogsReferenced = new HashSet<Integer>(); 1916 1917 //flag to know whether the ack forwarding completed without an exception 1918 boolean forwarded = false; 1919 1920 try { 1921 //acquire the checkpoint lock to prevent other threads from 1922 //running a checkpoint while this is running 1923 // 1924 //Normally this task runs on the same executor as the checkpoint task 1925 //so this ack compaction runner wouldn't run at the same time as the checkpoint task. 1926 // 1927 //However, there are two cases where this isn't always true. 1928 //First, the checkpoint() method is public and can be called through the 1929 //PersistenceAdapter interface by someone at the same time this is running. 1930 //Second, a checkpoint is called during shutdown without using the executor. 1931 // 1932 //In the future it might be better to just remove the checkpointLock entirely 1933 //and only use the executor but this would need to be examined for any unintended 1934 //consequences 1935 checkpointLock.readLock().lock(); 1936 1937 try { 1938 1939 // Lock index to capture the ackMessageFileMap data 1940 indexLock.writeLock().lock(); 1941 1942 // Map keys might not be sorted, find the earliest log file to forward acks 1943 // from and move only those, future cycles can chip away at more as needed. 1944 // We won't move files that are themselves rewritten on a previous compaction. 1945 List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet()); 1946 Collections.sort(journalFileIds); 1947 for (Integer journalFileId : journalFileIds) { 1948 DataFile current = journal.getDataFileById(journalFileId); 1949 if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) { 1950 journalToAdvance = journalFileId; 1951 break; 1952 } 1953 } 1954 1955 // Check if we found one, or if we only found the current file being written to. 1956 if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) { 1957 return; 1958 } 1959 1960 journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); 1961 1962 } finally { 1963 indexLock.writeLock().unlock(); 1964 } 1965 1966 try { 1967 // Background rewrite of the old acks 1968 forwardAllAcks(journalToAdvance, journalLogsReferenced); 1969 forwarded = true; 1970 } catch (IOException ioe) { 1971 LOG.error("Forwarding of acks failed", ioe); 1972 brokerService.handleIOException(ioe); 1973 } catch (Throwable e) { 1974 LOG.error("Forwarding of acks failed", e); 1975 brokerService.handleIOException(IOExceptionSupport.create(e)); 1976 } 1977 } finally { 1978 checkpointLock.readLock().unlock(); 1979 } 1980 1981 try { 1982 if (forwarded) { 1983 // Checkpoint with changes from the ackMessageFileMap 1984 checkpointUpdate(false); 1985 } 1986 } catch (IOException ioe) { 1987 LOG.error("Checkpoint failed", ioe); 1988 brokerService.handleIOException(ioe); 1989 } catch (Throwable e) { 1990 LOG.error("Checkpoint failed", e); 1991 brokerService.handleIOException(IOExceptionSupport.create(e)); 1992 } 1993 } 1994 } 1995 1996 private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException { 1997 LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead); 1998 1999 DataFile forwardsFile = journal.reserveDataFile(); 2000 forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE); 2001 LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile); 2002 2003 Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>(); 2004 2005 try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) { 2006 KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand(); 2007 compactionMarker.setSourceDataFileId(journalToRead); 2008 compactionMarker.setRewriteType(forwardsFile.getTypeCode()); 2009 2010 ByteSequence payload = toByteSequence(compactionMarker); 2011 appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); 2012 LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead); 2013 2014 Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0)); 2015 while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) { 2016 JournalCommand<?> command = null; 2017 try { 2018 command = load(nextLocation); 2019 } catch (IOException ex) { 2020 LOG.trace("Error loading command during ack forward: {}", nextLocation); 2021 } 2022 2023 if (command != null && command instanceof KahaRemoveMessageCommand) { 2024 payload = toByteSequence(command); 2025 Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false); 2026 updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced); 2027 } 2028 2029 nextLocation = getNextLocationForAckForward(nextLocation); 2030 } 2031 } 2032 2033 LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations); 2034 2035 // Lock index while we update the ackMessageFileMap. 2036 indexLock.writeLock().lock(); 2037 2038 // Update the ack map with the new locations of the acks 2039 for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) { 2040 Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); 2041 if (referenceFileIds == null) { 2042 referenceFileIds = new HashSet<Integer>(); 2043 referenceFileIds.addAll(entry.getValue()); 2044 metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); 2045 } else { 2046 referenceFileIds.addAll(entry.getValue()); 2047 } 2048 } 2049 2050 // remove the old location data from the ack map so that the old journal log file can 2051 // be removed on next GC. 2052 metadata.ackMessageFileMap.remove(journalToRead); 2053 2054 indexLock.writeLock().unlock(); 2055 2056 LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); 2057 } 2058 2059 private Location getNextLocationForAckForward(final Location nextLocation) { 2060 //getNextLocation() can throw an IOException, we should handle it and set 2061 //nextLocation to null and abort gracefully 2062 //Should not happen in the normal case 2063 Location location = null; 2064 try { 2065 location = journal.getNextLocation(nextLocation); 2066 } catch (IOException e) { 2067 LOG.warn("Failed to load next journal location: {}", e.getMessage()); 2068 if (LOG.isDebugEnabled()) { 2069 LOG.debug("Failed to load next journal location", e); 2070 } 2071 } 2072 return location; 2073 } 2074 2075 final Runnable nullCompletionCallback = new Runnable() { 2076 @Override 2077 public void run() { 2078 } 2079 }; 2080 2081 private Location checkpointProducerAudit() throws IOException { 2082 if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { 2083 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2084 ObjectOutputStream oout = new ObjectOutputStream(baos); 2085 oout.writeObject(metadata.producerSequenceIdTracker); 2086 oout.flush(); 2087 oout.close(); 2088 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2089 Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback); 2090 try { 2091 location.getLatch().await(); 2092 } catch (InterruptedException e) { 2093 throw new InterruptedIOException(e.toString()); 2094 } 2095 return location; 2096 } 2097 return metadata.producerSequenceIdTrackerLocation; 2098 } 2099 2100 private Location checkpointAckMessageFileMap() throws IOException { 2101 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 2102 ObjectOutputStream oout = new ObjectOutputStream(baos); 2103 oout.writeObject(metadata.ackMessageFileMap); 2104 oout.flush(); 2105 oout.close(); 2106 // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false 2107 Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback); 2108 try { 2109 location.getLatch().await(); 2110 } catch (InterruptedException e) { 2111 throw new InterruptedIOException(e.toString()); 2112 } 2113 return location; 2114 } 2115 2116 private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException { 2117 2118 ByteSequence sequence = toByteSequence(subscription); 2119 Location location = journal.write(sequence, nullCompletionCallback) ; 2120 2121 try { 2122 location.getLatch().await(); 2123 } catch (InterruptedException e) { 2124 throw new InterruptedIOException(e.toString()); 2125 } 2126 return location; 2127 } 2128 2129 public HashSet<Integer> getJournalFilesBeingReplicated() { 2130 return journalFilesBeingReplicated; 2131 } 2132 2133 // ///////////////////////////////////////////////////////////////// 2134 // StoredDestination related implementation methods. 2135 // ///////////////////////////////////////////////////////////////// 2136 2137 protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>(); 2138 2139 static class MessageKeys { 2140 final String messageId; 2141 final Location location; 2142 2143 public MessageKeys(String messageId, Location location) { 2144 this.messageId=messageId; 2145 this.location=location; 2146 } 2147 2148 @Override 2149 public String toString() { 2150 return "["+messageId+","+location+"]"; 2151 } 2152 } 2153 2154 static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> { 2155 static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); 2156 2157 @Override 2158 public MessageKeys readPayload(DataInput dataIn) throws IOException { 2159 return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn)); 2160 } 2161 2162 @Override 2163 public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { 2164 dataOut.writeUTF(object.messageId); 2165 LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); 2166 } 2167 } 2168 2169 class LastAck { 2170 long lastAckedSequence; 2171 byte priority; 2172 2173 public LastAck(LastAck source) { 2174 this.lastAckedSequence = source.lastAckedSequence; 2175 this.priority = source.priority; 2176 } 2177 2178 public LastAck() { 2179 this.priority = MessageOrderIndex.HI; 2180 } 2181 2182 public LastAck(long ackLocation) { 2183 this.lastAckedSequence = ackLocation; 2184 this.priority = MessageOrderIndex.LO; 2185 } 2186 2187 public LastAck(long ackLocation, byte priority) { 2188 this.lastAckedSequence = ackLocation; 2189 this.priority = priority; 2190 } 2191 2192 @Override 2193 public String toString() { 2194 return "[" + lastAckedSequence + ":" + priority + "]"; 2195 } 2196 } 2197 2198 protected class LastAckMarshaller implements Marshaller<LastAck> { 2199 2200 @Override 2201 public void writePayload(LastAck object, DataOutput dataOut) throws IOException { 2202 dataOut.writeLong(object.lastAckedSequence); 2203 dataOut.writeByte(object.priority); 2204 } 2205 2206 @Override 2207 public LastAck readPayload(DataInput dataIn) throws IOException { 2208 LastAck lastAcked = new LastAck(); 2209 lastAcked.lastAckedSequence = dataIn.readLong(); 2210 if (metadata.version >= 3) { 2211 lastAcked.priority = dataIn.readByte(); 2212 } 2213 return lastAcked; 2214 } 2215 2216 @Override 2217 public int getFixedSize() { 2218 return 9; 2219 } 2220 2221 @Override 2222 public LastAck deepCopy(LastAck source) { 2223 return new LastAck(source); 2224 } 2225 2226 @Override 2227 public boolean isDeepCopySupported() { 2228 return true; 2229 } 2230 } 2231 2232 class StoredDestination { 2233 2234 MessageOrderIndex orderIndex = new MessageOrderIndex(); 2235 BTreeIndex<Location, Long> locationIndex; 2236 BTreeIndex<String, Long> messageIdIndex; 2237 2238 // These bits are only set for Topics 2239 BTreeIndex<String, KahaSubscriptionCommand> subscriptions; 2240 BTreeIndex<String, LastAck> subscriptionAcks; 2241 HashMap<String, MessageOrderCursor> subscriptionCursors; 2242 ListIndex<String, SequenceSet> ackPositions; 2243 ListIndex<String, Location> subLocations; 2244 2245 // Transient data used to track which Messages are no longer needed. 2246 final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>(); 2247 final HashSet<String> subscriptionCache = new LinkedHashSet<String>(); 2248 2249 public void trackPendingAdd(Long seq) { 2250 orderIndex.trackPendingAdd(seq); 2251 } 2252 2253 public void trackPendingAddComplete(Long seq) { 2254 orderIndex.trackPendingAddComplete(seq); 2255 } 2256 2257 @Override 2258 public String toString() { 2259 return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size(); 2260 } 2261 } 2262 2263 protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> { 2264 2265 @Override 2266 public StoredDestination readPayload(final DataInput dataIn) throws IOException { 2267 final StoredDestination value = new StoredDestination(); 2268 value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2269 value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong()); 2270 value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong()); 2271 2272 if (dataIn.readBoolean()) { 2273 value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong()); 2274 value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong()); 2275 if (metadata.version >= 4) { 2276 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong()); 2277 } else { 2278 // upgrade 2279 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2280 @Override 2281 public void execute(Transaction tx) throws IOException { 2282 LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>(); 2283 2284 if (metadata.version >= 3) { 2285 // migrate 2286 BTreeIndex<Long, HashSet<String>> oldAckPositions = 2287 new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong()); 2288 oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); 2289 oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); 2290 oldAckPositions.load(tx); 2291 2292 2293 // Do the initial build of the data in memory before writing into the store 2294 // based Ack Positions List to avoid a lot of disk thrashing. 2295 Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx); 2296 while (iterator.hasNext()) { 2297 Entry<Long, HashSet<String>> entry = iterator.next(); 2298 2299 for(String subKey : entry.getValue()) { 2300 SequenceSet pendingAcks = temp.get(subKey); 2301 if (pendingAcks == null) { 2302 pendingAcks = new SequenceSet(); 2303 temp.put(subKey, pendingAcks); 2304 } 2305 2306 pendingAcks.add(entry.getKey()); 2307 } 2308 } 2309 } 2310 // Now move the pending messages to ack data into the store backed 2311 // structure. 2312 value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2313 value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2314 value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2315 value.ackPositions.load(tx); 2316 for(String subscriptionKey : temp.keySet()) { 2317 value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); 2318 } 2319 2320 } 2321 }); 2322 } 2323 2324 if (metadata.version >= 5) { 2325 value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong()); 2326 } else { 2327 // upgrade 2328 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2329 @Override 2330 public void execute(Transaction tx) throws IOException { 2331 value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2332 value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2333 value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2334 value.subLocations.load(tx); 2335 } 2336 }); 2337 } 2338 } 2339 if (metadata.version >= 2) { 2340 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2341 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong()); 2342 } else { 2343 // upgrade 2344 pageFile.tx().execute(new Transaction.Closure<IOException>() { 2345 @Override 2346 public void execute(Transaction tx) throws IOException { 2347 value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2348 value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2349 value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2350 value.orderIndex.lowPriorityIndex.load(tx); 2351 2352 value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 2353 value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 2354 value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 2355 value.orderIndex.highPriorityIndex.load(tx); 2356 } 2357 }); 2358 } 2359 2360 return value; 2361 } 2362 2363 @Override 2364 public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException { 2365 dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId()); 2366 dataOut.writeLong(value.locationIndex.getPageId()); 2367 dataOut.writeLong(value.messageIdIndex.getPageId()); 2368 if (value.subscriptions != null) { 2369 dataOut.writeBoolean(true); 2370 dataOut.writeLong(value.subscriptions.getPageId()); 2371 dataOut.writeLong(value.subscriptionAcks.getPageId()); 2372 dataOut.writeLong(value.ackPositions.getHeadPageId()); 2373 dataOut.writeLong(value.subLocations.getHeadPageId()); 2374 } else { 2375 dataOut.writeBoolean(false); 2376 } 2377 dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); 2378 dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); 2379 } 2380 } 2381 2382 static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> { 2383 final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller(); 2384 2385 @Override 2386 public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException { 2387 KahaSubscriptionCommand rc = new KahaSubscriptionCommand(); 2388 rc.mergeFramed((InputStream)dataIn); 2389 return rc; 2390 } 2391 2392 @Override 2393 public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException { 2394 object.writeFramed((OutputStream)dataOut); 2395 } 2396 } 2397 2398 protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2399 String key = key(destination); 2400 StoredDestination rc = storedDestinations.get(key); 2401 if (rc == null) { 2402 boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC; 2403 rc = loadStoredDestination(tx, key, topic); 2404 // Cache it. We may want to remove/unload destinations from the 2405 // cache that are not used for a while 2406 // to reduce memory usage. 2407 storedDestinations.put(key, rc); 2408 } 2409 return rc; 2410 } 2411 2412 protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { 2413 String key = key(destination); 2414 StoredDestination rc = storedDestinations.get(key); 2415 if (rc == null && metadata.destinations.containsKey(tx, key)) { 2416 rc = getStoredDestination(destination, tx); 2417 } 2418 return rc; 2419 } 2420 2421 /** 2422 * @param tx 2423 * @param key 2424 * @param topic 2425 * @return 2426 * @throws IOException 2427 */ 2428 private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException { 2429 // Try to load the existing indexes.. 2430 StoredDestination rc = metadata.destinations.get(tx, key); 2431 if (rc == null) { 2432 // Brand new destination.. allocate indexes for it. 2433 rc = new StoredDestination(); 2434 rc.orderIndex.allocate(tx); 2435 rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate()); 2436 rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate()); 2437 2438 if (topic) { 2439 rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate()); 2440 rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate()); 2441 rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate()); 2442 rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate()); 2443 } 2444 metadata.destinations.put(tx, key, rc); 2445 } 2446 2447 // Configure the marshalers and load. 2448 rc.orderIndex.load(tx); 2449 2450 // Figure out the next key using the last entry in the destination. 2451 rc.orderIndex.configureLast(tx); 2452 2453 rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE); 2454 rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2455 rc.locationIndex.load(tx); 2456 2457 rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE); 2458 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); 2459 rc.messageIdIndex.load(tx); 2460 2461 // If it was a topic... 2462 if (topic) { 2463 2464 rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE); 2465 rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE); 2466 rc.subscriptions.load(tx); 2467 2468 rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE); 2469 rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller()); 2470 rc.subscriptionAcks.load(tx); 2471 2472 rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); 2473 rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); 2474 rc.ackPositions.load(tx); 2475 2476 rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); 2477 rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); 2478 rc.subLocations.load(tx); 2479 2480 rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>(); 2481 2482 if (metadata.version < 3) { 2483 2484 // on upgrade need to fill ackLocation with available messages past last ack 2485 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2486 Entry<String, LastAck> entry = iterator.next(); 2487 for (Iterator<Entry<Long, MessageKeys>> orderIterator = 2488 rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { 2489 Long sequence = orderIterator.next().getKey(); 2490 addAckLocation(tx, rc, sequence, entry.getKey()); 2491 } 2492 // modify so it is upgraded 2493 rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue()); 2494 } 2495 } 2496 2497 // Configure the message references index 2498 Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx); 2499 while (subscriptions.hasNext()) { 2500 Entry<String, SequenceSet> subscription = subscriptions.next(); 2501 SequenceSet pendingAcks = subscription.getValue(); 2502 if (pendingAcks != null && !pendingAcks.isEmpty()) { 2503 Long lastPendingAck = pendingAcks.getTail().getLast(); 2504 for(Long sequenceId : pendingAcks) { 2505 Long current = rc.messageReferences.get(sequenceId); 2506 if (current == null) { 2507 current = new Long(0); 2508 } 2509 2510 // We always add a trailing empty entry for the next position to start from 2511 // so we need to ensure we don't count that as a message reference on reload. 2512 if (!sequenceId.equals(lastPendingAck)) { 2513 current = current.longValue() + 1; 2514 } 2515 2516 rc.messageReferences.put(sequenceId, current); 2517 } 2518 } 2519 } 2520 2521 // Configure the subscription cache 2522 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { 2523 Entry<String, LastAck> entry = iterator.next(); 2524 rc.subscriptionCache.add(entry.getKey()); 2525 } 2526 2527 if (rc.orderIndex.nextMessageId == 0) { 2528 // check for existing durable sub all acked out - pull next seq from acks as messages are gone 2529 if (!rc.subscriptionAcks.isEmpty(tx)) { 2530 for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) { 2531 Entry<String, LastAck> entry = iterator.next(); 2532 rc.orderIndex.nextMessageId = 2533 Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1); 2534 } 2535 } 2536 } else { 2537 // update based on ackPositions for unmatched, last entry is always the next 2538 if (!rc.messageReferences.isEmpty()) { 2539 Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1]; 2540 rc.orderIndex.nextMessageId = 2541 Math.max(rc.orderIndex.nextMessageId, nextMessageId); 2542 } 2543 } 2544 } 2545 2546 if (metadata.version < VERSION) { 2547 // store again after upgrade 2548 metadata.destinations.put(tx, key, rc); 2549 } 2550 return rc; 2551 } 2552 2553 private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException { 2554 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2555 if (sequences == null) { 2556 sequences = new SequenceSet(); 2557 sequences.add(messageSequence); 2558 sd.ackPositions.add(tx, subscriptionKey, sequences); 2559 } else { 2560 sequences.add(messageSequence); 2561 sd.ackPositions.put(tx, subscriptionKey, sequences); 2562 } 2563 2564 Long count = sd.messageReferences.get(messageSequence); 2565 if (count == null) { 2566 count = Long.valueOf(0L); 2567 } 2568 count = count.longValue() + 1; 2569 sd.messageReferences.put(messageSequence, count); 2570 } 2571 2572 // new sub is interested in potentially all existing messages 2573 private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2574 SequenceSet allOutstanding = new SequenceSet(); 2575 Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx); 2576 while (iterator.hasNext()) { 2577 SequenceSet set = iterator.next().getValue(); 2578 for (Long entry : set) { 2579 allOutstanding.add(entry); 2580 } 2581 } 2582 sd.ackPositions.put(tx, subscriptionKey, allOutstanding); 2583 2584 for (Long ackPosition : allOutstanding) { 2585 Long count = sd.messageReferences.get(ackPosition); 2586 count = count.longValue() + 1; 2587 sd.messageReferences.put(ackPosition, count); 2588 } 2589 } 2590 2591 // on a new message add, all existing subs are interested in this message 2592 private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException { 2593 for(String subscriptionKey : sd.subscriptionCache) { 2594 SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey); 2595 if (sequences == null) { 2596 sequences = new SequenceSet(); 2597 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2598 sd.ackPositions.add(tx, subscriptionKey, sequences); 2599 } else { 2600 sequences.add(new Sequence(messageSequence, messageSequence + 1)); 2601 sd.ackPositions.put(tx, subscriptionKey, sequences); 2602 } 2603 2604 Long count = sd.messageReferences.get(messageSequence); 2605 if (count == null) { 2606 count = Long.valueOf(0L); 2607 } 2608 count = count.longValue() + 1; 2609 sd.messageReferences.put(messageSequence, count); 2610 sd.messageReferences.put(messageSequence+1, Long.valueOf(0L)); 2611 } 2612 } 2613 2614 private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2615 if (!sd.ackPositions.isEmpty(tx)) { 2616 SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey); 2617 if (sequences == null || sequences.isEmpty()) { 2618 return; 2619 } 2620 2621 ArrayList<Long> unreferenced = new ArrayList<Long>(); 2622 2623 for(Long sequenceId : sequences) { 2624 Long references = sd.messageReferences.get(sequenceId); 2625 if (references != null) { 2626 references = references.longValue() - 1; 2627 2628 if (references.longValue() > 0) { 2629 sd.messageReferences.put(sequenceId, references); 2630 } else { 2631 sd.messageReferences.remove(sequenceId); 2632 unreferenced.add(sequenceId); 2633 } 2634 } 2635 } 2636 2637 for(Long sequenceId : unreferenced) { 2638 // Find all the entries that need to get deleted. 2639 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2640 sd.orderIndex.getDeleteList(tx, deletes, sequenceId); 2641 2642 // Do the actual deletes. 2643 for (Entry<Long, MessageKeys> entry : deletes) { 2644 sd.locationIndex.remove(tx, entry.getValue().location); 2645 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2646 sd.orderIndex.remove(tx, entry.getKey()); 2647 } 2648 } 2649 } 2650 } 2651 2652 /** 2653 * @param tx 2654 * @param sd 2655 * @param subscriptionKey 2656 * @param messageSequence 2657 * @throws IOException 2658 */ 2659 private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException { 2660 // Remove the sub from the previous location set.. 2661 if (messageSequence != null) { 2662 SequenceSet range = sd.ackPositions.get(tx, subscriptionKey); 2663 if (range != null && !range.isEmpty()) { 2664 range.remove(messageSequence); 2665 if (!range.isEmpty()) { 2666 sd.ackPositions.put(tx, subscriptionKey, range); 2667 } else { 2668 sd.ackPositions.remove(tx, subscriptionKey); 2669 } 2670 2671 // Check if the message is reference by any other subscription. 2672 Long count = sd.messageReferences.get(messageSequence); 2673 if (count != null){ 2674 long references = count.longValue() - 1; 2675 if (references > 0) { 2676 sd.messageReferences.put(messageSequence, Long.valueOf(references)); 2677 return; 2678 } else { 2679 sd.messageReferences.remove(messageSequence); 2680 } 2681 } 2682 2683 // Find all the entries that need to get deleted. 2684 ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>(); 2685 sd.orderIndex.getDeleteList(tx, deletes, messageSequence); 2686 2687 // Do the actual deletes. 2688 for (Entry<Long, MessageKeys> entry : deletes) { 2689 sd.locationIndex.remove(tx, entry.getValue().location); 2690 sd.messageIdIndex.remove(tx, entry.getValue().messageId); 2691 sd.orderIndex.remove(tx, entry.getKey()); 2692 } 2693 } 2694 } 2695 } 2696 2697 public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2698 return sd.subscriptionAcks.get(tx, subscriptionKey); 2699 } 2700 2701 public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { 2702 SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); 2703 if (messageSequences != null) { 2704 long result = messageSequences.rangeSize(); 2705 // if there's anything in the range the last value is always the nextMessage marker, so remove 1. 2706 return result > 0 ? result - 1 : 0; 2707 } 2708 2709 return 0; 2710 } 2711 2712 protected String key(KahaDestination destination) { 2713 return destination.getType().getNumber() + ":" + destination.getName(); 2714 } 2715 2716 // ///////////////////////////////////////////////////////////////// 2717 // Transaction related implementation methods. 2718 // ///////////////////////////////////////////////////////////////// 2719 @SuppressWarnings("rawtypes") 2720 private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2721 @SuppressWarnings("rawtypes") 2722 protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>(); 2723 protected final Set<String> ackedAndPrepared = new HashSet<String>(); 2724 protected final Set<String> rolledBackAcks = new HashSet<String>(); 2725 2726 // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback, 2727 // till then they are skipped by the store. 2728 // 'at most once' XA guarantee 2729 public void trackRecoveredAcks(ArrayList<MessageAck> acks) { 2730 this.indexLock.writeLock().lock(); 2731 try { 2732 for (MessageAck ack : acks) { 2733 ackedAndPrepared.add(ack.getLastMessageId().toProducerKey()); 2734 } 2735 } finally { 2736 this.indexLock.writeLock().unlock(); 2737 } 2738 } 2739 2740 public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException { 2741 if (acks != null) { 2742 this.indexLock.writeLock().lock(); 2743 try { 2744 for (MessageAck ack : acks) { 2745 final String id = ack.getLastMessageId().toProducerKey(); 2746 ackedAndPrepared.remove(id); 2747 if (rollback) { 2748 rolledBackAcks.add(id); 2749 } 2750 } 2751 } finally { 2752 this.indexLock.writeLock().unlock(); 2753 } 2754 } 2755 } 2756 2757 @SuppressWarnings("rawtypes") 2758 private List<Operation> getInflightTx(KahaTransactionInfo info) { 2759 TransactionId key = TransactionIdConversion.convert(info); 2760 List<Operation> tx; 2761 synchronized (inflightTransactions) { 2762 tx = inflightTransactions.get(key); 2763 if (tx == null) { 2764 tx = Collections.synchronizedList(new ArrayList<Operation>()); 2765 inflightTransactions.put(key, tx); 2766 } 2767 } 2768 return tx; 2769 } 2770 2771 @SuppressWarnings("unused") 2772 private TransactionId key(KahaTransactionInfo transactionInfo) { 2773 return TransactionIdConversion.convert(transactionInfo); 2774 } 2775 2776 abstract class Operation <T extends JournalCommand<T>> { 2777 final T command; 2778 final Location location; 2779 2780 public Operation(T command, Location location) { 2781 this.command = command; 2782 this.location = location; 2783 } 2784 2785 public Location getLocation() { 2786 return location; 2787 } 2788 2789 public T getCommand() { 2790 return command; 2791 } 2792 2793 abstract public void execute(Transaction tx) throws IOException; 2794 } 2795 2796 class AddOperation extends Operation<KahaAddMessageCommand> { 2797 final IndexAware runWithIndexLock; 2798 public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) { 2799 super(command, location); 2800 this.runWithIndexLock = runWithIndexLock; 2801 } 2802 2803 @Override 2804 public void execute(Transaction tx) throws IOException { 2805 long seq = updateIndex(tx, command, location); 2806 if (runWithIndexLock != null) { 2807 runWithIndexLock.sequenceAssignedWithIndexLocked(seq); 2808 } 2809 } 2810 } 2811 2812 class RemoveOperation extends Operation<KahaRemoveMessageCommand> { 2813 2814 public RemoveOperation(KahaRemoveMessageCommand command, Location location) { 2815 super(command, location); 2816 } 2817 2818 @Override 2819 public void execute(Transaction tx) throws IOException { 2820 updateIndex(tx, command, location); 2821 } 2822 } 2823 2824 // ///////////////////////////////////////////////////////////////// 2825 // Initialization related implementation methods. 2826 // ///////////////////////////////////////////////////////////////// 2827 2828 private PageFile createPageFile() throws IOException { 2829 if (indexDirectory == null) { 2830 indexDirectory = directory; 2831 } 2832 IOHelper.mkdirs(indexDirectory); 2833 PageFile index = new PageFile(indexDirectory, "db"); 2834 index.setEnableWriteThread(isEnableIndexWriteAsync()); 2835 index.setWriteBatchSize(getIndexWriteBatchSize()); 2836 index.setPageCacheSize(indexCacheSize); 2837 index.setUseLFRUEviction(isUseIndexLFRUEviction()); 2838 index.setLFUEvictionFactor(getIndexLFUEvictionFactor()); 2839 index.setEnableDiskSyncs(isEnableIndexDiskSyncs()); 2840 index.setEnableRecoveryFile(isEnableIndexRecoveryFile()); 2841 index.setEnablePageCaching(isEnableIndexPageCaching()); 2842 return index; 2843 } 2844 2845 private Journal createJournal() throws IOException { 2846 Journal manager = new Journal(); 2847 manager.setDirectory(directory); 2848 manager.setMaxFileLength(getJournalMaxFileLength()); 2849 manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); 2850 manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); 2851 manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); 2852 manager.setArchiveDataLogs(isArchiveDataLogs()); 2853 manager.setSizeAccumulator(journalSize); 2854 manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs()); 2855 manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase())); 2856 manager.setPreallocationStrategy( 2857 Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase())); 2858 manager.setJournalDiskSyncStrategy( 2859 Journal.JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase())); 2860 if (getDirectoryArchive() != null) { 2861 IOHelper.mkdirs(getDirectoryArchive()); 2862 manager.setDirectoryArchive(getDirectoryArchive()); 2863 } 2864 return manager; 2865 } 2866 2867 private Metadata createMetadata() { 2868 Metadata md = new Metadata(); 2869 md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth()); 2870 md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack()); 2871 return md; 2872 } 2873 2874 public int getJournalMaxWriteBatchSize() { 2875 return journalMaxWriteBatchSize; 2876 } 2877 2878 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 2879 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 2880 } 2881 2882 public File getDirectory() { 2883 return directory; 2884 } 2885 2886 public void setDirectory(File directory) { 2887 this.directory = directory; 2888 } 2889 2890 public boolean isDeleteAllMessages() { 2891 return deleteAllMessages; 2892 } 2893 2894 public void setDeleteAllMessages(boolean deleteAllMessages) { 2895 this.deleteAllMessages = deleteAllMessages; 2896 } 2897 2898 public void setIndexWriteBatchSize(int setIndexWriteBatchSize) { 2899 this.setIndexWriteBatchSize = setIndexWriteBatchSize; 2900 } 2901 2902 public int getIndexWriteBatchSize() { 2903 return setIndexWriteBatchSize; 2904 } 2905 2906 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 2907 this.enableIndexWriteAsync = enableIndexWriteAsync; 2908 } 2909 2910 boolean isEnableIndexWriteAsync() { 2911 return enableIndexWriteAsync; 2912 } 2913 2914 /** 2915 * @deprecated use {@link #getJournalDiskSyncStrategy} instead 2916 * @return 2917 */ 2918 public boolean isEnableJournalDiskSyncs() { 2919 return journalDiskSyncStrategy != null && JournalDiskSyncStrategy.ALWAYS.name().equals( 2920 journalDiskSyncStrategy.trim().toUpperCase()); 2921 } 2922 2923 /** 2924 * @deprecated use {@link #setEnableJournalDiskSyncs} instead 2925 * @param syncWrites 2926 */ 2927 public void setEnableJournalDiskSyncs(boolean syncWrites) { 2928 if (syncWrites) { 2929 journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name(); 2930 } else { 2931 journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER.name(); 2932 } 2933 } 2934 2935 public String getJournalDiskSyncStrategy() { 2936 return journalDiskSyncStrategy; 2937 } 2938 2939 public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) { 2940 this.journalDiskSyncStrategy = journalDiskSyncStrategy; 2941 } 2942 2943 public long getJournalDiskSyncInterval() { 2944 return journalDiskSyncInterval; 2945 } 2946 2947 public void setJournalDiskSyncInterval(long journalDiskSyncInterval) { 2948 this.journalDiskSyncInterval = journalDiskSyncInterval; 2949 } 2950 2951 public long getCheckpointInterval() { 2952 return checkpointInterval; 2953 } 2954 2955 public void setCheckpointInterval(long checkpointInterval) { 2956 this.checkpointInterval = checkpointInterval; 2957 } 2958 2959 public long getCleanupInterval() { 2960 return cleanupInterval; 2961 } 2962 2963 public void setCleanupInterval(long cleanupInterval) { 2964 this.cleanupInterval = cleanupInterval; 2965 } 2966 2967 public void setJournalMaxFileLength(int journalMaxFileLength) { 2968 this.journalMaxFileLength = journalMaxFileLength; 2969 } 2970 2971 public int getJournalMaxFileLength() { 2972 return journalMaxFileLength; 2973 } 2974 2975 public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) { 2976 this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack); 2977 } 2978 2979 public int getMaxFailoverProducersToTrack() { 2980 return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack(); 2981 } 2982 2983 public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) { 2984 this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth); 2985 } 2986 2987 public int getFailoverProducersAuditDepth() { 2988 return this.metadata.producerSequenceIdTracker.getAuditDepth(); 2989 } 2990 2991 public PageFile getPageFile() throws IOException { 2992 if (pageFile == null) { 2993 pageFile = createPageFile(); 2994 } 2995 return pageFile; 2996 } 2997 2998 public Journal getJournal() throws IOException { 2999 if (journal == null) { 3000 journal = createJournal(); 3001 } 3002 return journal; 3003 } 3004 3005 protected Metadata getMetadata() { 3006 return metadata; 3007 } 3008 3009 public boolean isFailIfDatabaseIsLocked() { 3010 return failIfDatabaseIsLocked; 3011 } 3012 3013 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 3014 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 3015 } 3016 3017 public boolean isIgnoreMissingJournalfiles() { 3018 return ignoreMissingJournalfiles; 3019 } 3020 3021 public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { 3022 this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; 3023 } 3024 3025 public int getIndexCacheSize() { 3026 return indexCacheSize; 3027 } 3028 3029 public void setIndexCacheSize(int indexCacheSize) { 3030 this.indexCacheSize = indexCacheSize; 3031 } 3032 3033 public boolean isCheckForCorruptJournalFiles() { 3034 return checkForCorruptJournalFiles; 3035 } 3036 3037 public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { 3038 this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; 3039 } 3040 3041 public boolean isChecksumJournalFiles() { 3042 return checksumJournalFiles; 3043 } 3044 3045 public void setChecksumJournalFiles(boolean checksumJournalFiles) { 3046 this.checksumJournalFiles = checksumJournalFiles; 3047 } 3048 3049 @Override 3050 public void setBrokerService(BrokerService brokerService) { 3051 this.brokerService = brokerService; 3052 } 3053 3054 /** 3055 * @return the archiveDataLogs 3056 */ 3057 public boolean isArchiveDataLogs() { 3058 return this.archiveDataLogs; 3059 } 3060 3061 /** 3062 * @param archiveDataLogs the archiveDataLogs to set 3063 */ 3064 public void setArchiveDataLogs(boolean archiveDataLogs) { 3065 this.archiveDataLogs = archiveDataLogs; 3066 } 3067 3068 /** 3069 * @return the directoryArchive 3070 */ 3071 public File getDirectoryArchive() { 3072 return this.directoryArchive; 3073 } 3074 3075 /** 3076 * @param directoryArchive the directoryArchive to set 3077 */ 3078 public void setDirectoryArchive(File directoryArchive) { 3079 this.directoryArchive = directoryArchive; 3080 } 3081 3082 public boolean isArchiveCorruptedIndex() { 3083 return archiveCorruptedIndex; 3084 } 3085 3086 public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) { 3087 this.archiveCorruptedIndex = archiveCorruptedIndex; 3088 } 3089 3090 public float getIndexLFUEvictionFactor() { 3091 return indexLFUEvictionFactor; 3092 } 3093 3094 public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) { 3095 this.indexLFUEvictionFactor = indexLFUEvictionFactor; 3096 } 3097 3098 public boolean isUseIndexLFRUEviction() { 3099 return useIndexLFRUEviction; 3100 } 3101 3102 public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) { 3103 this.useIndexLFRUEviction = useIndexLFRUEviction; 3104 } 3105 3106 public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) { 3107 this.enableIndexDiskSyncs = enableIndexDiskSyncs; 3108 } 3109 3110 public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) { 3111 this.enableIndexRecoveryFile = enableIndexRecoveryFile; 3112 } 3113 3114 public void setEnableIndexPageCaching(boolean enableIndexPageCaching) { 3115 this.enableIndexPageCaching = enableIndexPageCaching; 3116 } 3117 3118 public boolean isEnableIndexDiskSyncs() { 3119 return enableIndexDiskSyncs; 3120 } 3121 3122 public boolean isEnableIndexRecoveryFile() { 3123 return enableIndexRecoveryFile; 3124 } 3125 3126 public boolean isEnableIndexPageCaching() { 3127 return enableIndexPageCaching; 3128 } 3129 3130 // ///////////////////////////////////////////////////////////////// 3131 // Internal conversion methods. 3132 // ///////////////////////////////////////////////////////////////// 3133 3134 class MessageOrderCursor{ 3135 long defaultCursorPosition; 3136 long lowPriorityCursorPosition; 3137 long highPriorityCursorPosition; 3138 MessageOrderCursor(){ 3139 } 3140 3141 MessageOrderCursor(long position){ 3142 this.defaultCursorPosition=position; 3143 this.lowPriorityCursorPosition=position; 3144 this.highPriorityCursorPosition=position; 3145 } 3146 3147 MessageOrderCursor(MessageOrderCursor other){ 3148 this.defaultCursorPosition=other.defaultCursorPosition; 3149 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3150 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3151 } 3152 3153 MessageOrderCursor copy() { 3154 return new MessageOrderCursor(this); 3155 } 3156 3157 void reset() { 3158 this.defaultCursorPosition=0; 3159 this.highPriorityCursorPosition=0; 3160 this.lowPriorityCursorPosition=0; 3161 } 3162 3163 void increment() { 3164 if (defaultCursorPosition!=0) { 3165 defaultCursorPosition++; 3166 } 3167 if (highPriorityCursorPosition!=0) { 3168 highPriorityCursorPosition++; 3169 } 3170 if (lowPriorityCursorPosition!=0) { 3171 lowPriorityCursorPosition++; 3172 } 3173 } 3174 3175 @Override 3176 public String toString() { 3177 return "MessageOrderCursor:[def:" + defaultCursorPosition 3178 + ", low:" + lowPriorityCursorPosition 3179 + ", high:" + highPriorityCursorPosition + "]"; 3180 } 3181 3182 public void sync(MessageOrderCursor other) { 3183 this.defaultCursorPosition=other.defaultCursorPosition; 3184 this.lowPriorityCursorPosition=other.lowPriorityCursorPosition; 3185 this.highPriorityCursorPosition=other.highPriorityCursorPosition; 3186 } 3187 } 3188 3189 class MessageOrderIndex { 3190 static final byte HI = 9; 3191 static final byte LO = 0; 3192 static final byte DEF = 4; 3193 3194 long nextMessageId; 3195 BTreeIndex<Long, MessageKeys> defaultPriorityIndex; 3196 BTreeIndex<Long, MessageKeys> lowPriorityIndex; 3197 BTreeIndex<Long, MessageKeys> highPriorityIndex; 3198 final MessageOrderCursor cursor = new MessageOrderCursor(); 3199 Long lastDefaultKey; 3200 Long lastHighKey; 3201 Long lastLowKey; 3202 byte lastGetPriority; 3203 final List<Long> pendingAdditions = new LinkedList<Long>(); 3204 3205 MessageKeys remove(Transaction tx, Long key) throws IOException { 3206 MessageKeys result = defaultPriorityIndex.remove(tx, key); 3207 if (result == null && highPriorityIndex!=null) { 3208 result = highPriorityIndex.remove(tx, key); 3209 if (result ==null && lowPriorityIndex!=null) { 3210 result = lowPriorityIndex.remove(tx, key); 3211 } 3212 } 3213 return result; 3214 } 3215 3216 void load(Transaction tx) throws IOException { 3217 defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3218 defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 3219 defaultPriorityIndex.load(tx); 3220 lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3221 lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 3222 lowPriorityIndex.load(tx); 3223 highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); 3224 highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); 3225 highPriorityIndex.load(tx); 3226 } 3227 3228 void allocate(Transaction tx) throws IOException { 3229 defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3230 if (metadata.version >= 2) { 3231 lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3232 highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate()); 3233 } 3234 } 3235 3236 void configureLast(Transaction tx) throws IOException { 3237 // Figure out the next key using the last entry in the destination. 3238 TreeSet<Long> orderedSet = new TreeSet<Long>(); 3239 3240 addLast(orderedSet, highPriorityIndex, tx); 3241 addLast(orderedSet, defaultPriorityIndex, tx); 3242 addLast(orderedSet, lowPriorityIndex, tx); 3243 3244 if (!orderedSet.isEmpty()) { 3245 nextMessageId = orderedSet.last() + 1; 3246 } 3247 } 3248 3249 private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException { 3250 if (index != null) { 3251 Entry<Long, MessageKeys> lastEntry = index.getLast(tx); 3252 if (lastEntry != null) { 3253 orderedSet.add(lastEntry.getKey()); 3254 } 3255 } 3256 } 3257 3258 void clear(Transaction tx) throws IOException { 3259 this.remove(tx); 3260 this.resetCursorPosition(); 3261 this.allocate(tx); 3262 this.load(tx); 3263 this.configureLast(tx); 3264 } 3265 3266 void remove(Transaction tx) throws IOException { 3267 defaultPriorityIndex.clear(tx); 3268 defaultPriorityIndex.unload(tx); 3269 tx.free(defaultPriorityIndex.getPageId()); 3270 if (lowPriorityIndex != null) { 3271 lowPriorityIndex.clear(tx); 3272 lowPriorityIndex.unload(tx); 3273 3274 tx.free(lowPriorityIndex.getPageId()); 3275 } 3276 if (highPriorityIndex != null) { 3277 highPriorityIndex.clear(tx); 3278 highPriorityIndex.unload(tx); 3279 tx.free(highPriorityIndex.getPageId()); 3280 } 3281 } 3282 3283 void resetCursorPosition() { 3284 this.cursor.reset(); 3285 lastDefaultKey = null; 3286 lastHighKey = null; 3287 lastLowKey = null; 3288 } 3289 3290 void setBatch(Transaction tx, Long sequence) throws IOException { 3291 if (sequence != null) { 3292 Long nextPosition = new Long(sequence.longValue() + 1); 3293 lastDefaultKey = sequence; 3294 cursor.defaultCursorPosition = nextPosition.longValue(); 3295 lastHighKey = sequence; 3296 cursor.highPriorityCursorPosition = nextPosition.longValue(); 3297 lastLowKey = sequence; 3298 cursor.lowPriorityCursorPosition = nextPosition.longValue(); 3299 } 3300 } 3301 3302 void setBatch(Transaction tx, LastAck last) throws IOException { 3303 setBatch(tx, last.lastAckedSequence); 3304 if (cursor.defaultCursorPosition == 0 3305 && cursor.highPriorityCursorPosition == 0 3306 && cursor.lowPriorityCursorPosition == 0) { 3307 long next = last.lastAckedSequence + 1; 3308 switch (last.priority) { 3309 case DEF: 3310 cursor.defaultCursorPosition = next; 3311 cursor.highPriorityCursorPosition = next; 3312 break; 3313 case HI: 3314 cursor.highPriorityCursorPosition = next; 3315 break; 3316 case LO: 3317 cursor.lowPriorityCursorPosition = next; 3318 cursor.defaultCursorPosition = next; 3319 cursor.highPriorityCursorPosition = next; 3320 break; 3321 } 3322 } 3323 } 3324 3325 void stoppedIterating() { 3326 if (lastDefaultKey!=null) { 3327 cursor.defaultCursorPosition=lastDefaultKey.longValue()+1; 3328 } 3329 if (lastHighKey!=null) { 3330 cursor.highPriorityCursorPosition=lastHighKey.longValue()+1; 3331 } 3332 if (lastLowKey!=null) { 3333 cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1; 3334 } 3335 lastDefaultKey = null; 3336 lastHighKey = null; 3337 lastLowKey = null; 3338 } 3339 3340 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId) 3341 throws IOException { 3342 if (defaultPriorityIndex.containsKey(tx, sequenceId)) { 3343 getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId); 3344 } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) { 3345 getDeleteList(tx, deletes, highPriorityIndex, sequenceId); 3346 } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) { 3347 getDeleteList(tx, deletes, lowPriorityIndex, sequenceId); 3348 } 3349 } 3350 3351 void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, 3352 BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException { 3353 3354 Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null); 3355 deletes.add(iterator.next()); 3356 } 3357 3358 long getNextMessageId(int priority) { 3359 return nextMessageId++; 3360 } 3361 3362 MessageKeys get(Transaction tx, Long key) throws IOException { 3363 MessageKeys result = defaultPriorityIndex.get(tx, key); 3364 if (result == null) { 3365 result = highPriorityIndex.get(tx, key); 3366 if (result == null) { 3367 result = lowPriorityIndex.get(tx, key); 3368 lastGetPriority = LO; 3369 } else { 3370 lastGetPriority = HI; 3371 } 3372 } else { 3373 lastGetPriority = DEF; 3374 } 3375 return result; 3376 } 3377 3378 MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException { 3379 if (priority == javax.jms.Message.DEFAULT_PRIORITY) { 3380 return defaultPriorityIndex.put(tx, key, value); 3381 } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) { 3382 return highPriorityIndex.put(tx, key, value); 3383 } else { 3384 return lowPriorityIndex.put(tx, key, value); 3385 } 3386 } 3387 3388 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{ 3389 return new MessageOrderIterator(tx,cursor,this); 3390 } 3391 3392 Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{ 3393 return new MessageOrderIterator(tx,m,this); 3394 } 3395 3396 public byte lastGetPriority() { 3397 return lastGetPriority; 3398 } 3399 3400 public boolean alreadyDispatched(Long sequence) { 3401 return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) || 3402 (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) || 3403 (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence); 3404 } 3405 3406 public void trackPendingAdd(Long seq) { 3407 synchronized (pendingAdditions) { 3408 pendingAdditions.add(seq); 3409 } 3410 } 3411 3412 public void trackPendingAddComplete(Long seq) { 3413 synchronized (pendingAdditions) { 3414 pendingAdditions.remove(seq); 3415 } 3416 } 3417 3418 public Long minPendingAdd() { 3419 synchronized (pendingAdditions) { 3420 if (!pendingAdditions.isEmpty()) { 3421 return pendingAdditions.get(0); 3422 } else { 3423 return null; 3424 } 3425 } 3426 } 3427 3428 3429 class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{ 3430 Iterator<Entry<Long, MessageKeys>>currentIterator; 3431 final Iterator<Entry<Long, MessageKeys>>highIterator; 3432 final Iterator<Entry<Long, MessageKeys>>defaultIterator; 3433 final Iterator<Entry<Long, MessageKeys>>lowIterator; 3434 3435 MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException { 3436 Long pendingAddLimiter = messageOrderIndex.minPendingAdd(); 3437 this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter); 3438 if (highPriorityIndex != null) { 3439 this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter); 3440 } else { 3441 this.highIterator = null; 3442 } 3443 if (lowPriorityIndex != null) { 3444 this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter); 3445 } else { 3446 this.lowIterator = null; 3447 } 3448 } 3449 3450 @Override 3451 public boolean hasNext() { 3452 if (currentIterator == null) { 3453 if (highIterator != null) { 3454 if (highIterator.hasNext()) { 3455 currentIterator = highIterator; 3456 return currentIterator.hasNext(); 3457 } 3458 if (defaultIterator.hasNext()) { 3459 currentIterator = defaultIterator; 3460 return currentIterator.hasNext(); 3461 } 3462 if (lowIterator.hasNext()) { 3463 currentIterator = lowIterator; 3464 return currentIterator.hasNext(); 3465 } 3466 return false; 3467 } else { 3468 currentIterator = defaultIterator; 3469 return currentIterator.hasNext(); 3470 } 3471 } 3472 if (highIterator != null) { 3473 if (currentIterator.hasNext()) { 3474 return true; 3475 } 3476 if (currentIterator == highIterator) { 3477 if (defaultIterator.hasNext()) { 3478 currentIterator = defaultIterator; 3479 return currentIterator.hasNext(); 3480 } 3481 if (lowIterator.hasNext()) { 3482 currentIterator = lowIterator; 3483 return currentIterator.hasNext(); 3484 } 3485 return false; 3486 } 3487 3488 if (currentIterator == defaultIterator) { 3489 if (lowIterator.hasNext()) { 3490 currentIterator = lowIterator; 3491 return currentIterator.hasNext(); 3492 } 3493 return false; 3494 } 3495 } 3496 return currentIterator.hasNext(); 3497 } 3498 3499 @Override 3500 public Entry<Long, MessageKeys> next() { 3501 Entry<Long, MessageKeys> result = currentIterator.next(); 3502 if (result != null) { 3503 Long key = result.getKey(); 3504 if (highIterator != null) { 3505 if (currentIterator == defaultIterator) { 3506 lastDefaultKey = key; 3507 } else if (currentIterator == highIterator) { 3508 lastHighKey = key; 3509 } else { 3510 lastLowKey = key; 3511 } 3512 } else { 3513 lastDefaultKey = key; 3514 } 3515 } 3516 return result; 3517 } 3518 3519 @Override 3520 public void remove() { 3521 throw new UnsupportedOperationException(); 3522 } 3523 3524 } 3525 } 3526 3527 private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> { 3528 final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller(); 3529 3530 @Override 3531 public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException { 3532 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 3533 ObjectOutputStream oout = new ObjectOutputStream(baos); 3534 oout.writeObject(object); 3535 oout.flush(); 3536 oout.close(); 3537 byte[] data = baos.toByteArray(); 3538 dataOut.writeInt(data.length); 3539 dataOut.write(data); 3540 } 3541 3542 @Override 3543 @SuppressWarnings("unchecked") 3544 public HashSet<String> readPayload(DataInput dataIn) throws IOException { 3545 int dataLen = dataIn.readInt(); 3546 byte[] data = new byte[dataLen]; 3547 dataIn.readFully(data); 3548 ByteArrayInputStream bais = new ByteArrayInputStream(data); 3549 ObjectInputStream oin = new ObjectInputStream(bais); 3550 try { 3551 return (HashSet<String>) oin.readObject(); 3552 } catch (ClassNotFoundException cfe) { 3553 IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe); 3554 ioe.initCause(cfe); 3555 throw ioe; 3556 } 3557 } 3558 } 3559 3560 public File getIndexDirectory() { 3561 return indexDirectory; 3562 } 3563 3564 public void setIndexDirectory(File indexDirectory) { 3565 this.indexDirectory = indexDirectory; 3566 } 3567 3568 interface IndexAware { 3569 public void sequenceAssignedWithIndexLocked(long index); 3570 } 3571 3572 public String getPreallocationScope() { 3573 return preallocationScope; 3574 } 3575 3576 public void setPreallocationScope(String preallocationScope) { 3577 this.preallocationScope = preallocationScope; 3578 } 3579 3580 public String getPreallocationStrategy() { 3581 return preallocationStrategy; 3582 } 3583 3584 public void setPreallocationStrategy(String preallocationStrategy) { 3585 this.preallocationStrategy = preallocationStrategy; 3586 } 3587 3588 public int getCompactAcksAfterNoGC() { 3589 return compactAcksAfterNoGC; 3590 } 3591 3592 /** 3593 * Sets the number of GC cycles where no journal logs were removed before an attempt to 3594 * move forward all the acks in the last log that contains them and is otherwise unreferenced. 3595 * <p> 3596 * A value of -1 will disable this feature. 3597 * 3598 * @param compactAcksAfterNoGC 3599 * Number of empty GC cycles before we rewrite old ACKS. 3600 */ 3601 public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) { 3602 this.compactAcksAfterNoGC = compactAcksAfterNoGC; 3603 } 3604 3605 /** 3606 * Returns whether Ack compaction will ignore that the store is still growing 3607 * and run more often. 3608 * 3609 * @return the compactAcksIgnoresStoreGrowth current value. 3610 */ 3611 public boolean isCompactAcksIgnoresStoreGrowth() { 3612 return compactAcksIgnoresStoreGrowth; 3613 } 3614 3615 /** 3616 * Configure if Ack compaction will occur regardless of continued growth of the 3617 * journal logs meaning that the store has not run out of space yet. Because the 3618 * compaction operation can be costly this value is defaulted to off and the Ack 3619 * compaction is only done when it seems that the store cannot grow and larger. 3620 * 3621 * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set 3622 */ 3623 public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) { 3624 this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth; 3625 } 3626 3627 /** 3628 * Returns whether Ack compaction is enabled 3629 * 3630 * @return enableAckCompaction 3631 */ 3632 public boolean isEnableAckCompaction() { 3633 return enableAckCompaction; 3634 } 3635 3636 /** 3637 * Configure if the Ack compaction task should be enabled to run 3638 * 3639 * @param enableAckCompaction 3640 */ 3641 public void setEnableAckCompaction(boolean enableAckCompaction) { 3642 this.enableAckCompaction = enableAckCompaction; 3643 } 3644}