001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Iterator; 022 023import org.apache.activemq.broker.ConnectionContext; 024import org.apache.activemq.command.ActiveMQDestination; 025import org.apache.activemq.command.ActiveMQQueue; 026import org.apache.activemq.command.ActiveMQTopic; 027import org.apache.activemq.command.Message; 028import org.apache.activemq.command.MessageAck; 029import org.apache.activemq.command.MessageId; 030import org.apache.activemq.command.TransactionId; 031import org.apache.activemq.command.XATransactionId; 032import org.apache.activemq.store.IndexListener; 033import org.apache.activemq.store.MessageStore; 034import org.apache.activemq.store.ProxyMessageStore; 035import org.apache.activemq.store.TopicMessageStore; 036import org.apache.activemq.store.TransactionRecoveryListener; 037import org.apache.activemq.store.memory.MemoryTransactionStore; 038import org.apache.activemq.util.ByteSequence; 039import org.apache.activemq.util.DataByteArrayInputStream; 040 041/** 042 * respect 2pc prepare 043 * uses local transactions to maintain prepared state 044 * xid column provides transaction flag for additions and removals 045 * a commit clears that context and completes the work 046 * a rollback clears the flag and removes the additions 047 * Essentially a prepare is an insert &| update transaction 048 * commit|rollback is an update &| remove 049 */ 050public class JdbcMemoryTransactionStore extends MemoryTransactionStore { 051 052 053 public JdbcMemoryTransactionStore(JDBCPersistenceAdapter jdbcPersistenceAdapter) { 054 super(jdbcPersistenceAdapter); 055 } 056 057 @Override 058 public void prepare(TransactionId txid) throws IOException { 059 Tx tx = inflightTransactions.remove(txid); 060 if (tx == null) { 061 return; 062 } 063 064 ConnectionContext ctx = new ConnectionContext(); 065 // setting the xid modifies the add/remove to be pending transaction outcome 066 ctx.setXid((XATransactionId) txid); 067 persistenceAdapter.beginTransaction(ctx); 068 try { 069 070 // Do all the message adds. 071 for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) { 072 AddMessageCommand cmd = iter.next(); 073 cmd.run(ctx); 074 } 075 // And removes.. 076 for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext();) { 077 RemoveMessageCommand cmd = iter.next(); 078 cmd.run(ctx); 079 } 080 081 persistenceAdapter.commitTransaction(ctx); 082 083 } catch ( IOException e ) { 084 persistenceAdapter.rollbackTransaction(ctx); 085 throw e; 086 } 087 088 ctx.setXid(null); 089 // setup for commit outcome 090 ArrayList<AddMessageCommand> updateFromPreparedStateCommands = new ArrayList<AddMessageCommand>(); 091 for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext();) { 092 final AddMessageCommand addMessageCommand = iter.next(); 093 updateFromPreparedStateCommands.add(new CommitAddOutcome(addMessageCommand)); 094 } 095 tx.messages = updateFromPreparedStateCommands; 096 preparedTransactions.put(txid, tx); 097 098 } 099 100 101 class CommitAddOutcome implements AddMessageCommand { 102 final Message message; 103 JDBCMessageStore jdbcMessageStore; 104 105 public CommitAddOutcome(JDBCMessageStore jdbcMessageStore, Message message) { 106 this.jdbcMessageStore = jdbcMessageStore; 107 this.message = message; 108 } 109 110 public CommitAddOutcome(AddMessageCommand addMessageCommand) { 111 this((JDBCMessageStore)addMessageCommand.getMessageStore(), addMessageCommand.getMessage()); 112 } 113 114 @Override 115 public Message getMessage() { 116 return message; 117 } 118 119 @Override 120 public MessageStore getMessageStore() { 121 return jdbcMessageStore; 122 } 123 124 @Override 125 public void run(final ConnectionContext context) throws IOException { 126 JDBCPersistenceAdapter jdbcPersistenceAdapter = (JDBCPersistenceAdapter) persistenceAdapter; 127 final Long preparedEntrySequence = (Long) message.getMessageId().getEntryLocator(); 128 TransactionContext c = jdbcPersistenceAdapter.getTransactionContext(context); 129 130 long newSequence; 131 synchronized (jdbcMessageStore.pendingAdditions) { 132 newSequence = jdbcPersistenceAdapter.getNextSequenceId(); 133 final long sequenceToSet = newSequence; 134 c.onCompletion(new Runnable() { 135 @Override 136 public void run() { 137 message.getMessageId().setEntryLocator(sequenceToSet); 138 message.getMessageId().setFutureOrSequenceLong(sequenceToSet); 139 } 140 }); 141 142 if (jdbcMessageStore.getIndexListener() != null) { 143 jdbcMessageStore.getIndexListener().onAdd(new IndexListener.MessageContext(context, message, null)); 144 } 145 } 146 147 jdbcPersistenceAdapter.commitAdd(context, message.getMessageId(), preparedEntrySequence, newSequence); 148 jdbcMessageStore.onAdd(message, (Long)message.getMessageId().getEntryLocator(), message.getPriority()); 149 } 150 151 @Override 152 public void setMessageStore(MessageStore messageStore) { 153 jdbcMessageStore = (JDBCMessageStore) messageStore; 154 } 155 } 156 157 @Override 158 public void rollback(TransactionId txid) throws IOException { 159 160 Tx tx = inflightTransactions.remove(txid); 161 if (tx == null) { 162 tx = preparedTransactions.remove(txid); 163 if (tx != null) { 164 // undo prepare work 165 ConnectionContext ctx = new ConnectionContext(); 166 persistenceAdapter.beginTransaction(ctx); 167 try { 168 169 for (Iterator<AddMessageCommand> iter = tx.messages.iterator(); iter.hasNext(); ) { 170 final Message message = iter.next().getMessage(); 171 // need to delete the row 172 ((JDBCPersistenceAdapter) persistenceAdapter).commitRemove(ctx, new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, 1)); 173 } 174 175 for (Iterator<RemoveMessageCommand> iter = tx.acks.iterator(); iter.hasNext(); ) { 176 RemoveMessageCommand removeMessageCommand = iter.next(); 177 if (removeMessageCommand instanceof LastAckCommand ) { 178 ((LastAckCommand)removeMessageCommand).rollback(ctx); 179 } else { 180 MessageId messageId = removeMessageCommand.getMessageAck().getLastMessageId(); 181 long sequence = (Long)messageId.getEntryLocator(); 182 // need to unset the txid flag on the existing row 183 ((JDBCPersistenceAdapter) persistenceAdapter).commitAdd(ctx, messageId, sequence, sequence); 184 185 if (removeMessageCommand instanceof RecoveredRemoveMessageCommand) { 186 ((JDBCMessageStore) removeMessageCommand.getMessageStore()).trackRollbackAck(((RecoveredRemoveMessageCommand) removeMessageCommand).getMessage()); 187 } 188 } 189 } 190 } catch (IOException e) { 191 persistenceAdapter.rollbackTransaction(ctx); 192 throw e; 193 } 194 persistenceAdapter.commitTransaction(ctx); 195 } 196 } 197 } 198 199 @Override 200 public void recover(TransactionRecoveryListener listener) throws IOException { 201 ((JDBCPersistenceAdapter)persistenceAdapter).recover(this); 202 super.recover(listener); 203 } 204 205 public void recoverAdd(long id, byte[] messageBytes) throws IOException { 206 final Message message = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(messageBytes)); 207 message.getMessageId().setFutureOrSequenceLong(id); 208 message.getMessageId().setEntryLocator(id); 209 Tx tx = getPreparedTx(message.getTransactionId()); 210 tx.add(new CommitAddOutcome(null, message)); 211 } 212 213 public void recoverAck(long id, byte[] xid, byte[] message) throws IOException { 214 final Message msg = (Message) ((JDBCPersistenceAdapter)persistenceAdapter).getWireFormat().unmarshal(new ByteSequence(message)); 215 msg.getMessageId().setFutureOrSequenceLong(id); 216 msg.getMessageId().setEntryLocator(id); 217 Tx tx = getPreparedTx(new XATransactionId(xid)); 218 final MessageAck ack = new MessageAck(msg, MessageAck.STANDARD_ACK_TYPE, 1); 219 tx.add(new RecoveredRemoveMessageCommand() { 220 MessageStore messageStore = null; 221 @Override 222 public MessageAck getMessageAck() { 223 return ack; 224 } 225 226 @Override 227 public void run(ConnectionContext context) throws IOException { 228 ((JDBCPersistenceAdapter)persistenceAdapter).commitRemove(context, ack); 229 } 230 231 public Message getMessage() { 232 return msg; 233 } 234 235 @Override 236 public void setMessageStore(MessageStore messageStore) { 237 this.messageStore = messageStore; 238 } 239 240 @Override 241 public MessageStore getMessageStore() { 242 return messageStore; 243 } 244 245 }); 246 } 247 248 interface RecoveredRemoveMessageCommand extends RemoveMessageCommand { 249 Message getMessage(); 250 251 void setMessageStore(MessageStore messageStore); 252 } 253 254 interface LastAckCommand extends RemoveMessageCommand { 255 void rollback(ConnectionContext context) throws IOException; 256 257 String getClientId(); 258 259 String getSubName(); 260 261 long getSequence(); 262 263 byte getPriority(); 264 265 void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore); 266 } 267 268 public void recoverLastAck(byte[] encodedXid, final ActiveMQDestination destination, final String subName, final String clientId) throws IOException { 269 Tx tx = getPreparedTx(new XATransactionId(encodedXid)); 270 DataByteArrayInputStream inputStream = new DataByteArrayInputStream(encodedXid); 271 inputStream.skipBytes(1); // +|- 272 final long lastAck = inputStream.readLong(); 273 final byte priority = inputStream.readByte(); 274 final MessageAck ack = new MessageAck(); 275 ack.setDestination(destination); 276 tx.add(new LastAckCommand() { 277 JDBCTopicMessageStore jdbcTopicMessageStore; 278 279 @Override 280 public MessageAck getMessageAck() { 281 return ack; 282 } 283 284 @Override 285 public MessageStore getMessageStore() { 286 return jdbcTopicMessageStore; 287 } 288 289 @Override 290 public void run(ConnectionContext context) throws IOException { 291 ((JDBCPersistenceAdapter)persistenceAdapter).commitLastAck(context, lastAck, priority, destination, subName, clientId); 292 jdbcTopicMessageStore.complete(clientId, subName); 293 } 294 295 @Override 296 public void rollback(ConnectionContext context) throws IOException { 297 ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, jdbcTopicMessageStore.isPrioritizedMessages() ? priority : 0, jdbcTopicMessageStore.getDestination(), subName, clientId); 298 jdbcTopicMessageStore.complete(clientId, subName); 299 } 300 301 @Override 302 public String getClientId() { 303 return clientId; 304 } 305 306 @Override 307 public String getSubName() { 308 return subName; 309 } 310 311 @Override 312 public long getSequence() { 313 return lastAck; 314 } 315 316 @Override 317 public byte getPriority() { 318 return priority; 319 } 320 321 @Override 322 public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) { 323 this.jdbcTopicMessageStore = jdbcTopicMessageStore; 324 } 325 }); 326 327 } 328 329 @Override 330 protected void onRecovered(Tx tx) { 331 for (RemoveMessageCommand removeMessageCommand: tx.acks) { 332 if (removeMessageCommand instanceof LastAckCommand) { 333 LastAckCommand lastAckCommand = (LastAckCommand) removeMessageCommand; 334 JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore) findMessageStore(lastAckCommand.getMessageAck().getDestination()); 335 jdbcTopicMessageStore.pendingCompletion(lastAckCommand.getClientId(), lastAckCommand.getSubName(), lastAckCommand.getSequence(), lastAckCommand.getPriority()); 336 lastAckCommand.setMessageStore(jdbcTopicMessageStore); 337 } else { 338 ((RecoveredRemoveMessageCommand)removeMessageCommand).setMessageStore(findMessageStore(removeMessageCommand.getMessageAck().getDestination())); 339 } 340 } 341 for (AddMessageCommand addMessageCommand : tx.messages) { 342 addMessageCommand.setMessageStore(findMessageStore(addMessageCommand.getMessage().getDestination())); 343 } 344 } 345 346 private MessageStore findMessageStore(ActiveMQDestination destination) { 347 ProxyMessageStore proxyMessageStore = null; 348 try { 349 if (destination.isQueue()) { 350 proxyMessageStore = (ProxyMessageStore) persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination); 351 } else { 352 proxyMessageStore = (ProxyMessageStore) persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination); 353 } 354 } catch (IOException error) { 355 throw new RuntimeException("Failed to find/create message store for destination: " + destination, error); 356 } 357 return proxyMessageStore.getDelegate(); 358 } 359 360 @Override 361 public void acknowledge(final TopicMessageStore topicMessageStore, final String clientId, final String subscriptionName, 362 final MessageId messageId, final MessageAck ack) throws IOException { 363 364 if (ack.isInTransaction()) { 365 Tx tx = getTx(ack.getTransactionId()); 366 tx.add(new LastAckCommand() { 367 public MessageAck getMessageAck() { 368 return ack; 369 } 370 371 public void run(ConnectionContext ctx) throws IOException { 372 topicMessageStore.acknowledge(ctx, clientId, subscriptionName, messageId, ack); 373 } 374 375 @Override 376 public MessageStore getMessageStore() { 377 return topicMessageStore; 378 } 379 380 @Override 381 public void rollback(ConnectionContext context) throws IOException { 382 JDBCTopicMessageStore jdbcTopicMessageStore = (JDBCTopicMessageStore)topicMessageStore; 383 ((JDBCPersistenceAdapter)persistenceAdapter).rollbackLastAck(context, 384 jdbcTopicMessageStore, 385 ack, 386 subscriptionName, clientId); 387 jdbcTopicMessageStore.complete(clientId, subscriptionName); 388 } 389 390 391 @Override 392 public String getClientId() { 393 return clientId; 394 } 395 396 @Override 397 public String getSubName() { 398 return subscriptionName; 399 } 400 401 @Override 402 public long getSequence() { 403 throw new IllegalStateException("Sequence id must be inferred from ack"); 404 } 405 406 @Override 407 public byte getPriority() { 408 throw new IllegalStateException("Priority must be inferred from ack or row"); 409 } 410 411 @Override 412 public void setMessageStore(JDBCTopicMessageStore jdbcTopicMessageStore) { 413 throw new IllegalStateException("message store already known!"); 414 } 415 }); 416 } else { 417 topicMessageStore.acknowledge(null, clientId, subscriptionName, messageId, ack); 418 } 419 } 420 421}