001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.jms.pool;
018
019import java.util.Properties;
020import java.util.concurrent.atomic.AtomicBoolean;
021import java.util.concurrent.atomic.AtomicReference;
022
023import javax.jms.Connection;
024import javax.jms.ConnectionFactory;
025import javax.jms.JMSException;
026import javax.jms.QueueConnection;
027import javax.jms.QueueConnectionFactory;
028import javax.jms.TopicConnection;
029import javax.jms.TopicConnectionFactory;
030
031import org.apache.commons.pool.KeyedPoolableObjectFactory;
032import org.apache.commons.pool.impl.GenericKeyedObjectPool;
033import org.slf4j.Logger;
034import org.slf4j.LoggerFactory;
035
036/**
037 * A JMS provider which pools Connection, Session and MessageProducer instances
038 * so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's
039 * <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
040 * Connections, sessions and producers are returned to a pool after use so that they can be reused later
041 * without having to undergo the cost of creating them again.
042 *
043 * b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
044 * it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
045 * are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
046 * just created at startup and left active, handling incoming messages as they come. When a consumer is
047 * complete, it is best to close it rather than return it to a pool for later reuse: this is because,
048 * even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
049 * where they'll get held until the consumer is active again.
050 *
051 * If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
052 * might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
053 * all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
054 * http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
055 *
056 * Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the
057 * pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should
058 * be used when configuring this optional feature. Eviction runs contend with client threads for access
059 * to objects in the pool, so if they run too frequently performance issues may result. The idle object
060 * eviction thread may be configured using the {@link org.apache.activemq.jms.pool.PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method.  By
061 * default the value is -1 which means no eviction thread will be run.  Set to a non-negative value to
062 * configure the idle eviction thread to run.
063 */
064public class PooledConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory {
065    private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
066
067    protected final AtomicBoolean stopped = new AtomicBoolean(false);
068    private GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
069
070    protected Object connectionFactory;
071
072    private int maximumActiveSessionPerConnection = 500;
073    private int idleTimeout = 30 * 1000;
074    private boolean blockIfSessionPoolIsFull = true;
075    private long blockIfSessionPoolIsFullTimeout = -1L;
076    private long expiryTimeout = 0l;
077    private boolean createConnectionOnStartup = true;
078    private boolean useAnonymousProducers = true;
079    private boolean reconnectOnException = true;
080
081    // Temporary value used to always fetch the result of makeObject.
082    private final AtomicReference<ConnectionPool> mostRecentlyCreated = new AtomicReference<ConnectionPool>(null);
083
084    public void initConnectionsPool() {
085        if (this.connectionsPool == null) {
086            this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
087                new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() {
088
089                    @Override
090                    public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
091                    }
092
093                    @Override
094                    public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception {
095                        try {
096                            if (LOG.isTraceEnabled()) {
097                                LOG.trace("Destroying connection: {}", connection);
098                            }
099                            connection.close();
100                        } catch (Exception e) {
101                            LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
102                        }
103                    }
104
105                    @Override
106                    public ConnectionPool makeObject(ConnectionKey key) throws Exception {
107                        Connection delegate = createConnection(key);
108
109                        ConnectionPool connection = createConnectionPool(delegate);
110                        connection.setIdleTimeout(getIdleTimeout());
111                        connection.setExpiryTimeout(getExpiryTimeout());
112                        connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
113                        connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
114                        if (isBlockIfSessionPoolIsFull() && getBlockIfSessionPoolIsFullTimeout() > 0) {
115                            connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout());
116                        }
117                        connection.setUseAnonymousProducers(isUseAnonymousProducers());
118                        connection.setReconnectOnException(isReconnectOnException());
119
120                        LOG.trace("Created new connection: {}", connection);
121
122                        PooledConnectionFactory.this.mostRecentlyCreated.set(connection);
123
124                        return connection;
125                    }
126
127                    @Override
128                    public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
129                    }
130
131                    @Override
132                    public boolean validateObject(ConnectionKey key, ConnectionPool connection) {
133                        if (connection != null && connection.expiredCheck()) {
134                            LOG.trace("Connection has expired: {} and will be destroyed", connection);
135                            return false;
136                        }
137
138                        return true;
139                    }
140                });
141
142            // Set max idle (not max active) since our connections always idle in the pool.
143            this.connectionsPool.setMaxIdle(1);
144            this.connectionsPool.setLifo(false);
145
146            // We always want our validate method to control when idle objects are evicted.
147            this.connectionsPool.setTestOnBorrow(true);
148            this.connectionsPool.setTestWhileIdle(true);
149        }
150    }
151
152    /**
153     * @return the currently configured ConnectionFactory used to create the pooled Connections.
154     */
155    public Object getConnectionFactory() {
156        return connectionFactory;
157    }
158
159    /**
160     * Sets the ConnectionFactory used to create new pooled Connections.
161     * <p/>
162     * Updates to this value do not affect Connections that were previously created and placed
163     * into the pool.  In order to allocate new Connections based off this new ConnectionFactory
164     * it is first necessary to {@link #clear} the pooled Connections.
165     *
166     * @param toUse
167     *      The factory to use to create pooled Connections.
168     */
169    public void setConnectionFactory(final Object toUse) {
170        if (toUse instanceof ConnectionFactory) {
171            this.connectionFactory = toUse;
172        } else {
173            throw new IllegalArgumentException("connectionFactory should implement javax.jmx.ConnectionFactory");
174        }
175    }
176
177    @Override
178    public QueueConnection createQueueConnection() throws JMSException {
179        return (QueueConnection) createConnection();
180    }
181
182    @Override
183    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
184        return (QueueConnection) createConnection(userName, password);
185    }
186
187    @Override
188    public TopicConnection createTopicConnection() throws JMSException {
189        return (TopicConnection) createConnection();
190    }
191
192    @Override
193    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
194        return (TopicConnection) createConnection(userName, password);
195    }
196
197    @Override
198    public Connection createConnection() throws JMSException {
199        return createConnection(null, null);
200    }
201
202    @Override
203    public synchronized Connection createConnection(String userName, String password) throws JMSException {
204        if (stopped.get()) {
205            LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
206            return null;
207        }
208
209        ConnectionPool connection = null;
210        ConnectionKey key = new ConnectionKey(userName, password);
211
212        // This will either return an existing non-expired ConnectionPool or it
213        // will create a new one to meet the demand.
214        if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) {
215            try {
216                connectionsPool.addObject(key);
217                connection = mostRecentlyCreated.getAndSet(null);
218                connection.incrementReferenceCount();
219            } catch (Exception e) {
220                throw createJmsException("Error while attempting to add new Connection to the pool", e);
221            }
222        } else {
223            try {
224                // We can race against other threads returning the connection when there is an
225                // expiration or idle timeout.  We keep pulling out ConnectionPool instances until
226                // we win and get a non-closed instance and then increment the reference count
227                // under lock to prevent another thread from triggering an expiration check and
228                // pulling the rug out from under us.
229                while (connection == null) {
230                    connection = connectionsPool.borrowObject(key);
231                    synchronized (connection) {
232                        if (connection.getConnection() != null) {
233                            connection.incrementReferenceCount();
234                            break;
235                        }
236
237                        // Return the bad one to the pool and let if get destroyed as normal.
238                        connectionsPool.returnObject(key, connection);
239                        connection = null;
240                    }
241                }
242            } catch (Exception e) {
243                throw createJmsException("Error while attempting to retrieve a connection from the pool", e);
244            }
245
246            try {
247                connectionsPool.returnObject(key, connection);
248            } catch (Exception e) {
249                throw createJmsException("Error when returning connection to the pool", e);
250            }
251        }
252
253        return newPooledConnection(connection);
254    }
255
256    protected Connection newPooledConnection(ConnectionPool connection) {
257        return new PooledConnection(connection);
258    }
259
260    private JMSException createJmsException(String msg, Exception cause) {
261        JMSException exception = new JMSException(msg);
262        exception.setLinkedException(cause);
263        exception.initCause(cause);
264        return exception;
265    }
266
267    protected Connection createConnection(ConnectionKey key) throws JMSException {
268        if (connectionFactory instanceof ConnectionFactory) {
269            if (key.getUserName() == null && key.getPassword() == null) {
270                return ((ConnectionFactory) connectionFactory).createConnection();
271            } else {
272                return ((ConnectionFactory) connectionFactory).createConnection(key.getUserName(), key.getPassword());
273            }
274        } else {
275            throw new IllegalStateException("connectionFactory should implement javax.jms.ConnectionFactory");
276        }
277    }
278
279    public void start() {
280        LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
281        stopped.set(false);
282        if (isCreateConnectionOnStartup()) {
283            try {
284                // warm the pool by creating a connection during startup
285                createConnection().close();
286            } catch (JMSException e) {
287                LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e);
288            }
289        }
290    }
291
292    public void stop() {
293        if (stopped.compareAndSet(false, true)) {
294            LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
295                      connectionsPool != null ? connectionsPool.getNumActive() : 0);
296            try {
297                if (connectionsPool != null) {
298                    connectionsPool.close();
299                    connectionsPool = null;
300                }
301            } catch (Exception e) {
302            }
303        }
304    }
305
306    /**
307     * Clears all connections from the pool.  Each connection that is currently in the pool is
308     * closed and removed from the pool.  A new connection will be created on the next call to
309     * {@link #createConnection}.  Care should be taken when using this method as Connections that
310     * are in use be client's will be closed.
311     */
312    public void clear() {
313        if (stopped.get()) {
314            return;
315        }
316
317        getConnectionsPool().clear();
318    }
319
320    /**
321     * Returns the currently configured maximum number of sessions a pooled Connection will
322     * create before it either blocks or throws an exception when a new session is requested,
323     * depending on configuration.
324     *
325     * @return the number of session instances that can be taken from a pooled connection.
326     */
327    public int getMaximumActiveSessionPerConnection() {
328        return maximumActiveSessionPerConnection;
329    }
330
331    /**
332     * Sets the maximum number of active sessions per connection
333     *
334     * @param maximumActiveSessionPerConnection
335     *      The maximum number of active session per connection in the pool.
336     */
337    public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
338        this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
339    }
340
341    /**
342     * Controls the behavior of the internal session pool. By default the call to
343     * Connection.getSession() will block if the session pool is full.  If the
344     * argument false is given, it will change the default behavior and instead the
345     * call to getSession() will throw a JMSException.
346     *
347     * The size of the session pool is controlled by the @see #maximumActive
348     * property.
349     *
350     * @param block - if true, the call to getSession() blocks if the pool is full
351     * until a session object is available.  defaults to true.
352     */
353    public void setBlockIfSessionPoolIsFull(boolean block) {
354        this.blockIfSessionPoolIsFull = block;
355    }
356
357    /**
358     * Returns whether a pooled Connection will enter a blocked state or will throw an Exception
359     * once the maximum number of sessions has been borrowed from the the Session Pool.
360     *
361     * @return true if the pooled Connection createSession method will block when the limit is hit.
362     * @see #setBlockIfSessionPoolIsFull(boolean)
363     */
364    public boolean isBlockIfSessionPoolIsFull() {
365        return this.blockIfSessionPoolIsFull;
366    }
367
368    /**
369     * Returns the maximum number to pooled Connections that this factory will allow before it
370     * begins to return connections from the pool on calls to ({@link #createConnection}.
371     *
372     * @return the maxConnections that will be created for this pool.
373     */
374    public int getMaxConnections() {
375        return getConnectionsPool().getMaxIdle();
376    }
377
378    /**
379     * Sets the maximum number of pooled Connections (defaults to one).  Each call to
380     * {@link #createConnection} will result in a new Connection being create up to the max
381     * connections value.
382     *
383     * @param maxConnections the maxConnections to set
384     */
385    public void setMaxConnections(int maxConnections) {
386        getConnectionsPool().setMaxIdle(maxConnections);
387    }
388
389    /**
390     * Gets the Idle timeout value applied to new Connection's that are created by this pool.
391     * <p/>
392     * The idle timeout is used determine if a Connection instance has sat to long in the pool unused
393     * and if so is closed and removed from the pool.  The default value is 30 seconds.
394     *
395     * @return idle timeout value (milliseconds)
396     */
397    public int getIdleTimeout() {
398        return idleTimeout;
399    }
400
401    /**
402     * Sets the idle timeout  value for Connection's that are created by this pool in Milliseconds,
403     * defaults to 30 seconds.
404     * <p/>
405     * For a Connection that is in the pool but has no current users the idle timeout determines how
406     * long the Connection can live before it is eligible for removal from the pool.  Normally the
407     * connections are tested when an attempt to check one out occurs so a Connection instance can sit
408     * in the pool much longer than its idle timeout if connections are used infrequently.
409     *
410     * @param idleTimeout
411     *      The maximum time a pooled Connection can sit unused before it is eligible for removal.
412     */
413    public void setIdleTimeout(int idleTimeout) {
414        this.idleTimeout = idleTimeout;
415    }
416
417    /**
418     * allow connections to expire, irrespective of load or idle time. This is useful with failover
419     * to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
420     *
421     * @param expiryTimeout non zero in milliseconds
422     */
423    public void setExpiryTimeout(long expiryTimeout) {
424        this.expiryTimeout = expiryTimeout;
425    }
426
427    /**
428     * @return the configured expiration timeout for connections in the pool.
429     */
430    public long getExpiryTimeout() {
431        return expiryTimeout;
432    }
433
434    /**
435     * @return true if a Connection is created immediately on a call to {@link start}.
436     */
437    public boolean isCreateConnectionOnStartup() {
438        return createConnectionOnStartup;
439    }
440
441    /**
442     * Whether to create a connection on starting this {@link PooledConnectionFactory}.
443     * <p/>
444     * This can be used to warm-up the pool on startup. Notice that any kind of exception
445     * happens during startup is logged at WARN level and ignored.
446     *
447     * @param createConnectionOnStartup <tt>true</tt> to create a connection on startup
448     */
449    public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
450        this.createConnectionOnStartup = createConnectionOnStartup;
451    }
452
453    /**
454     * Should Sessions use one anonymous producer for all producer requests or should a new
455     * MessageProducer be created for each request to create a producer object, default is true.
456     *
457     * When enabled the session only needs to allocate one MessageProducer for all requests and
458     * the MessageProducer#send(destination, message) method can be used.  Normally this is the
459     * right thing to do however it does result in the Broker not showing the producers per
460     * destination.
461     *
462     * @return true if a PooledSession will use only a single anonymous message producer instance.
463     */
464    public boolean isUseAnonymousProducers() {
465        return this.useAnonymousProducers;
466    }
467
468    /**
469     * Sets whether a PooledSession uses only one anonymous MessageProducer instance or creates
470     * a new MessageProducer for each call the create a MessageProducer.
471     *
472     * @param value
473     *      Boolean value that configures whether anonymous producers are used.
474     */
475    public void setUseAnonymousProducers(boolean value) {
476        this.useAnonymousProducers = value;
477    }
478
479    /**
480     * Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
481     *
482     * @return this factories pool of ConnectionPool instances.
483     */
484    protected GenericKeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
485        initConnectionsPool();
486        return this.connectionsPool;
487    }
488
489    /**
490     * Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread.
491     * When non-positive, no idle object eviction thread will be run, and Connections will only be
492     * checked on borrow to determine if they have sat idle for too long or have failed for some
493     * other reason.
494     * <p/>
495     * By default this value is set to -1 and no expiration thread ever runs.
496     *
497     * @param timeBetweenExpirationCheckMillis
498     *      The time to wait between runs of the idle Connection eviction thread.
499     */
500    public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
501        getConnectionsPool().setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
502    }
503
504    /**
505     * @return the number of milliseconds to sleep between runs of the idle connection eviction thread.
506     */
507    public long getTimeBetweenExpirationCheckMillis() {
508        return getConnectionsPool().getTimeBetweenEvictionRunsMillis();
509    }
510
511    /**
512     * @return the number of Connections currently in the Pool
513     */
514    public int getNumConnections() {
515        return getConnectionsPool().getNumIdle();
516    }
517
518    /**
519     * Delegate that creates each instance of an ConnectionPool object.  Subclasses can override
520     * this method to customize the type of connection pool returned.
521     *
522     * @param connection
523     *
524     * @return instance of a new ConnectionPool.
525     */
526    protected ConnectionPool createConnectionPool(Connection connection) {
527        return new ConnectionPool(connection);
528    }
529
530    /**
531     * Returns the timeout to use for blocking creating new sessions
532     *
533     * @return true if the pooled Connection createSession method will block when the limit is hit.
534     * @see #setBlockIfSessionPoolIsFull(boolean)
535     */
536    public long getBlockIfSessionPoolIsFullTimeout() {
537        return blockIfSessionPoolIsFullTimeout;
538    }
539
540    /**
541     * Controls the behavior of the internal session pool. By default the call to
542     * Connection.getSession() will block if the session pool is full.  This setting
543     * will affect how long it blocks and throws an exception after the timeout.
544     *
545     * The size of the session pool is controlled by the @see #maximumActive
546     * property.
547     *
548     * Whether or not the call to create session blocks is controlled by the @see #blockIfSessionPoolIsFull
549     * property
550     *
551     * @param blockIfSessionPoolIsFullTimeout - if blockIfSessionPoolIsFullTimeout is true,
552     *                                        then use this setting to configure how long to block before retry
553     */
554    public void setBlockIfSessionPoolIsFullTimeout(long blockIfSessionPoolIsFullTimeout) {
555        this.blockIfSessionPoolIsFullTimeout = blockIfSessionPoolIsFullTimeout;
556    }
557
558    /**
559     * @return true if the underlying connection will be renewed on JMSException, false otherwise
560     */
561    public boolean isReconnectOnException() {
562        return reconnectOnException;
563    }
564
565    /**
566     * Controls weather the underlying connection should be reset (and renewed) on JMSException
567     *
568     * @param reconnectOnException
569     *          Boolean value that configures whether reconnect on exception should happen
570     */
571    public void setReconnectOnException(boolean reconnectOnException) {
572        this.reconnectOnException = reconnectOnException;
573    }
574
575    /**
576     * Called by any superclass that implements a JNDIReferencable or similar that needs to collect
577     * the properties of this class for storage etc.
578     *
579     * This method should be updated any time there is a new property added.
580     *
581     * @param props
582     *        a properties object that should be filled in with this objects property values.
583     */
584    protected void populateProperties(Properties props) {
585        props.setProperty("maximumActiveSessionPerConnection", Integer.toString(getMaximumActiveSessionPerConnection()));
586        props.setProperty("maxConnections", Integer.toString(getMaxConnections()));
587        props.setProperty("idleTimeout", Integer.toString(getIdleTimeout()));
588        props.setProperty("expiryTimeout", Long.toString(getExpiryTimeout()));
589        props.setProperty("timeBetweenExpirationCheckMillis", Long.toString(getTimeBetweenExpirationCheckMillis()));
590        props.setProperty("createConnectionOnStartup", Boolean.toString(isCreateConnectionOnStartup()));
591        props.setProperty("useAnonymousProducers", Boolean.toString(isUseAnonymousProducers()));
592        props.setProperty("blockIfSessionPoolIsFullTimeout", Long.toString(getBlockIfSessionPoolIsFullTimeout()));
593        props.setProperty("reconnectOnException", Boolean.toString(isReconnectOnException()));
594    }
595}