001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.ArrayList;
020import java.util.Arrays;
021import java.util.HashMap;
022import java.util.List;
023
024import javax.jms.JMSException;
025import javax.jms.TransactionInProgressException;
026import javax.jms.TransactionRolledBackException;
027import javax.transaction.xa.XAException;
028import javax.transaction.xa.XAResource;
029import javax.transaction.xa.Xid;
030
031import org.apache.activemq.command.ConnectionId;
032import org.apache.activemq.command.DataArrayResponse;
033import org.apache.activemq.command.DataStructure;
034import org.apache.activemq.command.IntegerResponse;
035import org.apache.activemq.command.LocalTransactionId;
036import org.apache.activemq.command.TransactionId;
037import org.apache.activemq.command.TransactionInfo;
038import org.apache.activemq.command.XATransactionId;
039import org.apache.activemq.transaction.Synchronization;
040import org.apache.activemq.transport.failover.FailoverTransport;
041import org.apache.activemq.util.JMSExceptionSupport;
042import org.apache.activemq.util.LongSequenceGenerator;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * A TransactionContext provides the means to control a JMS transaction. It
048 * provides a local transaction interface and also an XAResource interface. <p/>
049 * An application server controls the transactional assignment of an XASession
050 * by obtaining its XAResource. It uses the XAResource to assign the session to
051 * a transaction, prepare and commit work on the transaction, and so on. <p/> An
052 * XAResource provides some fairly sophisticated facilities for interleaving
053 * work on multiple transactions, recovering a list of transactions in progress,
054 * and so on. A JTA aware JMS provider must fully implement this functionality.
055 * This could be done by using the services of a database that supports XA, or a
056 * JMS provider may choose to implement this functionality from scratch. <p/>
057 *
058 *
059 * @see javax.jms.Session
060 * @see javax.jms.QueueSession
061 * @see javax.jms.TopicSession
062 * @see javax.jms.XASession
063 */
064public class TransactionContext implements XAResource {
065
066    public static final String xaErrorCodeMarker = "xaErrorCode:";
067    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
068
069    // XATransactionId -> ArrayList of TransactionContext objects
070    private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS =
071                new HashMap<TransactionId, List<TransactionContext>>();
072
073    private ActiveMQConnection connection;
074    private final LongSequenceGenerator localTransactionIdGenerator;
075    private List<Synchronization> synchronizations;
076
077    // To track XA transactions.
078    private Xid associatedXid;
079    private TransactionId transactionId;
080    private LocalTransactionEventListener localTransactionEventListener;
081    private int beforeEndIndex;
082    private volatile boolean rollbackOnly;
083
084    // for RAR recovery
085    public TransactionContext() {
086        localTransactionIdGenerator = null;
087    }
088
089    public TransactionContext(ActiveMQConnection connection) {
090        this.connection = connection;
091        this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
092    }
093
094    public boolean isInXATransaction() {
095        if (transactionId != null && transactionId.isXATransaction()) {
096                return true;
097        } else {
098            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
099                for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
100                      if (transactions.contains(this)) {
101                          return true;
102                      }
103                }
104            }
105        }
106
107        return false;
108    }
109
110    public void setRollbackOnly(boolean val) {
111        rollbackOnly = val;
112    }
113
114    public boolean isRollbackOnly() {
115        return rollbackOnly;
116    }
117
118    public boolean isInLocalTransaction() {
119        return transactionId != null && transactionId.isLocalTransaction();
120    }
121
122    public boolean isInTransaction() {
123        return transactionId != null;
124    }
125
126    /**
127     * @return Returns the localTransactionEventListener.
128     */
129    public LocalTransactionEventListener getLocalTransactionEventListener() {
130        return localTransactionEventListener;
131    }
132
133    /**
134     * Used by the resource adapter to listen to transaction events.
135     *
136     * @param localTransactionEventListener The localTransactionEventListener to
137     *                set.
138     */
139    public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
140        this.localTransactionEventListener = localTransactionEventListener;
141    }
142
143    // ///////////////////////////////////////////////////////////
144    //
145    // Methods that work with the Synchronization objects registered with
146    // the transaction.
147    //
148    // ///////////////////////////////////////////////////////////
149
150    public void addSynchronization(Synchronization s) {
151        if (synchronizations == null) {
152            synchronizations = new ArrayList<Synchronization>(10);
153        }
154        synchronizations.add(s);
155    }
156
157    private void afterRollback() throws JMSException {
158        if (synchronizations == null) {
159            return;
160        }
161
162        Throwable firstException = null;
163        int size = synchronizations.size();
164        for (int i = 0; i < size; i++) {
165            try {
166                synchronizations.get(i).afterRollback();
167            } catch (Throwable t) {
168                LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t);
169                if (firstException == null) {
170                    firstException = t;
171                }
172            }
173        }
174        synchronizations = null;
175        if (firstException != null) {
176            throw JMSExceptionSupport.create(firstException);
177        }
178    }
179
180    private void afterCommit() throws JMSException {
181        if (synchronizations == null) {
182            return;
183        }
184
185        Throwable firstException = null;
186        int size = synchronizations.size();
187        for (int i = 0; i < size; i++) {
188            try {
189                synchronizations.get(i).afterCommit();
190            } catch (Throwable t) {
191                LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t);
192                if (firstException == null) {
193                    firstException = t;
194                }
195            }
196        }
197        synchronizations = null;
198        if (firstException != null) {
199            throw JMSExceptionSupport.create(firstException);
200        }
201    }
202
203    private void beforeEnd() throws JMSException {
204        if (synchronizations == null) {
205            return;
206        }
207
208        int size = synchronizations.size();
209        try {
210            for (;beforeEndIndex < size;) {
211                synchronizations.get(beforeEndIndex++).beforeEnd();
212            }
213        } catch (JMSException e) {
214            throw e;
215        } catch (Throwable e) {
216            throw JMSExceptionSupport.create(e);
217        }
218    }
219
220    public TransactionId getTransactionId() {
221        return transactionId;
222    }
223
224    // ///////////////////////////////////////////////////////////
225    //
226    // Local transaction interface.
227    //
228    // ///////////////////////////////////////////////////////////
229
230    /**
231     * Start a local transaction.
232     * @throws javax.jms.JMSException on internal error
233     */
234    public void begin() throws JMSException {
235
236        if (isInXATransaction()) {
237            throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
238        }
239
240        if (transactionId == null) {
241            synchronizations = null;
242            beforeEndIndex = 0;
243            setRollbackOnly(false);
244            this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId());
245            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
246            this.connection.ensureConnectionInfoSent();
247            this.connection.asyncSendPacket(info);
248
249            // Notify the listener that the tx was started.
250            if (localTransactionEventListener != null) {
251                localTransactionEventListener.beginEvent();
252            }
253            if (LOG.isDebugEnabled()) {
254                LOG.debug("Begin:" + transactionId);
255            }
256        }
257
258    }
259
260    /**
261     * Rolls back any work done in this transaction and releases any locks
262     * currently held.
263     *
264     * @throws JMSException if the JMS provider fails to roll back the
265     *                 transaction due to some internal error.
266     * @throws javax.jms.IllegalStateException if the method is not called by a
267     *                 transacted session.
268     */
269    public void rollback() throws JMSException {
270        if (isInXATransaction()) {
271            throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
272        }
273
274        try {
275            beforeEnd();
276        } catch (TransactionRolledBackException canOcurrOnFailover) {
277            LOG.warn("rollback processing error", canOcurrOnFailover);
278        }
279        if (transactionId != null) {
280            if (LOG.isDebugEnabled()) {
281                LOG.debug("Rollback: "  + transactionId
282                + " syncCount: "
283                + (synchronizations != null ? synchronizations.size() : 0));
284            }
285
286            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
287            this.transactionId = null;
288            //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
289            this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0);
290            // Notify the listener that the tx was rolled back
291            if (localTransactionEventListener != null) {
292                localTransactionEventListener.rollbackEvent();
293            }
294        }
295
296        afterRollback();
297    }
298
299    /**
300     * Commits all work done in this transaction and releases any locks
301     * currently held.
302     *
303     * @throws JMSException if the JMS provider fails to commit the transaction
304     *                 due to some internal error.
305     * @throws javax.jms.IllegalStateException if the method is not called by a
306     *                 transacted session.
307     */
308    public void commit() throws JMSException {
309        if (isInXATransaction()) {
310            throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
311        }
312
313        try {
314            beforeEnd();
315        } catch (JMSException e) {
316            rollback();
317            throw e;
318        }
319
320        if (transactionId != null && rollbackOnly) {
321            final String message = "Commit of " + transactionId + "  failed due to rollback only request; typically due to failover with pending acks";
322            try {
323                rollback();
324            } finally {
325                LOG.warn(message);
326                throw new TransactionRolledBackException(message);
327            }
328        }
329
330        // Only send commit if the transaction was started.
331        if (transactionId != null) {
332            if (LOG.isDebugEnabled()) {
333                LOG.debug("Commit: "  + transactionId
334                        + " syncCount: "
335                        + (synchronizations != null ? synchronizations.size() : 0));
336            }
337
338            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
339            this.transactionId = null;
340            // Notify the listener that the tx was committed back
341            try {
342                this.connection.syncSendPacket(info);
343                if (localTransactionEventListener != null) {
344                    localTransactionEventListener.commitEvent();
345                }
346                afterCommit();
347            } catch (JMSException cause) {
348                LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
349                if (localTransactionEventListener != null) {
350                    localTransactionEventListener.rollbackEvent();
351                }
352                afterRollback();
353                throw cause;
354            }
355
356        }
357    }
358
359    // ///////////////////////////////////////////////////////////
360    //
361    // XAResource Implementation
362    //
363    // ///////////////////////////////////////////////////////////
364    /**
365     * Associates a transaction with the resource.
366     */
367    public void start(Xid xid, int flags) throws XAException {
368
369        if (LOG.isDebugEnabled()) {
370            LOG.debug("Start: " + xid + ", flags:" + flags);
371        }
372        if (isInLocalTransaction()) {
373            throw new XAException(XAException.XAER_PROTO);
374        }
375        // Are we already associated?
376        if (associatedXid != null) {
377            throw new XAException(XAException.XAER_PROTO);
378        }
379
380        // if ((flags & TMJOIN) == TMJOIN) {
381        // TODO: verify that the server has seen the xid
382        // // }
383        // if ((flags & TMJOIN) == TMRESUME) {
384        // // TODO: verify that the xid was suspended.
385        // }
386
387        // associate
388        synchronizations = null;
389        beforeEndIndex = 0;
390        setRollbackOnly(false);
391        setXid(xid);
392    }
393
394    /**
395     * @return connectionId for connection
396     */
397    private ConnectionId getConnectionId() {
398        return connection.getConnectionInfo().getConnectionId();
399    }
400
401    public void end(Xid xid, int flags) throws XAException {
402
403        if (LOG.isDebugEnabled()) {
404            LOG.debug("End: " + xid + ", flags:" + flags);
405        }
406
407        if (isInLocalTransaction()) {
408            throw new XAException(XAException.XAER_PROTO);
409        }
410
411        if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
412            // You can only suspend the associated xid.
413            if (!equals(associatedXid, xid)) {
414                throw new XAException(XAException.XAER_PROTO);
415            }
416            invokeBeforeEnd();
417        } else if ((flags & TMSUCCESS) == TMSUCCESS) {
418            // set to null if this is the current xid.
419            // otherwise this could be an asynchronous success call
420            if (equals(associatedXid, xid)) {
421                invokeBeforeEnd();
422            }
423        } else {
424            throw new XAException(XAException.XAER_INVAL);
425        }
426    }
427
428    private void invokeBeforeEnd() throws XAException {
429        boolean throwingException = false;
430        try {
431            beforeEnd();
432        } catch (JMSException e) {
433            throwingException = true;
434            throw toXAException(e);
435        } finally {
436            try {
437                setXid(null);
438            } catch (XAException ignoreIfWillMask){
439                if (!throwingException) {
440                    throw ignoreIfWillMask;
441                }
442            }
443        }
444    }
445
446    private boolean equals(Xid xid1, Xid xid2) {
447        if (xid1 == xid2) {
448            return true;
449        }
450        if (xid1 == null ^ xid2 == null) {
451            return false;
452        }
453        return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
454               && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
455    }
456
457    public int prepare(Xid xid) throws XAException {
458        if (LOG.isDebugEnabled()) {
459            LOG.debug("Prepare: " + xid);
460        }
461
462        // We allow interleaving multiple transactions, so
463        // we don't limit prepare to the associated xid.
464        XATransactionId x;
465        // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
466        // called first
467        if (xid == null || (equals(associatedXid, xid))) {
468            throw new XAException(XAException.XAER_PROTO);
469        } else {
470            // TODO: cache the known xids so we don't keep recreating this one??
471            x = new XATransactionId(xid);
472        }
473
474        if (rollbackOnly) {
475            LOG.warn("prepare of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks");
476            throw new XAException(XAException.XA_RBINTEGRITY);
477        }
478
479        try {
480            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
481
482            // Find out if the server wants to commit or rollback.
483            IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
484            if (XAResource.XA_RDONLY == response.getResult()) {
485                // transaction stops now, may be syncs that need a callback
486                List<TransactionContext> l;
487                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
488                    l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
489                }
490                // After commit may be expensive and can deadlock, do it outside global synch block
491                // No risk for concurrent updates as we own the list now
492                if (l != null) {
493                    if(! l.isEmpty()) {
494                        LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid);
495                        for (TransactionContext ctx : l) {
496                            ctx.afterCommit();
497                        }
498                    }
499                }
500            }
501            return response.getResult();
502
503        } catch (JMSException e) {
504            LOG.warn("prepare of: " + x + " failed with: " + e, e);
505            List<TransactionContext> l;
506            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
507                l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
508            }
509            // After rollback may be expensive and can deadlock, do it outside global synch block
510            // No risk for concurrent updates as we own the list now
511            if (l != null) {
512                for (TransactionContext ctx : l) {
513                    try {
514                        ctx.afterRollback();
515                    } catch (Throwable ignored) {
516                        LOG.debug("failed to firing afterRollback callbacks on prepare " +
517                                  "failure, txid: {}, context: {}", x, ctx, ignored);
518                    }
519                }
520            }
521            throw toXAException(e);
522        }
523    }
524
525    public void rollback(Xid xid) throws XAException {
526
527        if (LOG.isDebugEnabled()) {
528            LOG.debug("Rollback: " + xid);
529        }
530
531        // We allow interleaving multiple transactions, so
532        // we don't limit rollback to the associated xid.
533        XATransactionId x;
534        if (xid == null) {
535            throw new XAException(XAException.XAER_PROTO);
536        }
537        if (equals(associatedXid, xid)) {
538            // I think this can happen even without an end(xid) call. Need to
539            // check spec.
540            x = (XATransactionId)transactionId;
541        } else {
542            x = new XATransactionId(xid);
543        }
544
545        try {
546            this.connection.checkClosedOrFailed();
547            this.connection.ensureConnectionInfoSent();
548
549            // Let the server know that the tx is rollback.
550            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
551            this.connection.syncSendPacket(info);
552
553            List<TransactionContext> l;
554            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
555                l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
556            }
557            // After rollback may be expensive and can deadlock, do it outside global synch block
558            // No risk for concurrent updates as we own the list now
559            if (l != null) {
560                for (TransactionContext ctx : l) {
561                    try {
562                        ctx.afterRollback();
563                    } catch (Exception ignored) {
564                        LOG.debug("ignoring exception from after rollback on ended transaction: {}", ignored, ignored);
565                    }
566                }                  
567            }
568        } catch (JMSException e) {
569            throw toXAException(e);
570        }
571    }
572
573    // XAResource interface
574    public void commit(Xid xid, boolean onePhase) throws XAException {
575
576        if (LOG.isDebugEnabled()) {
577            LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
578        }
579
580        // We allow interleaving multiple transactions, so
581        // we don't limit commit to the associated xid.
582        XATransactionId x;
583        if (xid == null || (equals(associatedXid, xid))) {
584            // should never happen, end(xid,TMSUCCESS) must have been previously
585            // called
586            throw new XAException(XAException.XAER_PROTO);
587        } else {
588            x = new XATransactionId(xid);
589        }
590
591        if (rollbackOnly) {
592             LOG.warn("commit of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks");
593             throw new XAException(XAException.XA_RBINTEGRITY);
594         }
595
596        try {
597            this.connection.checkClosedOrFailed();
598            this.connection.ensureConnectionInfoSent();
599
600            // Notify the server that the tx was committed back
601            TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
602
603            this.connection.syncSendPacket(info);
604
605            List<TransactionContext> l;
606            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
607                l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
608            }
609            // After commit may be expensive and can deadlock, do it outside global synch block
610            // No risk for concurrent updates as we own the list now
611            if (l != null) {
612                for (TransactionContext ctx : l) {
613                    try {
614                        ctx.afterCommit();
615                    } catch (Exception ignored) {
616                        LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored);
617                    }
618                }
619            }
620
621        } catch (JMSException e) {
622            LOG.warn("commit of: " + x + " failed with: " + e, e);
623            if (onePhase) {
624                List<TransactionContext> l;
625                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
626                    l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
627                }
628                // After rollback may be expensive and can deadlock, do it outside global synch block
629                // No risk for concurrent updates as we own the list now
630                if (l != null) {
631                    for (TransactionContext ctx : l) {
632                        try {
633                            ctx.afterRollback();
634                        } catch (Throwable ignored) {
635                            LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored);
636                        }
637                    }
638                }
639            }
640            throw toXAException(e);
641        }
642
643    }
644
645    public void forget(Xid xid) throws XAException {
646        if (LOG.isDebugEnabled()) {
647            LOG.debug("Forget: " + xid);
648        }
649
650        // We allow interleaving multiple transactions, so
651        // we don't limit forget to the associated xid.
652        XATransactionId x;
653        if (xid == null) {
654            throw new XAException(XAException.XAER_PROTO);
655        }
656        if (equals(associatedXid, xid)) {
657            // TODO determine if this can happen... I think not.
658            x = (XATransactionId)transactionId;
659        } else {
660            x = new XATransactionId(xid);
661        }
662
663        TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
664
665        try {
666            // Tell the server to forget the transaction.
667            this.connection.syncSendPacket(info);
668        } catch (JMSException e) {
669            throw toXAException(e);
670        }
671        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
672                ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
673        }
674    }
675
676    public boolean isSameRM(XAResource xaResource) throws XAException {
677        if (xaResource == null) {
678            return false;
679        }
680        if (!(xaResource instanceof TransactionContext)) {
681            return false;
682        }
683        TransactionContext xar = (TransactionContext)xaResource;
684        try {
685            return getResourceManagerId().equals(xar.getResourceManagerId());
686        } catch (Throwable e) {
687            throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
688        }
689    }
690
691    public Xid[] recover(int flag) throws XAException {
692        LOG.debug("recover({})", flag);
693        XATransactionId[] answer;
694
695        if (XAResource.TMNOFLAGS == flag) {
696            // signal next in cursor scan, which for us is always the end b/c we don't maintain any cursor state
697            // allows looping scan to complete
698            answer = new XATransactionId[0];
699        } else {
700            TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
701            try {
702                this.connection.checkClosedOrFailed();
703                this.connection.ensureConnectionInfoSent();
704
705                DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info);
706                DataStructure[] data = receipt.getData();
707                if (data instanceof XATransactionId[]) {
708                    answer = (XATransactionId[]) data;
709                } else {
710                    answer = new XATransactionId[data.length];
711                    System.arraycopy(data, 0, answer, 0, data.length);
712                }
713            } catch (JMSException e) {
714                throw toXAException(e);
715            }
716        }
717        LOG.debug("recover({})={}", flag, answer);
718        return answer;
719    }
720
721    public int getTransactionTimeout() throws XAException {
722        return 0;
723    }
724
725    public boolean setTransactionTimeout(int seconds) throws XAException {
726        return false;
727    }
728
729    // ///////////////////////////////////////////////////////////
730    //
731    // Helper methods.
732    //
733    // ///////////////////////////////////////////////////////////
734    protected String getResourceManagerId() throws JMSException {
735        return this.connection.getResourceManagerId();
736    }
737
738    private void setXid(Xid xid) throws XAException {
739
740        try {
741            this.connection.checkClosedOrFailed();
742            this.connection.ensureConnectionInfoSent();
743        } catch (JMSException e) {
744            disassociate();
745            throw toXAException(e);
746        }
747
748        if (xid != null) {
749            // associate
750            associatedXid = xid;
751            transactionId = new XATransactionId(xid);
752
753            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
754            try {
755                this.connection.asyncSendPacket(info);
756                if (LOG.isDebugEnabled()) {
757                    LOG.debug("{} started XA transaction {} ", this, transactionId);
758                }
759            } catch (JMSException e) {
760                disassociate();
761                throw toXAException(e);
762            }
763
764        } else {
765
766            if (transactionId != null) {
767                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END);
768                try {
769                    this.connection.syncSendPacket(info);
770                    LOG.debug("{} ended XA transaction {}", this, transactionId);
771                } catch (JMSException e) {
772                    disassociate();
773                    throw toXAException(e);
774                }
775
776                // Add our self to the list of contexts that are interested in
777                // post commit/rollback events.
778                List<TransactionContext> l;
779                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
780                    l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
781                    if (l == null) {
782                        l = new ArrayList<TransactionContext>(3);
783                        ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
784                    }
785                    if (!l.contains(this)) {
786                        l.add(this);
787                    }
788                }
789            }
790
791            disassociate();
792        }
793    }
794
795    private void disassociate() {
796         // dis-associate
797         associatedXid = null;
798         transactionId = null;
799    }
800
801    /**
802     * Converts a JMSException from the server to an XAException. if the
803     * JMSException contained a linked XAException that is returned instead.
804     *
805     * @param e JMSException to convert
806     * @return XAException wrapping original exception or its message
807     */
808    public static XAException toXAException(JMSException e) {
809        if (e.getCause() != null && e.getCause() instanceof XAException) {
810            XAException original = (XAException)e.getCause();
811            XAException xae = new XAException(original.getMessage());
812            xae.errorCode = original.errorCode;
813            if (xae.errorCode == XA_OK) {
814                // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable
815                xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR);
816            }
817            xae.initCause(original);
818            return xae;
819        }
820
821        XAException xae = new XAException(e.getMessage());
822        xae.errorCode = XAException.XAER_RMFAIL;
823        xae.initCause(e);
824        return xae;
825    }
826
827    private static int parseFromMessageOr(String message, int fallbackCode) {
828        final String marker = "xaErrorCode:";
829        final int index = message.lastIndexOf(marker);
830        if (index > -1) {
831            try {
832                return Integer.parseInt(message.substring(index + marker.length()));
833            } catch (Exception ignored) {}
834        }
835        return fallbackCode;
836    }
837
838    public ActiveMQConnection getConnection() {
839        return connection;
840    }
841
842
843    // for RAR xa recovery where xaresource connection is per request
844    public ActiveMQConnection setConnection(ActiveMQConnection connection) {
845        ActiveMQConnection existing = this.connection;
846        this.connection = connection;
847        return existing;
848    }
849
850    public void cleanup() {
851        associatedXid = null;
852        transactionId = null;
853    }
854
855    @Override
856    public String toString() {
857        return "TransactionContext{" +
858                "transactionId=" + transactionId +
859                ",connection=" + connection +
860                '}';
861    }
862}