001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.jms.pool;
018
019import java.io.IOException;
020
021import javax.jms.Connection;
022import javax.jms.ConnectionFactory;
023import javax.jms.XAConnection;
024import javax.jms.XAConnectionFactory;
025import javax.jms.XASession;
026import javax.transaction.SystemException;
027import javax.transaction.TransactionManager;
028
029import javax.transaction.xa.XAResource;
030import org.apache.geronimo.transaction.manager.NamedXAResourceFactory;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
034import org.apache.geronimo.transaction.manager.NamedXAResource;
035import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
036
037
038/**
039 * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager
040 * in a way that will allow the transaction manager to correctly recover XA transactions.
041 *
042 * For example, it can be used the following way:
043 * <pre>
044 *   <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
045 *      <property name="brokerURL" value="tcp://localhost:61616" />
046 *   </bean>
047 *
048 *   <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean">
049 *       <property name="maxConnections" value="8" />
050 *       <property name="transactionManager" ref="transactionManager" />
051 *       <property name="connectionFactory" ref="activemqConnectionFactory" />
052 *       <property name="resourceName" value="activemq.broker" />
053 *   </bean>
054 *
055 *   <bean id="resourceManager" class="org.apache.activemq.jms.pool.GenericResourceManager" init-method="recoverResource">
056 *         <property name="transactionManager" ref="transactionManager" />
057 *         <property name="connectionFactory" ref="activemqConnectionFactory" />
058 *         <property name="resourceName" value="activemq.broker" />
059 *   </bean>
060 * </pre>
061 */
062public class GenericResourceManager {
063
064    private static final Logger LOGGER = LoggerFactory.getLogger(GenericResourceManager.class);
065
066    private String resourceName;
067
068    private String userName;
069    private String password;
070
071    private TransactionManager transactionManager;
072
073    private ConnectionFactory connectionFactory;
074
075    public void recoverResource() {
076        try {
077            if (!Recovery.recover(this)) {
078                LOGGER.info("Resource manager is unrecoverable");
079            }
080        } catch (NoClassDefFoundError e) {
081            LOGGER.info("Resource manager is unrecoverable due to missing classes: " + e);
082        } catch (Throwable e) {
083            LOGGER.warn("Error while recovering resource manager", e);
084        }
085    }
086
087    public String getPassword() {
088        return password;
089    }
090
091    public void setPassword(String password) {
092        this.password = password;
093    }
094
095    public String getUserName() {
096        return userName;
097    }
098
099    public void setUserName(String userName) {
100        this.userName = userName;
101    }
102
103    public String getResourceName() {
104        return resourceName;
105    }
106
107    public void setResourceName(String resourceName) {
108        this.resourceName = resourceName;
109    }
110
111    public TransactionManager getTransactionManager() {
112        return transactionManager;
113    }
114
115    public void setTransactionManager(TransactionManager transactionManager) {
116        this.transactionManager = transactionManager;
117    }
118
119    public ConnectionFactory getConnectionFactory() {
120        return connectionFactory;
121    }
122
123    public void setConnectionFactory(ConnectionFactory connectionFactory) {
124        this.connectionFactory = connectionFactory;
125    }
126
127    /**
128     * This class will ensure the broker is properly recovered when wired with
129     * the Geronimo transaction manager.
130     */
131    public static class Recovery {
132
133        public static boolean isRecoverable(GenericResourceManager rm) {
134            return  rm.getConnectionFactory() instanceof XAConnectionFactory &&
135                    rm.getTransactionManager() instanceof RecoverableTransactionManager &&
136                    rm.getResourceName() != null && !"".equals(rm.getResourceName());
137        }
138
139        public static boolean recover(final GenericResourceManager rm) throws IOException {
140            if (isRecoverable(rm)) {
141                final XAConnectionFactory connFactory = (XAConnectionFactory) rm.getConnectionFactory();
142
143                RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager();
144                rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() {
145
146                    @Override
147                    public String getName() {
148                        return rm.getResourceName();
149                    }
150
151                    @Override
152                    public NamedXAResource getNamedXAResource() throws SystemException {
153                        try {
154                            final XAConnection xaConnection;
155                            if (rm.getUserName() != null && rm.getPassword() != null) {
156                                xaConnection = connFactory.createXAConnection(rm.getUserName(), rm.getPassword());
157                            } else {
158                                xaConnection = connFactory.createXAConnection();
159                            }
160                            final XASession session = xaConnection.createXASession();
161                            xaConnection.start();
162                            LOGGER.debug("new namedXAResource's connection: " + xaConnection);
163
164                            return new ConnectionAndWrapperNamedXAResource(session.getXAResource(), getName(), xaConnection);
165                        } catch (Exception e) {
166                            SystemException se =  new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
167                            se.initCause(e);
168                            LOGGER.error(se.getLocalizedMessage(), se);
169                            throw se;
170                        }
171                    }
172
173                    @Override
174                    public void returnNamedXAResource(NamedXAResource namedXaResource) {
175                        if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) {
176                            try {
177                                LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection);
178                                ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close();
179                            } catch (Exception ignored) {
180                                LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored);
181                            }
182                        }
183                    }
184                });
185                return true;
186            } else {
187                return false;
188            }
189        }
190    }
191
192    public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource {
193        final Connection connection;
194        public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, Connection connection) {
195            super(xaResource, name);
196            this.connection = connection;
197        }
198    }
199}