001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.memory; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.Collections; 022import java.util.Iterator; 023import java.util.LinkedHashMap; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.ConcurrentHashMap; 027import java.util.concurrent.ConcurrentMap; 028 029import org.apache.activemq.broker.ConnectionContext; 030import org.apache.activemq.command.Message; 031import org.apache.activemq.command.MessageAck; 032import org.apache.activemq.command.MessageId; 033import org.apache.activemq.command.TransactionId; 034import org.apache.activemq.command.XATransactionId; 035import org.apache.activemq.store.InlineListenableFuture; 036import org.apache.activemq.store.ListenableFuture; 037import org.apache.activemq.store.MessageStore; 038import org.apache.activemq.store.PersistenceAdapter; 039import org.apache.activemq.store.ProxyMessageStore; 040import org.apache.activemq.store.ProxyTopicMessageStore; 041import org.apache.activemq.store.TopicMessageStore; 042import org.apache.activemq.store.TransactionRecoveryListener; 043import org.apache.activemq.store.TransactionStore; 044 045/** 046 * Provides a TransactionStore implementation that can create transaction aware 047 * MessageStore objects from non transaction aware MessageStore objects. 048 * 049 * 050 */ 051public class MemoryTransactionStore implements TransactionStore { 052 053 protected ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); 054 protected Map<TransactionId, Tx> preparedTransactions = Collections.synchronizedMap(new LinkedHashMap<TransactionId, Tx>()); 055 protected final PersistenceAdapter persistenceAdapter; 056 057 private boolean doingRecover; 058 059 public class Tx { 060 061 public List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<AddMessageCommand>()); 062 063 public final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<RemoveMessageCommand>()); 064 065 public void add(AddMessageCommand msg) { 066 messages.add(msg); 067 } 068 069 public void add(RemoveMessageCommand ack) { 070 acks.add(ack); 071 } 072 073 public Message[] getMessages() { 074 Message rc[] = new Message[messages.size()]; 075 int count = 0; 076 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 077 AddMessageCommand cmd = iter.next(); 078 rc[count++] = cmd.getMessage(); 079 } 080 return rc; 081 } 082 083 public MessageAck[] getAcks() { 084 MessageAck rc[] = new MessageAck[acks.size()]; 085 int count = 0; 086 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 087 RemoveMessageCommand cmd = iter.next(); 088 rc[count++] = cmd.getMessageAck(); 089 } 090 return rc; 091 } 092 093 /** 094 * @throws IOException 095 */ 096 public void commit() throws IOException { 097 ConnectionContext ctx = new ConnectionContext(); 098 persistenceAdapter.beginTransaction(ctx); 099 try { 100 101 // Do all the message adds. 102 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 103 AddMessageCommand cmd = iter.next(); 104 cmd.run(ctx); 105 } 106 // And removes.. 107 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 108 RemoveMessageCommand cmd = iter.next(); 109 cmd.run(ctx); 110 } 111 112 persistenceAdapter.commitTransaction(ctx); 113 114 } catch (IOException e) { 115 persistenceAdapter.rollbackTransaction(ctx); 116 throw e; 117 } 118 } 119 } 120 121 public interface AddMessageCommand { 122 Message getMessage(); 123 124 MessageStore getMessageStore(); 125 126 void run(ConnectionContext context) throws IOException; 127 128 void setMessageStore(MessageStore messageStore); 129 } 130 131 public interface RemoveMessageCommand { 132 MessageAck getMessageAck(); 133 134 void run(ConnectionContext context) throws IOException; 135 136 MessageStore getMessageStore(); 137 } 138 139 public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) { 140 this.persistenceAdapter=persistenceAdapter; 141 } 142 143 public MessageStore proxy(MessageStore messageStore) { 144 ProxyMessageStore proxyMessageStore = new ProxyMessageStore(messageStore) { 145 @Override 146 public void addMessage(ConnectionContext context, final Message send) throws IOException { 147 MemoryTransactionStore.this.addMessage(context, getDelegate(), send); 148 } 149 150 @Override 151 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 152 MemoryTransactionStore.this.addMessage(context, getDelegate(), send); 153 } 154 155 @Override 156 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 157 MemoryTransactionStore.this.addMessage(context, getDelegate(), message); 158 return new InlineListenableFuture(); 159 } 160 161 @Override 162 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canoptimize) throws IOException { 163 MemoryTransactionStore.this.addMessage(context, getDelegate(), message); 164 return new InlineListenableFuture(); 165 } 166 167 @Override 168 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 169 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 170 } 171 172 @Override 173 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 174 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 175 } 176 }; 177 onProxyQueueStore(proxyMessageStore); 178 return proxyMessageStore; 179 } 180 181 protected void onProxyQueueStore(ProxyMessageStore proxyMessageStore) { 182 } 183 184 public TopicMessageStore proxy(TopicMessageStore messageStore) { 185 ProxyTopicMessageStore proxyTopicMessageStore = new ProxyTopicMessageStore(messageStore) { 186 @Override 187 public void addMessage(ConnectionContext context, final Message send) throws IOException { 188 MemoryTransactionStore.this.addMessage(context, getDelegate(), send); 189 } 190 191 @Override 192 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 193 MemoryTransactionStore.this.addMessage(context, getDelegate(), send); 194 } 195 196 @Override 197 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 198 MemoryTransactionStore.this.addMessage(context, getDelegate(), message); 199 return new InlineListenableFuture(); 200 } 201 202 @Override 203 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException { 204 MemoryTransactionStore.this.addMessage(context, getDelegate(), message); 205 return new InlineListenableFuture(); 206 } 207 208 @Override 209 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 210 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 211 } 212 213 @Override 214 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 215 MemoryTransactionStore.this.removeMessage(getDelegate(), ack); 216 } 217 218 @Override 219 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 220 MessageId messageId, MessageAck ack) throws IOException { 221 MemoryTransactionStore.this.acknowledge((TopicMessageStore)getDelegate(), clientId, 222 subscriptionName, messageId, ack); 223 } 224 }; 225 onProxyTopicStore(proxyTopicMessageStore); 226 return proxyTopicMessageStore; 227 } 228 229 protected void onProxyTopicStore(ProxyTopicMessageStore proxyTopicMessageStore) { 230 } 231 232 /** 233 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 234 */ 235 @Override 236 public void prepare(TransactionId txid) throws IOException { 237 Tx tx = inflightTransactions.remove(txid); 238 if (tx == null) { 239 return; 240 } 241 preparedTransactions.put(txid, tx); 242 } 243 244 public Tx getTx(Object txid) { 245 Tx tx = inflightTransactions.get(txid); 246 if (tx == null) { 247 synchronized (inflightTransactions) { 248 tx = inflightTransactions.get(txid); 249 if ( tx == null) { 250 tx = new Tx(); 251 inflightTransactions.put(txid, tx); 252 } 253 } 254 } 255 return tx; 256 } 257 258 public Tx getPreparedTx(TransactionId txid) { 259 Tx tx = preparedTransactions.get(txid); 260 if (tx == null) { 261 tx = new Tx(); 262 preparedTransactions.put(txid, tx); 263 } 264 return tx; 265 } 266 267 @Override 268 public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { 269 if (preCommit != null) { 270 preCommit.run(); 271 } 272 Tx tx; 273 if (wasPrepared) { 274 tx = preparedTransactions.get(txid); 275 } else { 276 tx = inflightTransactions.remove(txid); 277 } 278 279 if (tx != null) { 280 tx.commit(); 281 } 282 if (wasPrepared) { 283 preparedTransactions.remove(txid); 284 } 285 if (postCommit != null) { 286 postCommit.run(); 287 } 288 } 289 290 /** 291 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 292 */ 293 @Override 294 public void rollback(TransactionId txid) throws IOException { 295 preparedTransactions.remove(txid); 296 inflightTransactions.remove(txid); 297 } 298 299 @Override 300 public void start() throws Exception { 301 } 302 303 @Override 304 public void stop() throws Exception { 305 } 306 307 @Override 308 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 309 // All the inflight transactions get rolled back.. 310 inflightTransactions.clear(); 311 this.doingRecover = true; 312 try { 313 for (Iterator<TransactionId> iter = preparedTransactions.keySet().iterator(); iter.hasNext();) { 314 Object txid = iter.next(); 315 Tx tx = preparedTransactions.get(txid); 316 listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks()); 317 onRecovered(tx); 318 } 319 } finally { 320 this.doingRecover = false; 321 } 322 } 323 324 protected void onRecovered(Tx tx) { 325 } 326 327 /** 328 * @param message 329 * @throws IOException 330 */ 331 void addMessage(final ConnectionContext context, final MessageStore destination, final Message message) throws IOException { 332 333 if (doingRecover) { 334 return; 335 } 336 337 if (message.getTransactionId() != null) { 338 Tx tx = getTx(message.getTransactionId()); 339 tx.add(new AddMessageCommand() { 340 MessageStore messageStore = destination; 341 @Override 342 public Message getMessage() { 343 return message; 344 } 345 346 @Override 347 public MessageStore getMessageStore() { 348 return destination; 349 } 350 351 @Override 352 public void run(ConnectionContext ctx) throws IOException { 353 destination.addMessage(ctx, message); 354 } 355 356 @Override 357 public void setMessageStore(MessageStore messageStore) { 358 this.messageStore = messageStore; 359 } 360 361 }); 362 } else { 363 destination.addMessage(context, message); 364 } 365 } 366 367 /** 368 * @param ack 369 * @throws IOException 370 */ 371 final void removeMessage(final MessageStore destination, final MessageAck ack) throws IOException { 372 if (doingRecover) { 373 return; 374 } 375 376 if (ack.isInTransaction()) { 377 Tx tx = getTx(ack.getTransactionId()); 378 tx.add(new RemoveMessageCommand() { 379 @Override 380 public MessageAck getMessageAck() { 381 return ack; 382 } 383 384 @Override 385 public void run(ConnectionContext ctx) throws IOException { 386 destination.removeMessage(ctx, ack); 387 } 388 389 @Override 390 public MessageStore getMessageStore() { 391 return destination; 392 } 393 }); 394 } else { 395 destination.removeMessage(null, ack); 396 } 397 } 398 399 public void acknowledge(final TopicMessageStore destination, final String clientId, final String subscriptionName, 400 final MessageId messageId, final MessageAck ack) throws IOException { 401 if (doingRecover) { 402 return; 403 } 404 405 if (ack.isInTransaction()) { 406 Tx tx = getTx(ack.getTransactionId()); 407 tx.add(new RemoveMessageCommand() { 408 @Override 409 public MessageAck getMessageAck() { 410 return ack; 411 } 412 413 @Override 414 public void run(ConnectionContext ctx) throws IOException { 415 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack); 416 } 417 418 @Override 419 public MessageStore getMessageStore() { 420 return destination; 421 } 422 }); 423 } else { 424 destination.acknowledge(null, clientId, subscriptionName, messageId, ack); 425 } 426 } 427 428 429 public void delete() { 430 inflightTransactions.clear(); 431 preparedTransactions.clear(); 432 doingRecover = false; 433 } 434 435}