001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.Date; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.Map; 025import java.util.Set; 026import java.util.TreeSet; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.atomic.AtomicBoolean; 030 031import org.apache.activemq.broker.Broker; 032import org.apache.activemq.broker.ConnectionContext; 033import org.apache.activemq.broker.region.BaseDestination; 034import org.apache.activemq.command.Message; 035import org.apache.activemq.command.MessageAck; 036import org.apache.activemq.command.MessageId; 037import org.apache.activemq.command.TransactionId; 038import org.apache.activemq.command.XATransactionId; 039import org.apache.activemq.store.AbstractMessageStore; 040import org.apache.activemq.store.IndexListener; 041import org.apache.activemq.store.ListenableFuture; 042import org.apache.activemq.store.MessageStore; 043import org.apache.activemq.store.PersistenceAdapter; 044import org.apache.activemq.store.ProxyMessageStore; 045import org.apache.activemq.store.ProxyTopicMessageStore; 046import org.apache.activemq.store.TopicMessageStore; 047import org.apache.activemq.store.TransactionRecoveryListener; 048import org.apache.activemq.store.TransactionStore; 049import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 050import org.apache.activemq.store.kahadb.data.KahaEntryType; 051import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 052import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 053import org.apache.activemq.store.kahadb.disk.journal.Journal; 054import org.apache.activemq.store.kahadb.disk.journal.Location; 055import org.apache.activemq.usage.StoreUsage; 056import org.apache.activemq.util.DataByteArrayInputStream; 057import org.apache.activemq.util.DataByteArrayOutputStream; 058import org.apache.activemq.util.IOHelper; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062public class MultiKahaDBTransactionStore implements TransactionStore { 063 static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class); 064 final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter; 065 final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>(); 066 final ConcurrentMap<TransactionId, Tx> pendingCommit = new ConcurrentHashMap<TransactionId, Tx>(); 067 private Journal journal; 068 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 069 private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 070 private final AtomicBoolean started = new AtomicBoolean(false); 071 private final AtomicBoolean recovered = new AtomicBoolean(false); 072 private long journalCleanupInterval = Journal.DEFAULT_CLEANUP_INTERVAL; 073 074 public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) { 075 this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter; 076 } 077 078 public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) { 079 return new ProxyMessageStore(messageStore) { 080 @Override 081 public void addMessage(ConnectionContext context, final Message send) throws IOException { 082 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 083 } 084 085 @Override 086 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 087 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 088 } 089 090 @Override 091 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 092 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 093 } 094 095 @Override 096 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 097 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 098 } 099 100 @Override 101 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 102 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 103 } 104 105 @Override 106 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 107 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 108 } 109 110 @Override 111 public void registerIndexListener(IndexListener indexListener) { 112 getDelegate().registerIndexListener(indexListener); 113 try { 114 if (indexListener instanceof BaseDestination) { 115 // update queue storeUsage 116 Object matchingPersistenceAdapter = multiKahaDBPersistenceAdapter.destinationMap.chooseValue(getDelegate().getDestination()); 117 if (matchingPersistenceAdapter instanceof FilteredKahaDBPersistenceAdapter) { 118 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) matchingPersistenceAdapter; 119 if (filteredAdapter.getUsage() != null && filteredAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { 120 StoreUsage storeUsage = filteredAdapter.getUsage(); 121 storeUsage.setStore(filteredAdapter.getPersistenceAdapter()); 122 storeUsage.setParent(multiKahaDBPersistenceAdapter.getBrokerService().getSystemUsage().getStoreUsage()); 123 ((BaseDestination) indexListener).getSystemUsage().setStoreUsage(storeUsage); 124 } 125 } 126 } 127 } catch (Exception ignored) { 128 LOG.warn("Failed to set mKahaDB destination store usage", ignored); 129 } 130 } 131 }; 132 } 133 134 public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) { 135 return new ProxyTopicMessageStore(messageStore) { 136 @Override 137 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 138 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 139 } 140 141 @Override 142 public void addMessage(ConnectionContext context, final Message send) throws IOException { 143 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 144 } 145 146 @Override 147 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 148 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 149 } 150 151 @Override 152 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 153 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 154 } 155 156 @Override 157 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 158 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 159 } 160 161 @Override 162 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 163 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 164 } 165 166 @Override 167 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 168 MessageId messageId, MessageAck ack) throws IOException { 169 MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId, 170 subscriptionName, messageId, ack); 171 } 172 }; 173 } 174 175 public void deleteAllMessages() { 176 IOHelper.deleteChildren(getDirectory()); 177 } 178 179 public int getJournalMaxFileLength() { 180 return journalMaxFileLength; 181 } 182 183 public void setJournalMaxFileLength(int journalMaxFileLength) { 184 this.journalMaxFileLength = journalMaxFileLength; 185 } 186 187 public int getJournalMaxWriteBatchSize() { 188 return journalWriteBatchSize; 189 } 190 191 public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) { 192 this.journalWriteBatchSize = journalWriteBatchSize; 193 } 194 195 public void setJournalCleanupInterval(long journalCleanupInterval) { 196 this.journalCleanupInterval = journalCleanupInterval; 197 } 198 199 public long getJournalCleanupInterval() { 200 return journalCleanupInterval; 201 } 202 203 public class Tx { 204 private final HashMap<TransactionStore, TransactionId> stores = new HashMap<TransactionStore, TransactionId>(); 205 private int prepareLocationId = 0; 206 207 public void trackStore(TransactionStore store, XATransactionId xid) { 208 stores.put(store, xid); 209 } 210 211 public void trackStore(TransactionStore store) { 212 stores.put(store, null); 213 } 214 215 public HashMap<TransactionStore, TransactionId> getStoresMap() { 216 return stores; 217 } 218 219 public Set<TransactionStore> getStores() { 220 return stores.keySet(); 221 } 222 223 public void trackPrepareLocation(Location location) { 224 this.prepareLocationId = location.getDataFileId(); 225 } 226 227 public int getPreparedLocationId() { 228 return prepareLocationId; 229 } 230 } 231 232 public Tx getTx(TransactionId txid) { 233 Tx tx = inflightTransactions.get(txid); 234 if (tx == null) { 235 tx = new Tx(); 236 inflightTransactions.put(txid, tx); 237 } 238 return tx; 239 } 240 241 public Tx removeTx(TransactionId txid) { 242 return inflightTransactions.remove(txid); 243 } 244 245 @Override 246 public void prepare(TransactionId txid) throws IOException { 247 Tx tx = getTx(txid); 248 for (TransactionStore store : tx.getStores()) { 249 store.prepare(txid); 250 } 251 } 252 253 @Override 254 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) 255 throws IOException { 256 257 if (preCommit != null) { 258 preCommit.run(); 259 } 260 261 Tx tx = getTx(txid); 262 if (wasPrepared) { 263 for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) { 264 TransactionId recovered = storeTx.getValue(); 265 if (recovered != null) { 266 storeTx.getKey().commit(recovered, true, null, null); 267 } else { 268 storeTx.getKey().commit(txid, true, null, null); 269 } 270 } 271 } else { 272 // can only do 1pc on a single store 273 if (tx.getStores().size() == 1) { 274 for (TransactionStore store : tx.getStores()) { 275 store.commit(txid, false, null, null); 276 } 277 } else { 278 // need to do local 2pc 279 for (TransactionStore store : tx.getStores()) { 280 store.prepare(txid); 281 } 282 persistOutcome(tx, txid); 283 for (TransactionStore store : tx.getStores()) { 284 store.commit(txid, true, null, null); 285 } 286 persistCompletion(txid); 287 } 288 } 289 removeTx(txid); 290 if (postCommit != null) { 291 postCommit.run(); 292 } 293 } 294 295 public void persistOutcome(Tx tx, TransactionId txid) throws IOException { 296 tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))))); 297 pendingCommit.put(txid, tx); 298 } 299 300 public void persistCompletion(TransactionId txid) throws IOException { 301 store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))); 302 pendingCommit.remove(txid); 303 } 304 305 private Location store(JournalCommand<?> data) throws IOException { 306 int size = data.serializedSizeFramed(); 307 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 308 os.writeByte(data.type().getNumber()); 309 data.writeFramed(os); 310 Location location = journal.write(os.toByteSequence(), true); 311 journal.setLastAppendLocation(location); 312 return location; 313 } 314 315 @Override 316 public void rollback(TransactionId txid) throws IOException { 317 Tx tx = removeTx(txid); 318 if (tx != null) { 319 for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) { 320 TransactionId recovered = storeTx.getValue(); 321 if (recovered != null) { 322 storeTx.getKey().rollback(recovered); 323 } else { 324 storeTx.getKey().rollback(txid); 325 } 326 } 327 } 328 } 329 330 @Override 331 public void start() throws Exception { 332 if (started.compareAndSet(false, true)) { 333 journal = new Journal() { 334 @Override 335 public void cleanup() { 336 super.cleanup(); 337 txStoreCleanup(); 338 } 339 }; 340 journal.setDirectory(getDirectory()); 341 journal.setMaxFileLength(journalMaxFileLength); 342 journal.setWriteBatchSize(journalWriteBatchSize); 343 journal.setCleanupInterval(journalCleanupInterval); 344 IOHelper.mkdirs(journal.getDirectory()); 345 journal.start(); 346 recoverPendingLocalTransactions(); 347 recovered.set(true); 348 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 349 } 350 } 351 352 private void txStoreCleanup() { 353 if (!recovered.get()) { 354 return; 355 } 356 Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet()); 357 for (Tx tx : inflightTransactions.values()) { 358 knownDataFileIds.remove(tx.getPreparedLocationId()); 359 } 360 for (Tx tx : pendingCommit.values()) { 361 knownDataFileIds.remove(tx.getPreparedLocationId()); 362 } 363 try { 364 journal.removeDataFiles(knownDataFileIds); 365 } catch (Exception e) { 366 LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds); 367 } 368 } 369 370 private File getDirectory() { 371 return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore"); 372 } 373 374 @Override 375 public void stop() throws Exception { 376 if (started.compareAndSet(true, false) && journal != null) { 377 journal.close(); 378 journal = null; 379 } 380 } 381 382 private void recoverPendingLocalTransactions() throws IOException { 383 Location location = journal.getNextLocation(null); 384 while (location != null) { 385 process(location, load(location)); 386 location = journal.getNextLocation(location); 387 } 388 pendingCommit.putAll(inflightTransactions); 389 LOG.info("pending local transactions: " + pendingCommit.keySet()); 390 } 391 392 public JournalCommand<?> load(Location location) throws IOException { 393 DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location)); 394 byte readByte = is.readByte(); 395 KahaEntryType type = KahaEntryType.valueOf(readByte); 396 if (type == null) { 397 throw new IOException("Could not load journal record. Invalid location: " + location); 398 } 399 JournalCommand<?> message = (JournalCommand<?>) type.createMessage(); 400 message.mergeFramed(is); 401 return message; 402 } 403 404 public void process(final Location location, JournalCommand<?> command) throws IOException { 405 switch (command.type()) { 406 case KAHA_PREPARE_COMMAND: 407 KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command; 408 getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())).trackPrepareLocation(location); 409 break; 410 case KAHA_COMMIT_COMMAND: 411 KahaCommitCommand commitCommand = (KahaCommitCommand) command; 412 removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo())); 413 break; 414 case KAHA_TRACE_COMMAND: 415 break; 416 default: 417 throw new IOException("Unexpected command in transaction journal: " + command); 418 } 419 } 420 421 422 @Override 423 public synchronized void recover(final TransactionRecoveryListener listener) throws IOException { 424 425 for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) { 426 adapter.createTransactionStore().recover(new TransactionRecoveryListener() { 427 @Override 428 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) { 429 try { 430 getTx(xid).trackStore(adapter.createTransactionStore(), xid); 431 } catch (IOException e) { 432 LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e); 433 } 434 listener.recover(xid, addedMessages, acks); 435 } 436 }); 437 } 438 439 try { 440 Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker(); 441 // force completion of local xa 442 for (TransactionId txid : broker.getPreparedTransactions(null)) { 443 if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) { 444 try { 445 if (pendingCommit.keySet().contains(txid)) { 446 LOG.info("delivering pending commit outcome for tid: " + txid); 447 broker.commitTransaction(null, txid, false); 448 } else { 449 LOG.info("delivering rollback outcome to store for tid: " + txid); 450 broker.forgetTransaction(null, txid); 451 } 452 persistCompletion(txid); 453 } catch (Exception ex) { 454 LOG.error("failed to deliver pending outcome for tid: " + txid, ex); 455 } 456 } 457 } 458 } catch (Exception e) { 459 LOG.error("failed to resolve pending local transactions", e); 460 } 461 } 462 463 void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 464 throws IOException { 465 if (message.getTransactionId() != null) { 466 getTx(message.getTransactionId()).trackStore(transactionStore); 467 } 468 destination.addMessage(context, message); 469 } 470 471 ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 472 throws IOException { 473 if (message.getTransactionId() != null) { 474 getTx(message.getTransactionId()).trackStore(transactionStore); 475 destination.addMessage(context, message); 476 return AbstractMessageStore.FUTURE; 477 } else { 478 return destination.asyncAddQueueMessage(context, message); 479 } 480 } 481 482 ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 483 throws IOException { 484 485 if (message.getTransactionId() != null) { 486 getTx(message.getTransactionId()).trackStore(transactionStore); 487 destination.addMessage(context, message); 488 return AbstractMessageStore.FUTURE; 489 } else { 490 return destination.asyncAddTopicMessage(context, message); 491 } 492 } 493 494 final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 495 throws IOException { 496 if (ack.getTransactionId() != null) { 497 getTx(ack.getTransactionId()).trackStore(transactionStore); 498 } 499 destination.removeMessage(context, ack); 500 } 501 502 final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 503 throws IOException { 504 if (ack.getTransactionId() != null) { 505 getTx(ack.getTransactionId()).trackStore(transactionStore); 506 } 507 destination.removeAsyncMessage(context, ack); 508 } 509 510 final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination, 511 final String clientId, final String subscriptionName, 512 final MessageId messageId, final MessageAck ack) throws IOException { 513 if (ack.getTransactionId() != null) { 514 getTx(ack.getTransactionId()).trackStore(transactionStore); 515 } 516 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 517 } 518 519}