001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.jms.pool; 019 020import java.util.List; 021import java.util.concurrent.CopyOnWriteArrayList; 022import java.util.concurrent.atomic.AtomicBoolean; 023 024import javax.jms.Connection; 025import javax.jms.ExceptionListener; 026import javax.jms.IllegalStateException; 027import javax.jms.JMSException; 028import javax.jms.Session; 029import javax.jms.TemporaryQueue; 030import javax.jms.TemporaryTopic; 031 032import org.apache.commons.pool.KeyedPoolableObjectFactory; 033import org.apache.commons.pool.impl.GenericKeyedObjectPool; 034import org.apache.commons.pool.impl.GenericObjectPool; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * Holds a real JMS connection along with the session pools associated with it. 040 * <p/> 041 * Instances of this class are shared amongst one or more PooledConnection object and must 042 * track the session objects that are loaned out for cleanup on close as well as ensuring 043 * that the temporary destinations of the managed Connection are purged when all references 044 * to this ConnectionPool are released. 045 */ 046public class ConnectionPool implements ExceptionListener { 047 private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class); 048 049 protected Connection connection; 050 private int referenceCount; 051 private long lastUsed = System.currentTimeMillis(); 052 private final long firstUsed = lastUsed; 053 private boolean hasExpired; 054 private int idleTimeout = 30 * 1000; 055 private long expiryTimeout = 0l; 056 private boolean useAnonymousProducers = true; 057 058 private final AtomicBoolean started = new AtomicBoolean(false); 059 private final GenericKeyedObjectPool<SessionKey, SessionHolder> sessionPool; 060 private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>(); 061 private boolean reconnectOnException; 062 private ExceptionListener parentExceptionListener; 063 064 public ConnectionPool(Connection connection) { 065 066 this.connection = wrap(connection); 067 try { 068 this.connection.setExceptionListener(this); 069 } catch (JMSException ex) { 070 LOG.warn("Could not set exception listener on create of ConnectionPool"); 071 } 072 073 // Create our internal Pool of session instances. 074 this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>( 075 new KeyedPoolableObjectFactory<SessionKey, SessionHolder>() { 076 077 @Override 078 public void activateObject(SessionKey key, SessionHolder session) throws Exception { 079 } 080 081 @Override 082 public void destroyObject(SessionKey key, SessionHolder session) throws Exception { 083 session.close(); 084 } 085 086 @Override 087 public SessionHolder makeObject(SessionKey key) throws Exception { 088 return new SessionHolder(makeSession(key)); 089 } 090 091 @Override 092 public void passivateObject(SessionKey key, SessionHolder session) throws Exception { 093 } 094 095 @Override 096 public boolean validateObject(SessionKey key, SessionHolder session) { 097 return true; 098 } 099 } 100 ); 101 } 102 103 // useful when external failure needs to force expiry 104 public void setHasExpired(boolean val) { 105 hasExpired = val; 106 } 107 108 protected Session makeSession(SessionKey key) throws JMSException { 109 return connection.createSession(key.isTransacted(), key.getAckMode()); 110 } 111 112 protected Connection wrap(Connection connection) { 113 return connection; 114 } 115 116 protected void unWrap(Connection connection) { 117 } 118 119 public void start() throws JMSException { 120 if (started.compareAndSet(false, true)) { 121 try { 122 connection.start(); 123 } catch (JMSException e) { 124 started.set(false); 125 if (isReconnectOnException()) { 126 close(); 127 } 128 throw(e); 129 } 130 } 131 } 132 133 public synchronized Connection getConnection() { 134 return connection; 135 } 136 137 public Session createSession(boolean transacted, int ackMode) throws JMSException { 138 SessionKey key = new SessionKey(transacted, ackMode); 139 PooledSession session; 140 try { 141 session = new PooledSession(key, sessionPool.borrowObject(key), sessionPool, key.isTransacted(), useAnonymousProducers); 142 session.addSessionEventListener(new PooledSessionEventListener() { 143 144 @Override 145 public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { 146 } 147 148 @Override 149 public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { 150 } 151 152 @Override 153 public void onSessionClosed(PooledSession session) { 154 ConnectionPool.this.loanedSessions.remove(session); 155 } 156 }); 157 this.loanedSessions.add(session); 158 } catch (Exception e) { 159 IllegalStateException illegalStateException = new IllegalStateException(e.toString()); 160 illegalStateException.initCause(e); 161 throw illegalStateException; 162 } 163 return session; 164 } 165 166 public synchronized void close() { 167 if (connection != null) { 168 try { 169 sessionPool.close(); 170 } catch (Exception e) { 171 } finally { 172 try { 173 connection.close(); 174 } catch (Exception e) { 175 } finally { 176 connection = null; 177 } 178 } 179 } 180 } 181 182 public synchronized void incrementReferenceCount() { 183 referenceCount++; 184 lastUsed = System.currentTimeMillis(); 185 } 186 187 public synchronized void decrementReferenceCount() { 188 referenceCount--; 189 lastUsed = System.currentTimeMillis(); 190 if (referenceCount == 0) { 191 // Loaned sessions are those that are active in the sessionPool and 192 // have not been closed by the client before closing the connection. 193 // These need to be closed so that all session's reflect the fact 194 // that the parent Connection is closed. 195 for (PooledSession session : this.loanedSessions) { 196 try { 197 session.close(); 198 } catch (Exception e) { 199 } 200 } 201 this.loanedSessions.clear(); 202 203 unWrap(getConnection()); 204 205 expiredCheck(); 206 } 207 } 208 209 /** 210 * Determines if this Connection has expired. 211 * <p/> 212 * A ConnectionPool is considered expired when all references to it are released AND either 213 * the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed. 214 * Once a ConnectionPool is determined to have expired its underlying Connection is closed. 215 * 216 * @return true if this connection has expired. 217 */ 218 public synchronized boolean expiredCheck() { 219 220 boolean expired = false; 221 222 if (connection == null) { 223 return true; 224 } 225 226 if (hasExpired) { 227 if (referenceCount == 0) { 228 close(); 229 expired = true; 230 } 231 } 232 233 if (expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) { 234 hasExpired = true; 235 if (referenceCount == 0) { 236 close(); 237 expired = true; 238 } 239 } 240 241 // Only set hasExpired here is no references, as a Connection with references is by 242 // definition not idle at this time. 243 if (referenceCount == 0 && idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout) { 244 hasExpired = true; 245 close(); 246 expired = true; 247 } 248 249 return expired; 250 } 251 252 public int getIdleTimeout() { 253 return idleTimeout; 254 } 255 256 public void setIdleTimeout(int idleTimeout) { 257 this.idleTimeout = idleTimeout; 258 } 259 260 public void setExpiryTimeout(long expiryTimeout) { 261 this.expiryTimeout = expiryTimeout; 262 } 263 264 public long getExpiryTimeout() { 265 return expiryTimeout; 266 } 267 268 public int getMaximumActiveSessionPerConnection() { 269 return this.sessionPool.getMaxActive(); 270 } 271 272 public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) { 273 this.sessionPool.setMaxActive(maximumActiveSessionPerConnection); 274 } 275 276 public boolean isUseAnonymousProducers() { 277 return this.useAnonymousProducers; 278 } 279 280 public void setUseAnonymousProducers(boolean value) { 281 this.useAnonymousProducers = value; 282 } 283 284 /** 285 * @return the total number of Pooled session including idle sessions that are not 286 * currently loaned out to any client. 287 */ 288 public int getNumSessions() { 289 return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive(); 290 } 291 292 /** 293 * @return the total number of Sessions that are in the Session pool but not loaned out. 294 */ 295 public int getNumIdleSessions() { 296 return this.sessionPool.getNumIdle(); 297 } 298 299 /** 300 * @return the total number of Session's that have been loaned to PooledConnection instances. 301 */ 302 public int getNumActiveSessions() { 303 return this.sessionPool.getNumActive(); 304 } 305 306 /** 307 * Configure whether the createSession method should block when there are no more idle sessions and the 308 * pool already contains the maximum number of active sessions. If false the create method will fail 309 * and throw an exception. 310 * 311 * @param block 312 * Indicates whether blocking should be used to wait for more space to create a session. 313 */ 314 public void setBlockIfSessionPoolIsFull(boolean block) { 315 this.sessionPool.setWhenExhaustedAction( 316 (block ? GenericObjectPool.WHEN_EXHAUSTED_BLOCK : GenericObjectPool.WHEN_EXHAUSTED_FAIL)); 317 } 318 319 public boolean isBlockIfSessionPoolIsFull() { 320 return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK; 321 } 322 323 /** 324 * Returns the timeout to use for blocking creating new sessions 325 * 326 * @return true if the pooled Connection createSession method will block when the limit is hit. 327 * @see #setBlockIfSessionPoolIsFull(boolean) 328 */ 329 public long getBlockIfSessionPoolIsFullTimeout() { 330 return this.sessionPool.getMaxWait(); 331 } 332 333 /** 334 * Controls the behavior of the internal session pool. By default the call to 335 * Connection.getSession() will block if the session pool is full. This setting 336 * will affect how long it blocks and throws an exception after the timeout. 337 * 338 * The size of the session pool is controlled by the @see #maximumActive 339 * property. 340 * 341 * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull 342 * property 343 * 344 * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true, 345 * then use this setting to configure how long to block before retry 346 */ 347 public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) { 348 this.sessionPool.setMaxWait(blockIfSessionPoolIsFullTimeout); 349 } 350 351 /** 352 * @return true if the underlying connection will be renewed on JMSException, false otherwise 353 */ 354 public boolean isReconnectOnException() { 355 return reconnectOnException; 356 } 357 358 /** 359 * Controls weather the underlying connection should be reset (and renewed) on JMSException 360 * 361 * @param reconnectOnException 362 * Boolean value that configures whether reconnect on exception should happen 363 */ 364 public void setReconnectOnException(boolean reconnectOnException) { 365 this.reconnectOnException = reconnectOnException; 366 } 367 368 ExceptionListener getParentExceptionListener() { 369 return parentExceptionListener; 370 } 371 372 void setParentExceptionListener(ExceptionListener parentExceptionListener) { 373 this.parentExceptionListener = parentExceptionListener; 374 } 375 376 @Override 377 public void onException(JMSException exception) { 378 if (isReconnectOnException()) { 379 close(); 380 } 381 if (parentExceptionListener != null) { 382 parentExceptionListener.onException(exception); 383 } 384 } 385 386 @Override 387 public String toString() { 388 return "ConnectionPool[" + connection + "]"; 389 } 390}