001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.jms.pool; 018 019import java.util.Properties; 020import java.util.concurrent.atomic.AtomicBoolean; 021import java.util.concurrent.atomic.AtomicReference; 022 023import javax.jms.Connection; 024import javax.jms.ConnectionFactory; 025import javax.jms.JMSException; 026import javax.jms.QueueConnection; 027import javax.jms.QueueConnectionFactory; 028import javax.jms.TopicConnection; 029import javax.jms.TopicConnectionFactory; 030 031import org.apache.commons.pool.KeyedPoolableObjectFactory; 032import org.apache.commons.pool.impl.GenericKeyedObjectPool; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * A JMS provider which pools Connection, Session and MessageProducer instances 038 * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's 039 * <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>. 040 * Connections, sessions and producers are returned to a pool after use so that they can be reused later 041 * without having to undergo the cost of creating them again. 042 * 043 * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers, 044 * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which 045 * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually 046 * just created at startup and left active, handling incoming messages as they come. When a consumer is 047 * complete, it is best to close it rather than return it to a pool for later reuse: this is because, 048 * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer, 049 * where they'll get held until the consumer is active again. 050 * 051 * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you 052 * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that 053 * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail: 054 * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html 055 * 056 * Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the 057 * pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should 058 * be used when configuring this optional feature. Eviction runs contend with client threads for access 059 * to objects in the pool, so if they run too frequently performance issues may result. The idle object 060 * eviction thread may be configured using the {@link org.apache.activemq.jms.pool.PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method. By 061 * default the value is -1 which means no eviction thread will be run. Set to a non-negative value to 062 * configure the idle eviction thread to run. 063 */ 064public class PooledConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory { 065 private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class); 066 067 protected final AtomicBoolean stopped = new AtomicBoolean(false); 068 private GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool; 069 070 protected Object connectionFactory; 071 072 private int maximumActiveSessionPerConnection = 500; 073 private int idleTimeout = 30 * 1000; 074 private boolean blockIfSessionPoolIsFull = true; 075 private long blockIfSessionPoolIsFullTimeout = -1L; 076 private long expiryTimeout = 0l; 077 private boolean createConnectionOnStartup = true; 078 private boolean useAnonymousProducers = true; 079 private boolean reconnectOnException = true; 080 081 // Temporary value used to always fetch the result of makeObject. 082 private final AtomicReference<ConnectionPool> mostRecentlyCreated = new AtomicReference<ConnectionPool>(null); 083 084 public void initConnectionsPool() { 085 if (this.connectionsPool == null) { 086 this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>( 087 new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() { 088 089 @Override 090 public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception { 091 } 092 093 @Override 094 public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception { 095 try { 096 if (LOG.isTraceEnabled()) { 097 LOG.trace("Destroying connection: {}", connection); 098 } 099 connection.close(); 100 } catch (Exception e) { 101 LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e); 102 } 103 } 104 105 @Override 106 public ConnectionPool makeObject(ConnectionKey key) throws Exception { 107 Connection delegate = createConnection(key); 108 109 ConnectionPool connection = createConnectionPool(delegate); 110 connection.setIdleTimeout(getIdleTimeout()); 111 connection.setExpiryTimeout(getExpiryTimeout()); 112 connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection()); 113 connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull()); 114 if (isBlockIfSessionPoolIsFull() && getBlockIfSessionPoolIsFullTimeout() > 0) { 115 connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout()); 116 } 117 connection.setUseAnonymousProducers(isUseAnonymousProducers()); 118 connection.setReconnectOnException(isReconnectOnException()); 119 120 LOG.trace("Created new connection: {}", connection); 121 122 PooledConnectionFactory.this.mostRecentlyCreated.set(connection); 123 124 return connection; 125 } 126 127 @Override 128 public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception { 129 } 130 131 @Override 132 public boolean validateObject(ConnectionKey key, ConnectionPool connection) { 133 if (connection != null && connection.expiredCheck()) { 134 LOG.trace("Connection has expired: {} and will be destroyed", connection); 135 return false; 136 } 137 138 return true; 139 } 140 }); 141 142 // Set max idle (not max active) since our connections always idle in the pool. 143 this.connectionsPool.setMaxIdle(1); 144 this.connectionsPool.setLifo(false); 145 146 // We always want our validate method to control when idle objects are evicted. 147 this.connectionsPool.setTestOnBorrow(true); 148 this.connectionsPool.setTestWhileIdle(true); 149 } 150 } 151 152 /** 153 * @return the currently configured ConnectionFactory used to create the pooled Connections. 154 */ 155 public Object getConnectionFactory() { 156 return connectionFactory; 157 } 158 159 /** 160 * Sets the ConnectionFactory used to create new pooled Connections. 161 * <p/> 162 * Updates to this value do not affect Connections that were previously created and placed 163 * into the pool. In order to allocate new Connections based off this new ConnectionFactory 164 * it is first necessary to {@link #clear} the pooled Connections. 165 * 166 * @param toUse 167 * The factory to use to create pooled Connections. 168 */ 169 public void setConnectionFactory(final Object toUse) { 170 if (toUse instanceof ConnectionFactory) { 171 this.connectionFactory = toUse; 172 } else { 173 throw new IllegalArgumentException("connectionFactory should implement javax.jmx.ConnectionFactory"); 174 } 175 } 176 177 @Override 178 public QueueConnection createQueueConnection() throws JMSException { 179 return (QueueConnection) createConnection(); 180 } 181 182 @Override 183 public QueueConnection createQueueConnection(String userName, String password) throws JMSException { 184 return (QueueConnection) createConnection(userName, password); 185 } 186 187 @Override 188 public TopicConnection createTopicConnection() throws JMSException { 189 return (TopicConnection) createConnection(); 190 } 191 192 @Override 193 public TopicConnection createTopicConnection(String userName, String password) throws JMSException { 194 return (TopicConnection) createConnection(userName, password); 195 } 196 197 @Override 198 public Connection createConnection() throws JMSException { 199 return createConnection(null, null); 200 } 201 202 @Override 203 public synchronized Connection createConnection(String userName, String password) throws JMSException { 204 if (stopped.get()) { 205 LOG.debug("PooledConnectionFactory is stopped, skip create new connection."); 206 return null; 207 } 208 209 ConnectionPool connection = null; 210 ConnectionKey key = new ConnectionKey(userName, password); 211 212 // This will either return an existing non-expired ConnectionPool or it 213 // will create a new one to meet the demand. 214 if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) { 215 try { 216 connectionsPool.addObject(key); 217 connection = mostRecentlyCreated.getAndSet(null); 218 connection.incrementReferenceCount(); 219 } catch (Exception e) { 220 throw createJmsException("Error while attempting to add new Connection to the pool", e); 221 } 222 } else { 223 try { 224 // We can race against other threads returning the connection when there is an 225 // expiration or idle timeout. We keep pulling out ConnectionPool instances until 226 // we win and get a non-closed instance and then increment the reference count 227 // under lock to prevent another thread from triggering an expiration check and 228 // pulling the rug out from under us. 229 while (connection == null) { 230 connection = connectionsPool.borrowObject(key); 231 synchronized (connection) { 232 if (connection.getConnection() != null) { 233 connection.incrementReferenceCount(); 234 break; 235 } 236 237 // Return the bad one to the pool and let if get destroyed as normal. 238 connectionsPool.returnObject(key, connection); 239 connection = null; 240 } 241 } 242 } catch (Exception e) { 243 throw createJmsException("Error while attempting to retrieve a connection from the pool", e); 244 } 245 246 try { 247 connectionsPool.returnObject(key, connection); 248 } catch (Exception e) { 249 throw createJmsException("Error when returning connection to the pool", e); 250 } 251 } 252 253 return newPooledConnection(connection); 254 } 255 256 protected Connection newPooledConnection(ConnectionPool connection) { 257 return new PooledConnection(connection); 258 } 259 260 private JMSException createJmsException(String msg, Exception cause) { 261 JMSException exception = new JMSException(msg); 262 exception.setLinkedException(cause); 263 exception.initCause(cause); 264 return exception; 265 } 266 267 protected Connection createConnection(ConnectionKey key) throws JMSException { 268 if (connectionFactory instanceof ConnectionFactory) { 269 if (key.getUserName() == null && key.getPassword() == null) { 270 return ((ConnectionFactory) connectionFactory).createConnection(); 271 } else { 272 return ((ConnectionFactory) connectionFactory).createConnection(key.getUserName(), key.getPassword()); 273 } 274 } else { 275 throw new IllegalStateException("connectionFactory should implement javax.jms.ConnectionFactory"); 276 } 277 } 278 279 public void start() { 280 LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup()); 281 stopped.set(false); 282 if (isCreateConnectionOnStartup()) { 283 try { 284 // warm the pool by creating a connection during startup 285 createConnection().close(); 286 } catch (JMSException e) { 287 LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e); 288 } 289 } 290 } 291 292 public void stop() { 293 if (stopped.compareAndSet(false, true)) { 294 LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}", 295 connectionsPool != null ? connectionsPool.getNumActive() : 0); 296 try { 297 if (connectionsPool != null) { 298 connectionsPool.close(); 299 connectionsPool = null; 300 } 301 } catch (Exception e) { 302 } 303 } 304 } 305 306 /** 307 * Clears all connections from the pool. Each connection that is currently in the pool is 308 * closed and removed from the pool. A new connection will be created on the next call to 309 * {@link #createConnection}. Care should be taken when using this method as Connections that 310 * are in use be client's will be closed. 311 */ 312 public void clear() { 313 if (stopped.get()) { 314 return; 315 } 316 317 getConnectionsPool().clear(); 318 } 319 320 /** 321 * Returns the currently configured maximum number of sessions a pooled Connection will 322 * create before it either blocks or throws an exception when a new session is requested, 323 * depending on configuration. 324 * 325 * @return the number of session instances that can be taken from a pooled connection. 326 */ 327 public int getMaximumActiveSessionPerConnection() { 328 return maximumActiveSessionPerConnection; 329 } 330 331 /** 332 * Sets the maximum number of active sessions per connection 333 * 334 * @param maximumActiveSessionPerConnection 335 * The maximum number of active session per connection in the pool. 336 */ 337 public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) { 338 this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection; 339 } 340 341 /** 342 * Controls the behavior of the internal session pool. By default the call to 343 * Connection.getSession() will block if the session pool is full. If the 344 * argument false is given, it will change the default behavior and instead the 345 * call to getSession() will throw a JMSException. 346 * 347 * The size of the session pool is controlled by the @see #maximumActive 348 * property. 349 * 350 * @param block - if true, the call to getSession() blocks if the pool is full 351 * until a session object is available. defaults to true. 352 */ 353 public void setBlockIfSessionPoolIsFull(boolean block) { 354 this.blockIfSessionPoolIsFull = block; 355 } 356 357 /** 358 * Returns whether a pooled Connection will enter a blocked state or will throw an Exception 359 * once the maximum number of sessions has been borrowed from the the Session Pool. 360 * 361 * @return true if the pooled Connection createSession method will block when the limit is hit. 362 * @see #setBlockIfSessionPoolIsFull(boolean) 363 */ 364 public boolean isBlockIfSessionPoolIsFull() { 365 return this.blockIfSessionPoolIsFull; 366 } 367 368 /** 369 * Returns the maximum number to pooled Connections that this factory will allow before it 370 * begins to return connections from the pool on calls to ({@link #createConnection}. 371 * 372 * @return the maxConnections that will be created for this pool. 373 */ 374 public int getMaxConnections() { 375 return getConnectionsPool().getMaxIdle(); 376 } 377 378 /** 379 * Sets the maximum number of pooled Connections (defaults to one). Each call to 380 * {@link #createConnection} will result in a new Connection being create up to the max 381 * connections value. 382 * 383 * @param maxConnections the maxConnections to set 384 */ 385 public void setMaxConnections(int maxConnections) { 386 getConnectionsPool().setMaxIdle(maxConnections); 387 } 388 389 /** 390 * Gets the Idle timeout value applied to new Connection's that are created by this pool. 391 * <p/> 392 * The idle timeout is used determine if a Connection instance has sat to long in the pool unused 393 * and if so is closed and removed from the pool. The default value is 30 seconds. 394 * 395 * @return idle timeout value (milliseconds) 396 */ 397 public int getIdleTimeout() { 398 return idleTimeout; 399 } 400 401 /** 402 * Sets the idle timeout value for Connection's that are created by this pool in Milliseconds, 403 * defaults to 30 seconds. 404 * <p/> 405 * For a Connection that is in the pool but has no current users the idle timeout determines how 406 * long the Connection can live before it is eligible for removal from the pool. Normally the 407 * connections are tested when an attempt to check one out occurs so a Connection instance can sit 408 * in the pool much longer than its idle timeout if connections are used infrequently. 409 * 410 * @param idleTimeout 411 * The maximum time a pooled Connection can sit unused before it is eligible for removal. 412 */ 413 public void setIdleTimeout(int idleTimeout) { 414 this.idleTimeout = idleTimeout; 415 } 416 417 /** 418 * allow connections to expire, irrespective of load or idle time. This is useful with failover 419 * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery 420 * 421 * @param expiryTimeout non zero in milliseconds 422 */ 423 public void setExpiryTimeout(long expiryTimeout) { 424 this.expiryTimeout = expiryTimeout; 425 } 426 427 /** 428 * @return the configured expiration timeout for connections in the pool. 429 */ 430 public long getExpiryTimeout() { 431 return expiryTimeout; 432 } 433 434 /** 435 * @return true if a Connection is created immediately on a call to {@link start}. 436 */ 437 public boolean isCreateConnectionOnStartup() { 438 return createConnectionOnStartup; 439 } 440 441 /** 442 * Whether to create a connection on starting this {@link PooledConnectionFactory}. 443 * <p/> 444 * This can be used to warm-up the pool on startup. Notice that any kind of exception 445 * happens during startup is logged at WARN level and ignored. 446 * 447 * @param createConnectionOnStartup <tt>true</tt> to create a connection on startup 448 */ 449 public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) { 450 this.createConnectionOnStartup = createConnectionOnStartup; 451 } 452 453 /** 454 * Should Sessions use one anonymous producer for all producer requests or should a new 455 * MessageProducer be created for each request to create a producer object, default is true. 456 * 457 * When enabled the session only needs to allocate one MessageProducer for all requests and 458 * the MessageProducer#send(destination, message) method can be used. Normally this is the 459 * right thing to do however it does result in the Broker not showing the producers per 460 * destination. 461 * 462 * @return true if a PooledSession will use only a single anonymous message producer instance. 463 */ 464 public boolean isUseAnonymousProducers() { 465 return this.useAnonymousProducers; 466 } 467 468 /** 469 * Sets whether a PooledSession uses only one anonymous MessageProducer instance or creates 470 * a new MessageProducer for each call the create a MessageProducer. 471 * 472 * @param value 473 * Boolean value that configures whether anonymous producers are used. 474 */ 475 public void setUseAnonymousProducers(boolean value) { 476 this.useAnonymousProducers = value; 477 } 478 479 /** 480 * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys. 481 * 482 * @return this factories pool of ConnectionPool instances. 483 */ 484 protected GenericKeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() { 485 initConnectionsPool(); 486 return this.connectionsPool; 487 } 488 489 /** 490 * Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread. 491 * When non-positive, no idle object eviction thread will be run, and Connections will only be 492 * checked on borrow to determine if they have sat idle for too long or have failed for some 493 * other reason. 494 * <p/> 495 * By default this value is set to -1 and no expiration thread ever runs. 496 * 497 * @param timeBetweenExpirationCheckMillis 498 * The time to wait between runs of the idle Connection eviction thread. 499 */ 500 public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) { 501 getConnectionsPool().setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis); 502 } 503 504 /** 505 * @return the number of milliseconds to sleep between runs of the idle connection eviction thread. 506 */ 507 public long getTimeBetweenExpirationCheckMillis() { 508 return getConnectionsPool().getTimeBetweenEvictionRunsMillis(); 509 } 510 511 /** 512 * @return the number of Connections currently in the Pool 513 */ 514 public int getNumConnections() { 515 return getConnectionsPool().getNumIdle(); 516 } 517 518 /** 519 * Delegate that creates each instance of an ConnectionPool object. Subclasses can override 520 * this method to customize the type of connection pool returned. 521 * 522 * @param connection 523 * 524 * @return instance of a new ConnectionPool. 525 */ 526 protected ConnectionPool createConnectionPool(Connection connection) { 527 return new ConnectionPool(connection); 528 } 529 530 /** 531 * Returns the timeout to use for blocking creating new sessions 532 * 533 * @return true if the pooled Connection createSession method will block when the limit is hit. 534 * @see #setBlockIfSessionPoolIsFull(boolean) 535 */ 536 public long getBlockIfSessionPoolIsFullTimeout() { 537 return blockIfSessionPoolIsFullTimeout; 538 } 539 540 /** 541 * Controls the behavior of the internal session pool. By default the call to 542 * Connection.getSession() will block if the session pool is full. This setting 543 * will affect how long it blocks and throws an exception after the timeout. 544 * 545 * The size of the session pool is controlled by the @see #maximumActive 546 * property. 547 * 548 * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull 549 * property 550 * 551 * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true, 552 * then use this setting to configure how long to block before retry 553 */ 554 public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) { 555 this.blockIfSessionPoolIsFullTimeout = blockIfSessionPoolIsFullTimeout; 556 } 557 558 /** 559 * @return true if the underlying connection will be renewed on JMSException, false otherwise 560 */ 561 public boolean isReconnectOnException() { 562 return reconnectOnException; 563 } 564 565 /** 566 * Controls weather the underlying connection should be reset (and renewed) on JMSException 567 * 568 * @param reconnectOnException 569 * Boolean value that configures whether reconnect on exception should happen 570 */ 571 public void setReconnectOnException(boolean reconnectOnException) { 572 this.reconnectOnException = reconnectOnException; 573 } 574 575 /** 576 * Called by any superclass that implements a JNDIReferencable or similar that needs to collect 577 * the properties of this class for storage etc. 578 * 579 * This method should be updated any time there is a new property added. 580 * 581 * @param props 582 * a properties object that should be filled in with this objects property values. 583 */ 584 protected void populateProperties(Properties props) { 585 props.setProperty("maximumActiveSessionPerConnection", Integer.toString(getMaximumActiveSessionPerConnection())); 586 props.setProperty("maxConnections", Integer.toString(getMaxConnections())); 587 props.setProperty("idleTimeout", Integer.toString(getIdleTimeout())); 588 props.setProperty("expiryTimeout", Long.toString(getExpiryTimeout())); 589 props.setProperty("timeBetweenExpirationCheckMillis", Long.toString(getTimeBetweenExpirationCheckMillis())); 590 props.setProperty("createConnectionOnStartup", Boolean.toString(isCreateConnectionOnStartup())); 591 props.setProperty("useAnonymousProducers", Boolean.toString(isUseAnonymousProducers())); 592 props.setProperty("blockIfSessionPoolIsFullTimeout", Long.toString(getBlockIfSessionPoolIsFullTimeout())); 593 props.setProperty("reconnectOnException", Boolean.toString(isReconnectOnException())); 594 } 595}