001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.jms.pool; 018 019import java.io.IOException; 020 021import javax.jms.Connection; 022import javax.jms.ConnectionFactory; 023import javax.jms.XAConnection; 024import javax.jms.XAConnectionFactory; 025import javax.jms.XASession; 026import javax.transaction.SystemException; 027import javax.transaction.TransactionManager; 028 029import javax.transaction.xa.XAResource; 030import org.apache.geronimo.transaction.manager.NamedXAResourceFactory; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033import org.apache.geronimo.transaction.manager.RecoverableTransactionManager; 034import org.apache.geronimo.transaction.manager.NamedXAResource; 035import org.apache.geronimo.transaction.manager.WrapperNamedXAResource; 036 037 038/** 039 * This class allows wiring the ActiveMQ broker and the Geronimo transaction manager 040 * in a way that will allow the transaction manager to correctly recover XA transactions. 041 * 042 * For example, it can be used the following way: 043 * <pre> 044 * <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 045 * <property name="brokerURL" value="tcp://localhost:61616" /> 046 * </bean> 047 * 048 * <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean"> 049 * <property name="maxConnections" value="8" /> 050 * <property name="transactionManager" ref="transactionManager" /> 051 * <property name="connectionFactory" ref="activemqConnectionFactory" /> 052 * <property name="resourceName" value="activemq.broker" /> 053 * </bean> 054 * 055 * <bean id="resourceManager" class="org.apache.activemq.jms.pool.GenericResourceManager" init-method="recoverResource"> 056 * <property name="transactionManager" ref="transactionManager" /> 057 * <property name="connectionFactory" ref="activemqConnectionFactory" /> 058 * <property name="resourceName" value="activemq.broker" /> 059 * </bean> 060 * </pre> 061 */ 062public class GenericResourceManager { 063 064 private static final Logger LOGGER = LoggerFactory.getLogger(GenericResourceManager.class); 065 066 private String resourceName; 067 068 private String userName; 069 private String password; 070 071 private TransactionManager transactionManager; 072 073 private ConnectionFactory connectionFactory; 074 075 public void recoverResource() { 076 try { 077 if (!Recovery.recover(this)) { 078 LOGGER.info("Resource manager is unrecoverable"); 079 } 080 } catch (NoClassDefFoundError e) { 081 LOGGER.info("Resource manager is unrecoverable due to missing classes: " + e); 082 } catch (Throwable e) { 083 LOGGER.warn("Error while recovering resource manager", e); 084 } 085 } 086 087 public String getPassword() { 088 return password; 089 } 090 091 public void setPassword(String password) { 092 this.password = password; 093 } 094 095 public String getUserName() { 096 return userName; 097 } 098 099 public void setUserName(String userName) { 100 this.userName = userName; 101 } 102 103 public String getResourceName() { 104 return resourceName; 105 } 106 107 public void setResourceName(String resourceName) { 108 this.resourceName = resourceName; 109 } 110 111 public TransactionManager getTransactionManager() { 112 return transactionManager; 113 } 114 115 public void setTransactionManager(TransactionManager transactionManager) { 116 this.transactionManager = transactionManager; 117 } 118 119 public ConnectionFactory getConnectionFactory() { 120 return connectionFactory; 121 } 122 123 public void setConnectionFactory(ConnectionFactory connectionFactory) { 124 this.connectionFactory = connectionFactory; 125 } 126 127 /** 128 * This class will ensure the broker is properly recovered when wired with 129 * the Geronimo transaction manager. 130 */ 131 public static class Recovery { 132 133 public static boolean isRecoverable(GenericResourceManager rm) { 134 return rm.getConnectionFactory() instanceof XAConnectionFactory && 135 rm.getTransactionManager() instanceof RecoverableTransactionManager && 136 rm.getResourceName() != null && !"".equals(rm.getResourceName()); 137 } 138 139 public static boolean recover(final GenericResourceManager rm) throws IOException { 140 if (isRecoverable(rm)) { 141 final XAConnectionFactory connFactory = (XAConnectionFactory) rm.getConnectionFactory(); 142 143 RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager(); 144 rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() { 145 146 @Override 147 public String getName() { 148 return rm.getResourceName(); 149 } 150 151 @Override 152 public NamedXAResource getNamedXAResource() throws SystemException { 153 try { 154 final XAConnection xaConnection; 155 if (rm.getUserName() != null && rm.getPassword() != null) { 156 xaConnection = connFactory.createXAConnection(rm.getUserName(), rm.getPassword()); 157 } else { 158 xaConnection = connFactory.createXAConnection(); 159 } 160 final XASession session = xaConnection.createXASession(); 161 xaConnection.start(); 162 LOGGER.debug("new namedXAResource's connection: " + xaConnection); 163 164 return new ConnectionAndWrapperNamedXAResource(session.getXAResource(), getName(), xaConnection); 165 } catch (Exception e) { 166 SystemException se = new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage()); 167 se.initCause(e); 168 LOGGER.error(se.getLocalizedMessage(), se); 169 throw se; 170 } 171 } 172 173 @Override 174 public void returnNamedXAResource(NamedXAResource namedXaResource) { 175 if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) { 176 try { 177 LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection); 178 ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close(); 179 } catch (Exception ignored) { 180 LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored); 181 } 182 } 183 } 184 }); 185 return true; 186 } else { 187 return false; 188 } 189 } 190 } 191 192 public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource { 193 final Connection connection; 194 public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, Connection connection) { 195 super(xaResource, name); 196 this.connection = connection; 197 } 198 } 199}