001/* 002 * Copyright 2006 the original author or authors. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.apache.activemq.pool; 017 018import java.io.IOException; 019import javax.jms.Connection; 020import org.apache.activemq.ActiveMQConnection; 021import org.apache.activemq.jms.pool.ConnectionPool; 022import org.apache.activemq.jms.pool.JcaConnectionPool; 023import org.apache.activemq.transport.TransportListener; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027public class JcaPooledConnectionFactory extends XaPooledConnectionFactory { 028 private static final transient Logger LOG = LoggerFactory.getLogger(JcaPooledConnectionFactory.class); 029 030 private String name; 031 032 public String getName() { 033 return name; 034 } 035 036 public void setName(String name) { 037 this.name = name; 038 } 039 040 protected ConnectionPool createConnectionPool(Connection connection) { 041 return new JcaConnectionPool(connection, getTransactionManager(), getName()) { 042 043 @Override 044 protected Connection wrap(final Connection connection) { 045 // Add a transport Listener so that we can notice if this connection 046 // should be expired due to a connection failure. 047 ((ActiveMQConnection)connection).addTransportListener(new TransportListener() { 048 @Override 049 public void onCommand(Object command) { 050 } 051 052 @Override 053 public void onException(IOException error) { 054 synchronized (this) { 055 setHasExpired(true); 056 // only log if not stopped 057 if (!stopped.get()) { 058 LOG.info("Expiring connection " + connection + " on IOException: " + error.getMessage()); 059 // log stacktrace at debug level 060 LOG.debug("Expiring connection " + connection + " on IOException: ", error); 061 } 062 } 063 } 064 065 @Override 066 public void transportInterupted() { 067 } 068 069 @Override 070 public void transportResumed() { 071 } 072 }); 073 074 // make sure that we set the hasFailed flag, in case the transport already failed 075 // prior to the addition of our new TransportListener 076 setHasExpired(((ActiveMQConnection) connection).isTransportFailed()); 077 078 // may want to return an amq EnhancedConnection 079 return connection; 080 } 081 082 @Override 083 protected void unWrap(Connection connection) { 084 if (connection != null) { 085 ((ActiveMQConnection)connection).cleanUpTempDestinations(); 086 } 087 } 088 }; 089 } 090}