001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.HashMap; 022import java.util.List; 023 024import javax.jms.JMSException; 025import javax.jms.TransactionInProgressException; 026import javax.jms.TransactionRolledBackException; 027import javax.transaction.xa.XAException; 028import javax.transaction.xa.XAResource; 029import javax.transaction.xa.Xid; 030 031import org.apache.activemq.command.ConnectionId; 032import org.apache.activemq.command.DataArrayResponse; 033import org.apache.activemq.command.DataStructure; 034import org.apache.activemq.command.IntegerResponse; 035import org.apache.activemq.command.LocalTransactionId; 036import org.apache.activemq.command.TransactionId; 037import org.apache.activemq.command.TransactionInfo; 038import org.apache.activemq.command.XATransactionId; 039import org.apache.activemq.transaction.Synchronization; 040import org.apache.activemq.transport.failover.FailoverTransport; 041import org.apache.activemq.util.JMSExceptionSupport; 042import org.apache.activemq.util.LongSequenceGenerator; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * A TransactionContext provides the means to control a JMS transaction. It 048 * provides a local transaction interface and also an XAResource interface. <p/> 049 * An application server controls the transactional assignment of an XASession 050 * by obtaining its XAResource. It uses the XAResource to assign the session to 051 * a transaction, prepare and commit work on the transaction, and so on. <p/> An 052 * XAResource provides some fairly sophisticated facilities for interleaving 053 * work on multiple transactions, recovering a list of transactions in progress, 054 * and so on. A JTA aware JMS provider must fully implement this functionality. 055 * This could be done by using the services of a database that supports XA, or a 056 * JMS provider may choose to implement this functionality from scratch. <p/> 057 * 058 * 059 * @see javax.jms.Session 060 * @see javax.jms.QueueSession 061 * @see javax.jms.TopicSession 062 * @see javax.jms.XASession 063 */ 064public class TransactionContext implements XAResource { 065 066 public static final String xaErrorCodeMarker = "xaErrorCode:"; 067 private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); 068 069 // XATransactionId -> ArrayList of TransactionContext objects 070 private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS = 071 new HashMap<TransactionId, List<TransactionContext>>(); 072 073 private ActiveMQConnection connection; 074 private final LongSequenceGenerator localTransactionIdGenerator; 075 private List<Synchronization> synchronizations; 076 077 // To track XA transactions. 078 private Xid associatedXid; 079 private TransactionId transactionId; 080 private LocalTransactionEventListener localTransactionEventListener; 081 private int beforeEndIndex; 082 private volatile boolean rollbackOnly; 083 084 // for RAR recovery 085 public TransactionContext() { 086 localTransactionIdGenerator = null; 087 } 088 089 public TransactionContext(ActiveMQConnection connection) { 090 this.connection = connection; 091 this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator(); 092 } 093 094 public boolean isInXATransaction() { 095 if (transactionId != null && transactionId.isXATransaction()) { 096 return true; 097 } else { 098 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 099 for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) { 100 if (transactions.contains(this)) { 101 return true; 102 } 103 } 104 } 105 } 106 107 return false; 108 } 109 110 public void setRollbackOnly(boolean val) { 111 rollbackOnly = val; 112 } 113 114 public boolean isInLocalTransaction() { 115 return transactionId != null && transactionId.isLocalTransaction(); 116 } 117 118 public boolean isInTransaction() { 119 return transactionId != null; 120 } 121 122 /** 123 * @return Returns the localTransactionEventListener. 124 */ 125 public LocalTransactionEventListener getLocalTransactionEventListener() { 126 return localTransactionEventListener; 127 } 128 129 /** 130 * Used by the resource adapter to listen to transaction events. 131 * 132 * @param localTransactionEventListener The localTransactionEventListener to 133 * set. 134 */ 135 public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) { 136 this.localTransactionEventListener = localTransactionEventListener; 137 } 138 139 // /////////////////////////////////////////////////////////// 140 // 141 // Methods that work with the Synchronization objects registered with 142 // the transaction. 143 // 144 // /////////////////////////////////////////////////////////// 145 146 public void addSynchronization(Synchronization s) { 147 if (synchronizations == null) { 148 synchronizations = new ArrayList<Synchronization>(10); 149 } 150 synchronizations.add(s); 151 } 152 153 private void afterRollback() throws JMSException { 154 if (synchronizations == null) { 155 return; 156 } 157 158 Throwable firstException = null; 159 int size = synchronizations.size(); 160 for (int i = 0; i < size; i++) { 161 try { 162 synchronizations.get(i).afterRollback(); 163 } catch (Throwable t) { 164 LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t); 165 if (firstException == null) { 166 firstException = t; 167 } 168 } 169 } 170 synchronizations = null; 171 if (firstException != null) { 172 throw JMSExceptionSupport.create(firstException); 173 } 174 } 175 176 private void afterCommit() throws JMSException { 177 if (synchronizations == null) { 178 return; 179 } 180 181 Throwable firstException = null; 182 int size = synchronizations.size(); 183 for (int i = 0; i < size; i++) { 184 try { 185 synchronizations.get(i).afterCommit(); 186 } catch (Throwable t) { 187 LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t); 188 if (firstException == null) { 189 firstException = t; 190 } 191 } 192 } 193 synchronizations = null; 194 if (firstException != null) { 195 throw JMSExceptionSupport.create(firstException); 196 } 197 } 198 199 private void beforeEnd() throws JMSException { 200 if (synchronizations == null) { 201 return; 202 } 203 204 int size = synchronizations.size(); 205 try { 206 for (;beforeEndIndex < size;) { 207 synchronizations.get(beforeEndIndex++).beforeEnd(); 208 } 209 } catch (JMSException e) { 210 throw e; 211 } catch (Throwable e) { 212 throw JMSExceptionSupport.create(e); 213 } 214 } 215 216 public TransactionId getTransactionId() { 217 return transactionId; 218 } 219 220 // /////////////////////////////////////////////////////////// 221 // 222 // Local transaction interface. 223 // 224 // /////////////////////////////////////////////////////////// 225 226 /** 227 * Start a local transaction. 228 * @throws javax.jms.JMSException on internal error 229 */ 230 public void begin() throws JMSException { 231 232 if (isInXATransaction()) { 233 throw new TransactionInProgressException("Cannot start local transaction. XA transaction is already in progress."); 234 } 235 236 if (transactionId == null) { 237 synchronizations = null; 238 beforeEndIndex = 0; 239 setRollbackOnly(false); 240 this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId()); 241 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 242 this.connection.ensureConnectionInfoSent(); 243 this.connection.asyncSendPacket(info); 244 245 // Notify the listener that the tx was started. 246 if (localTransactionEventListener != null) { 247 localTransactionEventListener.beginEvent(); 248 } 249 if (LOG.isDebugEnabled()) { 250 LOG.debug("Begin:" + transactionId); 251 } 252 } 253 254 } 255 256 /** 257 * Rolls back any work done in this transaction and releases any locks 258 * currently held. 259 * 260 * @throws JMSException if the JMS provider fails to roll back the 261 * transaction due to some internal error. 262 * @throws javax.jms.IllegalStateException if the method is not called by a 263 * transacted session. 264 */ 265 public void rollback() throws JMSException { 266 if (isInXATransaction()) { 267 throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress "); 268 } 269 270 try { 271 beforeEnd(); 272 } catch (TransactionRolledBackException canOcurrOnFailover) { 273 LOG.warn("rollback processing error", canOcurrOnFailover); 274 } 275 if (transactionId != null) { 276 if (LOG.isDebugEnabled()) { 277 LOG.debug("Rollback: " + transactionId 278 + " syncCount: " 279 + (synchronizations != null ? synchronizations.size() : 0)); 280 } 281 282 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK); 283 this.transactionId = null; 284 //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364 285 this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0); 286 // Notify the listener that the tx was rolled back 287 if (localTransactionEventListener != null) { 288 localTransactionEventListener.rollbackEvent(); 289 } 290 } 291 292 afterRollback(); 293 } 294 295 /** 296 * Commits all work done in this transaction and releases any locks 297 * currently held. 298 * 299 * @throws JMSException if the JMS provider fails to commit the transaction 300 * due to some internal error. 301 * @throws javax.jms.IllegalStateException if the method is not called by a 302 * transacted session. 303 */ 304 public void commit() throws JMSException { 305 if (isInXATransaction()) { 306 throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress "); 307 } 308 309 try { 310 beforeEnd(); 311 } catch (JMSException e) { 312 rollback(); 313 throw e; 314 } 315 316 if (transactionId != null && rollbackOnly) { 317 final String message = "Commit of " + transactionId + " failed due to rollback only request; typically due to failover with pending acks"; 318 try { 319 rollback(); 320 } finally { 321 LOG.warn(message); 322 throw new TransactionRolledBackException(message); 323 } 324 } 325 326 // Only send commit if the transaction was started. 327 if (transactionId != null) { 328 if (LOG.isDebugEnabled()) { 329 LOG.debug("Commit: " + transactionId 330 + " syncCount: " 331 + (synchronizations != null ? synchronizations.size() : 0)); 332 } 333 334 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE); 335 this.transactionId = null; 336 // Notify the listener that the tx was committed back 337 try { 338 this.connection.syncSendPacket(info); 339 if (localTransactionEventListener != null) { 340 localTransactionEventListener.commitEvent(); 341 } 342 afterCommit(); 343 } catch (JMSException cause) { 344 LOG.info("commit failed for transaction " + info.getTransactionId(), cause); 345 if (localTransactionEventListener != null) { 346 localTransactionEventListener.rollbackEvent(); 347 } 348 afterRollback(); 349 throw cause; 350 } 351 352 } 353 } 354 355 // /////////////////////////////////////////////////////////// 356 // 357 // XAResource Implementation 358 // 359 // /////////////////////////////////////////////////////////// 360 /** 361 * Associates a transaction with the resource. 362 */ 363 public void start(Xid xid, int flags) throws XAException { 364 365 if (LOG.isDebugEnabled()) { 366 LOG.debug("Start: " + xid + ", flags:" + flags); 367 } 368 if (isInLocalTransaction()) { 369 throw new XAException(XAException.XAER_PROTO); 370 } 371 // Are we already associated? 372 if (associatedXid != null) { 373 throw new XAException(XAException.XAER_PROTO); 374 } 375 376 // if ((flags & TMJOIN) == TMJOIN) { 377 // TODO: verify that the server has seen the xid 378 // // } 379 // if ((flags & TMJOIN) == TMRESUME) { 380 // // TODO: verify that the xid was suspended. 381 // } 382 383 // associate 384 synchronizations = null; 385 beforeEndIndex = 0; 386 setRollbackOnly(false); 387 setXid(xid); 388 } 389 390 /** 391 * @return connectionId for connection 392 */ 393 private ConnectionId getConnectionId() { 394 return connection.getConnectionInfo().getConnectionId(); 395 } 396 397 public void end(Xid xid, int flags) throws XAException { 398 399 if (LOG.isDebugEnabled()) { 400 LOG.debug("End: " + xid + ", flags:" + flags); 401 } 402 403 if (isInLocalTransaction()) { 404 throw new XAException(XAException.XAER_PROTO); 405 } 406 407 if ((flags & (TMSUSPEND | TMFAIL)) != 0) { 408 // You can only suspend the associated xid. 409 if (!equals(associatedXid, xid)) { 410 throw new XAException(XAException.XAER_PROTO); 411 } 412 invokeBeforeEnd(); 413 } else if ((flags & TMSUCCESS) == TMSUCCESS) { 414 // set to null if this is the current xid. 415 // otherwise this could be an asynchronous success call 416 if (equals(associatedXid, xid)) { 417 invokeBeforeEnd(); 418 } 419 } else { 420 throw new XAException(XAException.XAER_INVAL); 421 } 422 } 423 424 private void invokeBeforeEnd() throws XAException { 425 boolean throwingException = false; 426 try { 427 beforeEnd(); 428 } catch (JMSException e) { 429 throwingException = true; 430 throw toXAException(e); 431 } finally { 432 try { 433 setXid(null); 434 } catch (XAException ignoreIfWillMask){ 435 if (!throwingException) { 436 throw ignoreIfWillMask; 437 } 438 } 439 } 440 } 441 442 private boolean equals(Xid xid1, Xid xid2) { 443 if (xid1 == xid2) { 444 return true; 445 } 446 if (xid1 == null ^ xid2 == null) { 447 return false; 448 } 449 return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier()) 450 && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId()); 451 } 452 453 public int prepare(Xid xid) throws XAException { 454 if (LOG.isDebugEnabled()) { 455 LOG.debug("Prepare: " + xid); 456 } 457 458 // We allow interleaving multiple transactions, so 459 // we don't limit prepare to the associated xid. 460 XATransactionId x; 461 // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been 462 // called first 463 if (xid == null || (equals(associatedXid, xid))) { 464 throw new XAException(XAException.XAER_PROTO); 465 } else { 466 // TODO: cache the known xids so we don't keep recreating this one?? 467 x = new XATransactionId(xid); 468 } 469 470 if (rollbackOnly) { 471 LOG.warn("prepare of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks"); 472 throw new XAException(XAException.XA_RBINTEGRITY); 473 } 474 475 try { 476 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE); 477 478 // Find out if the server wants to commit or rollback. 479 IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info); 480 if (XAResource.XA_RDONLY == response.getResult()) { 481 // transaction stops now, may be syncs that need a callback 482 List<TransactionContext> l; 483 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 484 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 485 } 486 // After commit may be expensive and can deadlock, do it outside global synch block 487 // No risk for concurrent updates as we own the list now 488 if (l != null) { 489 if(! l.isEmpty()) { 490 LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid); 491 for (TransactionContext ctx : l) { 492 ctx.afterCommit(); 493 } 494 } 495 } 496 } 497 return response.getResult(); 498 499 } catch (JMSException e) { 500 LOG.warn("prepare of: " + x + " failed with: " + e, e); 501 List<TransactionContext> l; 502 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 503 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 504 } 505 // After rollback may be expensive and can deadlock, do it outside global synch block 506 // No risk for concurrent updates as we own the list now 507 if (l != null) { 508 for (TransactionContext ctx : l) { 509 try { 510 ctx.afterRollback(); 511 } catch (Throwable ignored) { 512 LOG.debug("failed to firing afterRollback callbacks on prepare " + 513 "failure, txid: {}, context: {}", x, ctx, ignored); 514 } 515 } 516 } 517 throw toXAException(e); 518 } 519 } 520 521 public void rollback(Xid xid) throws XAException { 522 523 if (LOG.isDebugEnabled()) { 524 LOG.debug("Rollback: " + xid); 525 } 526 527 // We allow interleaving multiple transactions, so 528 // we don't limit rollback to the associated xid. 529 XATransactionId x; 530 if (xid == null) { 531 throw new XAException(XAException.XAER_PROTO); 532 } 533 if (equals(associatedXid, xid)) { 534 // I think this can happen even without an end(xid) call. Need to 535 // check spec. 536 x = (XATransactionId)transactionId; 537 } else { 538 x = new XATransactionId(xid); 539 } 540 541 try { 542 this.connection.checkClosedOrFailed(); 543 this.connection.ensureConnectionInfoSent(); 544 545 // Let the server know that the tx is rollback. 546 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK); 547 this.connection.syncSendPacket(info); 548 549 List<TransactionContext> l; 550 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 551 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 552 } 553 // After rollback may be expensive and can deadlock, do it outside global synch block 554 // No risk for concurrent updates as we own the list now 555 if (l != null) { 556 for (TransactionContext ctx : l) { 557 try { 558 ctx.afterRollback(); 559 } catch (Exception ignored) { 560 LOG.debug("ignoring exception from after rollback on ended transaction: {}", ignored, ignored); 561 } 562 } 563 } 564 } catch (JMSException e) { 565 throw toXAException(e); 566 } 567 } 568 569 // XAResource interface 570 public void commit(Xid xid, boolean onePhase) throws XAException { 571 572 if (LOG.isDebugEnabled()) { 573 LOG.debug("Commit: " + xid + ", onePhase=" + onePhase); 574 } 575 576 // We allow interleaving multiple transactions, so 577 // we don't limit commit to the associated xid. 578 XATransactionId x; 579 if (xid == null || (equals(associatedXid, xid))) { 580 // should never happen, end(xid,TMSUCCESS) must have been previously 581 // called 582 throw new XAException(XAException.XAER_PROTO); 583 } else { 584 x = new XATransactionId(xid); 585 } 586 587 if (rollbackOnly) { 588 LOG.warn("commit of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks"); 589 throw new XAException(XAException.XA_RBINTEGRITY); 590 } 591 592 try { 593 this.connection.checkClosedOrFailed(); 594 this.connection.ensureConnectionInfoSent(); 595 596 // Notify the server that the tx was committed back 597 TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE); 598 599 this.connection.syncSendPacket(info); 600 601 List<TransactionContext> l; 602 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 603 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 604 } 605 // After commit may be expensive and can deadlock, do it outside global synch block 606 // No risk for concurrent updates as we own the list now 607 if (l != null) { 608 for (TransactionContext ctx : l) { 609 try { 610 ctx.afterCommit(); 611 } catch (Exception ignored) { 612 LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored); 613 } 614 } 615 } 616 617 } catch (JMSException e) { 618 LOG.warn("commit of: " + x + " failed with: " + e, e); 619 if (onePhase) { 620 List<TransactionContext> l; 621 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 622 l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 623 } 624 // After rollback may be expensive and can deadlock, do it outside global synch block 625 // No risk for concurrent updates as we own the list now 626 if (l != null) { 627 for (TransactionContext ctx : l) { 628 try { 629 ctx.afterRollback(); 630 } catch (Throwable ignored) { 631 LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored); 632 } 633 } 634 } 635 } 636 throw toXAException(e); 637 } 638 639 } 640 641 public void forget(Xid xid) throws XAException { 642 if (LOG.isDebugEnabled()) { 643 LOG.debug("Forget: " + xid); 644 } 645 646 // We allow interleaving multiple transactions, so 647 // we don't limit forget to the associated xid. 648 XATransactionId x; 649 if (xid == null) { 650 throw new XAException(XAException.XAER_PROTO); 651 } 652 if (equals(associatedXid, xid)) { 653 // TODO determine if this can happen... I think not. 654 x = (XATransactionId)transactionId; 655 } else { 656 x = new XATransactionId(xid); 657 } 658 659 TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET); 660 661 try { 662 // Tell the server to forget the transaction. 663 this.connection.syncSendPacket(info); 664 } catch (JMSException e) { 665 throw toXAException(e); 666 } 667 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 668 ENDED_XA_TRANSACTION_CONTEXTS.remove(x); 669 } 670 } 671 672 public boolean isSameRM(XAResource xaResource) throws XAException { 673 if (xaResource == null) { 674 return false; 675 } 676 if (!(xaResource instanceof TransactionContext)) { 677 return false; 678 } 679 TransactionContext xar = (TransactionContext)xaResource; 680 try { 681 return getResourceManagerId().equals(xar.getResourceManagerId()); 682 } catch (Throwable e) { 683 throw (XAException)new XAException("Could not get resource manager id.").initCause(e); 684 } 685 } 686 687 public Xid[] recover(int flag) throws XAException { 688 LOG.debug("recover({})", flag); 689 XATransactionId[] answer; 690 691 if (XAResource.TMNOFLAGS == flag) { 692 // signal next in cursor scan, which for us is always the end b/c we don't maintain any cursor state 693 // allows looping scan to complete 694 answer = new XATransactionId[0]; 695 } else { 696 TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER); 697 try { 698 this.connection.checkClosedOrFailed(); 699 this.connection.ensureConnectionInfoSent(); 700 701 DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info); 702 DataStructure[] data = receipt.getData(); 703 if (data instanceof XATransactionId[]) { 704 answer = (XATransactionId[]) data; 705 } else { 706 answer = new XATransactionId[data.length]; 707 System.arraycopy(data, 0, answer, 0, data.length); 708 } 709 } catch (JMSException e) { 710 throw toXAException(e); 711 } 712 } 713 LOG.debug("recover({})={}", flag, answer); 714 return answer; 715 } 716 717 public int getTransactionTimeout() throws XAException { 718 return 0; 719 } 720 721 public boolean setTransactionTimeout(int seconds) throws XAException { 722 return false; 723 } 724 725 // /////////////////////////////////////////////////////////// 726 // 727 // Helper methods. 728 // 729 // /////////////////////////////////////////////////////////// 730 protected String getResourceManagerId() throws JMSException { 731 return this.connection.getResourceManagerId(); 732 } 733 734 private void setXid(Xid xid) throws XAException { 735 736 try { 737 this.connection.checkClosedOrFailed(); 738 this.connection.ensureConnectionInfoSent(); 739 } catch (JMSException e) { 740 disassociate(); 741 throw toXAException(e); 742 } 743 744 if (xid != null) { 745 // associate 746 associatedXid = xid; 747 transactionId = new XATransactionId(xid); 748 749 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN); 750 try { 751 this.connection.asyncSendPacket(info); 752 if (LOG.isDebugEnabled()) { 753 LOG.debug("{} started XA transaction {} ", this, transactionId); 754 } 755 } catch (JMSException e) { 756 disassociate(); 757 throw toXAException(e); 758 } 759 760 } else { 761 762 if (transactionId != null) { 763 TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END); 764 try { 765 this.connection.syncSendPacket(info); 766 LOG.debug("{} ended XA transaction {}", this, transactionId); 767 } catch (JMSException e) { 768 disassociate(); 769 throw toXAException(e); 770 } 771 772 // Add our self to the list of contexts that are interested in 773 // post commit/rollback events. 774 List<TransactionContext> l; 775 synchronized(ENDED_XA_TRANSACTION_CONTEXTS) { 776 l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId); 777 if (l == null) { 778 l = new ArrayList<TransactionContext>(3); 779 ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l); 780 } 781 if (!l.contains(this)) { 782 l.add(this); 783 } 784 } 785 } 786 787 disassociate(); 788 } 789 } 790 791 private void disassociate() { 792 // dis-associate 793 associatedXid = null; 794 transactionId = null; 795 } 796 797 /** 798 * Converts a JMSException from the server to an XAException. if the 799 * JMSException contained a linked XAException that is returned instead. 800 * 801 * @param e JMSException to convert 802 * @return XAException wrapping original exception or its message 803 */ 804 private XAException toXAException(JMSException e) { 805 if (e.getCause() != null && e.getCause() instanceof XAException) { 806 XAException original = (XAException)e.getCause(); 807 XAException xae = new XAException(original.getMessage()); 808 xae.errorCode = original.errorCode; 809 if (xae.errorCode == XA_OK) { 810 // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable 811 xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR); 812 } 813 xae.initCause(original); 814 return xae; 815 } 816 817 XAException xae = new XAException(e.getMessage()); 818 xae.errorCode = XAException.XAER_RMFAIL; 819 xae.initCause(e); 820 return xae; 821 } 822 823 private int parseFromMessageOr(String message, int fallbackCode) { 824 final String marker = "xaErrorCode:"; 825 final int index = message.lastIndexOf(marker); 826 if (index > -1) { 827 try { 828 return Integer.parseInt(message.substring(index + marker.length())); 829 } catch (Exception ignored) {} 830 } 831 return fallbackCode; 832 } 833 834 public ActiveMQConnection getConnection() { 835 return connection; 836 } 837 838 839 // for RAR xa recovery where xaresource connection is per request 840 public ActiveMQConnection setConnection(ActiveMQConnection connection) { 841 ActiveMQConnection existing = this.connection; 842 this.connection = connection; 843 return existing; 844 } 845 846 public void cleanup() { 847 associatedXid = null; 848 transactionId = null; 849 } 850 851 @Override 852 public String toString() { 853 return "TransactionContext{" + 854 "transactionId=" + transactionId + 855 ",connection=" + connection + 856 '}'; 857 } 858}