001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.jms.pool;
018
019import java.io.Serializable;
020import java.util.Iterator;
021import java.util.concurrent.CopyOnWriteArrayList;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024import javax.jms.BytesMessage;
025import javax.jms.Destination;
026import javax.jms.JMSException;
027import javax.jms.MapMessage;
028import javax.jms.Message;
029import javax.jms.MessageConsumer;
030import javax.jms.MessageListener;
031import javax.jms.MessageProducer;
032import javax.jms.ObjectMessage;
033import javax.jms.Queue;
034import javax.jms.QueueBrowser;
035import javax.jms.QueueReceiver;
036import javax.jms.QueueSender;
037import javax.jms.QueueSession;
038import javax.jms.Session;
039import javax.jms.StreamMessage;
040import javax.jms.TemporaryQueue;
041import javax.jms.TemporaryTopic;
042import javax.jms.TextMessage;
043import javax.jms.Topic;
044import javax.jms.TopicPublisher;
045import javax.jms.TopicSession;
046import javax.jms.TopicSubscriber;
047import javax.jms.XASession;
048import javax.transaction.xa.XAResource;
049
050import org.apache.commons.pool.KeyedObjectPool;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054public class PooledSession implements Session, TopicSession, QueueSession, XASession {
055    private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
056
057    private final SessionKey key;
058    private final KeyedObjectPool<SessionKey, SessionHolder> sessionPool;
059    private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
060    private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
061    private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>();
062    private final AtomicBoolean closed = new AtomicBoolean();
063
064    private SessionHolder sessionHolder;
065    private boolean transactional = true;
066    private boolean ignoreClose;
067    private boolean isXa;
068    private boolean useAnonymousProducers = true;
069
070    public PooledSession(SessionKey key, SessionHolder sessionHolder, KeyedObjectPool<SessionKey, SessionHolder> sessionPool, boolean transactional, boolean anonymous) {
071        this.key = key;
072        this.sessionHolder = sessionHolder;
073        this.sessionPool = sessionPool;
074        this.transactional = transactional;
075        this.useAnonymousProducers = anonymous;
076    }
077
078    public void addSessionEventListener(PooledSessionEventListener listener) {
079        // only add if really needed
080        if (!sessionEventListeners.contains(listener)) {
081            this.sessionEventListeners.add(listener);
082        }
083    }
084
085    protected boolean isIgnoreClose() {
086        return ignoreClose;
087    }
088
089    protected void setIgnoreClose(boolean ignoreClose) {
090        this.ignoreClose = ignoreClose;
091    }
092
093    @Override
094    public void close() throws JMSException {
095        if (ignoreClose) {
096            return;
097        }
098
099        if (closed.compareAndSet(false, true)) {
100            boolean invalidate = false;
101            try {
102                // lets reset the session
103                getInternalSession().setMessageListener(null);
104
105                // Close any consumers and browsers that may have been created.
106                for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
107                    MessageConsumer consumer = iter.next();
108                    consumer.close();
109                }
110
111                for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) {
112                    QueueBrowser browser = iter.next();
113                    browser.close();
114                }
115
116                if (transactional && !isXa) {
117                    try {
118                        getInternalSession().rollback();
119                    } catch (JMSException e) {
120                        invalidate = true;
121                        LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e);
122                    }
123                }
124            } catch (JMSException ex) {
125                invalidate = true;
126                LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex);
127            } finally {
128                consumers.clear();
129                browsers.clear();
130                for (PooledSessionEventListener listener : this.sessionEventListeners) {
131                    listener.onSessionClosed(this);
132                }
133                sessionEventListeners.clear();
134            }
135
136            if (invalidate) {
137                // lets close the session and not put the session back into the pool
138                // instead invalidate it so the pool can create a new one on demand.
139                if (sessionHolder != null) {
140                    try {
141                        sessionHolder.close();
142                    } catch (JMSException e1) {
143                        LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
144                    }
145                }
146                try {
147                    sessionPool.invalidateObject(key, sessionHolder);
148                } catch (Exception e) {
149                    LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
150                }
151            } else {
152                try {
153                    sessionPool.returnObject(key, sessionHolder);
154                } catch (Exception e) {
155                    javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
156                    illegalStateException.initCause(e);
157                    throw illegalStateException;
158                }
159            }
160
161            sessionHolder = null;
162        }
163    }
164
165    @Override
166    public void commit() throws JMSException {
167        getInternalSession().commit();
168    }
169
170    @Override
171    public BytesMessage createBytesMessage() throws JMSException {
172        return getInternalSession().createBytesMessage();
173    }
174
175    @Override
176    public MapMessage createMapMessage() throws JMSException {
177        return getInternalSession().createMapMessage();
178    }
179
180    @Override
181    public Message createMessage() throws JMSException {
182        return getInternalSession().createMessage();
183    }
184
185    @Override
186    public ObjectMessage createObjectMessage() throws JMSException {
187        return getInternalSession().createObjectMessage();
188    }
189
190    @Override
191    public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
192        return getInternalSession().createObjectMessage(serializable);
193    }
194
195    @Override
196    public Queue createQueue(String s) throws JMSException {
197        return getInternalSession().createQueue(s);
198    }
199
200    @Override
201    public StreamMessage createStreamMessage() throws JMSException {
202        return getInternalSession().createStreamMessage();
203    }
204
205    @Override
206    public TemporaryQueue createTemporaryQueue() throws JMSException {
207        TemporaryQueue result;
208
209        result = getInternalSession().createTemporaryQueue();
210
211        // Notify all of the listeners of the created temporary Queue.
212        for (PooledSessionEventListener listener : this.sessionEventListeners) {
213            listener.onTemporaryQueueCreate(result);
214        }
215
216        return result;
217    }
218
219    @Override
220    public TemporaryTopic createTemporaryTopic() throws JMSException {
221        TemporaryTopic result;
222
223        result = getInternalSession().createTemporaryTopic();
224
225        // Notify all of the listeners of the created temporary Topic.
226        for (PooledSessionEventListener listener : this.sessionEventListeners) {
227            listener.onTemporaryTopicCreate(result);
228        }
229
230        return result;
231    }
232
233    @Override
234    public void unsubscribe(String s) throws JMSException {
235        getInternalSession().unsubscribe(s);
236    }
237
238    @Override
239    public TextMessage createTextMessage() throws JMSException {
240        return getInternalSession().createTextMessage();
241    }
242
243    @Override
244    public TextMessage createTextMessage(String s) throws JMSException {
245        return getInternalSession().createTextMessage(s);
246    }
247
248    @Override
249    public Topic createTopic(String s) throws JMSException {
250        return getInternalSession().createTopic(s);
251    }
252
253    @Override
254    public int getAcknowledgeMode() throws JMSException {
255        return getInternalSession().getAcknowledgeMode();
256    }
257
258    @Override
259    public boolean getTransacted() throws JMSException {
260        return getInternalSession().getTransacted();
261    }
262
263    @Override
264    public void recover() throws JMSException {
265        getInternalSession().recover();
266    }
267
268    @Override
269    public void rollback() throws JMSException {
270        getInternalSession().rollback();
271    }
272
273    @Override
274    public XAResource getXAResource() {
275        SessionHolder session = safeGetSessionHolder();
276
277        if (session.getSession() instanceof XASession) {
278            return ((XASession) session.getSession()).getXAResource();
279        }
280
281        return null;
282    }
283
284    @Override
285    public Session getSession() {
286        return this;
287    }
288
289    @Override
290    public void run() {
291        SessionHolder session = safeGetSessionHolder();
292        if (session != null) {
293            session.getSession().run();
294        }
295    }
296
297    // Consumer related methods
298    // -------------------------------------------------------------------------
299    @Override
300    public QueueBrowser createBrowser(Queue queue) throws JMSException {
301        return addQueueBrowser(getInternalSession().createBrowser(queue));
302    }
303
304    @Override
305    public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
306        return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
307    }
308
309    @Override
310    public MessageConsumer createConsumer(Destination destination) throws JMSException {
311        return addConsumer(getInternalSession().createConsumer(destination));
312    }
313
314    @Override
315    public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
316        return addConsumer(getInternalSession().createConsumer(destination, selector));
317    }
318
319    @Override
320    public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
321        return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
322    }
323
324    @Override
325    public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
326        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
327    }
328
329    @Override
330    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
331        return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
332    }
333
334    @Override
335    public MessageListener getMessageListener() throws JMSException {
336        return getInternalSession().getMessageListener();
337    }
338
339    @Override
340    public void setMessageListener(MessageListener messageListener) throws JMSException {
341        getInternalSession().setMessageListener(messageListener);
342    }
343
344    @Override
345    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
346        return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic));
347    }
348
349    @Override
350    public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
351        return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local));
352    }
353
354    @Override
355    public QueueReceiver createReceiver(Queue queue) throws JMSException {
356        return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue));
357    }
358
359    @Override
360    public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
361        return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector));
362    }
363
364    // Producer related methods
365    // -------------------------------------------------------------------------
366    @Override
367    public MessageProducer createProducer(Destination destination) throws JMSException {
368        return new PooledProducer(getMessageProducer(destination), destination);
369    }
370
371    @Override
372    public QueueSender createSender(Queue queue) throws JMSException {
373        return new PooledQueueSender(getQueueSender(queue), queue);
374    }
375
376    @Override
377    public TopicPublisher createPublisher(Topic topic) throws JMSException {
378        return new PooledTopicPublisher(getTopicPublisher(topic), topic);
379    }
380
381    public Session getInternalSession() throws IllegalStateException {
382        return safeGetSessionHolder().getSession();
383    }
384
385    public MessageProducer getMessageProducer() throws JMSException {
386        return getMessageProducer(null);
387    }
388
389    public MessageProducer getMessageProducer(Destination destination) throws JMSException {
390        MessageProducer result = null;
391
392        if (useAnonymousProducers) {
393            result = safeGetSessionHolder().getOrCreateProducer();
394        } else {
395            result = getInternalSession().createProducer(destination);
396        }
397
398        return result;
399    }
400
401    public QueueSender getQueueSender() throws JMSException {
402        return getQueueSender(null);
403    }
404
405    public QueueSender getQueueSender(Queue destination) throws JMSException {
406        QueueSender result = null;
407
408        if (useAnonymousProducers) {
409            result = safeGetSessionHolder().getOrCreateSender();
410        } else {
411            result = ((QueueSession) getInternalSession()).createSender(destination);
412        }
413
414        return result;
415    }
416
417    public TopicPublisher getTopicPublisher() throws JMSException {
418        return getTopicPublisher(null);
419    }
420
421    public TopicPublisher getTopicPublisher(Topic destination) throws JMSException {
422        TopicPublisher result = null;
423
424        if (useAnonymousProducers) {
425            result = safeGetSessionHolder().getOrCreatePublisher();
426        } else {
427            result = ((TopicSession) getInternalSession()).createPublisher(destination);
428        }
429
430        return result;
431    }
432
433    private QueueBrowser addQueueBrowser(QueueBrowser browser) {
434        browsers.add(browser);
435        return browser;
436    }
437
438    private MessageConsumer addConsumer(MessageConsumer consumer) {
439        consumers.add(consumer);
440        // must wrap in PooledMessageConsumer to ensure the onConsumerClose
441        // method is invoked when the returned consumer is closed, to avoid memory
442        // leak in this session class in case many consumers is created
443        return new PooledMessageConsumer(this, consumer);
444    }
445
446    private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) {
447        consumers.add(subscriber);
448        return subscriber;
449    }
450
451    private QueueReceiver addQueueReceiver(QueueReceiver receiver) {
452        consumers.add(receiver);
453        return receiver;
454    }
455
456    public void setIsXa(boolean isXa) {
457        this.isXa = isXa;
458    }
459
460    @Override
461    public String toString() {
462        return "PooledSession { " + safeGetSessionHolder() + " }";
463    }
464
465    /**
466     * Callback invoked when the consumer is closed.
467     * <p/>
468     * This is used to keep track of an explicit closed consumer created by this
469     * session, by which we know do not need to keep track of the consumer, as
470     * its already closed.
471     *
472     * @param consumer
473     *            the consumer which is being closed
474     */
475    protected void onConsumerClose(MessageConsumer consumer) {
476        consumers.remove(consumer);
477    }
478
479    private SessionHolder safeGetSessionHolder() {
480        SessionHolder sessionHolder = this.sessionHolder;
481        if (sessionHolder == null) {
482            throw new IllegalStateException("The session has already been closed");
483        }
484
485        return sessionHolder;
486    }
487}