001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.ArrayList;
020import java.util.Arrays;
021import java.util.HashMap;
022import java.util.List;
023
024import javax.jms.JMSException;
025import javax.jms.TransactionInProgressException;
026import javax.jms.TransactionRolledBackException;
027import javax.transaction.xa.XAException;
028import javax.transaction.xa.XAResource;
029import javax.transaction.xa.Xid;
030
031import org.apache.activemq.command.ConnectionId;
032import org.apache.activemq.command.DataArrayResponse;
033import org.apache.activemq.command.DataStructure;
034import org.apache.activemq.command.IntegerResponse;
035import org.apache.activemq.command.LocalTransactionId;
036import org.apache.activemq.command.TransactionId;
037import org.apache.activemq.command.TransactionInfo;
038import org.apache.activemq.command.XATransactionId;
039import org.apache.activemq.transaction.Synchronization;
040import org.apache.activemq.transport.failover.FailoverTransport;
041import org.apache.activemq.util.JMSExceptionSupport;
042import org.apache.activemq.util.LongSequenceGenerator;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * A TransactionContext provides the means to control a JMS transaction. It
048 * provides a local transaction interface and also an XAResource interface. <p/>
049 * An application server controls the transactional assignment of an XASession
050 * by obtaining its XAResource. It uses the XAResource to assign the session to
051 * a transaction, prepare and commit work on the transaction, and so on. <p/> An
052 * XAResource provides some fairly sophisticated facilities for interleaving
053 * work on multiple transactions, recovering a list of transactions in progress,
054 * and so on. A JTA aware JMS provider must fully implement this functionality.
055 * This could be done by using the services of a database that supports XA, or a
056 * JMS provider may choose to implement this functionality from scratch. <p/>
057 *
058 *
059 * @see javax.jms.Session
060 * @see javax.jms.QueueSession
061 * @see javax.jms.TopicSession
062 * @see javax.jms.XASession
063 */
064public class TransactionContext implements XAResource {
065
066    public static final String xaErrorCodeMarker = "xaErrorCode:";
067    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
068
069    // XATransactionId -> ArrayList of TransactionContext objects
070    private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS =
071                new HashMap<TransactionId, List<TransactionContext>>();
072
073    private ActiveMQConnection connection;
074    private final LongSequenceGenerator localTransactionIdGenerator;
075    private List<Synchronization> synchronizations;
076
077    // To track XA transactions.
078    private Xid associatedXid;
079    private TransactionId transactionId;
080    private LocalTransactionEventListener localTransactionEventListener;
081    private int beforeEndIndex;
082    private volatile boolean rollbackOnly;
083
084    // for RAR recovery
085    public TransactionContext() {
086        localTransactionIdGenerator = null;
087    }
088
089    public TransactionContext(ActiveMQConnection connection) {
090        this.connection = connection;
091        this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
092    }
093
094    public boolean isInXATransaction() {
095        if (transactionId != null && transactionId.isXATransaction()) {
096                return true;
097        } else {
098            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
099                for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
100                      if (transactions.contains(this)) {
101                          return true;
102                      }
103                }
104            }
105        }
106
107        return false;
108    }
109
110    public void setRollbackOnly(boolean val) {
111        rollbackOnly = val;
112    }
113
114    public boolean isInLocalTransaction() {
115        return transactionId != null && transactionId.isLocalTransaction();
116    }
117
118    public boolean isInTransaction() {
119        return transactionId != null;
120    }
121
122    /**
123     * @return Returns the localTransactionEventListener.
124     */
125    public LocalTransactionEventListener getLocalTransactionEventListener() {
126        return localTransactionEventListener;
127    }
128
129    /**
130     * Used by the resource adapter to listen to transaction events.
131     *
132     * @param localTransactionEventListener The localTransactionEventListener to
133     *                set.
134     */
135    public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
136        this.localTransactionEventListener = localTransactionEventListener;
137    }
138
139    // ///////////////////////////////////////////////////////////
140    //
141    // Methods that work with the Synchronization objects registered with
142    // the transaction.
143    //
144    // ///////////////////////////////////////////////////////////
145
146    public void addSynchronization(Synchronization s) {
147        if (synchronizations == null) {
148            synchronizations = new ArrayList<Synchronization>(10);
149        }
150        synchronizations.add(s);
151    }
152
153    private void afterRollback() throws JMSException {
154        if (synchronizations == null) {
155            return;
156        }
157
158        Throwable firstException = null;
159        int size = synchronizations.size();
160        for (int i = 0; i < size; i++) {
161            try {
162                synchronizations.get(i).afterRollback();
163            } catch (Throwable t) {
164                LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t);
165                if (firstException == null) {
166                    firstException = t;
167                }
168            }
169        }
170        synchronizations = null;
171        if (firstException != null) {
172            throw JMSExceptionSupport.create(firstException);
173        }
174    }
175
176    private void afterCommit() throws JMSException {
177        if (synchronizations == null) {
178            return;
179        }
180
181        Throwable firstException = null;
182        int size = synchronizations.size();
183        for (int i = 0; i < size; i++) {
184            try {
185                synchronizations.get(i).afterCommit();
186            } catch (Throwable t) {
187                LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t);
188                if (firstException == null) {
189                    firstException = t;
190                }
191            }
192        }
193        synchronizations = null;
194        if (firstException != null) {
195            throw JMSExceptionSupport.create(firstException);
196        }
197    }
198
199    private void beforeEnd() throws JMSException {
200        if (synchronizations == null) {
201            return;
202        }
203
204        int size = synchronizations.size();
205        try {
206            for (;beforeEndIndex < size;) {
207                synchronizations.get(beforeEndIndex++).beforeEnd();
208            }
209        } catch (JMSException e) {
210            throw e;
211        } catch (Throwable e) {
212            throw JMSExceptionSupport.create(e);
213        }
214    }
215
216    public TransactionId getTransactionId() {
217        return transactionId;
218    }
219
220    // ///////////////////////////////////////////////////////////
221    //
222    // Local transaction interface.
223    //
224    // ///////////////////////////////////////////////////////////
225
226    /**
227     * Start a local transaction.
228     * @throws javax.jms.JMSException on internal error
229     */
230    public void begin() throws JMSException {
231
232        if (isInXATransaction()) {
233            throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
234        }
235
236        if (transactionId == null) {
237            synchronizations = null;
238            beforeEndIndex = 0;
239            setRollbackOnly(false);
240            this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId());
241            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
242            this.connection.ensureConnectionInfoSent();
243            this.connection.asyncSendPacket(info);
244
245            // Notify the listener that the tx was started.
246            if (localTransactionEventListener != null) {
247                localTransactionEventListener.beginEvent();
248            }
249            if (LOG.isDebugEnabled()) {
250                LOG.debug("Begin:" + transactionId);
251            }
252        }
253
254    }
255
256    /**
257     * Rolls back any work done in this transaction and releases any locks
258     * currently held.
259     *
260     * @throws JMSException if the JMS provider fails to roll back the
261     *                 transaction due to some internal error.
262     * @throws javax.jms.IllegalStateException if the method is not called by a
263     *                 transacted session.
264     */
265    public void rollback() throws JMSException {
266        if (isInXATransaction()) {
267            throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
268        }
269
270        try {
271            beforeEnd();
272        } catch (TransactionRolledBackException canOcurrOnFailover) {
273            LOG.warn("rollback processing error", canOcurrOnFailover);
274        }
275        if (transactionId != null) {
276            if (LOG.isDebugEnabled()) {
277                LOG.debug("Rollback: "  + transactionId
278                + " syncCount: "
279                + (synchronizations != null ? synchronizations.size() : 0));
280            }
281
282            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
283            this.transactionId = null;
284            //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
285            this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0);
286            // Notify the listener that the tx was rolled back
287            if (localTransactionEventListener != null) {
288                localTransactionEventListener.rollbackEvent();
289            }
290        }
291
292        afterRollback();
293    }
294
295    /**
296     * Commits all work done in this transaction and releases any locks
297     * currently held.
298     *
299     * @throws JMSException if the JMS provider fails to commit the transaction
300     *                 due to some internal error.
301     * @throws javax.jms.IllegalStateException if the method is not called by a
302     *                 transacted session.
303     */
304    public void commit() throws JMSException {
305        if (isInXATransaction()) {
306            throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
307        }
308
309        try {
310            beforeEnd();
311        } catch (JMSException e) {
312            rollback();
313            throw e;
314        }
315
316        if (transactionId != null && rollbackOnly) {
317            final String message = "Commit of " + transactionId + "  failed due to rollback only request; typically due to failover with pending acks";
318            try {
319                rollback();
320            } finally {
321                LOG.warn(message);
322                throw new TransactionRolledBackException(message);
323            }
324        }
325
326        // Only send commit if the transaction was started.
327        if (transactionId != null) {
328            if (LOG.isDebugEnabled()) {
329                LOG.debug("Commit: "  + transactionId
330                        + " syncCount: "
331                        + (synchronizations != null ? synchronizations.size() : 0));
332            }
333
334            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
335            this.transactionId = null;
336            // Notify the listener that the tx was committed back
337            try {
338                this.connection.syncSendPacket(info);
339                if (localTransactionEventListener != null) {
340                    localTransactionEventListener.commitEvent();
341                }
342                afterCommit();
343            } catch (JMSException cause) {
344                LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
345                if (localTransactionEventListener != null) {
346                    localTransactionEventListener.rollbackEvent();
347                }
348                afterRollback();
349                throw cause;
350            }
351
352        }
353    }
354
355    // ///////////////////////////////////////////////////////////
356    //
357    // XAResource Implementation
358    //
359    // ///////////////////////////////////////////////////////////
360    /**
361     * Associates a transaction with the resource.
362     */
363    public void start(Xid xid, int flags) throws XAException {
364
365        if (LOG.isDebugEnabled()) {
366            LOG.debug("Start: " + xid + ", flags:" + flags);
367        }
368        if (isInLocalTransaction()) {
369            throw new XAException(XAException.XAER_PROTO);
370        }
371        // Are we already associated?
372        if (associatedXid != null) {
373            throw new XAException(XAException.XAER_PROTO);
374        }
375
376        // if ((flags & TMJOIN) == TMJOIN) {
377        // TODO: verify that the server has seen the xid
378        // // }
379        // if ((flags & TMJOIN) == TMRESUME) {
380        // // TODO: verify that the xid was suspended.
381        // }
382
383        // associate
384        synchronizations = null;
385        beforeEndIndex = 0;
386        setRollbackOnly(false);
387        setXid(xid);
388    }
389
390    /**
391     * @return connectionId for connection
392     */
393    private ConnectionId getConnectionId() {
394        return connection.getConnectionInfo().getConnectionId();
395    }
396
397    public void end(Xid xid, int flags) throws XAException {
398
399        if (LOG.isDebugEnabled()) {
400            LOG.debug("End: " + xid + ", flags:" + flags);
401        }
402
403        if (isInLocalTransaction()) {
404            throw new XAException(XAException.XAER_PROTO);
405        }
406
407        if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
408            // You can only suspend the associated xid.
409            if (!equals(associatedXid, xid)) {
410                throw new XAException(XAException.XAER_PROTO);
411            }
412            invokeBeforeEnd();
413        } else if ((flags & TMSUCCESS) == TMSUCCESS) {
414            // set to null if this is the current xid.
415            // otherwise this could be an asynchronous success call
416            if (equals(associatedXid, xid)) {
417                invokeBeforeEnd();
418            }
419        } else {
420            throw new XAException(XAException.XAER_INVAL);
421        }
422    }
423
424    private void invokeBeforeEnd() throws XAException {
425        boolean throwingException = false;
426        try {
427            beforeEnd();
428        } catch (JMSException e) {
429            throwingException = true;
430            throw toXAException(e);
431        } finally {
432            try {
433                setXid(null);
434            } catch (XAException ignoreIfWillMask){
435                if (!throwingException) {
436                    throw ignoreIfWillMask;
437                }
438            }
439        }
440    }
441
442    private boolean equals(Xid xid1, Xid xid2) {
443        if (xid1 == xid2) {
444            return true;
445        }
446        if (xid1 == null ^ xid2 == null) {
447            return false;
448        }
449        return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
450               && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
451    }
452
453    public int prepare(Xid xid) throws XAException {
454        if (LOG.isDebugEnabled()) {
455            LOG.debug("Prepare: " + xid);
456        }
457
458        // We allow interleaving multiple transactions, so
459        // we don't limit prepare to the associated xid.
460        XATransactionId x;
461        // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
462        // called first
463        if (xid == null || (equals(associatedXid, xid))) {
464            throw new XAException(XAException.XAER_PROTO);
465        } else {
466            // TODO: cache the known xids so we don't keep recreating this one??
467            x = new XATransactionId(xid);
468        }
469
470        if (rollbackOnly) {
471            LOG.warn("prepare of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks");
472            throw new XAException(XAException.XA_RBINTEGRITY);
473        }
474
475        try {
476            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
477
478            // Find out if the server wants to commit or rollback.
479            IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
480            if (XAResource.XA_RDONLY == response.getResult()) {
481                // transaction stops now, may be syncs that need a callback
482                List<TransactionContext> l;
483                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
484                    l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
485                }
486                // After commit may be expensive and can deadlock, do it outside global synch block
487                // No risk for concurrent updates as we own the list now
488                if (l != null) {
489                    if(! l.isEmpty()) {
490                        LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid);
491                        for (TransactionContext ctx : l) {
492                            ctx.afterCommit();
493                        }
494                    }
495                }
496            }
497            return response.getResult();
498
499        } catch (JMSException e) {
500            LOG.warn("prepare of: " + x + " failed with: " + e, e);
501            List<TransactionContext> l;
502            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
503                l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
504            }
505            // After rollback may be expensive and can deadlock, do it outside global synch block
506            // No risk for concurrent updates as we own the list now
507            if (l != null) {
508                for (TransactionContext ctx : l) {
509                    try {
510                        ctx.afterRollback();
511                    } catch (Throwable ignored) {
512                        LOG.debug("failed to firing afterRollback callbacks on prepare " +
513                                  "failure, txid: {}, context: {}", x, ctx, ignored);
514                    }
515                }
516            }
517            throw toXAException(e);
518        }
519    }
520
521    public void rollback(Xid xid) throws XAException {
522
523        if (LOG.isDebugEnabled()) {
524            LOG.debug("Rollback: " + xid);
525        }
526
527        // We allow interleaving multiple transactions, so
528        // we don't limit rollback to the associated xid.
529        XATransactionId x;
530        if (xid == null) {
531            throw new XAException(XAException.XAER_PROTO);
532        }
533        if (equals(associatedXid, xid)) {
534            // I think this can happen even without an end(xid) call. Need to
535            // check spec.
536            x = (XATransactionId)transactionId;
537        } else {
538            x = new XATransactionId(xid);
539        }
540
541        try {
542            this.connection.checkClosedOrFailed();
543            this.connection.ensureConnectionInfoSent();
544
545            // Let the server know that the tx is rollback.
546            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
547            this.connection.syncSendPacket(info);
548
549            List<TransactionContext> l;
550            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
551                l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
552            }
553            // After rollback may be expensive and can deadlock, do it outside global synch block
554            // No risk for concurrent updates as we own the list now
555            if (l != null) {
556                for (TransactionContext ctx : l) {
557                    ctx.afterRollback();
558                }
559            }
560        } catch (JMSException e) {
561            throw toXAException(e);
562        }
563    }
564
565    // XAResource interface
566    public void commit(Xid xid, boolean onePhase) throws XAException {
567
568        if (LOG.isDebugEnabled()) {
569            LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
570        }
571
572        // We allow interleaving multiple transactions, so
573        // we don't limit commit to the associated xid.
574        XATransactionId x;
575        if (xid == null || (equals(associatedXid, xid))) {
576            // should never happen, end(xid,TMSUCCESS) must have been previously
577            // called
578            throw new XAException(XAException.XAER_PROTO);
579        } else {
580            x = new XATransactionId(xid);
581        }
582
583        if (rollbackOnly) {
584             LOG.warn("commit of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks");
585             throw new XAException(XAException.XA_RBINTEGRITY);
586         }
587
588        try {
589            this.connection.checkClosedOrFailed();
590            this.connection.ensureConnectionInfoSent();
591
592            // Notify the server that the tx was committed back
593            TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
594
595            this.connection.syncSendPacket(info);
596
597            List<TransactionContext> l;
598            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
599                l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
600            }
601            // After commit may be expensive and can deadlock, do it outside global synch block
602            // No risk for concurrent updates as we own the list now
603            if (l != null) {
604                for (TransactionContext ctx : l) {
605                    try {
606                        ctx.afterCommit();
607                    } catch (Exception ignored) {
608                        LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored);
609                    }
610                }
611            }
612
613        } catch (JMSException e) {
614            LOG.warn("commit of: " + x + " failed with: " + e, e);
615            if (onePhase) {
616                List<TransactionContext> l;
617                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
618                    l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
619                }
620                // After rollback may be expensive and can deadlock, do it outside global synch block
621                // No risk for concurrent updates as we own the list now
622                if (l != null) {
623                    for (TransactionContext ctx : l) {
624                        try {
625                            ctx.afterRollback();
626                        } catch (Throwable ignored) {
627                            LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored);
628                        }
629                    }
630                }
631            }
632            throw toXAException(e);
633        }
634
635    }
636
637    public void forget(Xid xid) throws XAException {
638        if (LOG.isDebugEnabled()) {
639            LOG.debug("Forget: " + xid);
640        }
641
642        // We allow interleaving multiple transactions, so
643        // we don't limit forget to the associated xid.
644        XATransactionId x;
645        if (xid == null) {
646            throw new XAException(XAException.XAER_PROTO);
647        }
648        if (equals(associatedXid, xid)) {
649            // TODO determine if this can happen... I think not.
650            x = (XATransactionId)transactionId;
651        } else {
652            x = new XATransactionId(xid);
653        }
654
655        TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
656
657        try {
658            // Tell the server to forget the transaction.
659            this.connection.syncSendPacket(info);
660        } catch (JMSException e) {
661            throw toXAException(e);
662        }
663        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
664                ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
665        }
666    }
667
668    public boolean isSameRM(XAResource xaResource) throws XAException {
669        if (xaResource == null) {
670            return false;
671        }
672        if (!(xaResource instanceof TransactionContext)) {
673            return false;
674        }
675        TransactionContext xar = (TransactionContext)xaResource;
676        try {
677            return getResourceManagerId().equals(xar.getResourceManagerId());
678        } catch (Throwable e) {
679            throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
680        }
681    }
682
683    public Xid[] recover(int flag) throws XAException {
684        LOG.debug("recover({})", flag);
685        XATransactionId[] answer;
686
687        if (XAResource.TMNOFLAGS == flag) {
688            // signal next in cursor scan, which for us is always the end b/c we don't maintain any cursor state
689            // allows looping scan to complete
690            answer = new XATransactionId[0];
691        } else {
692            TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
693            try {
694                this.connection.checkClosedOrFailed();
695                this.connection.ensureConnectionInfoSent();
696
697                DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info);
698                DataStructure[] data = receipt.getData();
699                if (data instanceof XATransactionId[]) {
700                    answer = (XATransactionId[]) data;
701                } else {
702                    answer = new XATransactionId[data.length];
703                    System.arraycopy(data, 0, answer, 0, data.length);
704                }
705            } catch (JMSException e) {
706                throw toXAException(e);
707            }
708        }
709        LOG.debug("recover({})={}", flag, answer);
710        return answer;
711    }
712
713    public int getTransactionTimeout() throws XAException {
714        return 0;
715    }
716
717    public boolean setTransactionTimeout(int seconds) throws XAException {
718        return false;
719    }
720
721    // ///////////////////////////////////////////////////////////
722    //
723    // Helper methods.
724    //
725    // ///////////////////////////////////////////////////////////
726    protected String getResourceManagerId() throws JMSException {
727        return this.connection.getResourceManagerId();
728    }
729
730    private void setXid(Xid xid) throws XAException {
731
732        try {
733            this.connection.checkClosedOrFailed();
734            this.connection.ensureConnectionInfoSent();
735        } catch (JMSException e) {
736            disassociate();
737            throw toXAException(e);
738        }
739
740        if (xid != null) {
741            // associate
742            associatedXid = xid;
743            transactionId = new XATransactionId(xid);
744
745            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
746            try {
747                this.connection.asyncSendPacket(info);
748                if (LOG.isDebugEnabled()) {
749                    LOG.debug("{} started XA transaction {} ", this, transactionId);
750                }
751            } catch (JMSException e) {
752                disassociate();
753                throw toXAException(e);
754            }
755
756        } else {
757
758            if (transactionId != null) {
759                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END);
760                try {
761                    this.connection.syncSendPacket(info);
762                    LOG.debug("{} ended XA transaction {}", this, transactionId);
763                } catch (JMSException e) {
764                    disassociate();
765                    throw toXAException(e);
766                }
767
768                // Add our self to the list of contexts that are interested in
769                // post commit/rollback events.
770                List<TransactionContext> l;
771                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
772                    l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
773                    if (l == null) {
774                        l = new ArrayList<TransactionContext>(3);
775                        ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
776                    }
777                    if (!l.contains(this)) {
778                        l.add(this);
779                    }
780                }
781            }
782
783            disassociate();
784        }
785    }
786
787    private void disassociate() {
788         // dis-associate
789         associatedXid = null;
790         transactionId = null;
791    }
792
793    /**
794     * Converts a JMSException from the server to an XAException. if the
795     * JMSException contained a linked XAException that is returned instead.
796     *
797     * @param e JMSException to convert
798     * @return XAException wrapping original exception or its message
799     */
800    private XAException toXAException(JMSException e) {
801        if (e.getCause() != null && e.getCause() instanceof XAException) {
802            XAException original = (XAException)e.getCause();
803            XAException xae = new XAException(original.getMessage());
804            xae.errorCode = original.errorCode;
805            if (xae.errorCode == XA_OK) {
806                // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable
807                xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR);
808            }
809            xae.initCause(original);
810            return xae;
811        }
812
813        XAException xae = new XAException(e.getMessage());
814        xae.errorCode = XAException.XAER_RMFAIL;
815        xae.initCause(e);
816        return xae;
817    }
818
819    private int parseFromMessageOr(String message, int fallbackCode) {
820        final String marker = "xaErrorCode:";
821        final int index = message.lastIndexOf(marker);
822        if (index > -1) {
823            try {
824                return Integer.parseInt(message.substring(index + marker.length()));
825            } catch (Exception ignored) {}
826        }
827        return fallbackCode;
828    }
829
830    public ActiveMQConnection getConnection() {
831        return connection;
832    }
833
834
835    // for RAR xa recovery where xaresource connection is per request
836    public ActiveMQConnection setConnection(ActiveMQConnection connection) {
837        ActiveMQConnection existing = this.connection;
838        this.connection = connection;
839        return existing;
840    }
841
842    public void cleanup() {
843        associatedXid = null;
844        transactionId = null;
845    }
846
847    @Override
848    public String toString() {
849        return "TransactionContext{" +
850                "transactionId=" + transactionId +
851                ",connection=" + connection +
852                '}';
853    }
854}