001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.util.ArrayList;
020import java.util.Arrays;
021import java.util.HashMap;
022import java.util.List;
023
024import javax.jms.JMSException;
025import javax.jms.TransactionInProgressException;
026import javax.jms.TransactionRolledBackException;
027import javax.transaction.xa.XAException;
028import javax.transaction.xa.XAResource;
029import javax.transaction.xa.Xid;
030
031import org.apache.activemq.command.ConnectionId;
032import org.apache.activemq.command.DataArrayResponse;
033import org.apache.activemq.command.DataStructure;
034import org.apache.activemq.command.IntegerResponse;
035import org.apache.activemq.command.LocalTransactionId;
036import org.apache.activemq.command.TransactionId;
037import org.apache.activemq.command.TransactionInfo;
038import org.apache.activemq.command.XATransactionId;
039import org.apache.activemq.transaction.Synchronization;
040import org.apache.activemq.transport.failover.FailoverTransport;
041import org.apache.activemq.util.JMSExceptionSupport;
042import org.apache.activemq.util.LongSequenceGenerator;
043import org.slf4j.Logger;
044import org.slf4j.LoggerFactory;
045
046/**
047 * A TransactionContext provides the means to control a JMS transaction. It
048 * provides a local transaction interface and also an XAResource interface. <p/>
049 * An application server controls the transactional assignment of an XASession
050 * by obtaining its XAResource. It uses the XAResource to assign the session to
051 * a transaction, prepare and commit work on the transaction, and so on. <p/> An
052 * XAResource provides some fairly sophisticated facilities for interleaving
053 * work on multiple transactions, recovering a list of transactions in progress,
054 * and so on. A JTA aware JMS provider must fully implement this functionality.
055 * This could be done by using the services of a database that supports XA, or a
056 * JMS provider may choose to implement this functionality from scratch. <p/>
057 *
058 *
059 * @see javax.jms.Session
060 * @see javax.jms.QueueSession
061 * @see javax.jms.TopicSession
062 * @see javax.jms.XASession
063 */
064public class TransactionContext implements XAResource {
065
066    public static final String xaErrorCodeMarker = "xaErrorCode:";
067    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
068
069    // XATransactionId -> ArrayList of TransactionContext objects
070    private final static HashMap<TransactionId, List<TransactionContext>> ENDED_XA_TRANSACTION_CONTEXTS =
071                new HashMap<TransactionId, List<TransactionContext>>();
072
073    private ActiveMQConnection connection;
074    private final LongSequenceGenerator localTransactionIdGenerator;
075    private List<Synchronization> synchronizations;
076
077    // To track XA transactions.
078    private Xid associatedXid;
079    private TransactionId transactionId;
080    private LocalTransactionEventListener localTransactionEventListener;
081    private int beforeEndIndex;
082    private volatile boolean rollbackOnly;
083
084    // for RAR recovery
085    public TransactionContext() {
086        localTransactionIdGenerator = null;
087    }
088
089    public TransactionContext(ActiveMQConnection connection) {
090        this.connection = connection;
091        this.localTransactionIdGenerator = connection.getLocalTransactionIdGenerator();
092    }
093
094    public boolean isInXATransaction() {
095        if (transactionId != null && transactionId.isXATransaction()) {
096                return true;
097        } else {
098            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
099                for(List<TransactionContext> transactions : ENDED_XA_TRANSACTION_CONTEXTS.values()) {
100                      if (transactions.contains(this)) {
101                          return true;
102                      }
103                }
104            }
105        }
106
107        return false;
108    }
109
110    public void setRollbackOnly(boolean val) {
111        rollbackOnly = val;
112    }
113
114    public boolean isInLocalTransaction() {
115        return transactionId != null && transactionId.isLocalTransaction();
116    }
117
118    public boolean isInTransaction() {
119        return transactionId != null;
120    }
121
122    /**
123     * @return Returns the localTransactionEventListener.
124     */
125    public LocalTransactionEventListener getLocalTransactionEventListener() {
126        return localTransactionEventListener;
127    }
128
129    /**
130     * Used by the resource adapter to listen to transaction events.
131     *
132     * @param localTransactionEventListener The localTransactionEventListener to
133     *                set.
134     */
135    public void setLocalTransactionEventListener(LocalTransactionEventListener localTransactionEventListener) {
136        this.localTransactionEventListener = localTransactionEventListener;
137    }
138
139    // ///////////////////////////////////////////////////////////
140    //
141    // Methods that work with the Synchronization objects registered with
142    // the transaction.
143    //
144    // ///////////////////////////////////////////////////////////
145
146    public void addSynchronization(Synchronization s) {
147        if (synchronizations == null) {
148            synchronizations = new ArrayList<Synchronization>(10);
149        }
150        synchronizations.add(s);
151    }
152
153    private void afterRollback() throws JMSException {
154        if (synchronizations == null) {
155            return;
156        }
157
158        Throwable firstException = null;
159        int size = synchronizations.size();
160        for (int i = 0; i < size; i++) {
161            try {
162                synchronizations.get(i).afterRollback();
163            } catch (Throwable t) {
164                LOG.debug("Exception from afterRollback on " + synchronizations.get(i), t);
165                if (firstException == null) {
166                    firstException = t;
167                }
168            }
169        }
170        synchronizations = null;
171        if (firstException != null) {
172            throw JMSExceptionSupport.create(firstException);
173        }
174    }
175
176    private void afterCommit() throws JMSException {
177        if (synchronizations == null) {
178            return;
179        }
180
181        Throwable firstException = null;
182        int size = synchronizations.size();
183        for (int i = 0; i < size; i++) {
184            try {
185                synchronizations.get(i).afterCommit();
186            } catch (Throwable t) {
187                LOG.debug("Exception from afterCommit on " + synchronizations.get(i), t);
188                if (firstException == null) {
189                    firstException = t;
190                }
191            }
192        }
193        synchronizations = null;
194        if (firstException != null) {
195            throw JMSExceptionSupport.create(firstException);
196        }
197    }
198
199    private void beforeEnd() throws JMSException {
200        if (synchronizations == null) {
201            return;
202        }
203
204        int size = synchronizations.size();
205        try {
206            for (;beforeEndIndex < size;) {
207                synchronizations.get(beforeEndIndex++).beforeEnd();
208            }
209        } catch (JMSException e) {
210            throw e;
211        } catch (Throwable e) {
212            throw JMSExceptionSupport.create(e);
213        }
214    }
215
216    public TransactionId getTransactionId() {
217        return transactionId;
218    }
219
220    // ///////////////////////////////////////////////////////////
221    //
222    // Local transaction interface.
223    //
224    // ///////////////////////////////////////////////////////////
225
226    /**
227     * Start a local transaction.
228     * @throws javax.jms.JMSException on internal error
229     */
230    public void begin() throws JMSException {
231
232        if (isInXATransaction()) {
233            throw new TransactionInProgressException("Cannot start local transaction.  XA transaction is already in progress.");
234        }
235
236        if (transactionId == null) {
237            synchronizations = null;
238            beforeEndIndex = 0;
239            setRollbackOnly(false);
240            this.transactionId = new LocalTransactionId(getConnectionId(), localTransactionIdGenerator.getNextSequenceId());
241            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
242            this.connection.ensureConnectionInfoSent();
243            this.connection.asyncSendPacket(info);
244
245            // Notify the listener that the tx was started.
246            if (localTransactionEventListener != null) {
247                localTransactionEventListener.beginEvent();
248            }
249            if (LOG.isDebugEnabled()) {
250                LOG.debug("Begin:" + transactionId);
251            }
252        }
253
254    }
255
256    /**
257     * Rolls back any work done in this transaction and releases any locks
258     * currently held.
259     *
260     * @throws JMSException if the JMS provider fails to roll back the
261     *                 transaction due to some internal error.
262     * @throws javax.jms.IllegalStateException if the method is not called by a
263     *                 transacted session.
264     */
265    public void rollback() throws JMSException {
266        if (isInXATransaction()) {
267            throw new TransactionInProgressException("Cannot rollback() if an XA transaction is already in progress ");
268        }
269
270        try {
271            beforeEnd();
272        } catch (TransactionRolledBackException canOcurrOnFailover) {
273            LOG.warn("rollback processing error", canOcurrOnFailover);
274        }
275        if (transactionId != null) {
276            if (LOG.isDebugEnabled()) {
277                LOG.debug("Rollback: "  + transactionId
278                + " syncCount: "
279                + (synchronizations != null ? synchronizations.size() : 0));
280            }
281
282            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.ROLLBACK);
283            this.transactionId = null;
284            //make this synchronous - see https://issues.apache.org/activemq/browse/AMQ-2364
285            this.connection.syncSendPacket(info, this.connection.isClosing() ? this.connection.getCloseTimeout() : 0);
286            // Notify the listener that the tx was rolled back
287            if (localTransactionEventListener != null) {
288                localTransactionEventListener.rollbackEvent();
289            }
290        }
291
292        afterRollback();
293    }
294
295    /**
296     * Commits all work done in this transaction and releases any locks
297     * currently held.
298     *
299     * @throws JMSException if the JMS provider fails to commit the transaction
300     *                 due to some internal error.
301     * @throws javax.jms.IllegalStateException if the method is not called by a
302     *                 transacted session.
303     */
304    public void commit() throws JMSException {
305        if (isInXATransaction()) {
306            throw new TransactionInProgressException("Cannot commit() if an XA transaction is already in progress ");
307        }
308
309        try {
310            beforeEnd();
311        } catch (JMSException e) {
312            rollback();
313            throw e;
314        }
315
316        if (transactionId != null && rollbackOnly) {
317            final String message = "Commit of " + transactionId + "  failed due to rollback only request; typically due to failover with pending acks";
318            try {
319                rollback();
320            } finally {
321                LOG.warn(message);
322                throw new TransactionRolledBackException(message);
323            }
324        }
325
326        // Only send commit if the transaction was started.
327        if (transactionId != null) {
328            if (LOG.isDebugEnabled()) {
329                LOG.debug("Commit: "  + transactionId
330                        + " syncCount: "
331                        + (synchronizations != null ? synchronizations.size() : 0));
332            }
333
334            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.COMMIT_ONE_PHASE);
335            this.transactionId = null;
336            // Notify the listener that the tx was committed back
337            try {
338                this.connection.syncSendPacket(info);
339                if (localTransactionEventListener != null) {
340                    localTransactionEventListener.commitEvent();
341                }
342                afterCommit();
343            } catch (JMSException cause) {
344                LOG.info("commit failed for transaction " + info.getTransactionId(), cause);
345                if (localTransactionEventListener != null) {
346                    localTransactionEventListener.rollbackEvent();
347                }
348                afterRollback();
349                throw cause;
350            }
351
352        }
353    }
354
355    // ///////////////////////////////////////////////////////////
356    //
357    // XAResource Implementation
358    //
359    // ///////////////////////////////////////////////////////////
360    /**
361     * Associates a transaction with the resource.
362     */
363    public void start(Xid xid, int flags) throws XAException {
364
365        if (LOG.isDebugEnabled()) {
366            LOG.debug("Start: " + xid + ", flags:" + flags);
367        }
368        if (isInLocalTransaction()) {
369            throw new XAException(XAException.XAER_PROTO);
370        }
371        // Are we already associated?
372        if (associatedXid != null) {
373            throw new XAException(XAException.XAER_PROTO);
374        }
375
376        // if ((flags & TMJOIN) == TMJOIN) {
377        // TODO: verify that the server has seen the xid
378        // // }
379        // if ((flags & TMJOIN) == TMRESUME) {
380        // // TODO: verify that the xid was suspended.
381        // }
382
383        // associate
384        synchronizations = null;
385        beforeEndIndex = 0;
386        setRollbackOnly(false);
387        setXid(xid);
388    }
389
390    /**
391     * @return connectionId for connection
392     */
393    private ConnectionId getConnectionId() {
394        return connection.getConnectionInfo().getConnectionId();
395    }
396
397    public void end(Xid xid, int flags) throws XAException {
398
399        if (LOG.isDebugEnabled()) {
400            LOG.debug("End: " + xid + ", flags:" + flags);
401        }
402
403        if (isInLocalTransaction()) {
404            throw new XAException(XAException.XAER_PROTO);
405        }
406
407        if ((flags & (TMSUSPEND | TMFAIL)) != 0) {
408            // You can only suspend the associated xid.
409            if (!equals(associatedXid, xid)) {
410                throw new XAException(XAException.XAER_PROTO);
411            }
412            invokeBeforeEnd();
413        } else if ((flags & TMSUCCESS) == TMSUCCESS) {
414            // set to null if this is the current xid.
415            // otherwise this could be an asynchronous success call
416            if (equals(associatedXid, xid)) {
417                invokeBeforeEnd();
418            }
419        } else {
420            throw new XAException(XAException.XAER_INVAL);
421        }
422    }
423
424    private void invokeBeforeEnd() throws XAException {
425        boolean throwingException = false;
426        try {
427            beforeEnd();
428        } catch (JMSException e) {
429            throwingException = true;
430            throw toXAException(e);
431        } finally {
432            try {
433                setXid(null);
434            } catch (XAException ignoreIfWillMask){
435                if (!throwingException) {
436                    throw ignoreIfWillMask;
437                }
438            }
439        }
440    }
441
442    private boolean equals(Xid xid1, Xid xid2) {
443        if (xid1 == xid2) {
444            return true;
445        }
446        if (xid1 == null ^ xid2 == null) {
447            return false;
448        }
449        return xid1.getFormatId() == xid2.getFormatId() && Arrays.equals(xid1.getBranchQualifier(), xid2.getBranchQualifier())
450               && Arrays.equals(xid1.getGlobalTransactionId(), xid2.getGlobalTransactionId());
451    }
452
453    public int prepare(Xid xid) throws XAException {
454        if (LOG.isDebugEnabled()) {
455            LOG.debug("Prepare: " + xid);
456        }
457
458        // We allow interleaving multiple transactions, so
459        // we don't limit prepare to the associated xid.
460        XATransactionId x;
461        // THIS SHOULD NEVER HAPPEN because end(xid, TMSUCCESS) should have been
462        // called first
463        if (xid == null || (equals(associatedXid, xid))) {
464            throw new XAException(XAException.XAER_PROTO);
465        } else {
466            // TODO: cache the known xids so we don't keep recreating this one??
467            x = new XATransactionId(xid);
468        }
469
470        if (rollbackOnly) {
471            LOG.warn("prepare of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks");
472            throw new XAException(XAException.XA_RBINTEGRITY);
473        }
474
475        try {
476            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.PREPARE);
477
478            // Find out if the server wants to commit or rollback.
479            IntegerResponse response = (IntegerResponse)this.connection.syncSendPacket(info);
480            if (XAResource.XA_RDONLY == response.getResult()) {
481                // transaction stops now, may be syncs that need a callback
482                List<TransactionContext> l;
483                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
484                    l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
485                }
486                // After commit may be expensive and can deadlock, do it outside global synch block
487                // No risk for concurrent updates as we own the list now
488                if (l != null) {
489                    if(! l.isEmpty()) {
490                        LOG.debug("firing afterCommit callbacks on XA_RDONLY from prepare: {}", xid);
491                        for (TransactionContext ctx : l) {
492                            ctx.afterCommit();
493                        }
494                    }
495                }
496            }
497            return response.getResult();
498
499        } catch (JMSException e) {
500            LOG.warn("prepare of: " + x + " failed with: " + e, e);
501            List<TransactionContext> l;
502            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
503                l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
504            }
505            // After rollback may be expensive and can deadlock, do it outside global synch block
506            // No risk for concurrent updates as we own the list now
507            if (l != null) {
508                for (TransactionContext ctx : l) {
509                    try {
510                        ctx.afterRollback();
511                    } catch (Throwable ignored) {
512                        LOG.debug("failed to firing afterRollback callbacks on prepare " +
513                                  "failure, txid: {}, context: {}", x, ctx, ignored);
514                    }
515                }
516            }
517            throw toXAException(e);
518        }
519    }
520
521    public void rollback(Xid xid) throws XAException {
522
523        if (LOG.isDebugEnabled()) {
524            LOG.debug("Rollback: " + xid);
525        }
526
527        // We allow interleaving multiple transactions, so
528        // we don't limit rollback to the associated xid.
529        XATransactionId x;
530        if (xid == null) {
531            throw new XAException(XAException.XAER_PROTO);
532        }
533        if (equals(associatedXid, xid)) {
534            // I think this can happen even without an end(xid) call. Need to
535            // check spec.
536            x = (XATransactionId)transactionId;
537        } else {
538            x = new XATransactionId(xid);
539        }
540
541        try {
542            this.connection.checkClosedOrFailed();
543            this.connection.ensureConnectionInfoSent();
544
545            // Let the server know that the tx is rollback.
546            TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.ROLLBACK);
547            this.connection.syncSendPacket(info);
548
549            List<TransactionContext> l;
550            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
551                l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
552            }
553            // After rollback may be expensive and can deadlock, do it outside global synch block
554            // No risk for concurrent updates as we own the list now
555            if (l != null) {
556                for (TransactionContext ctx : l) {
557                    try {
558                        ctx.afterRollback();
559                    } catch (Exception ignored) {
560                        LOG.debug("ignoring exception from after rollback on ended transaction: {}", ignored, ignored);
561                    }
562                }                  
563            }
564        } catch (JMSException e) {
565            throw toXAException(e);
566        }
567    }
568
569    // XAResource interface
570    public void commit(Xid xid, boolean onePhase) throws XAException {
571
572        if (LOG.isDebugEnabled()) {
573            LOG.debug("Commit: " + xid + ", onePhase=" + onePhase);
574        }
575
576        // We allow interleaving multiple transactions, so
577        // we don't limit commit to the associated xid.
578        XATransactionId x;
579        if (xid == null || (equals(associatedXid, xid))) {
580            // should never happen, end(xid,TMSUCCESS) must have been previously
581            // called
582            throw new XAException(XAException.XAER_PROTO);
583        } else {
584            x = new XATransactionId(xid);
585        }
586
587        if (rollbackOnly) {
588             LOG.warn("commit of: " + x + " failed because it was marked rollback only; typically due to failover with pending acks");
589             throw new XAException(XAException.XA_RBINTEGRITY);
590         }
591
592        try {
593            this.connection.checkClosedOrFailed();
594            this.connection.ensureConnectionInfoSent();
595
596            // Notify the server that the tx was committed back
597            TransactionInfo info = new TransactionInfo(getConnectionId(), x, onePhase ? TransactionInfo.COMMIT_ONE_PHASE : TransactionInfo.COMMIT_TWO_PHASE);
598
599            this.connection.syncSendPacket(info);
600
601            List<TransactionContext> l;
602            synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
603                l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
604            }
605            // After commit may be expensive and can deadlock, do it outside global synch block
606            // No risk for concurrent updates as we own the list now
607            if (l != null) {
608                for (TransactionContext ctx : l) {
609                    try {
610                        ctx.afterCommit();
611                    } catch (Exception ignored) {
612                        LOG.debug("ignoring exception from after completion on ended transaction: {}", ignored, ignored);
613                    }
614                }
615            }
616
617        } catch (JMSException e) {
618            LOG.warn("commit of: " + x + " failed with: " + e, e);
619            if (onePhase) {
620                List<TransactionContext> l;
621                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
622                    l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
623                }
624                // After rollback may be expensive and can deadlock, do it outside global synch block
625                // No risk for concurrent updates as we own the list now
626                if (l != null) {
627                    for (TransactionContext ctx : l) {
628                        try {
629                            ctx.afterRollback();
630                        } catch (Throwable ignored) {
631                            LOG.debug("failed to firing afterRollback callbacks commit failure, txid: {}, context: {}", x, ctx, ignored);
632                        }
633                    }
634                }
635            }
636            throw toXAException(e);
637        }
638
639    }
640
641    public void forget(Xid xid) throws XAException {
642        if (LOG.isDebugEnabled()) {
643            LOG.debug("Forget: " + xid);
644        }
645
646        // We allow interleaving multiple transactions, so
647        // we don't limit forget to the associated xid.
648        XATransactionId x;
649        if (xid == null) {
650            throw new XAException(XAException.XAER_PROTO);
651        }
652        if (equals(associatedXid, xid)) {
653            // TODO determine if this can happen... I think not.
654            x = (XATransactionId)transactionId;
655        } else {
656            x = new XATransactionId(xid);
657        }
658
659        TransactionInfo info = new TransactionInfo(getConnectionId(), x, TransactionInfo.FORGET);
660
661        try {
662            // Tell the server to forget the transaction.
663            this.connection.syncSendPacket(info);
664        } catch (JMSException e) {
665            throw toXAException(e);
666        }
667        synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
668                ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
669        }
670    }
671
672    public boolean isSameRM(XAResource xaResource) throws XAException {
673        if (xaResource == null) {
674            return false;
675        }
676        if (!(xaResource instanceof TransactionContext)) {
677            return false;
678        }
679        TransactionContext xar = (TransactionContext)xaResource;
680        try {
681            return getResourceManagerId().equals(xar.getResourceManagerId());
682        } catch (Throwable e) {
683            throw (XAException)new XAException("Could not get resource manager id.").initCause(e);
684        }
685    }
686
687    public Xid[] recover(int flag) throws XAException {
688        LOG.debug("recover({})", flag);
689        XATransactionId[] answer;
690
691        if (XAResource.TMNOFLAGS == flag) {
692            // signal next in cursor scan, which for us is always the end b/c we don't maintain any cursor state
693            // allows looping scan to complete
694            answer = new XATransactionId[0];
695        } else {
696            TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
697            try {
698                this.connection.checkClosedOrFailed();
699                this.connection.ensureConnectionInfoSent();
700
701                DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info);
702                DataStructure[] data = receipt.getData();
703                if (data instanceof XATransactionId[]) {
704                    answer = (XATransactionId[]) data;
705                } else {
706                    answer = new XATransactionId[data.length];
707                    System.arraycopy(data, 0, answer, 0, data.length);
708                }
709            } catch (JMSException e) {
710                throw toXAException(e);
711            }
712        }
713        LOG.debug("recover({})={}", flag, answer);
714        return answer;
715    }
716
717    public int getTransactionTimeout() throws XAException {
718        return 0;
719    }
720
721    public boolean setTransactionTimeout(int seconds) throws XAException {
722        return false;
723    }
724
725    // ///////////////////////////////////////////////////////////
726    //
727    // Helper methods.
728    //
729    // ///////////////////////////////////////////////////////////
730    protected String getResourceManagerId() throws JMSException {
731        return this.connection.getResourceManagerId();
732    }
733
734    private void setXid(Xid xid) throws XAException {
735
736        try {
737            this.connection.checkClosedOrFailed();
738            this.connection.ensureConnectionInfoSent();
739        } catch (JMSException e) {
740            disassociate();
741            throw toXAException(e);
742        }
743
744        if (xid != null) {
745            // associate
746            associatedXid = xid;
747            transactionId = new XATransactionId(xid);
748
749            TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.BEGIN);
750            try {
751                this.connection.asyncSendPacket(info);
752                if (LOG.isDebugEnabled()) {
753                    LOG.debug("{} started XA transaction {} ", this, transactionId);
754                }
755            } catch (JMSException e) {
756                disassociate();
757                throw toXAException(e);
758            }
759
760        } else {
761
762            if (transactionId != null) {
763                TransactionInfo info = new TransactionInfo(getConnectionId(), transactionId, TransactionInfo.END);
764                try {
765                    this.connection.syncSendPacket(info);
766                    LOG.debug("{} ended XA transaction {}", this, transactionId);
767                } catch (JMSException e) {
768                    disassociate();
769                    throw toXAException(e);
770                }
771
772                // Add our self to the list of contexts that are interested in
773                // post commit/rollback events.
774                List<TransactionContext> l;
775                synchronized(ENDED_XA_TRANSACTION_CONTEXTS) {
776                    l = ENDED_XA_TRANSACTION_CONTEXTS.get(transactionId);
777                    if (l == null) {
778                        l = new ArrayList<TransactionContext>(3);
779                        ENDED_XA_TRANSACTION_CONTEXTS.put(transactionId, l);
780                    }
781                    if (!l.contains(this)) {
782                        l.add(this);
783                    }
784                }
785            }
786
787            disassociate();
788        }
789    }
790
791    private void disassociate() {
792         // dis-associate
793         associatedXid = null;
794         transactionId = null;
795    }
796
797    /**
798     * Converts a JMSException from the server to an XAException. if the
799     * JMSException contained a linked XAException that is returned instead.
800     *
801     * @param e JMSException to convert
802     * @return XAException wrapping original exception or its message
803     */
804    private XAException toXAException(JMSException e) {
805        if (e.getCause() != null && e.getCause() instanceof XAException) {
806            XAException original = (XAException)e.getCause();
807            XAException xae = new XAException(original.getMessage());
808            xae.errorCode = original.errorCode;
809            if (xae.errorCode == XA_OK) {
810                // detail not unmarshalled see: org.apache.activemq.openwire.v1.BaseDataStreamMarshaller.createThrowable
811                xae.errorCode = parseFromMessageOr(original.getMessage(), XAException.XAER_RMERR);
812            }
813            xae.initCause(original);
814            return xae;
815        }
816
817        XAException xae = new XAException(e.getMessage());
818        xae.errorCode = XAException.XAER_RMFAIL;
819        xae.initCause(e);
820        return xae;
821    }
822
823    private int parseFromMessageOr(String message, int fallbackCode) {
824        final String marker = "xaErrorCode:";
825        final int index = message.lastIndexOf(marker);
826        if (index > -1) {
827            try {
828                return Integer.parseInt(message.substring(index + marker.length()));
829            } catch (Exception ignored) {}
830        }
831        return fallbackCode;
832    }
833
834    public ActiveMQConnection getConnection() {
835        return connection;
836    }
837
838
839    // for RAR xa recovery where xaresource connection is per request
840    public ActiveMQConnection setConnection(ActiveMQConnection connection) {
841        ActiveMQConnection existing = this.connection;
842        this.connection = connection;
843        return existing;
844    }
845
846    public void cleanup() {
847        associatedXid = null;
848        transactionId = null;
849    }
850
851    @Override
852    public String toString() {
853        return "TransactionContext{" +
854                "transactionId=" + transactionId +
855                ",connection=" + connection +
856                '}';
857    }
858}