001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.HashMap; 022import java.util.List; 023 024import javax.jms.JMSException; 025import javax.jms.TransactionInProgressException; 026import javax.jms.TransactionRolledBackException; 027import javax.transaction.xa.XAException; 028import javax.transaction.xa.XAResource; 029import javax.transaction.xa.Xid; 030 031import org.apache.activemq.command.ConnectionId; 032import org.apache.activemq.command.DataArrayResponse; 033import org.apache.activemq.command.DataStructure; 034import org.apache.activemq.command.IntegerResponse; 035import org.apache.activemq.command.LocalTransactionId; 036import org.apache.activemq.command.TransactionId; 037import org.apache.activemq.command.TransactionInfo; 038import org.apache.activemq.command.XATransactionId; 039import org.apache.activemq.transaction.Synchronization; 040import org.apache.activemq.transport.failover.FailoverTransport; 041import org.apache.activemq.util.JMSExceptionSupport; 042import org.apache.activemq.util.LongSequenceGenerator; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * A TransactionContext provides the means to control a JMS transaction. It 048 * provides a local transaction interface and also an XAResource interface. <p/> 049 * An application server controls the transactional assignment of an XASession 050 * by obtaining its XAResource. It uses the XAResource to assign the session to 051 * a transaction, prepare and commit work on the transaction, and so on. <p/> An 052 * XAResource provides some fairly sophisticated facilities for interleaving 053 * work on multiple transactions, recovering a list of transactions in progress, 054 * and so on. A JTA aware JMS provider must fully implement this functionality. 055 * This could be done by using the services of a database that supports XA, or a 056 * JMS provider may choose to implement this functionality from scratch. <p/> 057 * 058 * 059 * @see javax.jms.Session 060 * @see javax.jms.QueueSession 061 * @see javax.jms.TopicSession 062 * @see javax.jms.XASession 063 */ 064public class TransactionContext implements XAResource { 065 066 public static final String xaErrorCodeMarker = "xaErrorCode:"; 067 private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); 068 069 // XATransactionId -> ArrayList of TransactionContext objects 070 private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = 071 new HashMap<TransactionId, List<TransactionContext>>(); 072 073 private ActiveMQConnection connection; 074 private final LongSequenceGenerator localTransactionIdGenerator; 075 private List<Synchronization> synchronizations; 076 077 // To track XA transactions. 078 private Xid associatedXid; 079 private TransactionId transactionId; 080 private LocalTransactionEventListener localTransactionEventListener; 081 private int beforeEndIndex; 082 private volatile boolean rollbackOnly; 083 084 // for RAR recovery 085 public TransactionContext() { 086 localTransactionIdGenerator = null; 087 } 088 089 public TransactionContext(ActiveMQConnection connection) { 090 this.connection = connection; 091 this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator(); 092 } 093 094 public boolean isInXATransaction() { 095 if (transactionId != null && transactionId.isXATransaction()) { 096 return true; 097 } else { 098 if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) { 099 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 100 for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) { 101 if (transactions.contains(this)) { 102 return true; 103 } 104 } 105 } 106 } 107 } 108 109 return false; 110 } 111 112 public void setRollbackOnly(boolean val) { 113 rollbackOnly = val; 114 } 115 116 public boolean isInLocalTransaction() { 117 return transactionId != null && transactionId.isLocalTransaction(); 118 } 119 120 public boolean isInTransaction() { 121 return transactionId != null; 122 } 123 124 /** 125 * @return Returns the localTransactionEventListener. 126 */ 127 public LocalTransactionEventListener getLocalTransactionEventListener() { 128 return localTransactionEventListener; 129 } 130 131 /** 132 * Used by the resource adapter to listen to transaction events. 133 * 134 * @param localTransactionEventListener The localTransactionEventListener to 135 * set. 136 */ 137 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { 138 this.localTransactionEventListener = localTransactionEventListener; 139 } 140 141 // /////////////////////////////////////////////////////////// 142 // 143 // Methods that work with the Synchronization objects registered with 144 // the transaction. 145 // 146 // /////////////////////////////////////////////////////////// 147 148 public void addSynchronization(Synchronization s) { 149 if (synchronizations == null) { 150 synchronizations = new ArrayList<Synchronization>(10); 151 } 152 synchronizations.add(s); 153 } 154 155 private void afterRollback() throws JMSException { 156 if (synchronizations == null) { 157 return; 158 } 159 160 Throwable firstException = null; 161 int size = synchronizations.size(); 162 for (int i = 0; i < size; i++) { 163 try { 164 synchronizations.get(i).afterRollback(); 165 } catch (Throwable t) { 166 LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t); 167 if (firstException == null) { 168 firstException = t; 169 } 170 } 171 } 172 synchronizations = null; 173 if (firstException != null) { 174 throw JMSExceptionSupport.create(firstException); 175 } 176 } 177 178 private void afterCommit() throws JMSException { 179 if (synchronizations == null) { 180 return; 181 } 182 183 Throwable firstException = null; 184 int size = synchronizations.size(); 185 for (int i = 0; i < size; i++) { 186 try { 187 synchronizations.get(i).afterCommit(); 188 } catch (Throwable t) { 189 LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t); 190 if (firstException == null) { 191 firstException = t; 192 } 193 } 194 } 195 synchronizations = null; 196 if (firstException != null) { 197 throw JMSExceptionSupport.create(firstException); 198 } 199 } 200 201 private void beforeEnd() throws JMSException { 202 if (synchronizations == null) { 203 return; 204 } 205 206 int size = synchronizations.size(); 207 try { 208 for (;beforeEndIndex < size;) { 209 synchronizations.get(beforeEndIndex++).beforeEnd(); 210 } 211 } catch (JMSException e) { 212 throw e; 213 } catch (Throwable e) { 214 throw JMSExceptionSupport.create(e); 215 } 216 } 217 218 public TransactionId getTransactionId() { 219 return transactionId; 220 } 221 222 // /////////////////////////////////////////////////////////// 223 // 224 // Local transaction interface. 225 // 226 // /////////////////////////////////////////////////////////// 227 228 /** 229 * Start a local transaction. 230 * @throws javax.jms.JMSException on internal error 231 */ 232 public void begin() throws JMSException { 233 234 if (isInXATransaction()) { 235 throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress."); 236 } 237 238 if (transactionId == null) { 239 synchronizations = null; 240 beforeEndIndex = 0; 241 setRollbackOnly(false); 242 this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId()); 243 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 244 this.connection.ensureConnectionInfoSent(); 245 this.connection.asyncSendPacket(info); 246 247 // Notify the listener that the tx was started. 248 if (localTransactionEventListener != null) { 249 localTransactionEventListener.beginEvent(); 250 } 251 if (LOG.isDebugEnabled()) { 252 LOG.debug("Begin:" + transactionId); 253 } 254 } 255 256 } 257 258 /** 259 * Rolls back any work done in this transaction and releases any locks 260 * currently held. 261 * 262 * @throws JMSException if the JMS provider fails to roll back the 263 * transaction due to some internal error. 264 * @throws javax.jms.IllegalStateException if the method is not called by a 265 * transacted session. 266 */ 267 public void rollback() throws JMSException { 268 if (isInXATransaction()) { 269 throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); 270 } 271 272 try { 273 beforeEnd(); 274 } catch (TransactionRolledBackException canOcurrOnFailover) { 275 LOG.warn("rollback processing error", canOcurrOnFailover); 276 } 277 if (transactionId != null) { 278 if (LOG.isDebugEnabled()) { 279 LOG.debug("Rollback: " + transactionId 280 + " syncCount: " 281 + (synchronizations != null ? synchronizations.size() : 0)); 282 } 283 284 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); 285 this.transactionId = null; 286 //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364 287 this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0); 288 // Notify the listener that the tx was rolled back 289 if (localTransactionEventListener != null) { 290 localTransactionEventListener.rollbackEvent(); 291 } 292 } 293 294 afterRollback(); 295 } 296 297 /** 298 * Commits all work done in this transaction and releases any locks 299 * currently held. 300 * 301 * @throws JMSException if the JMS provider fails to commit the transaction 302 * due to some internal error. 303 * @throws javax.jms.IllegalStateException if the method is not called by a 304 * transacted session. 305 */ 306 public void commit() throws JMSException { 307 if (isInXATransaction()) { 308 throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); 309 } 310 311 try { 312 beforeEnd(); 313 } catch (JMSException e) { 314 rollback(); 315 throw e; 316 } 317 318 if (transactionId != null && rollbackOnly) { 319 final String message = "Commit of " + transactionId + " failed due to rollback only request; typically due to failover with pending acks"; 320 try { 321 rollback(); 322 } finally { 323 LOG.warn(message); 324 throw new TransactionRolledBackException(message); 325 } 326 } 327 328 // Only send commit if the transaction was started. 329 if (transactionId != null) { 330 if (LOG.isDebugEnabled()) { 331 LOG.debug("Commit: " + transactionId 332 + " syncCount: " 333 + (synchronizations != null ? synchronizations.size() : 0)); 334 } 335 336 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); 337 this.transactionId = null; 338 // Notify the listener that the tx was committed back 339 try { 340 this.connection.syncSendPacket(info); 341 if (localTransactionEventListener != null) { 342 localTransactionEventListener.commitEvent(); 343 } 344 afterCommit(); 345 } catch (JMSException cause) { 346 LOG.info("commit failed for transaction " + info.getTransactionId(), cause); 347 if (localTransactionEventListener != null) { 348 localTransactionEventListener.rollbackEvent(); 349 } 350 afterRollback(); 351 throw cause; 352 } 353 354 } 355 } 356 357 // /////////////////////////////////////////////////////////// 358 // 359 // XAResource Implementation 360 // 361 // /////////////////////////////////////////////////////////// 362 /** 363 * Associates a transaction with the resource. 364 */ 365 public void start(Xid xid, int flags) throws XAException { 366 367 if (LOG.isDebugEnabled()) { 368 LOG.debug("Start: " + xid + ", flags:" + flags); 369 } 370 if (isInLocalTransaction()) { 371 throw new XAException(XAException.XAER_PROTO); 372 } 373 // Are we already associated? 374 if (associatedXid != null) { 375 throw new XAException(XAException.XAER_PROTO); 376 } 377 378 // if ((flags & TMJOIN) == TMJOIN) { 379 // TODO: verify that the server has seen the xid 380 // // } 381 // if ((flags & TMJOIN) == TMRESUME) { 382 // // TODO: verify that the xid was suspended. 383 // } 384 385 // associate 386 synchronizations = null; 387 beforeEndIndex = 0; 388 setRollbackOnly(false); 389 setXid(xid); 390 } 391 392 /** 393 * @return connectionId for connection 394 */ 395 private ConnectionId getConnectionId() { 396 return connection.getConnectionInfo().getConnectionId(); 397 } 398 399 public void end(Xid xid, int flags) throws XAException { 400 401 if (LOG.isDebugEnabled()) { 402 LOG.debug("End: " + xid + ", flags:" + flags); 403 } 404 405 if (isInLocalTransaction()) { 406 throw new XAException(XAException.XAER_PROTO); 407 } 408 409 if ((flags & (TMSUSPEND | TMFAIL)) != 0) { 410 // You can only suspend the associated xid. 411 if (!equals(associatedXid, xid)) { 412 throw new XAException(XAException.XAER_PROTO); 413 } 414 invokeBeforeEnd(); 415 } else if ((flags & TMSUCCESS) == TMSUCCESS) { 416 // set to null if this is the current xid. 417 // otherwise this could be an asynchronous success call 418 if (equals(associatedXid, xid)) { 419 invokeBeforeEnd(); 420 } 421 } else { 422 throw new XAException(XAException.XAER_INVAL); 423 } 424 } 425 426 private void invokeBeforeEnd() throws XAException { 427 boolean throwingException = false; 428 try { 429 beforeEnd(); 430 } catch (JMSException e) { 431 throwingException = true; 432 throw toXAException(e); 433 } finally { 434 try { 435 setXid(null); 436 } catch (XAException ignoreIfWillMask){ 437 if (!throwingException) { 438 throw ignoreIfWillMask; 439 } 440 } 441 } 442 } 443 444 private boolean equals(Xid xid1, Xid xid2) { 445 if (xid1 == xid2) { 446 return true; 447 } 448 if (xid1 == null ^ xid2 == null) { 449 return false; 450 } 451 return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier()) 452 && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId()); 453 } 454 455 public int prepare(Xid xid) throws XAException { 456 if (LOG.isDebugEnabled()) { 457 LOG.debug("Prepare: " + xid); 458 } 459 460 // We allow interleaving multiple transactions, so 461 // we don't limit prepare to the associated xid. 462 XATransactionId x; 463 // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been 464 // called first 465 if (xid == null || (equals(associatedXid, xid))) { 466 throw new XAException(XAException.XAER_PROTO); 467 } else { 468 // TODO: cache the known xids so we don't keep recreating this one?? 469 x = new XATransactionId(xid); 470 } 471 472 if (rollbackOnly) { 473 LOG.warn("prepare of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks"); 474 throw new XAException(XAException.XA_RBINTEGRITY); 475 } 476 477 try { 478 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); 479 480 // Find out if the server wants to commit or rollback. 481 IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info); 482 if (XAResource.XA_RDONLY == response.getResult()) { 483 // transaction stops now, may be syncs that need a callback 484 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 485 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 486 if (l != null && !l.isEmpty()) { 487 if (LOG.isDebugEnabled()) { 488 LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid); 489 } 490 for (TransactionContext ctx : l) { 491 ctx.afterCommit(); 492 } 493 } 494 } 495 } 496 return response.getResult(); 497 498 } catch (JMSException e) { 499 LOG.warn("prepare of: " + x + " failed with: " + e, e); 500 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 501 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 502 if (l != null && !l.isEmpty()) { 503 for (TransactionContext ctx : l) { 504 try { 505 ctx.afterRollback(); 506 } catch (Throwable ignored) { 507 if (LOG.isDebugEnabled()) { 508 LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " + 509 x + ", context: " + ctx, ignored); 510 } 511 } 512 } 513 } 514 } 515 throw toXAException(e); 516 } 517 } 518 519 public void rollback(Xid xid) throws XAException { 520 521 if (LOG.isDebugEnabled()) { 522 LOG.debug("Rollback: " + xid); 523 } 524 525 // We allow interleaving multiple transactions, so 526 // we don't limit rollback to the associated xid. 527 XATransactionId x; 528 if (xid == null) { 529 throw new XAException(XAException.XAER_PROTO); 530 } 531 if (equals(associatedXid, xid)) { 532 // I think this can happen even without an end(xid) call. Need to 533 // check spec. 534 x = (XATransactionId)transactionId; 535 } else { 536 x = new XATransactionId(xid); 537 } 538 539 try { 540 this.connection.checkClosedOrFailed(); 541 this.connection.ensureConnectionInfoSent(); 542 543 // Let the server know that the tx is rollback. 544 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); 545 this.connection.syncSendPacket(info); 546 547 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 548 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 549 if (l != null && !l.isEmpty()) { 550 for (TransactionContext ctx : l) { 551 ctx.afterRollback(); 552 } 553 } 554 } 555 } catch (JMSException e) { 556 throw toXAException(e); 557 } 558 } 559 560 // XAResource interface 561 public void commit(Xid xid, boolean onePhase) throws XAException { 562 563 if (LOG.isDebugEnabled()) { 564 LOG.debug("Commit: " + xid + ", onePhase=" + onePhase); 565 } 566 567 // We allow interleaving multiple transactions, so 568 // we don't limit commit to the associated xid. 569 XATransactionId x; 570 if (xid == null || (equals(associatedXid, xid))) { 571 // should never happen, end(xid,TMSUCCESS) must have been previously 572 // called 573 throw new XAException(XAException.XAER_PROTO); 574 } else { 575 x = new XATransactionId(xid); 576 } 577 578 if (rollbackOnly) { 579 LOG.warn("commit of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks"); 580 throw new XAException(XAException.XA_RBINTEGRITY); 581 } 582 583 try { 584 this.connection.checkClosedOrFailed(); 585 this.connection.ensureConnectionInfoSent(); 586 587 // Notify the server that the tx was committed back 588 TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); 589 590 this.connection.syncSendPacket(info); 591 592 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 593 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 594 if (l != null && !l.isEmpty()) { 595 for (TransactionContext ctx : l) { 596 try { 597 ctx.afterCommit(); 598 } catch (Exception ignored) { 599 LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored); 600 } 601 } 602 } 603 } 604 605 } catch (JMSException e) { 606 LOG.warn("commit of: " + x + " failed with: " + e, e); 607 if (onePhase) { 608 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 609 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 610 if (l != null && !l.isEmpty()) { 611 for (TransactionContext ctx : l) { 612 try { 613 ctx.afterRollback(); 614 } catch (Throwable ignored) { 615 if (LOG.isDebugEnabled()) { 616 LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored); 617 } 618 } 619 } 620 } 621 } 622 } 623 throw toXAException(e); 624 } 625 626 } 627 628 public void forget(Xid xid) throws XAException { 629 if (LOG.isDebugEnabled()) { 630 LOG.debug("Forget: " + xid); 631 } 632 633 // We allow interleaving multiple transactions, so 634 // we don't limit forget to the associated xid. 635 XATransactionId x; 636 if (xid == null) { 637 throw new XAException(XAException.XAER_PROTO); 638 } 639 if (equals(associatedXid, xid)) { 640 // TODO determine if this can happen... I think not. 641 x = (XATransactionId)transactionId; 642 } else { 643 x = new XATransactionId(xid); 644 } 645 646 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET); 647 648 try { 649 // Tell the server to forget the transaction. 650 this.connection.syncSendPacket(info); 651 } catch (JMSException e) { 652 throw toXAException(e); 653 } 654 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 655 ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 656 } 657 } 658 659 public boolean isSameRM(XAResource xaResource) throws XAException { 660 if (xaResource == null) { 661 return false; 662 } 663 if (!(xaResource instanceof TransactionContext)) { 664 return false; 665 } 666 TransactionContext xar = (TransactionContext)xaResource; 667 try { 668 return getResourceManagerId().equals(xar.getResourceManagerId()); 669 } catch (Throwable e) { 670 throw (XAException)new XAException("Could not get resource manager id.").initCause(e); 671 } 672 } 673 674 public Xid[] recover(int flag) throws XAException { 675 LOG.debug("recover({})", flag); 676 677 TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); 678 try { 679 this.connection.checkClosedOrFailed(); 680 this.connection.ensureConnectionInfoSent(); 681 682 DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info); 683 DataStructure[] data = receipt.getData(); 684 XATransactionId[] answer; 685 if (data instanceof XATransactionId[]) { 686 answer = (XATransactionId[])data; 687 } else { 688 answer = new XATransactionId[data.length]; 689 System.arraycopy(data, 0, answer, 0, data.length); 690 } 691 LOG.debug("recover({})={}", flag, answer); 692 return answer; 693 } catch (JMSException e) { 694 throw toXAException(e); 695 } 696 } 697 698 public int getTransactionTimeout() throws XAException { 699 return 0; 700 } 701 702 public boolean setTransactionTimeout(int seconds) throws XAException { 703 return false; 704 } 705 706 // /////////////////////////////////////////////////////////// 707 // 708 // Helper methods. 709 // 710 // /////////////////////////////////////////////////////////// 711 protected String getResourceManagerId() throws JMSException { 712 return this.connection.getResourceManagerId(); 713 } 714 715 private void setXid(Xid xid) throws XAException { 716 717 try { 718 this.connection.checkClosedOrFailed(); 719 this.connection.ensureConnectionInfoSent(); 720 } catch (JMSException e) { 721 disassociate(); 722 throw toXAException(e); 723 } 724 725 if (xid != null) { 726 // associate 727 associatedXid = xid; 728 transactionId = new XATransactionId(xid); 729 730 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 731 try { 732 this.connection.asyncSendPacket(info); 733 if (LOG.isDebugEnabled()) { 734 LOG.debug("{} started XA transaction {} ", this, transactionId); 735 } 736 } catch (JMSException e) { 737 disassociate(); 738 throw toXAException(e); 739 } 740 741 } else { 742 743 if (transactionId != null) { 744 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END); 745 try { 746 this.connection.syncSendPacket(info); 747 LOG.debug("{} ended XA transaction {}", this, transactionId); 748 } catch (JMSException e) { 749 disassociate(); 750 throw toXAException(e); 751 } 752 753 // Add our self to the list of contexts that are interested in 754 // post commit/rollback events. 755 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 756 List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); 757 if (l == null) { 758 l = new ArrayList<TransactionContext>(3); 759 ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); 760 l.add(this); 761 } else if (!l.contains(this)) { 762 l.add(this); 763 } 764 } 765 } 766 767 disassociate(); 768 } 769 } 770 771 private void disassociate() { 772 // dis-associate 773 associatedXid = null; 774 transactionId = null; 775 } 776 777 /** 778 * Converts a JMSException from the server to an XAException. if the 779 * JMSException contained a linked XAException that is returned instead. 780 * 781 * @param e JMSException to convert 782 * @return XAException wrapping original exception or its message 783 */ 784 private XAException toXAException(JMSException e) { 785 if (e.getCause() != null && e.getCause() instanceof XAException) { 786 XAException original = (XAException)e.getCause(); 787 XAException xae = new XAException(original.getMessage()); 788 xae.errorCode = original.errorCode; 789 if (xae.errorCode == XA_OK) { 790 // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable 791 xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR); 792 } 793 xae.initCause(original); 794 return xae; 795 } 796 797 XAException xae = new XAException(e.getMessage()); 798 xae.errorCode = XAException.XAER_RMFAIL; 799 xae.initCause(e); 800 return xae; 801 } 802 803 private int parseFromMessageOr(String message, int fallbackCode) { 804 final String marker = "xaErrorCode:"; 805 final int index = message.lastIndexOf(marker); 806 if (index > -1) { 807 try { 808 return Integer.parseInt(message.substring(index + marker.length())); 809 } catch (Exception ignored) {} 810 } 811 return fallbackCode; 812 } 813 814 public ActiveMQConnection getConnection() { 815 return connection; 816 } 817 818 819 // for RAR xa recovery where xaresource connection is per request 820 public ActiveMQConnection setConnection(ActiveMQConnection connection) { 821 ActiveMQConnection existing = this.connection; 822 this.connection = connection; 823 return existing; 824 } 825 826 public void cleanup() { 827 associatedXid = null; 828 transactionId = null; 829 } 830 831 @Override 832 public String toString() { 833 return "TransactionContext{" + 834 "transactionId=" + transactionId + 835 ",connection=" + connection + 836 '}'; 837 } 838}