001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.Iterator; 024import java.util.List; 025import java.util.Map; 026import java.util.concurrent.CancellationException; 027import java.util.concurrent.ConcurrentHashMap; 028import java.util.concurrent.ConcurrentMap; 029import java.util.concurrent.ExecutionException; 030import java.util.concurrent.Future; 031 032import org.apache.activemq.broker.ConnectionContext; 033import org.apache.activemq.command.Message; 034import org.apache.activemq.command.MessageAck; 035import org.apache.activemq.command.MessageId; 036import org.apache.activemq.command.TransactionId; 037import org.apache.activemq.command.XATransactionId; 038import org.apache.activemq.protobuf.Buffer; 039import org.apache.activemq.store.AbstractMessageStore; 040import org.apache.activemq.store.ListenableFuture; 041import org.apache.activemq.store.MessageStore; 042import org.apache.activemq.store.ProxyMessageStore; 043import org.apache.activemq.store.ProxyTopicMessageStore; 044import org.apache.activemq.store.TopicMessageStore; 045import org.apache.activemq.store.TransactionRecoveryListener; 046import org.apache.activemq.store.TransactionStore; 047import org.apache.activemq.store.kahadb.MessageDatabase.Operation; 048import org.apache.activemq.store.kahadb.data.KahaCommitCommand; 049import org.apache.activemq.store.kahadb.data.KahaPrepareCommand; 050import org.apache.activemq.store.kahadb.data.KahaRollbackCommand; 051import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; 052import org.apache.activemq.wireformat.WireFormat; 053import org.slf4j.Logger; 054import org.slf4j.LoggerFactory; 055 056/** 057 * Provides a TransactionStore implementation that can create transaction aware 058 * MessageStore objects from non transaction aware MessageStore objects. 059 * 060 * 061 */ 062public class KahaDBTransactionStore implements TransactionStore { 063 static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class); 064 ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>(); 065 private final KahaDBStore theStore; 066 067 public KahaDBTransactionStore(KahaDBStore theStore) { 068 this.theStore = theStore; 069 } 070 071 private WireFormat wireFormat(){ 072 return this.theStore.wireFormat; 073 } 074 075 public class Tx { 076 private final List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<AddMessageCommand>()); 077 078 private final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<RemoveMessageCommand>()); 079 080 public void add(AddMessageCommand msg) { 081 messages.add(msg); 082 } 083 084 public void add(RemoveMessageCommand ack) { 085 acks.add(ack); 086 } 087 088 public Message[] getMessages() { 089 Message rc[] = new Message[messages.size()]; 090 int count = 0; 091 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 092 AddMessageCommand cmd = iter.next(); 093 rc[count++] = cmd.getMessage(); 094 } 095 return rc; 096 } 097 098 public MessageAck[] getAcks() { 099 MessageAck rc[] = new MessageAck[acks.size()]; 100 int count = 0; 101 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 102 RemoveMessageCommand cmd = iter.next(); 103 rc[count++] = cmd.getMessageAck(); 104 } 105 return rc; 106 } 107 108 /** 109 * @return true if something to commit 110 * @throws IOException 111 */ 112 public List<Future<Object>> commit() throws IOException { 113 List<Future<Object>> results = new ArrayList<Future<Object>>(); 114 // Do all the message adds. 115 for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) { 116 AddMessageCommand cmd = iter.next(); 117 results.add(cmd.run()); 118 119 } 120 // And removes.. 121 for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) { 122 RemoveMessageCommand cmd = iter.next(); 123 cmd.run(); 124 results.add(cmd.run()); 125 } 126 127 return results; 128 } 129 } 130 131 public abstract class AddMessageCommand { 132 private final ConnectionContext ctx; 133 AddMessageCommand(ConnectionContext ctx) { 134 this.ctx = ctx; 135 } 136 abstract Message getMessage(); 137 Future<Object> run() throws IOException { 138 return run(this.ctx); 139 } 140 abstract Future<Object> run(ConnectionContext ctx) throws IOException; 141 } 142 143 public abstract class RemoveMessageCommand { 144 145 private final ConnectionContext ctx; 146 RemoveMessageCommand(ConnectionContext ctx) { 147 this.ctx = ctx; 148 } 149 abstract MessageAck getMessageAck(); 150 Future<Object> run() throws IOException { 151 return run(this.ctx); 152 } 153 abstract Future<Object> run(ConnectionContext context) throws IOException; 154 } 155 156 public MessageStore proxy(MessageStore messageStore) { 157 return new ProxyMessageStore(messageStore) { 158 @Override 159 public void addMessage(ConnectionContext context, final Message send) throws IOException { 160 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 161 } 162 163 @Override 164 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 165 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 166 } 167 168 @Override 169 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { 170 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message); 171 } 172 173 @Override 174 public ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException { 175 return KahaDBTransactionStore.this.asyncAddQueueMessage(context, getDelegate(), message); 176 } 177 178 @Override 179 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 180 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack); 181 } 182 183 @Override 184 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 185 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack); 186 } 187 }; 188 } 189 190 public TopicMessageStore proxy(TopicMessageStore messageStore) { 191 return new ProxyTopicMessageStore(messageStore) { 192 @Override 193 public void addMessage(ConnectionContext context, final Message send) throws IOException { 194 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 195 } 196 197 @Override 198 public void addMessage(ConnectionContext context, final Message send, boolean canOptimize) throws IOException { 199 KahaDBTransactionStore.this.addMessage(context, getDelegate(), send); 200 } 201 202 @Override 203 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { 204 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message); 205 } 206 207 @Override 208 public ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, Message message, boolean canOptimize) throws IOException { 209 return KahaDBTransactionStore.this.asyncAddTopicMessage(context, getDelegate(), message); 210 } 211 212 @Override 213 public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { 214 KahaDBTransactionStore.this.removeMessage(context, getDelegate(), ack); 215 } 216 217 @Override 218 public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { 219 KahaDBTransactionStore.this.removeAsyncMessage(context, getDelegate(), ack); 220 } 221 222 @Override 223 public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, 224 MessageId messageId, MessageAck ack) throws IOException { 225 KahaDBTransactionStore.this.acknowledge(context, (TopicMessageStore)getDelegate(), clientId, 226 subscriptionName, messageId, ack); 227 } 228 229 }; 230 } 231 232 /** 233 * @throws IOException 234 * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) 235 */ 236 @Override 237 public void prepare(TransactionId txid) throws IOException { 238 KahaTransactionInfo info = getTransactionInfo(txid); 239 if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 240 theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); 241 } else { 242 Tx tx = inflightTransactions.remove(txid); 243 if (tx != null) { 244 theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null); 245 } 246 } 247 } 248 249 public Tx getTx(Object txid) { 250 Tx tx = inflightTransactions.get(txid); 251 if (tx == null) { 252 synchronized (inflightTransactions) { 253 tx = inflightTransactions.get(txid); 254 if (tx == null) { 255 tx = new Tx(); 256 inflightTransactions.put(txid, tx); 257 } 258 } 259 } 260 return tx; 261 } 262 263 @Override 264 public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit) 265 throws IOException { 266 if (txid != null) { 267 if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) { 268 if (preCommit != null) { 269 preCommit.run(); 270 } 271 Tx tx = inflightTransactions.remove(txid); 272 if (tx != null) { 273 List<Future<Object>> results = tx.commit(); 274 boolean doneSomething = false; 275 for (Future<Object> result : results) { 276 try { 277 result.get(); 278 } catch (InterruptedException e) { 279 theStore.brokerService.handleIOException(new IOException(e.getMessage())); 280 } catch (ExecutionException e) { 281 theStore.brokerService.handleIOException(new IOException(e.getMessage())); 282 }catch(CancellationException e) { 283 } 284 if (!result.isCancelled()) { 285 doneSomething = true; 286 } 287 } 288 if (postCommit != null) { 289 postCommit.run(); 290 } 291 if (doneSomething) { 292 KahaTransactionInfo info = getTransactionInfo(txid); 293 theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null); 294 } 295 }else { 296 //The Tx will be null for failed over clients - lets run their post commits 297 if (postCommit != null) { 298 postCommit.run(); 299 } 300 } 301 302 } else { 303 KahaTransactionInfo info = getTransactionInfo(txid); 304 if (preCommit != null) { 305 preCommit.run(); 306 } 307 theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit); 308 forgetRecoveredAcks(txid, false); 309 } 310 }else { 311 LOG.error("Null transaction passed on commit"); 312 } 313 } 314 315 /** 316 * @throws IOException 317 * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) 318 */ 319 @Override 320 public void rollback(TransactionId txid) throws IOException { 321 if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 322 KahaTransactionInfo info = getTransactionInfo(txid); 323 theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null); 324 forgetRecoveredAcks(txid, true); 325 } else { 326 inflightTransactions.remove(txid); 327 } 328 } 329 330 protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException { 331 if (txid.isXATransaction()) { 332 XATransactionId xaTid = ((XATransactionId) txid); 333 theStore.forgetRecoveredAcks(xaTid.getPreparedAcks(), isRollback); 334 } 335 } 336 337 @Override 338 public void start() throws Exception { 339 } 340 341 @Override 342 public void stop() throws Exception { 343 } 344 345 @Override 346 public synchronized void recover(TransactionRecoveryListener listener) throws IOException { 347 for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) { 348 XATransactionId xid = (XATransactionId) entry.getKey(); 349 ArrayList<Message> messageList = new ArrayList<Message>(); 350 ArrayList<MessageAck> ackList = new ArrayList<MessageAck>(); 351 352 for (Operation op : entry.getValue()) { 353 if (op.getClass() == MessageDatabase.AddOperation.class) { 354 MessageDatabase.AddOperation addOp = (MessageDatabase.AddOperation) op; 355 Message msg = (Message) wireFormat().unmarshal(new DataInputStream(addOp.getCommand().getMessage() 356 .newInput())); 357 messageList.add(msg); 358 } else { 359 MessageDatabase.RemoveOperation rmOp = (MessageDatabase.RemoveOperation) op; 360 Buffer ackb = rmOp.getCommand().getAck(); 361 MessageAck ack = (MessageAck) wireFormat().unmarshal(new DataInputStream(ackb.newInput())); 362 ackList.add(ack); 363 } 364 } 365 366 Message[] addedMessages = new Message[messageList.size()]; 367 MessageAck[] acks = new MessageAck[ackList.size()]; 368 messageList.toArray(addedMessages); 369 ackList.toArray(acks); 370 xid.setPreparedAcks(ackList); 371 theStore.trackRecoveredAcks(ackList); 372 listener.recover(xid, addedMessages, acks); 373 } 374 } 375 376 /** 377 * @param message 378 * @throws IOException 379 */ 380 void addMessage(ConnectionContext context, final MessageStore destination, final Message message) 381 throws IOException { 382 383 if (message.getTransactionId() != null) { 384 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 385 destination.addMessage(context, message); 386 } else { 387 Tx tx = getTx(message.getTransactionId()); 388 tx.add(new AddMessageCommand(context) { 389 @Override 390 public Message getMessage() { 391 return message; 392 } 393 @Override 394 public Future<Object> run(ConnectionContext ctx) throws IOException { 395 destination.addMessage(ctx, message); 396 return AbstractMessageStore.FUTURE; 397 } 398 399 }); 400 } 401 } else { 402 destination.addMessage(context, message); 403 } 404 } 405 406 ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message) 407 throws IOException { 408 409 if (message.getTransactionId() != null) { 410 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) { 411 destination.addMessage(context, message); 412 return AbstractMessageStore.FUTURE; 413 } else { 414 Tx tx = getTx(message.getTransactionId()); 415 tx.add(new AddMessageCommand(context) { 416 @Override 417 public Message getMessage() { 418 return message; 419 } 420 @Override 421 public Future<Object> run(ConnectionContext ctx) throws IOException { 422 return destination.asyncAddQueueMessage(ctx, message); 423 } 424 425 }); 426 return AbstractMessageStore.FUTURE; 427 } 428 } else { 429 return destination.asyncAddQueueMessage(context, message); 430 } 431 } 432 433 ListenableFuture<Object> asyncAddTopicMessage(ConnectionContext context, final MessageStore destination, final Message message) 434 throws IOException { 435 436 if (message.getTransactionId() != null) { 437 if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { 438 destination.addMessage(context, message); 439 return AbstractMessageStore.FUTURE; 440 } else { 441 Tx tx = getTx(message.getTransactionId()); 442 tx.add(new AddMessageCommand(context) { 443 @Override 444 public Message getMessage() { 445 return message; 446 } 447 @Override 448 public Future<Object> run(ConnectionContext ctx) throws IOException { 449 return destination.asyncAddTopicMessage(ctx, message); 450 } 451 452 }); 453 return AbstractMessageStore.FUTURE; 454 } 455 } else { 456 return destination.asyncAddTopicMessage(context, message); 457 } 458 } 459 460 /** 461 * @param ack 462 * @throws IOException 463 */ 464 final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) 465 throws IOException { 466 467 if (ack.isInTransaction()) { 468 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { 469 destination.removeMessage(context, ack); 470 } else { 471 Tx tx = getTx(ack.getTransactionId()); 472 tx.add(new RemoveMessageCommand(context) { 473 @Override 474 public MessageAck getMessageAck() { 475 return ack; 476 } 477 478 @Override 479 public Future<Object> run(ConnectionContext ctx) throws IOException { 480 destination.removeMessage(ctx, ack); 481 return AbstractMessageStore.FUTURE; 482 } 483 }); 484 } 485 } else { 486 destination.removeMessage(context, ack); 487 } 488 } 489 490 final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack) 491 throws IOException { 492 493 if (ack.isInTransaction()) { 494 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) { 495 destination.removeAsyncMessage(context, ack); 496 } else { 497 Tx tx = getTx(ack.getTransactionId()); 498 tx.add(new RemoveMessageCommand(context) { 499 @Override 500 public MessageAck getMessageAck() { 501 return ack; 502 } 503 504 @Override 505 public Future<Object> run(ConnectionContext ctx) throws IOException { 506 destination.removeMessage(ctx, ack); 507 return AbstractMessageStore.FUTURE; 508 } 509 }); 510 } 511 } else { 512 destination.removeAsyncMessage(context, ack); 513 } 514 } 515 516 final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName, 517 final MessageId messageId, final MessageAck ack) throws IOException { 518 519 if (ack.isInTransaction()) { 520 if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) { 521 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 522 } else { 523 Tx tx = getTx(ack.getTransactionId()); 524 tx.add(new RemoveMessageCommand(context) { 525 @Override 526 public MessageAck getMessageAck() { 527 return ack; 528 } 529 530 @Override 531 public Future<Object> run(ConnectionContext ctx) throws IOException { 532 destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack); 533 return AbstractMessageStore.FUTURE; 534 } 535 }); 536 } 537 } else { 538 destination.acknowledge(context, clientId, subscriptionName, messageId, ack); 539 } 540 } 541 542 543 private KahaTransactionInfo getTransactionInfo(TransactionId txid) { 544 return TransactionIdConversion.convert(theStore.getTransactionIdTransformer().transform(txid)); 545 } 546}