001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.jms.pool; 018 019import java.util.List; 020import java.util.concurrent.CopyOnWriteArrayList; 021 022import javax.jms.Connection; 023import javax.jms.ConnectionConsumer; 024import javax.jms.ConnectionMetaData; 025import javax.jms.Destination; 026import javax.jms.ExceptionListener; 027import javax.jms.IllegalStateException; 028import javax.jms.JMSException; 029import javax.jms.Queue; 030import javax.jms.QueueConnection; 031import javax.jms.QueueSession; 032import javax.jms.ServerSessionPool; 033import javax.jms.Session; 034import javax.jms.TemporaryQueue; 035import javax.jms.TemporaryTopic; 036import javax.jms.Topic; 037import javax.jms.TopicConnection; 038import javax.jms.TopicSession; 039 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Represents a proxy {@link Connection} which is-a {@link TopicConnection} and 045 * {@link QueueConnection} which is pooled and on {@link #close()} will return 046 * its reference to the ConnectionPool backing it. 047 * 048 * <b>NOTE</b> this implementation is only intended for use when sending 049 * messages. It does not deal with pooling of consumers; for that look at a 050 * library like <a href="http://jencks.org/">Jencks</a> such as in <a 051 * href="http://jencks.org/Message+Driven+POJOs">this example</a> 052 * 053 */ 054public class PooledConnection implements TopicConnection, QueueConnection, PooledSessionEventListener { 055 private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class); 056 057 protected ConnectionPool pool; 058 private volatile boolean stopped; 059 private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>(); 060 private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>(); 061 private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>(); 062 063 /** 064 * Creates a new PooledConnection instance that uses the given ConnectionPool to create 065 * and manage its resources. The ConnectionPool instance can be shared amongst many 066 * PooledConnection instances. 067 * 068 * @param pool 069 * The connection and pool manager backing this proxy connection object. 070 */ 071 public PooledConnection(ConnectionPool pool) { 072 this.pool = pool; 073 } 074 075 /** 076 * Factory method to create a new instance. 077 */ 078 public PooledConnection newInstance() { 079 return new PooledConnection(pool); 080 } 081 082 @Override 083 public void close() throws JMSException { 084 this.cleanupConnectionTemporaryDestinations(); 085 this.cleanupAllLoanedSessions(); 086 if (this.pool != null) { 087 this.pool.decrementReferenceCount(); 088 this.pool = null; 089 } 090 } 091 092 @Override 093 public void start() throws JMSException { 094 assertNotClosed(); 095 pool.start(); 096 } 097 098 @Override 099 public void stop() throws JMSException { 100 stopped = true; 101 } 102 103 @Override 104 public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException { 105 return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages); 106 } 107 108 @Override 109 public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException { 110 return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages); 111 } 112 113 @Override 114 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException { 115 return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i); 116 } 117 118 @Override 119 public String getClientID() throws JMSException { 120 return getConnection().getClientID(); 121 } 122 123 @Override 124 public ExceptionListener getExceptionListener() throws JMSException { 125 return pool.getParentExceptionListener(); 126 } 127 128 @Override 129 public ConnectionMetaData getMetaData() throws JMSException { 130 return getConnection().getMetaData(); 131 } 132 133 @Override 134 public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException { 135 pool.setParentExceptionListener(exceptionListener); 136 } 137 138 @Override 139 public void setClientID(String clientID) throws JMSException { 140 // ignore repeated calls to setClientID() with the same client id 141 // this could happen when a JMS component such as Spring that uses a 142 // PooledConnectionFactory shuts down and reinitializes. 143 if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) { 144 getConnection().setClientID(clientID); 145 } 146 } 147 148 @Override 149 public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException { 150 return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages); 151 } 152 153 // Session factory methods 154 // ------------------------------------------------------------------------- 155 @Override 156 public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException { 157 return (QueueSession) createSession(transacted, ackMode); 158 } 159 160 @Override 161 public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException { 162 return (TopicSession) createSession(transacted, ackMode); 163 } 164 165 @Override 166 public Session createSession(boolean transacted, int ackMode) throws JMSException { 167 PooledSession result = (PooledSession) pool.createSession(transacted, ackMode); 168 169 // Store the session so we can close the sessions that this PooledConnection 170 // created in order to ensure that consumers etc are closed per the JMS contract. 171 loanedSessions.add(result); 172 173 // Add a event listener to the session that notifies us when the session 174 // creates / destroys temporary destinations and closes etc. 175 result.addSessionEventListener(this); 176 return result; 177 } 178 179 // Implementation methods 180 // ------------------------------------------------------------------------- 181 182 @Override 183 public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { 184 connTempQueues.add(tempQueue); 185 } 186 187 @Override 188 public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { 189 connTempTopics.add(tempTopic); 190 } 191 192 @Override 193 public void onSessionClosed(PooledSession session) { 194 if (session != null) { 195 this.loanedSessions.remove(session); 196 } 197 } 198 199 public Connection getConnection() throws JMSException { 200 assertNotClosed(); 201 return pool.getConnection(); 202 } 203 204 protected void assertNotClosed() throws javax.jms.IllegalStateException { 205 if (stopped || pool == null) { 206 throw new IllegalStateException("Connection closed"); 207 } 208 } 209 210 protected Session createSession(SessionKey key) throws JMSException { 211 return getConnection().createSession(key.isTransacted(), key.getAckMode()); 212 } 213 214 @Override 215 public String toString() { 216 return "PooledConnection { " + pool + " }"; 217 } 218 219 /** 220 * Remove all of the temporary destinations created for this connection. 221 * This is important since the underlying connection may be reused over a 222 * long period of time, accumulating all of the temporary destinations from 223 * each use. However, from the perspective of the lifecycle from the 224 * client's view, close() closes the connection and, therefore, deletes all 225 * of the temporary destinations created. 226 */ 227 protected void cleanupConnectionTemporaryDestinations() { 228 229 for (TemporaryQueue tempQueue : connTempQueues) { 230 try { 231 tempQueue.delete(); 232 } catch (JMSException ex) { 233 LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage()); 234 } 235 } 236 connTempQueues.clear(); 237 238 for (TemporaryTopic tempTopic : connTempTopics) { 239 try { 240 tempTopic.delete(); 241 } catch (JMSException ex) { 242 LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage()); 243 } 244 } 245 connTempTopics.clear(); 246 } 247 248 /** 249 * The PooledSession tracks all Sessions that it created and now we close them. Closing the 250 * PooledSession will return the internal Session to the Pool of Session after cleaning up 251 * all the resources that the Session had allocated for this PooledConnection. 252 */ 253 protected void cleanupAllLoanedSessions() { 254 255 for (PooledSession session : loanedSessions) { 256 try { 257 session.close(); 258 } catch (JMSException ex) { 259 LOG.info("failed to close laoned Session \"" + session + "\" on closing pooled connection: " + ex.getMessage()); 260 } 261 } 262 loanedSessions.clear(); 263 } 264 265 /** 266 * @return the total number of Pooled session including idle sessions that are not 267 * currently loaned out to any client. 268 */ 269 public int getNumSessions() { 270 return this.pool.getNumSessions(); 271 } 272 273 /** 274 * @return the number of Sessions that are currently checked out of this Connection's session pool. 275 */ 276 public int getNumActiveSessions() { 277 return this.pool.getNumActiveSessions(); 278 } 279 280 /** 281 * @return the number of Sessions that are idle in this Connection's sessions pool. 282 */ 283 public int getNumtIdleSessions() { 284 return this.pool.getNumIdleSessions(); 285 } 286}