001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.HashMap; 022import java.util.List; 023 024import javax.jms.JMSException; 025import javax.jms.TransactionInProgressException; 026import javax.jms.TransactionRolledBackException; 027import javax.transaction.xa.XAException; 028import javax.transaction.xa.XAResource; 029import javax.transaction.xa.Xid; 030 031import org.apache.activemq.command.ConnectionId; 032import org.apache.activemq.command.DataArrayResponse; 033import org.apache.activemq.command.DataStructure; 034import org.apache.activemq.command.IntegerResponse; 035import org.apache.activemq.command.LocalTransactionId; 036import org.apache.activemq.command.TransactionId; 037import org.apache.activemq.command.TransactionInfo; 038import org.apache.activemq.command.XATransactionId; 039import org.apache.activemq.transaction.Synchronization; 040import org.apache.activemq.transport.failover.FailoverTransport; 041import org.apache.activemq.util.JMSExceptionSupport; 042import org.apache.activemq.util.LongSequenceGenerator; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * A TransactionContext provides the means to control a JMS transaction. It 048 * provides a local transaction interface and also an XAResource interface. <p/> 049 * An application server controls the transactional assignment of an XASession 050 * by obtaining its XAResource. It uses the XAResource to assign the session to 051 * a transaction, prepare and commit work on the transaction, and so on. <p/> An 052 * XAResource provides some fairly sophisticated facilities for interleaving 053 * work on multiple transactions, recovering a list of transactions in progress, 054 * and so on. A JTA aware JMS provider must fully implement this functionality. 055 * This could be done by using the services of a database that supports XA, or a 056 * JMS provider may choose to implement this functionality from scratch. <p/> 057 * 058 * 059 * @see javax.jms.Session 060 * @see javax.jms.QueueSession 061 * @see javax.jms.TopicSession 062 * @see javax.jms.XASession 063 */ 064public class TransactionContext implements XAResource { 065 066 public static final String xaErrorCodeMarker = "xaErrorCode:"; 067 private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); 068 069 // XATransactionId -> ArrayList of TransactionContext objects 070 private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = 071 new HashMap<TransactionId, List<TransactionContext>>(); 072 073 private ActiveMQConnection connection; 074 private final LongSequenceGenerator localTransactionIdGenerator; 075 private List<Synchronization> synchronizations; 076 077 // To track XA transactions. 078 private Xid associatedXid; 079 private TransactionId transactionId; 080 private LocalTransactionEventListener localTransactionEventListener; 081 private int beforeEndIndex; 082 private volatile boolean rollbackOnly; 083 084 // for RAR recovery 085 public TransactionContext() { 086 localTransactionIdGenerator = null; 087 } 088 089 public TransactionContext(ActiveMQConnection connection) { 090 this.connection = connection; 091 this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator(); 092 } 093 094 public boolean isInXATransaction() { 095 if (transactionId != null && transactionId.isXATransaction()) { 096 return true; 097 } else { 098 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 099 for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) { 100 if (transactions.contains(this)) { 101 return true; 102 } 103 } 104 } 105 } 106 107 return false; 108 } 109 110 public void setRollbackOnly(boolean val) { 111 rollbackOnly = val; 112 } 113 114 public boolean isRollbackOnly() { 115 return rollbackOnly; 116 } 117 118 public boolean isInLocalTransaction() { 119 return transactionId != null && transactionId.isLocalTransaction(); 120 } 121 122 public boolean isInTransaction() { 123 return transactionId != null; 124 } 125 126 /** 127 * @return Returns the localTransactionEventListener. 128 */ 129 public LocalTransactionEventListener getLocalTransactionEventListener() { 130 return localTransactionEventListener; 131 } 132 133 /** 134 * Used by the resource adapter to listen to transaction events. 135 * 136 * @param localTransactionEventListener The localTransactionEventListener to 137 * set. 138 */ 139 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { 140 this.localTransactionEventListener = localTransactionEventListener; 141 } 142 143 // /////////////////////////////////////////////////////////// 144 // 145 // Methods that work with the Synchronization objects registered with 146 // the transaction. 147 // 148 // /////////////////////////////////////////////////////////// 149 150 public void addSynchronization(Synchronization s) { 151 if (synchronizations == null) { 152 synchronizations = new ArrayList<Synchronization>(10); 153 } 154 synchronizations.add(s); 155 } 156 157 private void afterRollback() throws JMSException { 158 if (synchronizations == null) { 159 return; 160 } 161 162 Throwable firstException = null; 163 int size = synchronizations.size(); 164 for (int i = 0; i < size; i++) { 165 try { 166 synchronizations.get(i).afterRollback(); 167 } catch (Throwable t) { 168 LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t); 169 if (firstException == null) { 170 firstException = t; 171 } 172 } 173 } 174 synchronizations = null; 175 if (firstException != null) { 176 throw JMSExceptionSupport.create(firstException); 177 } 178 } 179 180 private void afterCommit() throws JMSException { 181 if (synchronizations == null) { 182 return; 183 } 184 185 Throwable firstException = null; 186 int size = synchronizations.size(); 187 for (int i = 0; i < size; i++) { 188 try { 189 synchronizations.get(i).afterCommit(); 190 } catch (Throwable t) { 191 LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t); 192 if (firstException == null) { 193 firstException = t; 194 } 195 } 196 } 197 synchronizations = null; 198 if (firstException != null) { 199 throw JMSExceptionSupport.create(firstException); 200 } 201 } 202 203 private void beforeEnd() throws JMSException { 204 if (synchronizations == null) { 205 return; 206 } 207 208 int size = synchronizations.size(); 209 try { 210 for (;beforeEndIndex < size;) { 211 synchronizations.get(beforeEndIndex++).beforeEnd(); 212 } 213 } catch (JMSException e) { 214 throw e; 215 } catch (Throwable e) { 216 throw JMSExceptionSupport.create(e); 217 } 218 } 219 220 public TransactionId getTransactionId() { 221 return transactionId; 222 } 223 224 // /////////////////////////////////////////////////////////// 225 // 226 // Local transaction interface. 227 // 228 // /////////////////////////////////////////////////////////// 229 230 /** 231 * Start a local transaction. 232 * @throws javax.jms.JMSException on internal error 233 */ 234 public void begin() throws JMSException { 235 236 if (isInXATransaction()) { 237 throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress."); 238 } 239 240 if (transactionId == null) { 241 synchronizations = null; 242 beforeEndIndex = 0; 243 setRollbackOnly(false); 244 this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId()); 245 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 246 this.connection.ensureConnectionInfoSent(); 247 this.connection.asyncSendPacket(info); 248 249 // Notify the listener that the tx was started. 250 if (localTransactionEventListener != null) { 251 localTransactionEventListener.beginEvent(); 252 } 253 if (LOG.isDebugEnabled()) { 254 LOG.debug("Begin:" + transactionId); 255 } 256 } 257 258 } 259 260 /** 261 * Rolls back any work done in this transaction and releases any locks 262 * currently held. 263 * 264 * @throws JMSException if the JMS provider fails to roll back the 265 * transaction due to some internal error. 266 * @throws javax.jms.IllegalStateException if the method is not called by a 267 * transacted session. 268 */ 269 public void rollback() throws JMSException { 270 if (isInXATransaction()) { 271 throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); 272 } 273 274 try { 275 beforeEnd(); 276 } catch (TransactionRolledBackException canOcurrOnFailover) { 277 LOG.warn("rollback processing error", canOcurrOnFailover); 278 } 279 if (transactionId != null) { 280 if (LOG.isDebugEnabled()) { 281 LOG.debug("Rollback: " + transactionId 282 + " syncCount: " 283 + (synchronizations != null ? synchronizations.size() : 0)); 284 } 285 286 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); 287 this.transactionId = null; 288 //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364 289 this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0); 290 // Notify the listener that the tx was rolled back 291 if (localTransactionEventListener != null) { 292 localTransactionEventListener.rollbackEvent(); 293 } 294 } 295 296 afterRollback(); 297 } 298 299 /** 300 * Commits all work done in this transaction and releases any locks 301 * currently held. 302 * 303 * @throws JMSException if the JMS provider fails to commit the transaction 304 * due to some internal error. 305 * @throws javax.jms.IllegalStateException if the method is not called by a 306 * transacted session. 307 */ 308 public void commit() throws JMSException { 309 if (isInXATransaction()) { 310 throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); 311 } 312 313 try { 314 beforeEnd(); 315 } catch (JMSException e) { 316 rollback(); 317 throw e; 318 } 319 320 if (transactionId != null && rollbackOnly) { 321 final String message = "Commit of " + transactionId + " failed due to rollback only request; typically due to failover with pending acks"; 322 try { 323 rollback(); 324 } finally { 325 LOG.warn(message); 326 throw new TransactionRolledBackException(message); 327 } 328 } 329 330 // Only send commit if the transaction was started. 331 if (transactionId != null) { 332 if (LOG.isDebugEnabled()) { 333 LOG.debug("Commit: " + transactionId 334 + " syncCount: " 335 + (synchronizations != null ? synchronizations.size() : 0)); 336 } 337 338 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); 339 this.transactionId = null; 340 // Notify the listener that the tx was committed back 341 try { 342 this.connection.syncSendPacket(info); 343 if (localTransactionEventListener != null) { 344 localTransactionEventListener.commitEvent(); 345 } 346 afterCommit(); 347 } catch (JMSException cause) { 348 LOG.info("commit failed for transaction " + info.getTransactionId(), cause); 349 if (localTransactionEventListener != null) { 350 localTransactionEventListener.rollbackEvent(); 351 } 352 afterRollback(); 353 throw cause; 354 } 355 356 } 357 } 358 359 // /////////////////////////////////////////////////////////// 360 // 361 // XAResource Implementation 362 // 363 // /////////////////////////////////////////////////////////// 364 /** 365 * Associates a transaction with the resource. 366 */ 367 public void start(Xid xid, int flags) throws XAException { 368 369 if (LOG.isDebugEnabled()) { 370 LOG.debug("Start: " + xid + ", flags:" + flags); 371 } 372 if (isInLocalTransaction()) { 373 throw new XAException(XAException.XAER_PROTO); 374 } 375 // Are we already associated? 376 if (associatedXid != null) { 377 throw new XAException(XAException.XAER_PROTO); 378 } 379 380 // if ((flags & TMJOIN) == TMJOIN) { 381 // TODO: verify that the server has seen the xid 382 // // } 383 // if ((flags & TMJOIN) == TMRESUME) { 384 // // TODO: verify that the xid was suspended. 385 // } 386 387 // associate 388 synchronizations = null; 389 beforeEndIndex = 0; 390 setRollbackOnly(false); 391 setXid(xid); 392 } 393 394 /** 395 * @return connectionId for connection 396 */ 397 private ConnectionId getConnectionId() { 398 return connection.getConnectionInfo().getConnectionId(); 399 } 400 401 public void end(Xid xid, int flags) throws XAException { 402 403 if (LOG.isDebugEnabled()) { 404 LOG.debug("End: " + xid + ", flags:" + flags); 405 } 406 407 if (isInLocalTransaction()) { 408 throw new XAException(XAException.XAER_PROTO); 409 } 410 411 if ((flags & (TMSUSPEND | TMFAIL)) != 0) { 412 // You can only suspend the associated xid. 413 if (!equals(associatedXid, xid)) { 414 throw new XAException(XAException.XAER_PROTO); 415 } 416 invokeBeforeEnd(); 417 } else if ((flags & TMSUCCESS) == TMSUCCESS) { 418 // set to null if this is the current xid. 419 // otherwise this could be an asynchronous success call 420 if (equals(associatedXid, xid)) { 421 invokeBeforeEnd(); 422 } 423 } else { 424 throw new XAException(XAException.XAER_INVAL); 425 } 426 } 427 428 private void invokeBeforeEnd() throws XAException { 429 boolean throwingException = false; 430 try { 431 beforeEnd(); 432 } catch (JMSException e) { 433 throwingException = true; 434 throw toXAException(e); 435 } finally { 436 try { 437 setXid(null); 438 } catch (XAException ignoreIfWillMask){ 439 if (!throwingException) { 440 throw ignoreIfWillMask; 441 } 442 } 443 } 444 } 445 446 private boolean equals(Xid xid1, Xid xid2) { 447 if (xid1 == xid2) { 448 return true; 449 } 450 if (xid1 == null ^ xid2 == null) { 451 return false; 452 } 453 return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier()) 454 && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId()); 455 } 456 457 public int prepare(Xid xid) throws XAException { 458 if (LOG.isDebugEnabled()) { 459 LOG.debug("Prepare: " + xid); 460 } 461 462 // We allow interleaving multiple transactions, so 463 // we don't limit prepare to the associated xid. 464 XATransactionId x; 465 // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been 466 // called first 467 if (xid == null || (equals(associatedXid, xid))) { 468 throw new XAException(XAException.XAER_PROTO); 469 } else { 470 // TODO: cache the known xids so we don't keep recreating this one?? 471 x = new XATransactionId(xid); 472 } 473 474 if (rollbackOnly) { 475 LOG.warn("prepare of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks"); 476 throw new XAException(XAException.XA_RBINTEGRITY); 477 } 478 479 try { 480 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); 481 482 // Find out if the server wants to commit or rollback. 483 IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info); 484 if (XAResource.XA_RDONLY == response.getResult()) { 485 // transaction stops now, may be syncs that need a callback 486 List<TransactionContext> l; 487 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 488 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 489 } 490 // After commit may be expensive and can deadlock, do it outside global synch block 491 // No risk for concurrent updates as we own the list now 492 if (l != null) { 493 if(! l.isEmpty()) { 494 LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid); 495 for (TransactionContext ctx : l) { 496 ctx.afterCommit(); 497 } 498 } 499 } 500 } 501 return response.getResult(); 502 503 } catch (JMSException e) { 504 LOG.warn("prepare of: " + x + " failed with: " + e, e); 505 List<TransactionContext> l; 506 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 507 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 508 } 509 // After rollback may be expensive and can deadlock, do it outside global synch block 510 // No risk for concurrent updates as we own the list now 511 if (l != null) { 512 for (TransactionContext ctx : l) { 513 try { 514 ctx.afterRollback(); 515 } catch (Throwable ignored) { 516 LOG.debug("failed to firing afterRollback callbacks on prepare " + 517 "failure, txid: {}, context: {}", x, ctx, ignored); 518 } 519 } 520 } 521 throw toXAException(e); 522 } 523 } 524 525 public void rollback(Xid xid) throws XAException { 526 527 if (LOG.isDebugEnabled()) { 528 LOG.debug("Rollback: " + xid); 529 } 530 531 // We allow interleaving multiple transactions, so 532 // we don't limit rollback to the associated xid. 533 XATransactionId x; 534 if (xid == null) { 535 throw new XAException(XAException.XAER_PROTO); 536 } 537 if (equals(associatedXid, xid)) { 538 // I think this can happen even without an end(xid) call. Need to 539 // check spec. 540 x = (XATransactionId)transactionId; 541 } else { 542 x = new XATransactionId(xid); 543 } 544 545 try { 546 this.connection.checkClosedOrFailed(); 547 this.connection.ensureConnectionInfoSent(); 548 549 // Let the server know that the tx is rollback. 550 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); 551 this.connection.syncSendPacket(info); 552 553 List<TransactionContext> l; 554 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 555 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 556 } 557 // After rollback may be expensive and can deadlock, do it outside global synch block 558 // No risk for concurrent updates as we own the list now 559 if (l != null) { 560 for (TransactionContext ctx : l) { 561 try { 562 ctx.afterRollback(); 563 } catch (Exception ignored) { 564 LOG.debug("ignoring exception from after rollback on ended transaction: {}", ignored, ignored); 565 } 566 } 567 } 568 } catch (JMSException e) { 569 throw toXAException(e); 570 } 571 } 572 573 // XAResource interface 574 public void commit(Xid xid, boolean onePhase) throws XAException { 575 576 if (LOG.isDebugEnabled()) { 577 LOG.debug("Commit: " + xid + ", onePhase=" + onePhase); 578 } 579 580 // We allow interleaving multiple transactions, so 581 // we don't limit commit to the associated xid. 582 XATransactionId x; 583 if (xid == null || (equals(associatedXid, xid))) { 584 // should never happen, end(xid,TMSUCCESS) must have been previously 585 // called 586 throw new XAException(XAException.XAER_PROTO); 587 } else { 588 x = new XATransactionId(xid); 589 } 590 591 if (rollbackOnly) { 592 LOG.warn("commit of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks"); 593 throw new XAException(XAException.XA_RBINTEGRITY); 594 } 595 596 try { 597 this.connection.checkClosedOrFailed(); 598 this.connection.ensureConnectionInfoSent(); 599 600 // Notify the server that the tx was committed back 601 TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); 602 603 this.connection.syncSendPacket(info); 604 605 List<TransactionContext> l; 606 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 607 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 608 } 609 // After commit may be expensive and can deadlock, do it outside global synch block 610 // No risk for concurrent updates as we own the list now 611 if (l != null) { 612 for (TransactionContext ctx : l) { 613 try { 614 ctx.afterCommit(); 615 } catch (Exception ignored) { 616 LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored); 617 } 618 } 619 } 620 621 } catch (JMSException e) { 622 LOG.warn("commit of: " + x + " failed with: " + e, e); 623 if (onePhase) { 624 List<TransactionContext> l; 625 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 626 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 627 } 628 // After rollback may be expensive and can deadlock, do it outside global synch block 629 // No risk for concurrent updates as we own the list now 630 if (l != null) { 631 for (TransactionContext ctx : l) { 632 try { 633 ctx.afterRollback(); 634 } catch (Throwable ignored) { 635 LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored); 636 } 637 } 638 } 639 } 640 throw toXAException(e); 641 } 642 643 } 644 645 public void forget(Xid xid) throws XAException { 646 if (LOG.isDebugEnabled()) { 647 LOG.debug("Forget: " + xid); 648 } 649 650 // We allow interleaving multiple transactions, so 651 // we don't limit forget to the associated xid. 652 XATransactionId x; 653 if (xid == null) { 654 throw new XAException(XAException.XAER_PROTO); 655 } 656 if (equals(associatedXid, xid)) { 657 // TODO determine if this can happen... I think not. 658 x = (XATransactionId)transactionId; 659 } else { 660 x = new XATransactionId(xid); 661 } 662 663 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET); 664 665 try { 666 // Tell the server to forget the transaction. 667 this.connection.syncSendPacket(info); 668 } catch (JMSException e) { 669 throw toXAException(e); 670 } 671 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 672 ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 673 } 674 } 675 676 public boolean isSameRM(XAResource xaResource) throws XAException { 677 if (xaResource == null) { 678 return false; 679 } 680 if (!(xaResource instanceof TransactionContext)) { 681 return false; 682 } 683 TransactionContext xar = (TransactionContext)xaResource; 684 try { 685 return getResourceManagerId().equals(xar.getResourceManagerId()); 686 } catch (Throwable e) { 687 throw (XAException)new XAException("Could not get resource manager id.").initCause(e); 688 } 689 } 690 691 public Xid[] recover(int flag) throws XAException { 692 LOG.debug("recover({})", flag); 693 XATransactionId[] answer; 694 695 if (XAResource.TMNOFLAGS == flag) { 696 // signal next in cursor scan, which for us is always the end b/c we don't maintain any cursor state 697 // allows looping scan to complete 698 answer = new XATransactionId[0]; 699 } else { 700 TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); 701 try { 702 this.connection.checkClosedOrFailed(); 703 this.connection.ensureConnectionInfoSent(); 704 705 DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info); 706 DataStructure[] data = receipt.getData(); 707 if (data instanceof XATransactionId[]) { 708 answer = (XATransactionId[]) data; 709 } else { 710 answer = new XATransactionId[data.length]; 711 System.arraycopy(data, 0, answer, 0, data.length); 712 } 713 } catch (JMSException e) { 714 throw toXAException(e); 715 } 716 } 717 LOG.debug("recover({})={}", flag, answer); 718 return answer; 719 } 720 721 public int getTransactionTimeout() throws XAException { 722 return 0; 723 } 724 725 public boolean setTransactionTimeout(int seconds) throws XAException { 726 return false; 727 } 728 729 // /////////////////////////////////////////////////////////// 730 // 731 // Helper methods. 732 // 733 // /////////////////////////////////////////////////////////// 734 protected String getResourceManagerId() throws JMSException { 735 return this.connection.getResourceManagerId(); 736 } 737 738 private void setXid(Xid xid) throws XAException { 739 740 try { 741 this.connection.checkClosedOrFailed(); 742 this.connection.ensureConnectionInfoSent(); 743 } catch (JMSException e) { 744 disassociate(); 745 throw toXAException(e); 746 } 747 748 if (xid != null) { 749 // associate 750 associatedXid = xid; 751 transactionId = new XATransactionId(xid); 752 753 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 754 try { 755 this.connection.asyncSendPacket(info); 756 if (LOG.isDebugEnabled()) { 757 LOG.debug("{} started XA transaction {} ", this, transactionId); 758 } 759 } catch (JMSException e) { 760 disassociate(); 761 throw toXAException(e); 762 } 763 764 } else { 765 766 if (transactionId != null) { 767 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END); 768 try { 769 this.connection.syncSendPacket(info); 770 LOG.debug("{} ended XA transaction {}", this, transactionId); 771 } catch (JMSException e) { 772 disassociate(); 773 throw toXAException(e); 774 } 775 776 // Add our self to the list of contexts that are interested in 777 // post commit/rollback events. 778 List<TransactionContext> l; 779 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 780 l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); 781 if (l == null) { 782 l = new ArrayList<TransactionContext>(3); 783 ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); 784 } 785 if (!l.contains(this)) { 786 l.add(this); 787 } 788 } 789 } 790 791 disassociate(); 792 } 793 } 794 795 private void disassociate() { 796 // dis-associate 797 associatedXid = null; 798 transactionId = null; 799 } 800 801 /** 802 * Converts a JMSException from the server to an XAException. if the 803 * JMSException contained a linked XAException that is returned instead. 804 * 805 * @param e JMSException to convert 806 * @return XAException wrapping original exception or its message 807 */ 808 public static XAException toXAException(JMSException e) { 809 if (e.getCause() != null && e.getCause() instanceof XAException) { 810 XAException original = (XAException)e.getCause(); 811 XAException xae = new XAException(original.getMessage()); 812 xae.errorCode = original.errorCode; 813 if (xae.errorCode == XA_OK) { 814 // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable 815 xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR); 816 } 817 xae.initCause(original); 818 return xae; 819 } 820 821 XAException xae = new XAException(e.getMessage()); 822 xae.errorCode = XAException.XAER_RMFAIL; 823 xae.initCause(e); 824 return xae; 825 } 826 827 private static int parseFromMessageOr(String message, int fallbackCode) { 828 final String marker = "xaErrorCode:"; 829 final int index = message.lastIndexOf(marker); 830 if (index > -1) { 831 try { 832 return Integer.parseInt(message.substring(index + marker.length())); 833 } catch (Exception ignored) {} 834 } 835 return fallbackCode; 836 } 837 838 public ActiveMQConnection getConnection() { 839 return connection; 840 } 841 842 843 // for RAR xa recovery where xaresource connection is per request 844 public ActiveMQConnection setConnection(ActiveMQConnection connection) { 845 ActiveMQConnection existing = this.connection; 846 this.connection = connection; 847 return existing; 848 } 849 850 public void cleanup() { 851 associatedXid = null; 852 transactionId = null; 853 } 854 855 @Override 856 public String toString() { 857 return "TransactionContext{" + 858 "transactionId=" + transactionId + 859 ",connection=" + connection + 860 '}'; 861 } 862}