001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.HashMap; 022import java.util.List; 023 024import javax.jms.JMSException; 025import javax.jms.TransactionInProgressException; 026import javax.jms.TransactionRolledBackException; 027import javax.transaction.xa.XAException; 028import javax.transaction.xa.XAResource; 029import javax.transaction.xa.Xid; 030 031import org.apache.activemq.command.ConnectionId; 032import org.apache.activemq.command.DataArrayResponse; 033import org.apache.activemq.command.DataStructure; 034import org.apache.activemq.command.IntegerResponse; 035import org.apache.activemq.command.LocalTransactionId; 036import org.apache.activemq.command.TransactionId; 037import org.apache.activemq.command.TransactionInfo; 038import org.apache.activemq.command.XATransactionId; 039import org.apache.activemq.transaction.Synchronization; 040import org.apache.activemq.transport.failover.FailoverTransport; 041import org.apache.activemq.util.JMSExceptionSupport; 042import org.apache.activemq.util.LongSequenceGenerator; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * A TransactionContext provides the means to control a JMS transaction. It 048 * provides a local transaction interface and also an XAResource interface. <p/> 049 * An application server controls the transactional assignment of an XASession 050 * by obtaining its XAResource. It uses the XAResource to assign the session to 051 * a transaction, prepare and commit work on the transaction, and so on. <p/> An 052 * XAResource provides some fairly sophisticated facilities for interleaving 053 * work on multiple transactions, recovering a list of transactions in progress, 054 * and so on. A JTA aware JMS provider must fully implement this functionality. 055 * This could be done by using the services of a database that supports XA, or a 056 * JMS provider may choose to implement this functionality from scratch. <p/> 057 * 058 * 059 * @see javax.jms.Session 060 * @see javax.jms.QueueSession 061 * @see javax.jms.TopicSession 062 * @see javax.jms.XASession 063 */ 064public class TransactionContext implements XAResource { 065 066 public static final String xaErrorCodeMarker = "xaErrorCode:"; 067 private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); 068 069 // XATransactionId -> ArrayList of TransactionContext objects 070 private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = 071 new HashMap<TransactionId, List<TransactionContext>>(); 072 073 private ActiveMQConnection connection; 074 private final LongSequenceGenerator localTransactionIdGenerator; 075 private List<Synchronization> synchronizations; 076 077 // To track XA transactions. 078 private Xid associatedXid; 079 private TransactionId transactionId; 080 private LocalTransactionEventListener localTransactionEventListener; 081 private int beforeEndIndex; 082 private volatile boolean rollbackOnly; 083 084 // for RAR recovery 085 public TransactionContext() { 086 localTransactionIdGenerator = null; 087 } 088 089 public TransactionContext(ActiveMQConnection connection) { 090 this.connection = connection; 091 this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator(); 092 } 093 094 public boolean isInXATransaction() { 095 if (transactionId != null && transactionId.isXATransaction()) { 096 return true; 097 } else { 098 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 099 for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) { 100 if (transactions.contains(this)) { 101 return true; 102 } 103 } 104 } 105 } 106 107 return false; 108 } 109 110 public void setRollbackOnly(boolean val) { 111 rollbackOnly = val; 112 } 113 114 public boolean isInLocalTransaction() { 115 return transactionId != null && transactionId.isLocalTransaction(); 116 } 117 118 public boolean isInTransaction() { 119 return transactionId != null; 120 } 121 122 /** 123 * @return Returns the localTransactionEventListener. 124 */ 125 public LocalTransactionEventListener getLocalTransactionEventListener() { 126 return localTransactionEventListener; 127 } 128 129 /** 130 * Used by the resource adapter to listen to transaction events. 131 * 132 * @param localTransactionEventListener The localTransactionEventListener to 133 * set. 134 */ 135 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { 136 this.localTransactionEventListener = localTransactionEventListener; 137 } 138 139 // /////////////////////////////////////////////////////////// 140 // 141 // Methods that work with the Synchronization objects registered with 142 // the transaction. 143 // 144 // /////////////////////////////////////////////////////////// 145 146 public void addSynchronization(Synchronization s) { 147 if (synchronizations == null) { 148 synchronizations = new ArrayList<Synchronization>(10); 149 } 150 synchronizations.add(s); 151 } 152 153 private void afterRollback() throws JMSException { 154 if (synchronizations == null) { 155 return; 156 } 157 158 Throwable firstException = null; 159 int size = synchronizations.size(); 160 for (int i = 0; i < size; i++) { 161 try { 162 synchronizations.get(i).afterRollback(); 163 } catch (Throwable t) { 164 LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t); 165 if (firstException == null) { 166 firstException = t; 167 } 168 } 169 } 170 synchronizations = null; 171 if (firstException != null) { 172 throw JMSExceptionSupport.create(firstException); 173 } 174 } 175 176 private void afterCommit() throws JMSException { 177 if (synchronizations == null) { 178 return; 179 } 180 181 Throwable firstException = null; 182 int size = synchronizations.size(); 183 for (int i = 0; i < size; i++) { 184 try { 185 synchronizations.get(i).afterCommit(); 186 } catch (Throwable t) { 187 LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t); 188 if (firstException == null) { 189 firstException = t; 190 } 191 } 192 } 193 synchronizations = null; 194 if (firstException != null) { 195 throw JMSExceptionSupport.create(firstException); 196 } 197 } 198 199 private void beforeEnd() throws JMSException { 200 if (synchronizations == null) { 201 return; 202 } 203 204 int size = synchronizations.size(); 205 try { 206 for (;beforeEndIndex < size;) { 207 synchronizations.get(beforeEndIndex++).beforeEnd(); 208 } 209 } catch (JMSException e) { 210 throw e; 211 } catch (Throwable e) { 212 throw JMSExceptionSupport.create(e); 213 } 214 } 215 216 public TransactionId getTransactionId() { 217 return transactionId; 218 } 219 220 // /////////////////////////////////////////////////////////// 221 // 222 // Local transaction interface. 223 // 224 // /////////////////////////////////////////////////////////// 225 226 /** 227 * Start a local transaction. 228 * @throws javax.jms.JMSException on internal error 229 */ 230 public void begin() throws JMSException { 231 232 if (isInXATransaction()) { 233 throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress."); 234 } 235 236 if (transactionId == null) { 237 synchronizations = null; 238 beforeEndIndex = 0; 239 setRollbackOnly(false); 240 this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId()); 241 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 242 this.connection.ensureConnectionInfoSent(); 243 this.connection.asyncSendPacket(info); 244 245 // Notify the listener that the tx was started. 246 if (localTransactionEventListener != null) { 247 localTransactionEventListener.beginEvent(); 248 } 249 if (LOG.isDebugEnabled()) { 250 LOG.debug("Begin:" + transactionId); 251 } 252 } 253 254 } 255 256 /** 257 * Rolls back any work done in this transaction and releases any locks 258 * currently held. 259 * 260 * @throws JMSException if the JMS provider fails to roll back the 261 * transaction due to some internal error. 262 * @throws javax.jms.IllegalStateException if the method is not called by a 263 * transacted session. 264 */ 265 public void rollback() throws JMSException { 266 if (isInXATransaction()) { 267 throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); 268 } 269 270 try { 271 beforeEnd(); 272 } catch (TransactionRolledBackException canOcurrOnFailover) { 273 LOG.warn("rollback processing error", canOcurrOnFailover); 274 } 275 if (transactionId != null) { 276 if (LOG.isDebugEnabled()) { 277 LOG.debug("Rollback: " + transactionId 278 + " syncCount: " 279 + (synchronizations != null ? synchronizations.size() : 0)); 280 } 281 282 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); 283 this.transactionId = null; 284 //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364 285 this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0); 286 // Notify the listener that the tx was rolled back 287 if (localTransactionEventListener != null) { 288 localTransactionEventListener.rollbackEvent(); 289 } 290 } 291 292 afterRollback(); 293 } 294 295 /** 296 * Commits all work done in this transaction and releases any locks 297 * currently held. 298 * 299 * @throws JMSException if the JMS provider fails to commit the transaction 300 * due to some internal error. 301 * @throws javax.jms.IllegalStateException if the method is not called by a 302 * transacted session. 303 */ 304 public void commit() throws JMSException { 305 if (isInXATransaction()) { 306 throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); 307 } 308 309 try { 310 beforeEnd(); 311 } catch (JMSException e) { 312 rollback(); 313 throw e; 314 } 315 316 if (transactionId != null && rollbackOnly) { 317 final String message = "Commit of " + transactionId + " failed due to rollback only request; typically due to failover with pending acks"; 318 try { 319 rollback(); 320 } finally { 321 LOG.warn(message); 322 throw new TransactionRolledBackException(message); 323 } 324 } 325 326 // Only send commit if the transaction was started. 327 if (transactionId != null) { 328 if (LOG.isDebugEnabled()) { 329 LOG.debug("Commit: " + transactionId 330 + " syncCount: " 331 + (synchronizations != null ? synchronizations.size() : 0)); 332 } 333 334 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); 335 this.transactionId = null; 336 // Notify the listener that the tx was committed back 337 try { 338 this.connection.syncSendPacket(info); 339 if (localTransactionEventListener != null) { 340 localTransactionEventListener.commitEvent(); 341 } 342 afterCommit(); 343 } catch (JMSException cause) { 344 LOG.info("commit failed for transaction " + info.getTransactionId(), cause); 345 if (localTransactionEventListener != null) { 346 localTransactionEventListener.rollbackEvent(); 347 } 348 afterRollback(); 349 throw cause; 350 } 351 352 } 353 } 354 355 // /////////////////////////////////////////////////////////// 356 // 357 // XAResource Implementation 358 // 359 // /////////////////////////////////////////////////////////// 360 /** 361 * Associates a transaction with the resource. 362 */ 363 public void start(Xid xid, int flags) throws XAException { 364 365 if (LOG.isDebugEnabled()) { 366 LOG.debug("Start: " + xid + ", flags:" + flags); 367 } 368 if (isInLocalTransaction()) { 369 throw new XAException(XAException.XAER_PROTO); 370 } 371 // Are we already associated? 372 if (associatedXid != null) { 373 throw new XAException(XAException.XAER_PROTO); 374 } 375 376 // if ((flags & TMJOIN) == TMJOIN) { 377 // TODO: verify that the server has seen the xid 378 // // } 379 // if ((flags & TMJOIN) == TMRESUME) { 380 // // TODO: verify that the xid was suspended. 381 // } 382 383 // associate 384 synchronizations = null; 385 beforeEndIndex = 0; 386 setRollbackOnly(false); 387 setXid(xid); 388 } 389 390 /** 391 * @return connectionId for connection 392 */ 393 private ConnectionId getConnectionId() { 394 return connection.getConnectionInfo().getConnectionId(); 395 } 396 397 public void end(Xid xid, int flags) throws XAException { 398 399 if (LOG.isDebugEnabled()) { 400 LOG.debug("End: " + xid + ", flags:" + flags); 401 } 402 403 if (isInLocalTransaction()) { 404 throw new XAException(XAException.XAER_PROTO); 405 } 406 407 if ((flags & (TMSUSPEND | TMFAIL)) != 0) { 408 // You can only suspend the associated xid. 409 if (!equals(associatedXid, xid)) { 410 throw new XAException(XAException.XAER_PROTO); 411 } 412 invokeBeforeEnd(); 413 } else if ((flags & TMSUCCESS) == TMSUCCESS) { 414 // set to null if this is the current xid. 415 // otherwise this could be an asynchronous success call 416 if (equals(associatedXid, xid)) { 417 invokeBeforeEnd(); 418 } 419 } else { 420 throw new XAException(XAException.XAER_INVAL); 421 } 422 } 423 424 private void invokeBeforeEnd() throws XAException { 425 boolean throwingException = false; 426 try { 427 beforeEnd(); 428 } catch (JMSException e) { 429 throwingException = true; 430 throw toXAException(e); 431 } finally { 432 try { 433 setXid(null); 434 } catch (XAException ignoreIfWillMask){ 435 if (!throwingException) { 436 throw ignoreIfWillMask; 437 } 438 } 439 } 440 } 441 442 private boolean equals(Xid xid1, Xid xid2) { 443 if (xid1 == xid2) { 444 return true; 445 } 446 if (xid1 == null ^ xid2 == null) { 447 return false; 448 } 449 return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier()) 450 && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId()); 451 } 452 453 public int prepare(Xid xid) throws XAException { 454 if (LOG.isDebugEnabled()) { 455 LOG.debug("Prepare: " + xid); 456 } 457 458 // We allow interleaving multiple transactions, so 459 // we don't limit prepare to the associated xid. 460 XATransactionId x; 461 // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been 462 // called first 463 if (xid == null || (equals(associatedXid, xid))) { 464 throw new XAException(XAException.XAER_PROTO); 465 } else { 466 // TODO: cache the known xids so we don't keep recreating this one?? 467 x = new XATransactionId(xid); 468 } 469 470 if (rollbackOnly) { 471 LOG.warn("prepare of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks"); 472 throw new XAException(XAException.XA_RBINTEGRITY); 473 } 474 475 try { 476 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); 477 478 // Find out if the server wants to commit or rollback. 479 IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info); 480 if (XAResource.XA_RDONLY == response.getResult()) { 481 // transaction stops now, may be syncs that need a callback 482 List<TransactionContext> l; 483 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 484 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 485 } 486 // After commit may be expensive and can deadlock, do it outside global synch block 487 // No risk for concurrent updates as we own the list now 488 if (l != null) { 489 if(! l.isEmpty()) { 490 LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid); 491 for (TransactionContext ctx : l) { 492 ctx.afterCommit(); 493 } 494 } 495 } 496 } 497 return response.getResult(); 498 499 } catch (JMSException e) { 500 LOG.warn("prepare of: " + x + " failed with: " + e, e); 501 List<TransactionContext> l; 502 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 503 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 504 } 505 // After rollback may be expensive and can deadlock, do it outside global synch block 506 // No risk for concurrent updates as we own the list now 507 if (l != null) { 508 for (TransactionContext ctx : l) { 509 try { 510 ctx.afterRollback(); 511 } catch (Throwable ignored) { 512 LOG.debug("failed to firing afterRollback callbacks on prepare " + 513 "failure, txid: {}, context: {}", x, ctx, ignored); 514 } 515 } 516 } 517 throw toXAException(e); 518 } 519 } 520 521 public void rollback(Xid xid) throws XAException { 522 523 if (LOG.isDebugEnabled()) { 524 LOG.debug("Rollback: " + xid); 525 } 526 527 // We allow interleaving multiple transactions, so 528 // we don't limit rollback to the associated xid. 529 XATransactionId x; 530 if (xid == null) { 531 throw new XAException(XAException.XAER_PROTO); 532 } 533 if (equals(associatedXid, xid)) { 534 // I think this can happen even without an end(xid) call. Need to 535 // check spec. 536 x = (XATransactionId)transactionId; 537 } else { 538 x = new XATransactionId(xid); 539 } 540 541 try { 542 this.connection.checkClosedOrFailed(); 543 this.connection.ensureConnectionInfoSent(); 544 545 // Let the server know that the tx is rollback. 546 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); 547 this.connection.syncSendPacket(info); 548 549 List<TransactionContext> l; 550 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 551 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 552 } 553 // After rollback may be expensive and can deadlock, do it outside global synch block 554 // No risk for concurrent updates as we own the list now 555 if (l != null) { 556 for (TransactionContext ctx : l) { 557 ctx.afterRollback(); 558 } 559 } 560 } catch (JMSException e) { 561 throw toXAException(e); 562 } 563 } 564 565 // XAResource interface 566 public void commit(Xid xid, boolean onePhase) throws XAException { 567 568 if (LOG.isDebugEnabled()) { 569 LOG.debug("Commit: " + xid + ", onePhase=" + onePhase); 570 } 571 572 // We allow interleaving multiple transactions, so 573 // we don't limit commit to the associated xid. 574 XATransactionId x; 575 if (xid == null || (equals(associatedXid, xid))) { 576 // should never happen, end(xid,TMSUCCESS) must have been previously 577 // called 578 throw new XAException(XAException.XAER_PROTO); 579 } else { 580 x = new XATransactionId(xid); 581 } 582 583 if (rollbackOnly) { 584 LOG.warn("commit of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks"); 585 throw new XAException(XAException.XA_RBINTEGRITY); 586 } 587 588 try { 589 this.connection.checkClosedOrFailed(); 590 this.connection.ensureConnectionInfoSent(); 591 592 // Notify the server that the tx was committed back 593 TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); 594 595 this.connection.syncSendPacket(info); 596 597 List<TransactionContext> l; 598 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 599 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 600 } 601 // After commit may be expensive and can deadlock, do it outside global synch block 602 // No risk for concurrent updates as we own the list now 603 if (l != null) { 604 for (TransactionContext ctx : l) { 605 try { 606 ctx.afterCommit(); 607 } catch (Exception ignored) { 608 LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored); 609 } 610 } 611 } 612 613 } catch (JMSException e) { 614 LOG.warn("commit of: " + x + " failed with: " + e, e); 615 if (onePhase) { 616 List<TransactionContext> l; 617 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 618 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 619 } 620 // After rollback may be expensive and can deadlock, do it outside global synch block 621 // No risk for concurrent updates as we own the list now 622 if (l != null) { 623 for (TransactionContext ctx : l) { 624 try { 625 ctx.afterRollback(); 626 } catch (Throwable ignored) { 627 LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored); 628 } 629 } 630 } 631 } 632 throw toXAException(e); 633 } 634 635 } 636 637 public void forget(Xid xid) throws XAException { 638 if (LOG.isDebugEnabled()) { 639 LOG.debug("Forget: " + xid); 640 } 641 642 // We allow interleaving multiple transactions, so 643 // we don't limit forget to the associated xid. 644 XATransactionId x; 645 if (xid == null) { 646 throw new XAException(XAException.XAER_PROTO); 647 } 648 if (equals(associatedXid, xid)) { 649 // TODO determine if this can happen... I think not. 650 x = (XATransactionId)transactionId; 651 } else { 652 x = new XATransactionId(xid); 653 } 654 655 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET); 656 657 try { 658 // Tell the server to forget the transaction. 659 this.connection.syncSendPacket(info); 660 } catch (JMSException e) { 661 throw toXAException(e); 662 } 663 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 664 ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 665 } 666 } 667 668 public boolean isSameRM(XAResource xaResource) throws XAException { 669 if (xaResource == null) { 670 return false; 671 } 672 if (!(xaResource instanceof TransactionContext)) { 673 return false; 674 } 675 TransactionContext xar = (TransactionContext)xaResource; 676 try { 677 return getResourceManagerId().equals(xar.getResourceManagerId()); 678 } catch (Throwable e) { 679 throw (XAException)new XAException("Could not get resource manager id.").initCause(e); 680 } 681 } 682 683 public Xid[] recover(int flag) throws XAException { 684 LOG.debug("recover({})", flag); 685 XATransactionId[] answer; 686 687 if (XAResource.TMNOFLAGS == flag) { 688 // signal next in cursor scan, which for us is always the end b/c we don't maintain any cursor state 689 // allows looping scan to complete 690 answer = new XATransactionId[0]; 691 } else { 692 TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); 693 try { 694 this.connection.checkClosedOrFailed(); 695 this.connection.ensureConnectionInfoSent(); 696 697 DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info); 698 DataStructure[] data = receipt.getData(); 699 if (data instanceof XATransactionId[]) { 700 answer = (XATransactionId[]) data; 701 } else { 702 answer = new XATransactionId[data.length]; 703 System.arraycopy(data, 0, answer, 0, data.length); 704 } 705 } catch (JMSException e) { 706 throw toXAException(e); 707 } 708 } 709 LOG.debug("recover({})={}", flag, answer); 710 return answer; 711 } 712 713 public int getTransactionTimeout() throws XAException { 714 return 0; 715 } 716 717 public boolean setTransactionTimeout(int seconds) throws XAException { 718 return false; 719 } 720 721 // /////////////////////////////////////////////////////////// 722 // 723 // Helper methods. 724 // 725 // /////////////////////////////////////////////////////////// 726 protected String getResourceManagerId() throws JMSException { 727 return this.connection.getResourceManagerId(); 728 } 729 730 private void setXid(Xid xid) throws XAException { 731 732 try { 733 this.connection.checkClosedOrFailed(); 734 this.connection.ensureConnectionInfoSent(); 735 } catch (JMSException e) { 736 disassociate(); 737 throw toXAException(e); 738 } 739 740 if (xid != null) { 741 // associate 742 associatedXid = xid; 743 transactionId = new XATransactionId(xid); 744 745 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 746 try { 747 this.connection.asyncSendPacket(info); 748 if (LOG.isDebugEnabled()) { 749 LOG.debug("{} started XA transaction {} ", this, transactionId); 750 } 751 } catch (JMSException e) { 752 disassociate(); 753 throw toXAException(e); 754 } 755 756 } else { 757 758 if (transactionId != null) { 759 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END); 760 try { 761 this.connection.syncSendPacket(info); 762 LOG.debug("{} ended XA transaction {}", this, transactionId); 763 } catch (JMSException e) { 764 disassociate(); 765 throw toXAException(e); 766 } 767 768 // Add our self to the list of contexts that are interested in 769 // post commit/rollback events. 770 List<TransactionContext> l; 771 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 772 l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); 773 if (l == null) { 774 l = new ArrayList<TransactionContext>(3); 775 ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); 776 } 777 if (!l.contains(this)) { 778 l.add(this); 779 } 780 } 781 } 782 783 disassociate(); 784 } 785 } 786 787 private void disassociate() { 788 // dis-associate 789 associatedXid = null; 790 transactionId = null; 791 } 792 793 /** 794 * Converts a JMSException from the server to an XAException. if the 795 * JMSException contained a linked XAException that is returned instead. 796 * 797 * @param e JMSException to convert 798 * @return XAException wrapping original exception or its message 799 */ 800 private XAException toXAException(JMSException e) { 801 if (e.getCause() != null && e.getCause() instanceof XAException) { 802 XAException original = (XAException)e.getCause(); 803 XAException xae = new XAException(original.getMessage()); 804 xae.errorCode = original.errorCode; 805 if (xae.errorCode == XA_OK) { 806 // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable 807 xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR); 808 } 809 xae.initCause(original); 810 return xae; 811 } 812 813 XAException xae = new XAException(e.getMessage()); 814 xae.errorCode = XAException.XAER_RMFAIL; 815 xae.initCause(e); 816 return xae; 817 } 818 819 private int parseFromMessageOr(String message, int fallbackCode) { 820 final String marker = "xaErrorCode:"; 821 final int index = message.lastIndexOf(marker); 822 if (index > -1) { 823 try { 824 return Integer.parseInt(message.substring(index + marker.length())); 825 } catch (Exception ignored) {} 826 } 827 return fallbackCode; 828 } 829 830 public ActiveMQConnection getConnection() { 831 return connection; 832 } 833 834 835 // for RAR xa recovery where xaresource connection is per request 836 public ActiveMQConnection setConnection(ActiveMQConnection connection) { 837 ActiveMQConnection existing = this.connection; 838 this.connection = connection; 839 return existing; 840 } 841 842 public void cleanup() { 843 associatedXid = null; 844 transactionId = null; 845 } 846 847 @Override 848 public String toString() { 849 return "TransactionContext{" + 850 "transactionId=" + transactionId + 851 ",connection=" + connection + 852 '}'; 853 } 854}