001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.Date; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.Map; 025import java.util.Set; 026import java.util.TreeSet; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.atomic.AtomicBoolean; 030 031import org.apache.activemq.broker.Broker; 032import org.apache.activemq.broker.ConnectionContext; 033import org.apache.activemq.broker.region.BaseDestination; 034import org.apache.activemq.command.Message; 035import org.apache.activemq.command.MessageAck; 036import org.apache.activemq.command.MessageId; 037import org.apache.activemq.command.TransactionId; 038import org.apache.activemq.command.XATransactionId; 039import org.apache.activemq.store.AbstractMessageStore; 040import org.apache.activemq.store.IndexListener; 041import org.apache.activemq.store.ListenableFuture; 042import org.apache.activemq.store.MessageStore; 043import org.apache.activemq.store.PersistenceAdapter; 044import org.apache.activemq.store.ProxyMessageStore; 045import org.apache.activemq.store.ProxyTopicMessageStore; 046import org.apache.activemq.store.TopicMessageStore; 047import org.apache.activemq.store.TransactionRecoveryListener; 048import org.apache.activemq.store.TransactionStore; 049import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 050import org.apache.activemq.store.kahadb.data.KahaEntryType; 051import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 052import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 053import org.apache.activemq.store.kahadb.disk.journal.Journal; 054import org.apache.activemq.store.kahadb.disk.journal.Location; 055import org.apache.activemq.usage.StoreUsage; 056import org.apache.activemq.util.DataByteArrayInputStream; 057import org.apache.activemq.util.DataByteArrayOutputStream; 058import org.apache.activemq.util.IOHelper; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062public class MultiKahaDBTransactionStore implements TransactionStore { 063 static final Logger LOG = LoggerFactory.getLogger(MultiKahaDBTransactionStore.class); 064 final MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter; 065 final ConcurrentMap<TransactionId, Tx> inflightTransactions = new ConcurrentHashMap<TransactionId, Tx>(); 066 final Set<TransactionId> recoveredPendingCommit = new HashSet<TransactionId>(); 067 private Journal journal; 068 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 069 private int journalWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 070 private final AtomicBoolean started = new AtomicBoolean(false); 071 072 public MultiKahaDBTransactionStore(MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter) { 073 this.multiKahaDBPersistenceAdapter = multiKahaDBPersistenceAdapter; 074 } 075 076 public MessageStore proxy(final TransactionStore transactionStore, MessageStore messageStore) { 077 return new ProxyMessageStore(messageStore) { 078 @Override 079 public void addMessage(ConnectionContext context, final Message send) throws IOException { 080 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 081 } 082 083 @Override 084 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 085 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 086 } 087 088 @Override 089 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 090 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 091 } 092 093 @Override 094 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 095 return MultiKahaDBTransactionStore.this.asyncAddQueueMessage(transactionStore, context, getDelegate(), message); 096 } 097 098 @Override 099 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 100 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 101 } 102 103 @Override 104 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 105 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 106 } 107 108 @Override 109 public void registerIndexListener(IndexListener indexListener) { 110 getDelegate().registerIndexListener(indexListener); 111 try { 112 if (indexListener instanceof BaseDestination) { 113 // update queue storeUsage 114 Object matchingPersistenceAdapter = multiKahaDBPersistenceAdapter.destinationMap.chooseValue(getDelegate().getDestination()); 115 if (matchingPersistenceAdapter instanceof FilteredKahaDBPersistenceAdapter) { 116 FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) matchingPersistenceAdapter; 117 if (filteredAdapter.getUsage() != null && filteredAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { 118 StoreUsage storeUsage = filteredAdapter.getUsage(); 119 storeUsage.setStore(filteredAdapter.getPersistenceAdapter()); 120 storeUsage.setParent(multiKahaDBPersistenceAdapter.getBrokerService().getSystemUsage().getStoreUsage()); 121 ((BaseDestination) indexListener).getSystemUsage().setStoreUsage(storeUsage); 122 } 123 } 124 } 125 } catch (Exception ignored) { 126 LOG.warn("Failed to set mKahaDB destination store usage", ignored); 127 } 128 } 129 }; 130 } 131 132 public TopicMessageStore proxy(final TransactionStore transactionStore, final TopicMessageStore messageStore) { 133 return new ProxyTopicMessageStore(messageStore) { 134 @Override 135 public void addMessage(ConnectionContext context, final Message send, boolean canOptimizeHint) throws IOException { 136 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 137 } 138 139 @Override 140 public void addMessage(ConnectionContext context, final Message send) throws IOException { 141 MultiKahaDBTransactionStore.this.addMessage(transactionStore, context, getDelegate(), send); 142 } 143 144 @Override 145 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimizeHint) throws IOException { 146 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 147 } 148 149 @Override 150 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 151 return MultiKahaDBTransactionStore.this.asyncAddTopicMessage(transactionStore, context, getDelegate(), message); 152 } 153 154 @Override 155 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 156 MultiKahaDBTransactionStore.this.removeMessage(transactionStore, context, getDelegate(), ack); 157 } 158 159 @Override 160 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 161 MultiKahaDBTransactionStore.this.removeAsyncMessage(transactionStore, context, getDelegate(), ack); 162 } 163 164 @Override 165 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 166 MessageId messageId, MessageAck ack) throws IOException { 167 MultiKahaDBTransactionStore.this.acknowledge(transactionStore, context, (TopicMessageStore) getDelegate(), clientId, 168 subscriptionName, messageId, ack); 169 } 170 }; 171 } 172 173 public void deleteAllMessages() { 174 IOHelper.deleteChildren(getDirectory()); 175 } 176 177 public int getJournalMaxFileLength() { 178 return journalMaxFileLength; 179 } 180 181 public void setJournalMaxFileLength(int journalMaxFileLength) { 182 this.journalMaxFileLength = journalMaxFileLength; 183 } 184 185 public int getJournalMaxWriteBatchSize() { 186 return journalWriteBatchSize; 187 } 188 189 public void setJournalMaxWriteBatchSize(int journalWriteBatchSize) { 190 this.journalWriteBatchSize = journalWriteBatchSize; 191 } 192 193 public class Tx { 194 private final HashMap<TransactionStore, TransactionId> stores = new HashMap<TransactionStore, TransactionId>(); 195 private int prepareLocationId = 0; 196 197 public void trackStore(TransactionStore store, XATransactionId xid) { 198 stores.put(store, xid); 199 } 200 201 public void trackStore(TransactionStore store) { 202 stores.put(store, null); 203 } 204 205 public HashMap<TransactionStore, TransactionId> getStoresMap() { 206 return stores; 207 } 208 209 public Set<TransactionStore> getStores() { 210 return stores.keySet(); 211 } 212 213 public void trackPrepareLocation(Location location) { 214 this.prepareLocationId = location.getDataFileId(); 215 } 216 217 public int getPreparedLocationId() { 218 return prepareLocationId; 219 } 220 } 221 222 public Tx getTx(TransactionId txid) { 223 Tx tx = inflightTransactions.get(txid); 224 if (tx == null) { 225 tx = new Tx(); 226 inflightTransactions.put(txid, tx); 227 } 228 return tx; 229 } 230 231 public Tx removeTx(TransactionId txid) { 232 return inflightTransactions.remove(txid); 233 } 234 235 @Override 236 public void prepare(TransactionId txid) throws IOException { 237 Tx tx = getTx(txid); 238 for (TransactionStore store : tx.getStores()) { 239 store.prepare(txid); 240 } 241 } 242 243 @Override 244 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, Runnable postCommit) 245 throws IOException { 246 247 if (preCommit != null) { 248 preCommit.run(); 249 } 250 251 Tx tx = getTx(txid); 252 if (wasPrepared) { 253 for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) { 254 TransactionId recovered = storeTx.getValue(); 255 if (recovered != null) { 256 storeTx.getKey().commit(recovered, true, null, null); 257 } else { 258 storeTx.getKey().commit(txid, true, null, null); 259 } 260 } 261 } else { 262 // can only do 1pc on a single store 263 if (tx.getStores().size() == 1) { 264 for (TransactionStore store : tx.getStores()) { 265 store.commit(txid, false, null, null); 266 } 267 } else { 268 // need to do local 2pc 269 for (TransactionStore store : tx.getStores()) { 270 store.prepare(txid); 271 } 272 persistOutcome(tx, txid); 273 for (TransactionStore store : tx.getStores()) { 274 store.commit(txid, true, null, null); 275 } 276 persistCompletion(txid); 277 } 278 } 279 removeTx(txid); 280 if (postCommit != null) { 281 postCommit.run(); 282 } 283 } 284 285 public void persistOutcome(Tx tx, TransactionId txid) throws IOException { 286 tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))))); 287 } 288 289 public void persistCompletion(TransactionId txid) throws IOException { 290 store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))); 291 } 292 293 private Location store(JournalCommand<?> data) throws IOException { 294 int size = data.serializedSizeFramed(); 295 DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1); 296 os.writeByte(data.type().getNumber()); 297 data.writeFramed(os); 298 Location location = journal.write(os.toByteSequence(), true); 299 journal.setLastAppendLocation(location); 300 return location; 301 } 302 303 @Override 304 public void rollback(TransactionId txid) throws IOException { 305 Tx tx = removeTx(txid); 306 if (tx != null) { 307 for (Map.Entry<TransactionStore, TransactionId> storeTx : tx.getStoresMap().entrySet()) { 308 TransactionId recovered = storeTx.getValue(); 309 if (recovered != null) { 310 storeTx.getKey().rollback(recovered); 311 } else { 312 storeTx.getKey().rollback(txid); 313 } 314 } 315 } 316 } 317 318 @Override 319 public void start() throws Exception { 320 if (started.compareAndSet(false, true)) { 321 journal = new Journal() { 322 @Override 323 public void cleanup() { 324 super.cleanup(); 325 txStoreCleanup(); 326 } 327 }; 328 journal.setDirectory(getDirectory()); 329 journal.setMaxFileLength(journalMaxFileLength); 330 journal.setWriteBatchSize(journalWriteBatchSize); 331 IOHelper.mkdirs(journal.getDirectory()); 332 journal.start(); 333 recoverPendingLocalTransactions(); 334 store(new KahaTraceCommand().setMessage("LOADED " + new Date())); 335 } 336 } 337 338 private void txStoreCleanup() { 339 Set<Integer> knownDataFileIds = new TreeSet<Integer>(journal.getFileMap().keySet()); 340 for (Tx tx : inflightTransactions.values()) { 341 knownDataFileIds.remove(tx.getPreparedLocationId()); 342 } 343 try { 344 journal.removeDataFiles(knownDataFileIds); 345 } catch (Exception e) { 346 LOG.error(this + ", Failed to remove tx journal datafiles " + knownDataFileIds); 347 } 348 } 349 350 private File getDirectory() { 351 return new File(multiKahaDBPersistenceAdapter.getDirectory(), "txStore"); 352 } 353 354 @Override 355 public void stop() throws Exception { 356 if (started.compareAndSet(true, false) && journal != null) { 357 journal.close(); 358 journal = null; 359 } 360 } 361 362 private void recoverPendingLocalTransactions() throws IOException { 363 Location location = journal.getNextLocation(null); 364 while (location != null) { 365 process(load(location)); 366 location = journal.getNextLocation(location); 367 } 368 recoveredPendingCommit.addAll(inflightTransactions.keySet()); 369 LOG.info("pending local transactions: " + recoveredPendingCommit); 370 } 371 372 public JournalCommand<?> load(Location location) throws IOException { 373 DataByteArrayInputStream is = new DataByteArrayInputStream(journal.read(location)); 374 byte readByte = is.readByte(); 375 KahaEntryType type = KahaEntryType.valueOf(readByte); 376 if (type == null) { 377 throw new IOException("Could not load journal record. Invalid location: " + location); 378 } 379 JournalCommand<?> message = (JournalCommand<?>) type.createMessage(); 380 message.mergeFramed(is); 381 return message; 382 } 383 384 public void process(JournalCommand<?> command) throws IOException { 385 switch (command.type()) { 386 case KAHA_PREPARE_COMMAND: 387 KahaPrepareCommand prepareCommand = (KahaPrepareCommand) command; 388 getTx(TransactionIdConversion.convert(prepareCommand.getTransactionInfo())); 389 break; 390 case KAHA_COMMIT_COMMAND: 391 KahaCommitCommand commitCommand = (KahaCommitCommand) command; 392 removeTx(TransactionIdConversion.convert(commitCommand.getTransactionInfo())); 393 break; 394 case KAHA_TRACE_COMMAND: 395 break; 396 default: 397 throw new IOException("Unexpected command in transaction journal: " + command); 398 } 399 } 400 401 402 @Override 403 public synchronized void recover(final TransactionRecoveryListener listener) throws IOException { 404 405 for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) { 406 adapter.createTransactionStore().recover(new TransactionRecoveryListener() { 407 @Override 408 public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) { 409 try { 410 getTx(xid).trackStore(adapter.createTransactionStore(), xid); 411 } catch (IOException e) { 412 LOG.error("Failed to access transaction store: " + adapter + " for prepared xa tid: " + xid, e); 413 } 414 listener.recover(xid, addedMessages, acks); 415 } 416 }); 417 } 418 419 try { 420 Broker broker = multiKahaDBPersistenceAdapter.getBrokerService().getBroker(); 421 // force completion of local xa 422 for (TransactionId txid : broker.getPreparedTransactions(null)) { 423 if (multiKahaDBPersistenceAdapter.isLocalXid(txid)) { 424 try { 425 if (recoveredPendingCommit.contains(txid)) { 426 LOG.info("delivering pending commit outcome for tid: " + txid); 427 broker.commitTransaction(null, txid, false); 428 429 } else { 430 LOG.info("delivering rollback outcome to store for tid: " + txid); 431 broker.forgetTransaction(null, txid); 432 } 433 persistCompletion(txid); 434 } catch (Exception ex) { 435 LOG.error("failed to deliver pending outcome for tid: " + txid, ex); 436 } 437 } 438 } 439 } catch (Exception e) { 440 LOG.error("failed to resolve pending local transactions", e); 441 } 442 } 443 444 void addMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 445 throws IOException { 446 if (message.getTransactionId() != null) { 447 getTx(message.getTransactionId()).trackStore(transactionStore); 448 } 449 destination.addMessage(context, message); 450 } 451 452 ListenableFuture<Object> asyncAddQueueMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 453 throws IOException { 454 if (message.getTransactionId() != null) { 455 getTx(message.getTransactionId()).trackStore(transactionStore); 456 destination.addMessage(context, message); 457 return AbstractMessageStore.FUTURE; 458 } else { 459 return destination.asyncAddQueueMessage(context, message); 460 } 461 } 462 463 ListenableFuture<Object> asyncAddTopicMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final Message message) 464 throws IOException { 465 466 if (message.getTransactionId() != null) { 467 getTx(message.getTransactionId()).trackStore(transactionStore); 468 destination.addMessage(context, message); 469 return AbstractMessageStore.FUTURE; 470 } else { 471 return destination.asyncAddTopicMessage(context, message); 472 } 473 } 474 475 final void removeMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 476 throws IOException { 477 if (ack.getTransactionId() != null) { 478 getTx(ack.getTransactionId()).trackStore(transactionStore); 479 } 480 destination.removeMessage(context, ack); 481 } 482 483 final void removeAsyncMessage(final TransactionStore transactionStore, ConnectionContext context, final MessageStore destination, final MessageAck ack) 484 throws IOException { 485 if (ack.getTransactionId() != null) { 486 getTx(ack.getTransactionId()).trackStore(transactionStore); 487 } 488 destination.removeAsyncMessage(context, ack); 489 } 490 491 final void acknowledge(final TransactionStore transactionStore, ConnectionContext context, final TopicMessageStore destination, 492 final String clientId, final String subscriptionName, 493 final MessageId messageId, final MessageAck ack) throws IOException { 494 if (ack.getTransactionId() != null) { 495 getTx(ack.getTransactionId()).trackStore(transactionStore); 496 } 497 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 498 } 499 500}