001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.ArrayList;
020import java.util.Arrays;
021import java.util.HashMap;
022import java.util.List;
023
024import javax.jms.JMSException;
025import javax.jms.TransactionInProgressException;
026import javax.jms.TransactionRolledBackException;
027import javax.transaction.xa.XAException;
028import javax.transaction.xa.XAResource;
029import javax.transaction.xa.Xid;
030
031import org.apache.activemq.command.ConnectionId;
032import org.apache.activemq.command.DataArrayResponse;
033import org.apache.activemq.command.DataStructure;
034import org.apache.activemq.command.IntegerResponse;
035import org.apache.activemq.command.LocalTransactionId;
036import org.apache.activemq.command.TransactionId;
037import org.apache.activemq.command.TransactionInfo;
038import org.apache.activemq.command.XATransactionId;
039import org.apache.activemq.transaction.Synchronization;
040import org.apache.activemq.transport.failover.FailoverTransport;
041import org.apache.activemq.util.JMSExceptionSupport;
042import org.apache.activemq.util.LongSequenceGenerator;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * A TransactionContext provides the means to control a JMS transaction. It
048 * provides a local transaction interface and also an XAResource interface. <p/>
049 * An application server controls the transactional assignment of an XASession
050 * by obtaining its XAResource. It uses the XAResource to assign the session to
051 * a transaction, prepare and commit work on the transaction, and so on. <p/> An
052 * XAResource provides some fairly sophisticated facilities for interleaving
053 * work on multiple transactions, recovering a list of transactions in progress,
054 * and so on. A JTA aware JMS provider must fully implement this functionality.
055 * This could be done by using the services of a database that supports XA, or a
056 * JMS provider may choose to implement this functionality from scratch. <p/>
057 *
058 *
059 * @see javax.jms.Session
060 * @see javax.jms.QueueSession
061 * @see javax.jms.TopicSession
062 * @see javax.jms.XASession
063 */
064public class TransactionContext implements XAResource {
065
066    public static final String xaErrorCodeMarker = "xaErrorCode:";
067    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
068
069    // XATransactionId -> ArrayList of TransactionContext objects
070    private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS =
071                new HashMap<TransactionId, List<TransactionContext>>();
072
073    private ActiveMQConnection connection;
074    private final LongSequenceGenerator localTransactionIdGenerator;
075    private List<Synchronization> synchronizations;
076
077    // To track XA transactions.
078    private Xid associatedXid;
079    private TransactionId transactionId;
080    private LocalTransactionEventListener localTransactionEventListener;
081    private int beforeEndIndex;
082    private volatile boolean rollbackOnly;
083
084    // for RAR recovery
085    public TransactionContext() {
086        localTransactionIdGenerator = null;
087    }
088
089    public TransactionContext(ActiveMQConnection connection) {
090        this.connection = connection;
091        this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
092    }
093
094    public boolean isInXATransaction() {
095        if (transactionId != null && transactionId.isXATransaction()) {
096                return true;
097        } else {
098                if (!ENDED_XA_TRANSACTION_CONTEXTS.isEmpty()) {
099                        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
100                                for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
101                                        if (transactions.contains(this)) {
102                                                return true;
103                                        }
104                                }
105                        }
106                }
107        }
108
109        return false;
110    }
111
112    public void setRollbackOnly(boolean val) {
113        rollbackOnly = val;
114    }
115
116    public boolean isInLocalTransaction() {
117        return transactionId != null && transactionId.isLocalTransaction();
118    }
119
120    public boolean isInTransaction() {
121        return transactionId != null;
122    }
123
124    /**
125     * @return Returns the localTransactionEventListener.
126     */
127    public LocalTransactionEventListener getLocalTransactionEventListener() {
128        return localTransactionEventListener;
129    }
130
131    /**
132     * Used by the resource adapter to listen to transaction events.
133     *
134     * @param localTransactionEventListener The localTransactionEventListener to
135     *                set.
136     */
137    public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
138        this.localTransactionEventListener = localTransactionEventListener;
139    }
140
141    // ///////////////////////////////////////////////////////////
142    //
143    // Methods that work with the Synchronization objects registered with
144    // the transaction.
145    //
146    // ///////////////////////////////////////////////////////////
147
148    public void addSynchronization(Synchronization s) {
149        if (synchronizations == null) {
150            synchronizations = new ArrayList<Synchronization>(10);
151        }
152        synchronizations.add(s);
153    }
154
155    private void afterRollback() throws JMSException {
156        if (synchronizations == null) {
157            return;
158        }
159
160        Throwable firstException = null;
161        int size = synchronizations.size();
162        for (int i = 0; i < size; i++) {
163            try {
164                synchronizations.get(i).afterRollback();
165            } catch (Throwable t) {
166                LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t);
167                if (firstException == null) {
168                    firstException = t;
169                }
170            }
171        }
172        synchronizations = null;
173        if (firstException != null) {
174            throw JMSExceptionSupport.create(firstException);
175        }
176    }
177
178    private void afterCommit() throws JMSException {
179        if (synchronizations == null) {
180            return;
181        }
182
183        Throwable firstException = null;
184        int size = synchronizations.size();
185        for (int i = 0; i < size; i++) {
186            try {
187                synchronizations.get(i).afterCommit();
188            } catch (Throwable t) {
189                LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t);
190                if (firstException == null) {
191                    firstException = t;
192                }
193            }
194        }
195        synchronizations = null;
196        if (firstException != null) {
197            throw JMSExceptionSupport.create(firstException);
198        }
199    }
200
201    private void beforeEnd() throws JMSException {
202        if (synchronizations == null) {
203            return;
204        }
205
206        int size = synchronizations.size();
207        try {
208            for (;beforeEndIndex < size;) {
209                synchronizations.get(beforeEndIndex++).beforeEnd();
210            }
211        } catch (JMSException e) {
212            throw e;
213        } catch (Throwable e) {
214            throw JMSExceptionSupport.create(e);
215        }
216    }
217
218    public TransactionId getTransactionId() {
219        return transactionId;
220    }
221
222    // ///////////////////////////////////////////////////////////
223    //
224    // Local transaction interface.
225    //
226    // ///////////////////////////////////////////////////////////
227
228    /**
229     * Start a local transaction.
230     * @throws javax.jms.JMSException on internal error
231     */
232    public void begin() throws JMSException {
233
234        if (isInXATransaction()) {
235            throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
236        }
237
238        if (transactionId == null) {
239            synchronizations = null;
240            beforeEndIndex = 0;
241            setRollbackOnly(false);
242            this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId());
243            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
244            this.connection.ensureConnectionInfoSent();
245            this.connection.asyncSendPacket(info);
246
247            // Notify the listener that the tx was started.
248            if (localTransactionEventListener != null) {
249                localTransactionEventListener.beginEvent();
250            }
251            if (LOG.isDebugEnabled()) {
252                LOG.debug("Begin:" + transactionId);
253            }
254        }
255
256    }
257
258    /**
259     * Rolls back any work done in this transaction and releases any locks
260     * currently held.
261     *
262     * @throws JMSException if the JMS provider fails to roll back the
263     *                 transaction due to some internal error.
264     * @throws javax.jms.IllegalStateException if the method is not called by a
265     *                 transacted session.
266     */
267    public void rollback() throws JMSException {
268        if (isInXATransaction()) {
269            throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
270        }
271
272        try {
273            beforeEnd();
274        } catch (TransactionRolledBackException canOcurrOnFailover) {
275            LOG.warn("rollback processing error", canOcurrOnFailover);
276        }
277        if (transactionId != null) {
278            if (LOG.isDebugEnabled()) {
279                LOG.debug("Rollback: "  + transactionId
280                + " syncCount: "
281                + (synchronizations != null ? synchronizations.size() : 0));
282            }
283
284            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
285            this.transactionId = null;
286            //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
287            this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0);
288            // Notify the listener that the tx was rolled back
289            if (localTransactionEventListener != null) {
290                localTransactionEventListener.rollbackEvent();
291            }
292        }
293
294        afterRollback();
295    }
296
297    /**
298     * Commits all work done in this transaction and releases any locks
299     * currently held.
300     *
301     * @throws JMSException if the JMS provider fails to commit the transaction
302     *                 due to some internal error.
303     * @throws javax.jms.IllegalStateException if the method is not called by a
304     *                 transacted session.
305     */
306    public void commit() throws JMSException {
307        if (isInXATransaction()) {
308            throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
309        }
310
311        try {
312            beforeEnd();
313        } catch (JMSException e) {
314            rollback();
315            throw e;
316        }
317
318        if (transactionId != null && rollbackOnly) {
319            final String message = "Commit of " + transactionId + "  failed due to rollback only request; typically due to failover with pending acks";
320            try {
321                rollback();
322            } finally {
323                LOG.warn(message);
324                throw new TransactionRolledBackException(message);
325            }
326        }
327
328        // Only send commit if the transaction was started.
329        if (transactionId != null) {
330            if (LOG.isDebugEnabled()) {
331                LOG.debug("Commit: "  + transactionId
332                        + " syncCount: "
333                        + (synchronizations != null ? synchronizations.size() : 0));
334            }
335
336            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
337            this.transactionId = null;
338            // Notify the listener that the tx was committed back
339            try {
340                this.connection.syncSendPacket(info);
341                if (localTransactionEventListener != null) {
342                    localTransactionEventListener.commitEvent();
343                }
344                afterCommit();
345            } catch (JMSException cause) {
346                LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
347                if (localTransactionEventListener != null) {
348                    localTransactionEventListener.rollbackEvent();
349                }
350                afterRollback();
351                throw cause;
352            }
353
354        }
355    }
356
357    // ///////////////////////////////////////////////////////////
358    //
359    // XAResource Implementation
360    //
361    // ///////////////////////////////////////////////////////////
362    /**
363     * Associates a transaction with the resource.
364     */
365    public void start(Xid xid, int flags) throws XAException {
366
367        if (LOG.isDebugEnabled()) {
368            LOG.debug("Start: " + xid + ", flags:" + flags);
369        }
370        if (isInLocalTransaction()) {
371            throw new XAException(XAException.XAER_PROTO);
372        }
373        // Are we already associated?
374        if (associatedXid != null) {
375            throw new XAException(XAException.XAER_PROTO);
376        }
377
378        // if ((flags & TMJOIN) == TMJOIN) {
379        // TODO: verify that the server has seen the xid
380        // // }
381        // if ((flags & TMJOIN) == TMRESUME) {
382        // // TODO: verify that the xid was suspended.
383        // }
384
385        // associate
386        synchronizations = null;
387        beforeEndIndex = 0;
388        setRollbackOnly(false);
389        setXid(xid);
390    }
391
392    /**
393     * @return connectionId for connection
394     */
395    private ConnectionId getConnectionId() {
396        return connection.getConnectionInfo().getConnectionId();
397    }
398
399    public void end(Xid xid, int flags) throws XAException {
400
401        if (LOG.isDebugEnabled()) {
402            LOG.debug("End: " + xid + ", flags:" + flags);
403        }
404
405        if (isInLocalTransaction()) {
406            throw new XAException(XAException.XAER_PROTO);
407        }
408
409        if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
410            // You can only suspend the associated xid.
411            if (!equals(associatedXid, xid)) {
412                throw new XAException(XAException.XAER_PROTO);
413            }
414            invokeBeforeEnd();
415        } else if ((flags & TMSUCCESS) == TMSUCCESS) {
416            // set to null if this is the current xid.
417            // otherwise this could be an asynchronous success call
418            if (equals(associatedXid, xid)) {
419                invokeBeforeEnd();
420            }
421        } else {
422            throw new XAException(XAException.XAER_INVAL);
423        }
424    }
425
426    private void invokeBeforeEnd() throws XAException {
427        boolean throwingException = false;
428        try {
429            beforeEnd();
430        } catch (JMSException e) {
431            throwingException = true;
432            throw toXAException(e);
433        } finally {
434            try {
435                setXid(null);
436            } catch (XAException ignoreIfWillMask){
437                if (!throwingException) {
438                    throw ignoreIfWillMask;
439                }
440            }
441        }
442    }
443
444    private boolean equals(Xid xid1, Xid xid2) {
445        if (xid1 == xid2) {
446            return true;
447        }
448        if (xid1 == null ^ xid2 == null) {
449            return false;
450        }
451        return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
452               && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
453    }
454
455    public int prepare(Xid xid) throws XAException {
456        if (LOG.isDebugEnabled()) {
457            LOG.debug("Prepare: " + xid);
458        }
459
460        // We allow interleaving multiple transactions, so
461        // we don't limit prepare to the associated xid.
462        XATransactionId x;
463        // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
464        // called first
465        if (xid == null || (equals(associatedXid, xid))) {
466            throw new XAException(XAException.XAER_PROTO);
467        } else {
468            // TODO: cache the known xids so we don't keep recreating this one??
469            x = new XATransactionId(xid);
470        }
471
472        if (rollbackOnly) {
473            LOG.warn("prepare of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks");
474            throw new XAException(XAException.XA_RBINTEGRITY);
475        }
476
477        try {
478            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
479
480            // Find out if the server wants to commit or rollback.
481            IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
482            if (XAResource.XA_RDONLY == response.getResult()) {
483                // transaction stops now, may be syncs that need a callback
484                        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
485                        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
486                        if (l != null && !l.isEmpty()) {
487                            if (LOG.isDebugEnabled()) {
488                                LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: " + xid);
489                            }
490                            for (TransactionContext ctx : l) {
491                                ctx.afterCommit();
492                            }
493                        }
494                        }
495            }
496            return response.getResult();
497
498        } catch (JMSException e) {
499            LOG.warn("prepare of: " + x + " failed with: " + e, e);
500                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
501                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
502                    if (l != null && !l.isEmpty()) {
503                        for (TransactionContext ctx : l) {
504                            try {
505                                ctx.afterRollback();
506                            } catch (Throwable ignored) {
507                                if (LOG.isDebugEnabled()) {
508                                    LOG.debug("failed to firing afterRollback callbacks on prepare failure, txid: " +
509                                                  x + ", context: " + ctx, ignored);
510                                }
511                            }
512                        }
513                    }
514                }
515            throw toXAException(e);
516        }
517    }
518
519    public void rollback(Xid xid) throws XAException {
520
521        if (LOG.isDebugEnabled()) {
522            LOG.debug("Rollback: " + xid);
523        }
524
525        // We allow interleaving multiple transactions, so
526        // we don't limit rollback to the associated xid.
527        XATransactionId x;
528        if (xid == null) {
529            throw new XAException(XAException.XAER_PROTO);
530        }
531        if (equals(associatedXid, xid)) {
532            // I think this can happen even without an end(xid) call. Need to
533            // check spec.
534            x = (XATransactionId)transactionId;
535        } else {
536            x = new XATransactionId(xid);
537        }
538
539        try {
540            this.connection.checkClosedOrFailed();
541            this.connection.ensureConnectionInfoSent();
542
543            // Let the server know that the tx is rollback.
544            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
545            this.connection.syncSendPacket(info);
546
547                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
548                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
549                    if (l != null && !l.isEmpty()) {
550                        for (TransactionContext ctx : l) {
551                            ctx.afterRollback();
552                        }
553                    }
554                }
555        } catch (JMSException e) {
556            throw toXAException(e);
557        }
558    }
559
560    // XAResource interface
561    public void commit(Xid xid, boolean onePhase) throws XAException {
562
563        if (LOG.isDebugEnabled()) {
564            LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
565        }
566
567        // We allow interleaving multiple transactions, so
568        // we don't limit commit to the associated xid.
569        XATransactionId x;
570        if (xid == null || (equals(associatedXid, xid))) {
571            // should never happen, end(xid,TMSUCCESS) must have been previously
572            // called
573            throw new XAException(XAException.XAER_PROTO);
574        } else {
575            x = new XATransactionId(xid);
576        }
577
578        if (rollbackOnly) {
579             LOG.warn("commit of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks");
580             throw new XAException(XAException.XA_RBINTEGRITY);
581         }
582
583        try {
584            this.connection.checkClosedOrFailed();
585            this.connection.ensureConnectionInfoSent();
586
587            // Notify the server that the tx was committed back
588            TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
589
590            this.connection.syncSendPacket(info);
591
592                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
593                    List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
594                    if (l != null && !l.isEmpty()) {
595                        for (TransactionContext ctx : l) {
596                            try {
597                                ctx.afterCommit();
598                            } catch (Exception ignored) {
599                                LOG.debug("ignoring exception from after completion on ended transaction: " + ignored, ignored);
600                            }
601                        }
602                    }
603                }
604
605        } catch (JMSException e) {
606            LOG.warn("commit of: " + x + " failed with: " + e, e);
607            if (onePhase) {
608                        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
609                        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
610                        if (l != null && !l.isEmpty()) {
611                            for (TransactionContext ctx : l) {
612                                try {
613                                    ctx.afterRollback();
614                                } catch (Throwable ignored) {
615                                    if (LOG.isDebugEnabled()) {
616                                        LOG.debug("failed to firing afterRollback callbacks commit failure, txid: " + x + ", context: " + ctx, ignored);
617                                    }
618                                }
619                            }
620                        }
621                        }
622            }
623            throw toXAException(e);
624        }
625
626    }
627
628    public void forget(Xid xid) throws XAException {
629        if (LOG.isDebugEnabled()) {
630            LOG.debug("Forget: " + xid);
631        }
632
633        // We allow interleaving multiple transactions, so
634        // we don't limit forget to the associated xid.
635        XATransactionId x;
636        if (xid == null) {
637            throw new XAException(XAException.XAER_PROTO);
638        }
639        if (equals(associatedXid, xid)) {
640            // TODO determine if this can happen... I think not.
641            x = (XATransactionId)transactionId;
642        } else {
643            x = new XATransactionId(xid);
644        }
645
646        TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
647
648        try {
649            // Tell the server to forget the transaction.
650            this.connection.syncSendPacket(info);
651        } catch (JMSException e) {
652            throw toXAException(e);
653        }
654        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
655                ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
656        }
657    }
658
659    public boolean isSameRM(XAResource xaResource) throws XAException {
660        if (xaResource == null) {
661            return false;
662        }
663        if (!(xaResource instanceof TransactionContext)) {
664            return false;
665        }
666        TransactionContext xar = (TransactionContext)xaResource;
667        try {
668            return getResourceManagerId().equals(xar.getResourceManagerId());
669        } catch (Throwable e) {
670            throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
671        }
672    }
673
674    public Xid[] recover(int flag) throws XAException {
675        LOG.debug("recover({})", flag);
676
677        TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
678        try {
679            this.connection.checkClosedOrFailed();
680            this.connection.ensureConnectionInfoSent();
681
682            DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
683            DataStructure[] data = receipt.getData();
684            XATransactionId[] answer;
685            if (data instanceof XATransactionId[]) {
686                answer = (XATransactionId[])data;
687            } else {
688                answer = new XATransactionId[data.length];
689                System.arraycopy(data, 0, answer, 0, data.length);
690            }
691            LOG.debug("recover({})={}", flag, answer);
692            return answer;
693        } catch (JMSException e) {
694            throw toXAException(e);
695        }
696    }
697
698    public int getTransactionTimeout() throws XAException {
699        return 0;
700    }
701
702    public boolean setTransactionTimeout(int seconds) throws XAException {
703        return false;
704    }
705
706    // ///////////////////////////////////////////////////////////
707    //
708    // Helper methods.
709    //
710    // ///////////////////////////////////////////////////////////
711    protected String getResourceManagerId() throws JMSException {
712        return this.connection.getResourceManagerId();
713    }
714
715    private void setXid(Xid xid) throws XAException {
716
717        try {
718            this.connection.checkClosedOrFailed();
719            this.connection.ensureConnectionInfoSent();
720        } catch (JMSException e) {
721            disassociate();
722            throw toXAException(e);
723        }
724
725        if (xid != null) {
726            // associate
727            associatedXid = xid;
728            transactionId = new XATransactionId(xid);
729
730            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
731            try {
732                this.connection.asyncSendPacket(info);
733                if (LOG.isDebugEnabled()) {
734                    LOG.debug("{} started XA transaction {} ", this, transactionId);
735                }
736            } catch (JMSException e) {
737                disassociate();
738                throw toXAException(e);
739            }
740
741        } else {
742
743            if (transactionId != null) {
744                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END);
745                try {
746                    this.connection.syncSendPacket(info);
747                    LOG.debug("{} ended XA transaction {}", this, transactionId);
748                } catch (JMSException e) {
749                    disassociate();
750                    throw toXAException(e);
751                }
752
753                // Add our self to the list of contexts that are interested in
754                // post commit/rollback events.
755                        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
756                        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
757                        if (l == null) {
758                            l = new ArrayList<TransactionContext>(3);
759                            ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
760                            l.add(this);
761                        } else if (!l.contains(this)) {
762                            l.add(this);
763                        }
764                        }
765            }
766
767            disassociate();
768        }
769    }
770
771    private void disassociate() {
772         // dis-associate
773         associatedXid = null;
774         transactionId = null;
775    }
776
777    /**
778     * Converts a JMSException from the server to an XAException. if the
779     * JMSException contained a linked XAException that is returned instead.
780     *
781     * @param e JMSException to convert
782     * @return XAException wrapping original exception or its message
783     */
784    private XAException toXAException(JMSException e) {
785        if (e.getCause() != null && e.getCause() instanceof XAException) {
786            XAException original = (XAException)e.getCause();
787            XAException xae = new XAException(original.getMessage());
788            xae.errorCode = original.errorCode;
789            if (xae.errorCode == XA_OK) {
790                // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable
791                xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR);
792            }
793            xae.initCause(original);
794            return xae;
795        }
796
797        XAException xae = new XAException(e.getMessage());
798        xae.errorCode = XAException.XAER_RMFAIL;
799        xae.initCause(e);
800        return xae;
801    }
802
803    private int parseFromMessageOr(String message, int fallbackCode) {
804        final String marker = "xaErrorCode:";
805        final int index = message.lastIndexOf(marker);
806        if (index > -1) {
807            try {
808                return Integer.parseInt(message.substring(index + marker.length()));
809            } catch (Exception ignored) {}
810        }
811        return fallbackCode;
812    }
813
814    public ActiveMQConnection getConnection() {
815        return connection;
816    }
817
818
819    // for RAR xa recovery where xaresource connection is per request
820    public ActiveMQConnection setConnection(ActiveMQConnection connection) {
821        ActiveMQConnection existing = this.connection;
822        this.connection = connection;
823        return existing;
824    }
825
826    public void cleanup() {
827        associatedXid = null;
828        transactionId = null;
829    }
830
831    @Override
832    public String toString() {
833        return "TransactionContext{" +
834                "transactionId=" + transactionId +
835                ",connection=" + connection +
836                '}';
837    }
838}