001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.CancellationException; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.Future; 031 032import org.apache.activemq.broker.ConnectionContext; 033import org.apache.activemq.command.ConsumerId; 034import org.apache.activemq.command.Message; 035import org.apache.activemq.command.MessageAck; 036import org.apache.activemq.command.MessageId; 037import org.apache.activemq.command.TransactionId; 038import org.apache.activemq.command.XATransactionId; 039import org.apache.activemq.protobuf.Buffer; 040import org.apache.activemq.store.AbstractMessageStore; 041import org.apache.activemq.store.ListenableFuture; 042import org.apache.activemq.store.MessageStore; 043import org.apache.activemq.store.ProxyMessageStore; 044import org.apache.activemq.store.ProxyTopicMessageStore; 045import org.apache.activemq.store.TopicMessageStore; 046import org.apache.activemq.store.TransactionRecoveryListener; 047import org.apache.activemq.store.TransactionStore; 048import org.apache.activemq.store.kahadb.MessageDatabase.Operation; 049import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 050import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 051import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 052import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 053import org.apache.activemq.wireformat.WireFormat; 054import org.slf4j.Logger; 055import org.slf4j.LoggerFactory; 056 057/** 058 * Provides a TransactionStore implementation that can create transaction aware 059 * MessageStore objects from non transaction aware MessageStore objects. 060 * 061 * 062 */ 063public class KahaDBTransactionStore implements TransactionStore { 064 static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class); 065 ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); 066 private final KahaDBStore theStore; 067 068 public KahaDBTransactionStore(KahaDBStore theStore) { 069 this.theStore = theStore; 070 } 071 072 private WireFormat wireFormat(){ 073 return this.theStore.wireFormat; 074 } 075 076 public class Tx { 077 private final List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<AddMessageCommand>()); 078 079 private final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<RemoveMessageCommand>()); 080 081 public void add(AddMessageCommand msg) { 082 messages.add(msg); 083 } 084 085 public void add(RemoveMessageCommand ack) { 086 acks.add(ack); 087 } 088 089 public Message[] getMessages() { 090 Message rc[] = new Message[messages.size()]; 091 int count = 0; 092 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 093 AddMessageCommand cmd = iter.next(); 094 rc[count++] = cmd.getMessage(); 095 } 096 return rc; 097 } 098 099 public MessageAck[] getAcks() { 100 MessageAck rc[] = new MessageAck[acks.size()]; 101 int count = 0; 102 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 103 RemoveMessageCommand cmd = iter.next(); 104 rc[count++] = cmd.getMessageAck(); 105 } 106 return rc; 107 } 108 109 /** 110 * @return true if something to commit 111 * @throws IOException 112 */ 113 public List<Future<Object>> commit() throws IOException { 114 List<Future<Object>> results = new ArrayList<Future<Object>>(); 115 // Do all the message adds. 116 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 117 AddMessageCommand cmd = iter.next(); 118 results.add(cmd.run()); 119 120 } 121 // And removes.. 122 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 123 RemoveMessageCommand cmd = iter.next(); 124 cmd.run(); 125 results.add(cmd.run()); 126 } 127 128 return results; 129 } 130 } 131 132 public abstract class AddMessageCommand { 133 private final ConnectionContext ctx; 134 AddMessageCommand(ConnectionContext ctx) { 135 this.ctx = ctx; 136 } 137 abstract Message getMessage(); 138 Future<Object> run() throws IOException { 139 return run(this.ctx); 140 } 141 abstract Future<Object> run(ConnectionContext ctx) throws IOException; 142 } 143 144 public abstract class RemoveMessageCommand { 145 146 private final ConnectionContext ctx; 147 RemoveMessageCommand(ConnectionContext ctx) { 148 this.ctx = ctx; 149 } 150 abstract MessageAck getMessageAck(); 151 Future<Object> run() throws IOException { 152 return run(this.ctx); 153 } 154 abstract Future<Object> run(ConnectionContext context) throws IOException; 155 } 156 157 public MessageStore proxy(MessageStore messageStore) { 158 return new ProxyMessageStore(messageStore) { 159 @Override 160 public void addMessage(ConnectionContext context, final Message send) throws IOException { 161 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 162 } 163 164 @Override 165 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 166 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 167 } 168 169 @Override 170 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 171 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message); 172 } 173 174 @Override 175 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException { 176 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message); 177 } 178 179 @Override 180 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 181 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack); 182 } 183 184 @Override 185 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 186 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack); 187 } 188 }; 189 } 190 191 public TopicMessageStore proxy(TopicMessageStore messageStore) { 192 return new ProxyTopicMessageStore(messageStore) { 193 @Override 194 public void addMessage(ConnectionContext context, final Message send) throws IOException { 195 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 196 } 197 198 @Override 199 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 200 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 201 } 202 203 @Override 204 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 205 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message); 206 } 207 208 @Override 209 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException { 210 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message); 211 } 212 213 @Override 214 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 215 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack); 216 } 217 218 @Override 219 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 220 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack); 221 } 222 223 @Override 224 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 225 MessageId messageId, MessageAck ack) throws IOException { 226 KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId, 227 subscriptionName, messageId, ack); 228 } 229 230 }; 231 } 232 233 /** 234 * @throws IOException 235 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 236 */ 237 @Override 238 public void prepare(TransactionId txid) throws IOException { 239 KahaTransactionInfo info = getTransactionInfo(txid); 240 if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 241 theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); 242 } else { 243 Tx tx = inflightTransactions.remove(txid); 244 if (tx != null) { 245 theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); 246 } 247 } 248 } 249 250 public Tx getTx(Object txid) { 251 Tx tx = inflightTransactions.get(txid); 252 if (tx == null) { 253 synchronized (inflightTransactions) { 254 tx = inflightTransactions.get(txid); 255 if (tx == null) { 256 tx = new Tx(); 257 inflightTransactions.put(txid, tx); 258 } 259 } 260 } 261 return tx; 262 } 263 264 @Override 265 public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit) 266 throws IOException { 267 if (txid != null) { 268 if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) { 269 if (preCommit != null) { 270 preCommit.run(); 271 } 272 Tx tx = inflightTransactions.remove(txid); 273 if (tx != null) { 274 List<Future<Object>> results = tx.commit(); 275 boolean doneSomething = false; 276 for (Future<Object> result : results) { 277 try { 278 result.get(); 279 } catch (InterruptedException e) { 280 theStore.brokerService.handleIOException(new IOException(e.getMessage())); 281 } catch (ExecutionException e) { 282 theStore.brokerService.handleIOException(new IOException(e.getMessage())); 283 }catch(CancellationException e) { 284 } 285 if (!result.isCancelled()) { 286 doneSomething = true; 287 } 288 } 289 if (postCommit != null) { 290 postCommit.run(); 291 } 292 if (doneSomething) { 293 KahaTransactionInfo info = getTransactionInfo(txid); 294 theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null); 295 } 296 }else { 297 //The Tx will be null for failed over clients - lets run their post commits 298 if (postCommit != null) { 299 postCommit.run(); 300 } 301 } 302 303 } else { 304 KahaTransactionInfo info = getTransactionInfo(txid); 305 if (preCommit != null) { 306 preCommit.run(); 307 } 308 theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit); 309 forgetRecoveredAcks(txid, false); 310 } 311 }else { 312 LOG.error("Null transaction passed on commit"); 313 } 314 } 315 316 /** 317 * @throws IOException 318 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 319 */ 320 @Override 321 public void rollback(TransactionId txid) throws IOException { 322 if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 323 KahaTransactionInfo info = getTransactionInfo(txid); 324 theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null); 325 forgetRecoveredAcks(txid, true); 326 } else { 327 inflightTransactions.remove(txid); 328 } 329 } 330 331 protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException { 332 if (txid.isXATransaction()) { 333 XATransactionId xaTid = ((XATransactionId) txid); 334 theStore.forgetRecoveredAcks(xaTid.getPreparedAcks(), isRollback); 335 } 336 } 337 338 @Override 339 public void start() throws Exception { 340 } 341 342 @Override 343 public void stop() throws Exception { 344 } 345 346 @Override 347 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 348 for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) { 349 XATransactionId xid = (XATransactionId) entry.getKey(); 350 ArrayList<Message> messageList = new ArrayList<Message>(); 351 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); 352 353 for (Operation op : entry.getValue()) { 354 if (op.getClass() == MessageDatabase.AddOperation.class) { 355 MessageDatabase.AddOperation addOp = (MessageDatabase.AddOperation) op; 356 Message msg = (Message) wireFormat().unmarshal(new DataInputStream(addOp.getCommand().getMessage() 357 .newInput())); 358 messageList.add(msg); 359 } else { 360 MessageDatabase.RemoveOperation rmOp = (MessageDatabase.RemoveOperation) op; 361 Buffer ackb = rmOp.getCommand().getAck(); 362 MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput())); 363 // allow the ack to be tracked back to its durable sub 364 ConsumerId subKey = new ConsumerId(); 365 subKey.setConnectionId(rmOp.getCommand().getSubscriptionKey()); 366 ack.setConsumerId(subKey); 367 ackList.add(ack); 368 } 369 } 370 371 Message[] addedMessages = new Message[messageList.size()]; 372 MessageAck[] acks = new MessageAck[ackList.size()]; 373 messageList.toArray(addedMessages); 374 ackList.toArray(acks); 375 xid.setPreparedAcks(ackList); 376 theStore.trackRecoveredAcks(ackList); 377 listener.recover(xid, addedMessages, acks); 378 } 379 } 380 381 /** 382 * @param message 383 * @throws IOException 384 */ 385 void addMessage(ConnectionContext context, final MessageStore destination, final Message message) 386 throws IOException { 387 388 if (message.getTransactionId() != null) { 389 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 390 destination.addMessage(context, message); 391 } else { 392 Tx tx = getTx(message.getTransactionId()); 393 tx.add(new AddMessageCommand(context) { 394 @Override 395 public Message getMessage() { 396 return message; 397 } 398 @Override 399 public Future<Object> run(ConnectionContext ctx) throws IOException { 400 destination.addMessage(ctx, message); 401 return AbstractMessageStore.FUTURE; 402 } 403 404 }); 405 } 406 } else { 407 destination.addMessage(context, message); 408 } 409 } 410 411 ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message) 412 throws IOException { 413 414 if (message.getTransactionId() != null) { 415 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 416 destination.addMessage(context, message); 417 return AbstractMessageStore.FUTURE; 418 } else { 419 Tx tx = getTx(message.getTransactionId()); 420 tx.add(new AddMessageCommand(context) { 421 @Override 422 public Message getMessage() { 423 return message; 424 } 425 @Override 426 public Future<Object> run(ConnectionContext ctx) throws IOException { 427 return destination.asyncAddQueueMessage(ctx, message); 428 } 429 430 }); 431 return AbstractMessageStore.FUTURE; 432 } 433 } else { 434 return destination.asyncAddQueueMessage(context, message); 435 } 436 } 437 438 ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message) 439 throws IOException { 440 441 if (message.getTransactionId() != null) { 442 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { 443 destination.addMessage(context, message); 444 return AbstractMessageStore.FUTURE; 445 } else { 446 Tx tx = getTx(message.getTransactionId()); 447 tx.add(new AddMessageCommand(context) { 448 @Override 449 public Message getMessage() { 450 return message; 451 } 452 @Override 453 public Future<Object> run(ConnectionContext ctx) throws IOException { 454 return destination.asyncAddTopicMessage(ctx, message); 455 } 456 457 }); 458 return AbstractMessageStore.FUTURE; 459 } 460 } else { 461 return destination.asyncAddTopicMessage(context, message); 462 } 463 } 464 465 /** 466 * @param ack 467 * @throws IOException 468 */ 469 final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) 470 throws IOException { 471 472 if (ack.isInTransaction()) { 473 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { 474 destination.removeMessage(context, ack); 475 } else { 476 Tx tx = getTx(ack.getTransactionId()); 477 tx.add(new RemoveMessageCommand(context) { 478 @Override 479 public MessageAck getMessageAck() { 480 return ack; 481 } 482 483 @Override 484 public Future<Object> run(ConnectionContext ctx) throws IOException { 485 destination.removeMessage(ctx, ack); 486 return AbstractMessageStore.FUTURE; 487 } 488 }); 489 } 490 } else { 491 destination.removeMessage(context, ack); 492 } 493 } 494 495 final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) 496 throws IOException { 497 498 if (ack.isInTransaction()) { 499 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { 500 destination.removeAsyncMessage(context, ack); 501 } else { 502 Tx tx = getTx(ack.getTransactionId()); 503 tx.add(new RemoveMessageCommand(context) { 504 @Override 505 public MessageAck getMessageAck() { 506 return ack; 507 } 508 509 @Override 510 public Future<Object> run(ConnectionContext ctx) throws IOException { 511 destination.removeMessage(ctx, ack); 512 return AbstractMessageStore.FUTURE; 513 } 514 }); 515 } 516 } else { 517 destination.removeAsyncMessage(context, ack); 518 } 519 } 520 521 final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName, 522 final MessageId messageId, final MessageAck ack) throws IOException { 523 524 if (ack.isInTransaction()) { 525 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { 526 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 527 } else { 528 Tx tx = getTx(ack.getTransactionId()); 529 tx.add(new RemoveMessageCommand(context) { 530 @Override 531 public MessageAck getMessageAck() { 532 return ack; 533 } 534 535 @Override 536 public Future<Object> run(ConnectionContext ctx) throws IOException { 537 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack); 538 return AbstractMessageStore.FUTURE; 539 } 540 }); 541 } 542 } else { 543 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 544 } 545 } 546 547 548 private KahaTransactionInfo getTransactionInfo(TransactionId txid) { 549 return TransactionIdConversion.convert(theStore.getTransactionIdTransformer().transform(txid)); 550 } 551}