001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.jms.pool;
018
019import javax.jms.Connection;
020import javax.jms.JMSException;
021import javax.jms.Session;
022import javax.jms.XAConnection;
023import javax.transaction.RollbackException;
024import javax.transaction.Status;
025import javax.transaction.SystemException;
026import javax.transaction.TransactionManager;
027import javax.transaction.xa.XAResource;
028
029/**
030 * An XA-aware connection pool. When a session is created and an xa transaction
031 * is active, the session will automatically be enlisted in the current
032 * transaction.
033 */
034public class XaConnectionPool extends ConnectionPool {
035
036    private final TransactionManager transactionManager;
037
038    public XaConnectionPool(Connection connection, TransactionManager transactionManager) {
039        super(connection);
040        this.transactionManager = transactionManager;
041    }
042
043    @Override
044    protected Session makeSession(SessionKey key) throws JMSException {
045        return ((XAConnection) connection).createXASession();
046    }
047
048    @Override
049    public Session createSession(boolean transacted, int ackMode) throws JMSException {
050        try {
051            boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
052            if (isXa) {
053                // if the xa tx aborts inflight we don't want to auto create a
054                // local transaction or auto ack
055                transacted = false;
056                ackMode = Session.CLIENT_ACKNOWLEDGE;
057            } else if (transactionManager != null) {
058                // cmt or transactionManager managed
059                transacted = false;
060                if (ackMode == Session.SESSION_TRANSACTED) {
061                    ackMode = Session.AUTO_ACKNOWLEDGE;
062                }
063            }
064            PooledSession session = (PooledSession) super.createSession(transacted, ackMode);
065            if (isXa) {
066                session.setIgnoreClose(true);
067                session.setIsXa(true);
068                transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
069                incrementReferenceCount();
070                transactionManager.getTransaction().enlistResource(createXaResource(session));
071            } else {
072                session.setIgnoreClose(false);
073            }
074            return session;
075        } catch (RollbackException e) {
076            final JMSException jmsException = new JMSException("Rollback Exception");
077            jmsException.initCause(e);
078            throw jmsException;
079        } catch (SystemException e) {
080            final JMSException jmsException = new JMSException("System Exception");
081            jmsException.initCause(e);
082            throw jmsException;
083        }
084    }
085
086    protected XAResource createXaResource(PooledSession session) throws JMSException {
087        return session.getXAResource();
088    }
089
090    protected class Synchronization implements javax.transaction.Synchronization {
091        private final PooledSession session;
092
093        private Synchronization(PooledSession session) {
094            this.session = session;
095        }
096
097        @Override
098        public void beforeCompletion() {
099        }
100
101        @Override
102        public void afterCompletion(int status) {
103            try {
104                // This will return session to the pool.
105                session.setIgnoreClose(false);
106                session.close();
107                decrementReferenceCount();
108            } catch (JMSException e) {
109                throw new RuntimeException(e);
110            }
111        }
112    }
113}