001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.Date; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.Map; 025import java.util.Set; 026import java.util.TreeSet; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.atomic.AtomicBoolean; 030 031import org.apache.activemq.broker.Broker; 032import org.apache.activemq.broker.ConnectionContext; 033import org.apache.activemq.broker.region.BaseDestination; 034import org.apache.activemq.command.Message; 035import org.apache.activemq.command.MessageAck; 036import org.apache.activemq.command.MessageId; 037import org.apache.activemq.command.TransactionId; 038import org.apache.activemq.command.XATransactionId; 039import org.apache.activemq.store.AbstractMessageStore; 040import org.apache.activemq.store.IndexListener; 041import org.apache.activemq.store.ListenableFuture; 042import org.apache.activemq.store.MessageStore; 043import org.apache.activemq.store.PersistenceAdapter; 044import org.apache.activemq.store.ProxyMessageStore; 045import org.apache.activemq.store.ProxyTopicMessageStore; 046import org.apache.activemq.store.TopicMessageStore; 047import org.apache.activemq.store.TransactionRecoveryListener; 048import org.apache.activemq.store.TransactionStore; 049import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 050import org.apache.activemq.store.kahadb.data.KahaEntryType; 051import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 052import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 053import org.apache.activemq.store.kahadb.disk.journal.DataFile; 054import org.apache.activemq.store.kahadb.disk.journal.Journal; 055import org.apache.activemq.store.kahadb.disk.journal.Location; 056import org.apache.activemq.usage.StoreUsage; 057import org.apache.activemq.util.DataByteArrayInputStream; 058import org.apache.activemq.util.DataByteArrayOutputStream; 059import org.apache.activemq.util.IOHelper; 060import org.slf4j.Logger; 061import org.slf4j.LoggerFactory; 062 063public class MultiKahaDBTransactionStore implements TransactionStore { 064 static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class); 065 final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter; 066 final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>(); 067 final ConcurrentMap<TransactionId, Tx> pendingCommit = new ConcurrentHashMap<TransactionId, Tx>(); 068 private Journal journal; 069 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 070 private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 071 private final AtomicBoolean started = new AtomicBoolean(false); 072 private final AtomicBoolean recovered = new AtomicBoolean(false); 073 private long journalCleanupInterval = Journal.DEFAULT_CLEANUP_INTERVAL; 074 private boolean checkForCorruption = true; 075 private AtomicBoolean corruptJournalDetected = new AtomicBoolean(false); 076 077 public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) { 078 this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter; 079 } 080 081 public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) { 082 return new ProxyMessageStore(messageStore) { 083 @Override 084 public void addMessage(ConnectionContext context, final Message send) throws IOException { 085 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 086 } 087 088 @Override 089 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 090 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 091 } 092 093 @Override 094 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 095 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 096 } 097 098 @Override 099 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 100 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 101 } 102 103 @Override 104 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 105 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 106 } 107 108 @Override 109 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 110 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 111 } 112 113 @Override 114 public void registerIndexListener(IndexListener indexListener) { 115 getDelegate().registerIndexListener(indexListener); 116 try { 117 if (indexListener instanceof BaseDestination) { 118 // update queue storeUsage 119 Object matchingPersistenceAdapter = multiKahaDBPersistenceAdapter.destinationMap.chooseValue(getDelegate().getDestination()); 120 if (matchingPersistenceAdapter instanceof FilteredKahaDBPersistenceAdapter) { 121 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) matchingPersistenceAdapter; 122 if (filteredAdapter.getUsage() != null && filteredAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { 123 StoreUsage storeUsage = filteredAdapter.getUsage(); 124 storeUsage.setStore(filteredAdapter.getPersistenceAdapter()); 125 storeUsage.setParent(multiKahaDBPersistenceAdapter.getBrokerService().getSystemUsage().getStoreUsage()); 126 ((BaseDestination) indexListener).getSystemUsage().setStoreUsage(storeUsage); 127 } 128 } 129 } 130 } catch (Exception ignored) { 131 LOG.warn("Failed to set mKahaDB destination store usage", ignored); 132 } 133 } 134 }; 135 } 136 137 public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) { 138 return new ProxyTopicMessageStore(messageStore) { 139 @Override 140 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 141 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 142 } 143 144 @Override 145 public void addMessage(ConnectionContext context, final Message send) throws IOException { 146 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 147 } 148 149 @Override 150 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 151 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 152 } 153 154 @Override 155 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 156 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 157 } 158 159 @Override 160 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 161 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 162 } 163 164 @Override 165 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 166 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 167 } 168 169 @Override 170 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 171 MessageId messageId, MessageAck ack) throws IOException { 172 MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId, 173 subscriptionName, messageId, ack); 174 } 175 }; 176 } 177 178 public void deleteAllMessages() { 179 IOHelper.deleteChildren(getDirectory()); 180 } 181 182 public int getJournalMaxFileLength() { 183 return journalMaxFileLength; 184 } 185 186 public void setJournalMaxFileLength(int journalMaxFileLength) { 187 this.journalMaxFileLength = journalMaxFileLength; 188 } 189 190 public int getJournalMaxWriteBatchSize() { 191 return journalWriteBatchSize; 192 } 193 194 public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) { 195 this.journalWriteBatchSize = journalWriteBatchSize; 196 } 197 198 public void setJournalCleanupInterval(long journalCleanupInterval) { 199 this.journalCleanupInterval = journalCleanupInterval; 200 } 201 202 public long getJournalCleanupInterval() { 203 return journalCleanupInterval; 204 } 205 206 public void setCheckForCorruption(boolean checkForCorruption) { 207 this.checkForCorruption = checkForCorruption; 208 } 209 210 public boolean isCheckForCorruption() { 211 return checkForCorruption; 212 } 213 214 public class Tx { 215 private final HashMap<TransactionStore, TransactionId> stores = new HashMap<TransactionStore, TransactionId>(); 216 private int prepareLocationId = 0; 217 218 public void trackStore(TransactionStore store, XATransactionId xid) { 219 stores.put(store, xid); 220 } 221 222 public void trackStore(TransactionStore store) { 223 stores.put(store, null); 224 } 225 226 public HashMap<TransactionStore, TransactionId> getStoresMap() { 227 return stores; 228 } 229 230 public Set<TransactionStore> getStores() { 231 return stores.keySet(); 232 } 233 234 public void trackPrepareLocation(Location location) { 235 this.prepareLocationId = location.getDataFileId(); 236 } 237 238 public int getPreparedLocationId() { 239 return prepareLocationId; 240 } 241 } 242 243 public Tx getTx(TransactionId txid) { 244 Tx tx = inflightTransactions.get(txid); 245 if (tx == null) { 246 tx = new Tx(); 247 inflightTransactions.put(txid, tx); 248 } 249 return tx; 250 } 251 252 public Tx removeTx(TransactionId txid) { 253 return inflightTransactions.remove(txid); 254 } 255 256 @Override 257 public void prepare(TransactionId txid) throws IOException { 258 Tx tx = getTx(txid); 259 for (TransactionStore store : tx.getStores()) { 260 store.prepare(txid); 261 } 262 } 263 264 @Override 265 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) 266 throws IOException { 267 268 if (preCommit != null) { 269 preCommit.run(); 270 } 271 272 Tx tx = getTx(txid); 273 if (wasPrepared) { 274 for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) { 275 TransactionId recovered = storeTx.getValue(); 276 if (recovered != null) { 277 storeTx.getKey().commit(recovered, true, null, null); 278 } else { 279 storeTx.getKey().commit(txid, true, null, null); 280 } 281 } 282 } else { 283 // can only do 1pc on a single store 284 if (tx.getStores().size() == 1) { 285 for (TransactionStore store : tx.getStores()) { 286 store.commit(txid, false, null, null); 287 } 288 } else { 289 // need to do local 2pc 290 for (TransactionStore store : tx.getStores()) { 291 store.prepare(txid); 292 } 293 persistOutcome(tx, txid); 294 for (TransactionStore store : tx.getStores()) { 295 store.commit(txid, true, null, null); 296 } 297 persistCompletion(txid); 298 } 299 } 300 removeTx(txid); 301 if (postCommit != null) { 302 postCommit.run(); 303 } 304 } 305 306 public void persistOutcome(Tx tx, TransactionId txid) throws IOException { 307 tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))))); 308 pendingCommit.put(txid, tx); 309 } 310 311 public void persistCompletion(TransactionId txid) throws IOException { 312 store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))); 313 pendingCommit.remove(txid); 314 } 315 316 private Location store(JournalCommand<?> data) throws IOException { 317 int size = data.serializedSizeFramed(); 318 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 319 os.writeByte(data.type().getNumber()); 320 data.writeFramed(os); 321 Location location = journal.write(os.toByteSequence(), true); 322 journal.setLastAppendLocation(location); 323 return location; 324 } 325 326 @Override 327 public void rollback(TransactionId txid) throws IOException { 328 Tx tx = removeTx(txid); 329 if (tx != null) { 330 for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) { 331 TransactionId recovered = storeTx.getValue(); 332 if (recovered != null) { 333 storeTx.getKey().rollback(recovered); 334 } else { 335 storeTx.getKey().rollback(txid); 336 } 337 } 338 } 339 } 340 341 @Override 342 public void start() throws Exception { 343 if (started.compareAndSet(false, true)) { 344 journal = new Journal() { 345 @Override 346 public void cleanup() { 347 super.cleanup(); 348 txStoreCleanup(); 349 } 350 }; 351 journal.setDirectory(getDirectory()); 352 journal.setMaxFileLength(journalMaxFileLength); 353 journal.setWriteBatchSize(journalWriteBatchSize); 354 journal.setCleanupInterval(journalCleanupInterval); 355 journal.setCheckForCorruptionOnStartup(checkForCorruption); 356 journal.setChecksum(checkForCorruption); 357 IOHelper.mkdirs(journal.getDirectory()); 358 journal.start(); 359 recoverPendingLocalTransactions(); 360 recovered.set(true); 361 loaded(); 362 } 363 } 364 365 private void loaded() throws IOException { 366 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 367 } 368 369 private void txStoreCleanup() { 370 if (!recovered.get() || corruptJournalDetected.get()) { 371 return; 372 } 373 Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet()); 374 for (Tx tx : inflightTransactions.values()) { 375 knownDataFileIds.remove(tx.getPreparedLocationId()); 376 } 377 for (Tx tx : pendingCommit.values()) { 378 knownDataFileIds.remove(tx.getPreparedLocationId()); 379 } 380 try { 381 journal.removeDataFiles(knownDataFileIds); 382 } catch (Exception e) { 383 LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds); 384 } 385 } 386 387 private File getDirectory() { 388 return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore"); 389 } 390 391 @Override 392 public void stop() throws Exception { 393 if (started.compareAndSet(true, false) && journal != null) { 394 journal.close(); 395 journal = null; 396 } 397 } 398 399 private void recoverPendingLocalTransactions() throws IOException { 400 401 if (checkForCorruption) { 402 for (DataFile dataFile: journal.getFileMap().values()) { 403 if (!dataFile.getCorruptedBlocks().isEmpty()) { 404 LOG.error("Corrupt Transaction journal records found in db-{}.log at {}", dataFile.getDataFileId(), dataFile.getCorruptedBlocks()); 405 corruptJournalDetected.set(true); 406 } 407 } 408 } 409 if (!corruptJournalDetected.get()) { 410 Location location = null; 411 try { 412 location = journal.getNextLocation(null); 413 while (location != null) { 414 process(location, load(location)); 415 location = journal.getNextLocation(location); 416 } 417 } catch (Exception oops) { 418 LOG.error("Corrupt journal record; unexpected exception on transaction journal replay of location:" + location, oops); 419 corruptJournalDetected.set(true); 420 } 421 pendingCommit.putAll(inflightTransactions); 422 LOG.info("pending local transactions: " + pendingCommit.keySet()); 423 } 424 } 425 426 public JournalCommand<?> load(Location location) throws IOException { 427 DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location)); 428 byte readByte = is.readByte(); 429 KahaEntryType type = KahaEntryType.valueOf(readByte); 430 if (type == null) { 431 throw new IOException("Could not load journal record. Invalid location: " + location); 432 } 433 JournalCommand<?> message = (JournalCommand<?>) type.createMessage(); 434 message.mergeFramed(is); 435 return message; 436 } 437 438 public void process(final Location location, JournalCommand<?> command) throws IOException { 439 switch (command.type()) { 440 case KAHA_PREPARE_COMMAND: 441 KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command; 442 getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())).trackPrepareLocation(location); 443 break; 444 case KAHA_COMMIT_COMMAND: 445 KahaCommitCommand commitCommand = (KahaCommitCommand) command; 446 removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo())); 447 break; 448 case KAHA_TRACE_COMMAND: 449 break; 450 default: 451 throw new IOException("Unexpected command in transaction journal: " + command); 452 } 453 } 454 455 456 @Override 457 public synchronized void recover(final TransactionRecoveryListener listener) throws IOException { 458 459 for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) { 460 adapter.createTransactionStore().recover(new TransactionRecoveryListener() { 461 @Override 462 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) { 463 try { 464 getTx(xid).trackStore(adapter.createTransactionStore(), xid); 465 } catch (IOException e) { 466 LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e); 467 } 468 listener.recover(xid, addedMessages, acks); 469 } 470 }); 471 } 472 473 boolean recoveryWorkPending = false; 474 try { 475 Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker(); 476 // force completion of local xa 477 for (TransactionId txid : broker.getPreparedTransactions(null)) { 478 if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) { 479 recoveryWorkPending = true; 480 if (corruptJournalDetected.get()) { 481 // not having a record is meaningless once our tx store is corrupt; we need a heuristic decision 482 LOG.warn("Pending multi store local transaction {} requires manual heuristic outcome via JMX", txid); 483 logSomeContext(txid); 484 } else { 485 try { 486 if (pendingCommit.keySet().contains(txid)) { 487 // we recorded the commit outcome, finish the job 488 LOG.info("delivering pending commit outcome for tid: " + txid); 489 broker.commitTransaction(null, txid, false); 490 } else { 491 // we have not record an outcome, and would have reported a commit failure, so we must rollback 492 LOG.info("delivering rollback outcome to store for tid: " + txid); 493 broker.forgetTransaction(null, txid); 494 } 495 persistCompletion(txid); 496 } catch (Exception ex) { 497 LOG.error("failed to deliver pending outcome for tid: " + txid, ex); 498 } 499 } 500 } 501 } 502 } catch (Exception e) { 503 LOG.error("failed to resolve pending local transactions", e); 504 } 505 // can we ignore corruption and resume 506 if (corruptJournalDetected.get() && !recoveryWorkPending) { 507 // move to new write file, gc will cleanup 508 journal.rotateWriteFile(); 509 loaded(); 510 corruptJournalDetected.set(false); 511 LOG.info("No heuristics outcome pending after corrupt tx store detection, auto resolving"); 512 } 513 } 514 515 private void logSomeContext(TransactionId txid) throws IOException { 516 Tx tx = getTx(txid); 517 if (tx != null) { 518 for (TransactionStore store: tx.getStores()) { 519 for (PersistenceAdapter persistenceAdapter : multiKahaDBPersistenceAdapter.adapters) { 520 if (persistenceAdapter.createTransactionStore() == store) { 521 if (persistenceAdapter instanceof KahaDBPersistenceAdapter) { 522 LOG.warn("Heuristic data in: " + persistenceAdapter + ", " + ((KahaDBPersistenceAdapter)persistenceAdapter).getStore().getPreparedTransaction(txid)); 523 } 524 } 525 } 526 } 527 } 528 } 529 530 void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 531 throws IOException { 532 if (message.getTransactionId() != null) { 533 getTx(message.getTransactionId()).trackStore(transactionStore); 534 } 535 destination.addMessage(context, message); 536 } 537 538 ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 539 throws IOException { 540 if (message.getTransactionId() != null) { 541 getTx(message.getTransactionId()).trackStore(transactionStore); 542 destination.addMessage(context, message); 543 return AbstractMessageStore.FUTURE; 544 } else { 545 return destination.asyncAddQueueMessage(context, message); 546 } 547 } 548 549 ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 550 throws IOException { 551 552 if (message.getTransactionId() != null) { 553 getTx(message.getTransactionId()).trackStore(transactionStore); 554 destination.addMessage(context, message); 555 return AbstractMessageStore.FUTURE; 556 } else { 557 return destination.asyncAddTopicMessage(context, message); 558 } 559 } 560 561 final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 562 throws IOException { 563 if (ack.getTransactionId() != null) { 564 getTx(ack.getTransactionId()).trackStore(transactionStore); 565 } 566 destination.removeMessage(context, ack); 567 } 568 569 final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 570 throws IOException { 571 if (ack.getTransactionId() != null) { 572 getTx(ack.getTransactionId()).trackStore(transactionStore); 573 } 574 destination.removeAsyncMessage(context, ack); 575 } 576 577 final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination, 578 final String clientId, final String subscriptionName, 579 final MessageId messageId, final MessageAck ack) throws IOException { 580 if (ack.getTransactionId() != null) { 581 getTx(ack.getTransactionId()).trackStore(transactionStore); 582 } 583 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 584 } 585 586}