001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.jms.pool; 018 019import java.io.Serializable; 020import java.util.Iterator; 021import java.util.concurrent.CopyOnWriteArrayList; 022import java.util.concurrent.atomic.AtomicBoolean; 023 024import javax.jms.BytesMessage; 025import javax.jms.Destination; 026import javax.jms.JMSException; 027import javax.jms.MapMessage; 028import javax.jms.Message; 029import javax.jms.MessageConsumer; 030import javax.jms.MessageListener; 031import javax.jms.MessageProducer; 032import javax.jms.ObjectMessage; 033import javax.jms.Queue; 034import javax.jms.QueueBrowser; 035import javax.jms.QueueReceiver; 036import javax.jms.QueueSender; 037import javax.jms.QueueSession; 038import javax.jms.Session; 039import javax.jms.StreamMessage; 040import javax.jms.TemporaryQueue; 041import javax.jms.TemporaryTopic; 042import javax.jms.TextMessage; 043import javax.jms.Topic; 044import javax.jms.TopicPublisher; 045import javax.jms.TopicSession; 046import javax.jms.TopicSubscriber; 047import javax.jms.XASession; 048import javax.transaction.xa.XAResource; 049 050import org.apache.commons.pool.KeyedObjectPool; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054public class PooledSession implements Session, TopicSession, QueueSession, XASession { 055 private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class); 056 057 private final SessionKey key; 058 private final KeyedObjectPool<SessionKey, SessionHolder> sessionPool; 059 private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>(); 060 private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>(); 061 private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>(); 062 private final AtomicBoolean closed = new AtomicBoolean(); 063 064 private SessionHolder sessionHolder; 065 private boolean transactional = true; 066 private boolean ignoreClose; 067 private boolean isXa; 068 private boolean useAnonymousProducers = true; 069 070 public PooledSession(SessionKey key, SessionHolder sessionHolder, KeyedObjectPool<SessionKey, SessionHolder> sessionPool, boolean transactional, boolean anonymous) { 071 this.key = key; 072 this.sessionHolder = sessionHolder; 073 this.sessionPool = sessionPool; 074 this.transactional = transactional; 075 this.useAnonymousProducers = anonymous; 076 } 077 078 public void addSessionEventListener(PooledSessionEventListener listener) { 079 // only add if really needed 080 if (!sessionEventListeners.contains(listener)) { 081 this.sessionEventListeners.add(listener); 082 } 083 } 084 085 protected boolean isIgnoreClose() { 086 return ignoreClose; 087 } 088 089 protected void setIgnoreClose(boolean ignoreClose) { 090 this.ignoreClose = ignoreClose; 091 } 092 093 @Override 094 public void close() throws JMSException { 095 if (ignoreClose) { 096 return; 097 } 098 099 if (closed.compareAndSet(false, true)) { 100 boolean invalidate = false; 101 try { 102 // lets reset the session 103 getInternalSession().setMessageListener(null); 104 105 // Close any consumers and browsers that may have been created. 106 for (Iterator<MessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 107 MessageConsumer consumer = iter.next(); 108 consumer.close(); 109 } 110 111 for (Iterator<QueueBrowser> iter = browsers.iterator(); iter.hasNext();) { 112 QueueBrowser browser = iter.next(); 113 browser.close(); 114 } 115 116 if (transactional && !isXa) { 117 try { 118 getInternalSession().rollback(); 119 } catch (JMSException e) { 120 invalidate = true; 121 LOG.warn("Caught exception trying rollback() when putting session back into the pool, will invalidate. " + e, e); 122 } 123 } 124 } catch (JMSException ex) { 125 invalidate = true; 126 LOG.warn("Caught exception trying close() when putting session back into the pool, will invalidate. " + ex, ex); 127 } finally { 128 consumers.clear(); 129 browsers.clear(); 130 for (PooledSessionEventListener listener : this.sessionEventListeners) { 131 listener.onSessionClosed(this); 132 } 133 sessionEventListeners.clear(); 134 } 135 136 if (invalidate) { 137 // lets close the session and not put the session back into the pool 138 // instead invalidate it so the pool can create a new one on demand. 139 if (sessionHolder != null) { 140 try { 141 sessionHolder.close(); 142 } catch (JMSException e1) { 143 LOG.trace("Ignoring exception on close as discarding session: " + e1, e1); 144 } 145 } 146 try { 147 sessionPool.invalidateObject(key, sessionHolder); 148 } catch (Exception e) { 149 LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e); 150 } 151 } else { 152 try { 153 sessionPool.returnObject(key, sessionHolder); 154 } catch (Exception e) { 155 javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString()); 156 illegalStateException.initCause(e); 157 throw illegalStateException; 158 } 159 } 160 161 sessionHolder = null; 162 } 163 } 164 165 @Override 166 public void commit() throws JMSException { 167 getInternalSession().commit(); 168 } 169 170 @Override 171 public BytesMessage createBytesMessage() throws JMSException { 172 return getInternalSession().createBytesMessage(); 173 } 174 175 @Override 176 public MapMessage createMapMessage() throws JMSException { 177 return getInternalSession().createMapMessage(); 178 } 179 180 @Override 181 public Message createMessage() throws JMSException { 182 return getInternalSession().createMessage(); 183 } 184 185 @Override 186 public ObjectMessage createObjectMessage() throws JMSException { 187 return getInternalSession().createObjectMessage(); 188 } 189 190 @Override 191 public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException { 192 return getInternalSession().createObjectMessage(serializable); 193 } 194 195 @Override 196 public Queue createQueue(String s) throws JMSException { 197 return getInternalSession().createQueue(s); 198 } 199 200 @Override 201 public StreamMessage createStreamMessage() throws JMSException { 202 return getInternalSession().createStreamMessage(); 203 } 204 205 @Override 206 public TemporaryQueue createTemporaryQueue() throws JMSException { 207 TemporaryQueue result; 208 209 result = getInternalSession().createTemporaryQueue(); 210 211 // Notify all of the listeners of the created temporary Queue. 212 for (PooledSessionEventListener listener : this.sessionEventListeners) { 213 listener.onTemporaryQueueCreate(result); 214 } 215 216 return result; 217 } 218 219 @Override 220 public TemporaryTopic createTemporaryTopic() throws JMSException { 221 TemporaryTopic result; 222 223 result = getInternalSession().createTemporaryTopic(); 224 225 // Notify all of the listeners of the created temporary Topic. 226 for (PooledSessionEventListener listener : this.sessionEventListeners) { 227 listener.onTemporaryTopicCreate(result); 228 } 229 230 return result; 231 } 232 233 @Override 234 public void unsubscribe(String s) throws JMSException { 235 getInternalSession().unsubscribe(s); 236 } 237 238 @Override 239 public TextMessage createTextMessage() throws JMSException { 240 return getInternalSession().createTextMessage(); 241 } 242 243 @Override 244 public TextMessage createTextMessage(String s) throws JMSException { 245 return getInternalSession().createTextMessage(s); 246 } 247 248 @Override 249 public Topic createTopic(String s) throws JMSException { 250 return getInternalSession().createTopic(s); 251 } 252 253 @Override 254 public int getAcknowledgeMode() throws JMSException { 255 return getInternalSession().getAcknowledgeMode(); 256 } 257 258 @Override 259 public boolean getTransacted() throws JMSException { 260 return getInternalSession().getTransacted(); 261 } 262 263 @Override 264 public void recover() throws JMSException { 265 getInternalSession().recover(); 266 } 267 268 @Override 269 public void rollback() throws JMSException { 270 getInternalSession().rollback(); 271 } 272 273 @Override 274 public XAResource getXAResource() { 275 SessionHolder session = safeGetSessionHolder(); 276 277 if (session.getSession() instanceof XASession) { 278 return ((XASession) session.getSession()).getXAResource(); 279 } 280 281 return null; 282 } 283 284 @Override 285 public Session getSession() { 286 return this; 287 } 288 289 @Override 290 public void run() { 291 SessionHolder session = safeGetSessionHolder(); 292 if (session != null) { 293 session.getSession().run(); 294 } 295 } 296 297 // Consumer related methods 298 // ------------------------------------------------------------------------- 299 @Override 300 public QueueBrowser createBrowser(Queue queue) throws JMSException { 301 return addQueueBrowser(getInternalSession().createBrowser(queue)); 302 } 303 304 @Override 305 public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException { 306 return addQueueBrowser(getInternalSession().createBrowser(queue, selector)); 307 } 308 309 @Override 310 public MessageConsumer createConsumer(Destination destination) throws JMSException { 311 return addConsumer(getInternalSession().createConsumer(destination)); 312 } 313 314 @Override 315 public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException { 316 return addConsumer(getInternalSession().createConsumer(destination, selector)); 317 } 318 319 @Override 320 public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException { 321 return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal)); 322 } 323 324 @Override 325 public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException { 326 return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector)); 327 } 328 329 @Override 330 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException { 331 return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal)); 332 } 333 334 @Override 335 public MessageListener getMessageListener() throws JMSException { 336 return getInternalSession().getMessageListener(); 337 } 338 339 @Override 340 public void setMessageListener(MessageListener messageListener) throws JMSException { 341 getInternalSession().setMessageListener(messageListener); 342 } 343 344 @Override 345 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 346 return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic)); 347 } 348 349 @Override 350 public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException { 351 return addTopicSubscriber(((TopicSession) getInternalSession()).createSubscriber(topic, selector, local)); 352 } 353 354 @Override 355 public QueueReceiver createReceiver(Queue queue) throws JMSException { 356 return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue)); 357 } 358 359 @Override 360 public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException { 361 return addQueueReceiver(((QueueSession) getInternalSession()).createReceiver(queue, selector)); 362 } 363 364 // Producer related methods 365 // ------------------------------------------------------------------------- 366 @Override 367 public MessageProducer createProducer(Destination destination) throws JMSException { 368 return new PooledProducer(getMessageProducer(destination), destination); 369 } 370 371 @Override 372 public QueueSender createSender(Queue queue) throws JMSException { 373 return new PooledQueueSender(getQueueSender(queue), queue); 374 } 375 376 @Override 377 public TopicPublisher createPublisher(Topic topic) throws JMSException { 378 return new PooledTopicPublisher(getTopicPublisher(topic), topic); 379 } 380 381 public Session getInternalSession() throws IllegalStateException { 382 return safeGetSessionHolder().getSession(); 383 } 384 385 public MessageProducer getMessageProducer() throws JMSException { 386 return getMessageProducer(null); 387 } 388 389 public MessageProducer getMessageProducer(Destination destination) throws JMSException { 390 MessageProducer result = null; 391 392 if (useAnonymousProducers) { 393 result = safeGetSessionHolder().getOrCreateProducer(); 394 } else { 395 result = getInternalSession().createProducer(destination); 396 } 397 398 return result; 399 } 400 401 public QueueSender getQueueSender() throws JMSException { 402 return getQueueSender(null); 403 } 404 405 public QueueSender getQueueSender(Queue destination) throws JMSException { 406 QueueSender result = null; 407 408 if (useAnonymousProducers) { 409 result = safeGetSessionHolder().getOrCreateSender(); 410 } else { 411 result = ((QueueSession) getInternalSession()).createSender(destination); 412 } 413 414 return result; 415 } 416 417 public TopicPublisher getTopicPublisher() throws JMSException { 418 return getTopicPublisher(null); 419 } 420 421 public TopicPublisher getTopicPublisher(Topic destination) throws JMSException { 422 TopicPublisher result = null; 423 424 if (useAnonymousProducers) { 425 result = safeGetSessionHolder().getOrCreatePublisher(); 426 } else { 427 result = ((TopicSession) getInternalSession()).createPublisher(destination); 428 } 429 430 return result; 431 } 432 433 private QueueBrowser addQueueBrowser(QueueBrowser browser) { 434 browsers.add(browser); 435 return browser; 436 } 437 438 private MessageConsumer addConsumer(MessageConsumer consumer) { 439 consumers.add(consumer); 440 // must wrap in PooledMessageConsumer to ensure the onConsumerClose 441 // method is invoked when the returned consumer is closed, to avoid memory 442 // leak in this session class in case many consumers is created 443 return new PooledMessageConsumer(this, consumer); 444 } 445 446 private TopicSubscriber addTopicSubscriber(TopicSubscriber subscriber) { 447 consumers.add(subscriber); 448 return subscriber; 449 } 450 451 private QueueReceiver addQueueReceiver(QueueReceiver receiver) { 452 consumers.add(receiver); 453 return receiver; 454 } 455 456 public void setIsXa(boolean isXa) { 457 this.isXa = isXa; 458 } 459 460 @Override 461 public String toString() { 462 return "PooledSession { " + safeGetSessionHolder() + " }"; 463 } 464 465 /** 466 * Callback invoked when the consumer is closed. 467 * <p/> 468 * This is used to keep track of an explicit closed consumer created by this 469 * session, by which we know do not need to keep track of the consumer, as 470 * its already closed. 471 * 472 * @param consumer 473 * the consumer which is being closed 474 */ 475 protected void onConsumerClose(MessageConsumer consumer) { 476 consumers.remove(consumer); 477 } 478 479 private SessionHolder safeGetSessionHolder() { 480 SessionHolder sessionHolder = this.sessionHolder; 481 if (sessionHolder == null) { 482 throw new IllegalStateException("The session has already been closed"); 483 } 484 485 return sessionHolder; 486 } 487}