001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.File; 020import java.io.InputStream; 021import java.io.Serializable; 022import java.net.URL; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.List; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import javax.jms.BytesMessage; 032import javax.jms.Destination; 033import javax.jms.IllegalStateException; 034import javax.jms.InvalidDestinationException; 035import javax.jms.InvalidSelectorException; 036import javax.jms.JMSException; 037import javax.jms.MapMessage; 038import javax.jms.Message; 039import javax.jms.MessageConsumer; 040import javax.jms.MessageListener; 041import javax.jms.MessageProducer; 042import javax.jms.ObjectMessage; 043import javax.jms.Queue; 044import javax.jms.QueueBrowser; 045import javax.jms.QueueReceiver; 046import javax.jms.QueueSender; 047import javax.jms.QueueSession; 048import javax.jms.Session; 049import javax.jms.StreamMessage; 050import javax.jms.TemporaryQueue; 051import javax.jms.TemporaryTopic; 052import javax.jms.TextMessage; 053import javax.jms.Topic; 054import javax.jms.TopicPublisher; 055import javax.jms.TopicSession; 056import javax.jms.TopicSubscriber; 057import javax.jms.TransactionRolledBackException; 058 059import org.apache.activemq.blob.BlobDownloader; 060import org.apache.activemq.blob.BlobTransferPolicy; 061import org.apache.activemq.blob.BlobUploader; 062import org.apache.activemq.command.ActiveMQBlobMessage; 063import org.apache.activemq.command.ActiveMQBytesMessage; 064import org.apache.activemq.command.ActiveMQDestination; 065import org.apache.activemq.command.ActiveMQMapMessage; 066import org.apache.activemq.command.ActiveMQMessage; 067import org.apache.activemq.command.ActiveMQObjectMessage; 068import org.apache.activemq.command.ActiveMQQueue; 069import org.apache.activemq.command.ActiveMQStreamMessage; 070import org.apache.activemq.command.ActiveMQTempDestination; 071import org.apache.activemq.command.ActiveMQTempQueue; 072import org.apache.activemq.command.ActiveMQTempTopic; 073import org.apache.activemq.command.ActiveMQTextMessage; 074import org.apache.activemq.command.ActiveMQTopic; 075import org.apache.activemq.command.Command; 076import org.apache.activemq.command.CommandTypes; 077import org.apache.activemq.command.ConsumerId; 078import org.apache.activemq.command.MessageAck; 079import org.apache.activemq.command.MessageDispatch; 080import org.apache.activemq.command.MessageId; 081import org.apache.activemq.command.ProducerId; 082import org.apache.activemq.command.RemoveInfo; 083import org.apache.activemq.command.Response; 084import org.apache.activemq.command.SessionId; 085import org.apache.activemq.command.SessionInfo; 086import org.apache.activemq.command.TransactionId; 087import org.apache.activemq.management.JMSSessionStatsImpl; 088import org.apache.activemq.management.StatsCapable; 089import org.apache.activemq.management.StatsImpl; 090import org.apache.activemq.thread.Scheduler; 091import org.apache.activemq.transaction.Synchronization; 092import org.apache.activemq.usage.MemoryUsage; 093import org.apache.activemq.util.Callback; 094import org.apache.activemq.util.LongSequenceGenerator; 095import org.slf4j.Logger; 096import org.slf4j.LoggerFactory; 097 098/** 099 * <P> 100 * A <CODE>Session</CODE> object is a single-threaded context for producing 101 * and consuming messages. Although it may allocate provider resources outside 102 * the Java virtual machine (JVM), it is considered a lightweight JMS object. 103 * <P> 104 * A session serves several purposes: 105 * <UL> 106 * <LI>It is a factory for its message producers and consumers. 107 * <LI>It supplies provider-optimized message factories. 108 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and 109 * <CODE>TemporaryQueues</CODE>. 110 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> 111 * objects for those clients that need to dynamically manipulate 112 * provider-specific destination names. 113 * <LI>It supports a single series of transactions that combine work spanning 114 * its producers and consumers into atomic units. 115 * <LI>It defines a serial order for the messages it consumes and the messages 116 * it produces. 117 * <LI>It retains messages it consumes until they have been acknowledged. 118 * <LI>It serializes execution of message listeners registered with its message 119 * consumers. 120 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. 121 * </UL> 122 * <P> 123 * A session can create and service multiple message producers and consumers. 124 * <P> 125 * One typical use is to have a thread block on a synchronous 126 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then 127 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. 128 * <P> 129 * If a client desires to have one thread produce messages while others consume 130 * them, the client should use a separate session for its producing thread. 131 * <P> 132 * Once a connection has been started, any session with one or more registered 133 * message listeners is dedicated to the thread of control that delivers 134 * messages to it. It is erroneous for client code to use this session or any of 135 * its constituent objects from another thread of control. The only exception to 136 * this rule is the use of the session or connection <CODE>close</CODE> 137 * method. 138 * <P> 139 * It should be easy for most clients to partition their work naturally into 140 * sessions. This model allows clients to start simply and incrementally add 141 * message processing complexity as their need for concurrency grows. 142 * <P> 143 * The <CODE>close</CODE> method is the only session method that can be called 144 * while some other session method is being executed in another thread. 145 * <P> 146 * A session may be specified as transacted. Each transacted session supports a 147 * single series of transactions. Each transaction groups a set of message sends 148 * and a set of message receives into an atomic unit of work. In effect, 149 * transactions organize a session's input message stream and output message 150 * stream into series of atomic units. When a transaction commits, its atomic 151 * unit of input is acknowledged and its associated atomic unit of output is 152 * sent. If a transaction rollback is done, the transaction's sent messages are 153 * destroyed and the session's input is automatically recovered. 154 * <P> 155 * The content of a transaction's input and output units is simply those 156 * messages that have been produced and consumed within the session's current 157 * transaction. 158 * <P> 159 * A transaction is completed using either its session's <CODE>commit</CODE> 160 * method or its session's <CODE>rollback </CODE> method. The completion of a 161 * session's current transaction automatically begins the next. The result is 162 * that a transacted session always has a current transaction within which its 163 * work is done. 164 * <P> 165 * The Java Transaction Service (JTS) or some other transaction monitor may be 166 * used to combine a session's transaction with transactions on other resources 167 * (databases, other JMS sessions, etc.). Since Java distributed transactions 168 * are controlled via the Java Transaction API (JTA), use of the session's 169 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is 170 * prohibited. 171 * <P> 172 * The JMS API does not require support for JTA; however, it does define how a 173 * provider supplies this support. 174 * <P> 175 * Although it is also possible for a JMS client to handle distributed 176 * transactions directly, it is unlikely that many JMS clients will do this. 177 * Support for JTA in the JMS API is targeted at systems vendors who will be 178 * integrating the JMS API into their application server products. 179 * 180 * 181 * @see javax.jms.Session 182 * @see javax.jms.QueueSession 183 * @see javax.jms.TopicSession 184 * @see javax.jms.XASession 185 */ 186public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { 187 188 /** 189 * Only acknowledge an individual message - using message.acknowledge() 190 * as opposed to CLIENT_ACKNOWLEDGE which 191 * acknowledges all messages consumed by a session at when acknowledge() 192 * is called 193 */ 194 public static final int INDIVIDUAL_ACKNOWLEDGE = 4; 195 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; 196 197 public static interface DeliveryListener { 198 void beforeDelivery(ActiveMQSession session, Message msg); 199 200 void afterDelivery(ActiveMQSession session, Message msg); 201 } 202 203 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); 204 private final ThreadPoolExecutor connectionExecutor; 205 206 protected int acknowledgementMode; 207 protected final ActiveMQConnection connection; 208 protected final SessionInfo info; 209 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 210 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 211 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); 212 protected final ActiveMQSessionExecutor executor; 213 protected final AtomicBoolean started = new AtomicBoolean(false); 214 215 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>(); 216 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>(); 217 218 protected boolean closed; 219 private volatile boolean synchronizationRegistered; 220 protected boolean asyncDispatch; 221 protected boolean sessionAsyncDispatch; 222 protected final boolean debug; 223 protected final Object sendMutex = new Object(); 224 protected final Object redeliveryGuard = new Object(); 225 226 private final AtomicBoolean clearInProgress = new AtomicBoolean(); 227 228 private MessageListener messageListener; 229 private final JMSSessionStatsImpl stats; 230 private TransactionContext transactionContext; 231 private DeliveryListener deliveryListener; 232 private MessageTransformer transformer; 233 private BlobTransferPolicy blobTransferPolicy; 234 private long lastDeliveredSequenceId = -2; 235 236 /** 237 * Construct the Session 238 * 239 * @param connection 240 * @param sessionId 241 * @param acknowledgeMode n.b if transacted - the acknowledgeMode == 242 * Session.SESSION_TRANSACTED 243 * @param asyncDispatch 244 * @param sessionAsyncDispatch 245 * @throws JMSException on internal error 246 */ 247 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { 248 this.debug = LOG.isDebugEnabled(); 249 this.connection = connection; 250 this.acknowledgementMode = acknowledgeMode; 251 this.asyncDispatch = asyncDispatch; 252 this.sessionAsyncDispatch = sessionAsyncDispatch; 253 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); 254 setTransactionContext(new TransactionContext(connection)); 255 stats = new JMSSessionStatsImpl(producers, consumers); 256 this.connection.asyncSendPacket(info); 257 setTransformer(connection.getTransformer()); 258 setBlobTransferPolicy(connection.getBlobTransferPolicy()); 259 this.connectionExecutor=connection.getExecutor(); 260 this.executor = new ActiveMQSessionExecutor(this); 261 connection.addSession(this); 262 if (connection.isStarted()) { 263 start(); 264 } 265 266 } 267 268 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException { 269 this(connection, sessionId, acknowledgeMode, asyncDispatch, true); 270 } 271 272 /** 273 * Sets the transaction context of the session. 274 * 275 * @param transactionContext - provides the means to control a JMS 276 * transaction. 277 */ 278 public void setTransactionContext(TransactionContext transactionContext) { 279 this.transactionContext = transactionContext; 280 } 281 282 /** 283 * Returns the transaction context of the session. 284 * 285 * @return transactionContext - session's transaction context. 286 */ 287 public TransactionContext getTransactionContext() { 288 return transactionContext; 289 } 290 291 /* 292 * (non-Javadoc) 293 * 294 * @see org.apache.activemq.management.StatsCapable#getStats() 295 */ 296 @Override 297 public StatsImpl getStats() { 298 return stats; 299 } 300 301 /** 302 * Returns the session's statistics. 303 * 304 * @return stats - session's statistics. 305 */ 306 public JMSSessionStatsImpl getSessionStats() { 307 return stats; 308 } 309 310 /** 311 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> 312 * object is used to send a message containing a stream of uninterpreted 313 * bytes. 314 * 315 * @return the an ActiveMQBytesMessage 316 * @throws JMSException if the JMS provider fails to create this message due 317 * to some internal error. 318 */ 319 @Override 320 public BytesMessage createBytesMessage() throws JMSException { 321 ActiveMQBytesMessage message = new ActiveMQBytesMessage(); 322 configureMessage(message); 323 return message; 324 } 325 326 /** 327 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> 328 * object is used to send a self-defining set of name-value pairs, where 329 * names are <CODE>String</CODE> objects and values are primitive values 330 * in the Java programming language. 331 * 332 * @return an ActiveMQMapMessage 333 * @throws JMSException if the JMS provider fails to create this message due 334 * to some internal error. 335 */ 336 @Override 337 public MapMessage createMapMessage() throws JMSException { 338 ActiveMQMapMessage message = new ActiveMQMapMessage(); 339 configureMessage(message); 340 return message; 341 } 342 343 /** 344 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> 345 * interface is the root interface of all JMS messages. A 346 * <CODE>Message</CODE> object holds all the standard message header 347 * information. It can be sent when a message containing only header 348 * information is sufficient. 349 * 350 * @return an ActiveMQMessage 351 * @throws JMSException if the JMS provider fails to create this message due 352 * to some internal error. 353 */ 354 @Override 355 public Message createMessage() throws JMSException { 356 ActiveMQMessage message = new ActiveMQMessage(); 357 configureMessage(message); 358 return message; 359 } 360 361 /** 362 * Creates an <CODE>ObjectMessage</CODE> object. An 363 * <CODE>ObjectMessage</CODE> object is used to send a message that 364 * contains a serializable Java object. 365 * 366 * @return an ActiveMQObjectMessage 367 * @throws JMSException if the JMS provider fails to create this message due 368 * to some internal error. 369 */ 370 @Override 371 public ObjectMessage createObjectMessage() throws JMSException { 372 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 373 configureMessage(message); 374 return message; 375 } 376 377 /** 378 * Creates an initialized <CODE>ObjectMessage</CODE> object. An 379 * <CODE>ObjectMessage</CODE> object is used to send a message that 380 * contains a serializable Java object. 381 * 382 * @param object the object to use to initialize this message 383 * @return an ActiveMQObjectMessage 384 * @throws JMSException if the JMS provider fails to create this message due 385 * to some internal error. 386 */ 387 @Override 388 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 389 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 390 configureMessage(message); 391 message.setObject(object); 392 return message; 393 } 394 395 /** 396 * Creates a <CODE>StreamMessage</CODE> object. A 397 * <CODE>StreamMessage</CODE> object is used to send a self-defining 398 * stream of primitive values in the Java programming language. 399 * 400 * @return an ActiveMQStreamMessage 401 * @throws JMSException if the JMS provider fails to create this message due 402 * to some internal error. 403 */ 404 @Override 405 public StreamMessage createStreamMessage() throws JMSException { 406 ActiveMQStreamMessage message = new ActiveMQStreamMessage(); 407 configureMessage(message); 408 return message; 409 } 410 411 /** 412 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> 413 * object is used to send a message containing a <CODE>String</CODE> 414 * object. 415 * 416 * @return an ActiveMQTextMessage 417 * @throws JMSException if the JMS provider fails to create this message due 418 * to some internal error. 419 */ 420 @Override 421 public TextMessage createTextMessage() throws JMSException { 422 ActiveMQTextMessage message = new ActiveMQTextMessage(); 423 configureMessage(message); 424 return message; 425 } 426 427 /** 428 * Creates an initialized <CODE>TextMessage</CODE> object. A 429 * <CODE>TextMessage</CODE> object is used to send a message containing a 430 * <CODE>String</CODE>. 431 * 432 * @param text the string used to initialize this message 433 * @return an ActiveMQTextMessage 434 * @throws JMSException if the JMS provider fails to create this message due 435 * to some internal error. 436 */ 437 @Override 438 public TextMessage createTextMessage(String text) throws JMSException { 439 ActiveMQTextMessage message = new ActiveMQTextMessage(); 440 message.setText(text); 441 configureMessage(message); 442 return message; 443 } 444 445 /** 446 * Creates an initialized <CODE>BlobMessage</CODE> object. A 447 * <CODE>BlobMessage</CODE> object is used to send a message containing a 448 * <CODE>URL</CODE> which points to some network addressible BLOB. 449 * 450 * @param url the network addressable URL used to pass directly to the 451 * consumer 452 * @return a BlobMessage 453 * @throws JMSException if the JMS provider fails to create this message due 454 * to some internal error. 455 */ 456 public BlobMessage createBlobMessage(URL url) throws JMSException { 457 return createBlobMessage(url, false); 458 } 459 460 /** 461 * Creates an initialized <CODE>BlobMessage</CODE> object. A 462 * <CODE>BlobMessage</CODE> object is used to send a message containing a 463 * <CODE>URL</CODE> which points to some network addressible BLOB. 464 * 465 * @param url the network addressable URL used to pass directly to the 466 * consumer 467 * @param deletedByBroker indicates whether or not the resource is deleted 468 * by the broker when the message is acknowledged 469 * @return a BlobMessage 470 * @throws JMSException if the JMS provider fails to create this message due 471 * to some internal error. 472 */ 473 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { 474 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 475 configureMessage(message); 476 message.setURL(url); 477 message.setDeletedByBroker(deletedByBroker); 478 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 479 return message; 480 } 481 482 /** 483 * Creates an initialized <CODE>BlobMessage</CODE> object. A 484 * <CODE>BlobMessage</CODE> object is used to send a message containing 485 * the <CODE>File</CODE> content. Before the message is sent the file 486 * conent will be uploaded to the broker or some other remote repository 487 * depending on the {@link #getBlobTransferPolicy()}. 488 * 489 * @param file the file to be uploaded to some remote repo (or the broker) 490 * depending on the strategy 491 * @return a BlobMessage 492 * @throws JMSException if the JMS provider fails to create this message due 493 * to some internal error. 494 */ 495 public BlobMessage createBlobMessage(File file) throws JMSException { 496 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 497 configureMessage(message); 498 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); 499 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy()))); 500 message.setDeletedByBroker(true); 501 message.setName(file.getName()); 502 return message; 503 } 504 505 /** 506 * Creates an initialized <CODE>BlobMessage</CODE> object. A 507 * <CODE>BlobMessage</CODE> object is used to send a message containing 508 * the <CODE>File</CODE> content. Before the message is sent the file 509 * conent will be uploaded to the broker or some other remote repository 510 * depending on the {@link #getBlobTransferPolicy()}. <br/> 511 * <p> 512 * The caller of this method is responsible for closing the 513 * input stream that is used, however the stream can not be closed 514 * until <b>after</b> the message has been sent. To have this class 515 * manage the stream and close it automatically, use the method 516 * {@link ActiveMQSession#createBlobMessage(File)} 517 * 518 * @param in the stream to be uploaded to some remote repo (or the broker) 519 * depending on the strategy 520 * @return a BlobMessage 521 * @throws JMSException if the JMS provider fails to create this message due 522 * to some internal error. 523 */ 524 public BlobMessage createBlobMessage(InputStream in) throws JMSException { 525 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 526 configureMessage(message); 527 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); 528 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 529 message.setDeletedByBroker(true); 530 return message; 531 } 532 533 /** 534 * Indicates whether the session is in transacted mode. 535 * 536 * @return true if the session is in transacted mode 537 * @throws JMSException if there is some internal error. 538 */ 539 @Override 540 public boolean getTransacted() throws JMSException { 541 checkClosed(); 542 return isTransacted(); 543 } 544 545 /** 546 * Returns the acknowledgement mode of the session. The acknowledgement mode 547 * is set at the time that the session is created. If the session is 548 * transacted, the acknowledgement mode is ignored. 549 * 550 * @return If the session is not transacted, returns the current 551 * acknowledgement mode for the session. If the session is 552 * transacted, returns SESSION_TRANSACTED. 553 * @throws JMSException 554 * @see javax.jms.Connection#createSession(boolean,int) 555 * @since 1.1 exception JMSException if there is some internal error. 556 */ 557 @Override 558 public int getAcknowledgeMode() throws JMSException { 559 checkClosed(); 560 return this.acknowledgementMode; 561 } 562 563 /** 564 * Commits all messages done in this transaction and releases any locks 565 * currently held. 566 * 567 * @throws JMSException if the JMS provider fails to commit the transaction 568 * due to some internal error. 569 * @throws TransactionRolledBackException if the transaction is rolled back 570 * due to some internal error during commit. 571 * @throws javax.jms.IllegalStateException if the method is not called by a 572 * transacted session. 573 */ 574 @Override 575 public void commit() throws JMSException { 576 checkClosed(); 577 if (!getTransacted()) { 578 throw new javax.jms.IllegalStateException("Not a transacted session"); 579 } 580 if (LOG.isDebugEnabled()) { 581 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); 582 } 583 transactionContext.commit(); 584 } 585 586 /** 587 * Rolls back any messages done in this transaction and releases any locks 588 * currently held. 589 * 590 * @throws JMSException if the JMS provider fails to roll back the 591 * transaction due to some internal error. 592 * @throws javax.jms.IllegalStateException if the method is not called by a 593 * transacted session. 594 */ 595 @Override 596 public void rollback() throws JMSException { 597 checkClosed(); 598 if (!getTransacted()) { 599 throw new javax.jms.IllegalStateException("Not a transacted session"); 600 } 601 if (LOG.isDebugEnabled()) { 602 LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId()); 603 } 604 transactionContext.rollback(); 605 } 606 607 /** 608 * Closes the session. 609 * <P> 610 * Since a provider may allocate some resources on behalf of a session 611 * outside the JVM, clients should close the resources when they are not 612 * needed. Relying on garbage collection to eventually reclaim these 613 * resources may not be timely enough. 614 * <P> 615 * There is no need to close the producers and consumers of a closed 616 * session. 617 * <P> 618 * This call will block until a <CODE>receive</CODE> call or message 619 * listener in progress has completed. A blocked message consumer 620 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session 621 * is closed. 622 * <P> 623 * Closing a transacted session must roll back the transaction in progress. 624 * <P> 625 * This method is the only <CODE>Session</CODE> method that can be called 626 * concurrently. 627 * <P> 628 * Invoking any other <CODE>Session</CODE> method on a closed session must 629 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a 630 * closed session must <I>not </I> throw an exception. 631 * 632 * @throws JMSException if the JMS provider fails to close the session due 633 * to some internal error. 634 */ 635 @Override 636 public void close() throws JMSException { 637 if (!closed) { 638 if (getTransactionContext().isInXATransaction()) { 639 if (!synchronizationRegistered) { 640 synchronizationRegistered = true; 641 getTransactionContext().addSynchronization(new Synchronization() { 642 643 @Override 644 public void afterCommit() throws Exception { 645 doClose(); 646 synchronizationRegistered = false; 647 } 648 649 @Override 650 public void afterRollback() throws Exception { 651 doClose(); 652 synchronizationRegistered = false; 653 } 654 }); 655 } 656 657 } else { 658 doClose(); 659 } 660 } 661 } 662 663 private void doClose() throws JMSException { 664 dispose(); 665 RemoveInfo removeCommand = info.createRemoveCommand(); 666 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 667 connection.asyncSendPacket(removeCommand); 668 } 669 670 final AtomicInteger clearRequestsCounter = new AtomicInteger(0); 671 void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { 672 clearRequestsCounter.incrementAndGet(); 673 executor.clearMessagesInProgress(); 674 // we are called from inside the transport reconnection logic which involves us 675 // clearing all the connections' consumers dispatch and delivered lists. So rather 676 // than trying to grab a mutex (which could be already owned by the message listener 677 // calling the send or an ack) we allow it to complete in a separate thread via the 678 // scheduler and notify us via connection.transportInterruptionProcessingComplete() 679 // 680 // We must be careful though not to allow multiple calls to this method from a 681 // connection that is having issue becoming fully established from causing a large 682 // build up of scheduled tasks to clear the same consumers over and over. 683 if (consumers.isEmpty()) { 684 return; 685 } 686 687 if (clearInProgress.compareAndSet(false, true)) { 688 for (final ActiveMQMessageConsumer consumer : consumers) { 689 consumer.inProgressClearRequired(); 690 transportInterruptionProcessingComplete.incrementAndGet(); 691 try { 692 connection.getScheduler().executeAfterDelay(new Runnable() { 693 @Override 694 public void run() { 695 consumer.clearMessagesInProgress(); 696 }}, 0l); 697 } catch (JMSException e) { 698 connection.onClientInternalException(e); 699 } 700 } 701 702 try { 703 connection.getScheduler().executeAfterDelay(new Runnable() { 704 @Override 705 public void run() { 706 clearInProgress.set(false); 707 }}, 0l); 708 } catch (JMSException e) { 709 connection.onClientInternalException(e); 710 } 711 } 712 } 713 714 void deliverAcks() { 715 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 716 ActiveMQMessageConsumer consumer = iter.next(); 717 consumer.deliverAcks(); 718 } 719 } 720 721 public synchronized void dispose() throws JMSException { 722 if (!closed) { 723 724 try { 725 executor.close(); 726 727 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 728 ActiveMQMessageConsumer consumer = iter.next(); 729 consumer.setFailureError(connection.getFirstFailureError()); 730 consumer.dispose(); 731 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); 732 } 733 consumers.clear(); 734 735 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) { 736 ActiveMQMessageProducer producer = iter.next(); 737 producer.dispose(); 738 } 739 producers.clear(); 740 741 try { 742 if (getTransactionContext().isInLocalTransaction()) { 743 rollback(); 744 } 745 } catch (JMSException e) { 746 } 747 748 } finally { 749 connection.removeSession(this); 750 closed = true; 751 this.transactionContext = null; 752 } 753 } 754 } 755 756 /** 757 * Checks that the session is not closed then configures the message 758 */ 759 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { 760 checkClosed(); 761 message.setConnection(connection); 762 } 763 764 /** 765 * Check if the session is closed. It is used for ensuring that the session 766 * is open before performing various operations. 767 * 768 * @throws IllegalStateException if the Session is closed 769 */ 770 protected void checkClosed() throws IllegalStateException { 771 if (closed) { 772 throw new IllegalStateException("The Session is closed"); 773 } 774 } 775 776 /** 777 * Checks if the session is closed. 778 * 779 * @return true if the session is closed, false otherwise. 780 */ 781 public boolean isClosed() { 782 return closed; 783 } 784 785 /** 786 * Stops message delivery in this session, and restarts message delivery 787 * with the oldest unacknowledged message. 788 * <P> 789 * All consumers deliver messages in a serial order. Acknowledging a 790 * received message automatically acknowledges all messages that have been 791 * delivered to the client. 792 * <P> 793 * Restarting a session causes it to take the following actions: 794 * <UL> 795 * <LI>Stop message delivery 796 * <LI>Mark all messages that might have been delivered but not 797 * acknowledged as "redelivered" 798 * <LI>Restart the delivery sequence including all unacknowledged messages 799 * that had been previously delivered. Redelivered messages do not have to 800 * be delivered in exactly their original delivery order. 801 * </UL> 802 * 803 * @throws JMSException if the JMS provider fails to stop and restart 804 * message delivery due to some internal error. 805 * @throws IllegalStateException if the method is called by a transacted 806 * session. 807 */ 808 @Override 809 public void recover() throws JMSException { 810 811 checkClosed(); 812 if (getTransacted()) { 813 throw new IllegalStateException("This session is transacted"); 814 } 815 816 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 817 ActiveMQMessageConsumer c = iter.next(); 818 c.rollback(); 819 } 820 821 } 822 823 /** 824 * Returns the session's distinguished message listener (optional). 825 * 826 * @return the message listener associated with this session 827 * @throws JMSException if the JMS provider fails to get the message 828 * listener due to an internal error. 829 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) 830 * @see javax.jms.ServerSessionPool 831 * @see javax.jms.ServerSession 832 */ 833 @Override 834 public MessageListener getMessageListener() throws JMSException { 835 checkClosed(); 836 return this.messageListener; 837 } 838 839 /** 840 * Sets the session's distinguished message listener (optional). 841 * <P> 842 * When the distinguished message listener is set, no other form of message 843 * receipt in the session can be used; however, all forms of sending 844 * messages are still supported. 845 * <P> 846 * If this session has been closed, then an {@link IllegalStateException} is 847 * thrown, if trying to set a new listener. However setting the listener 848 * to <tt>null</tt> is allowed, to clear the listener, even if this session 849 * has been closed prior. 850 * <P> 851 * This is an expert facility not used by regular JMS clients. 852 * 853 * @param listener the message listener to associate with this session 854 * @throws JMSException if the JMS provider fails to set the message 855 * listener due to an internal error. 856 * @see javax.jms.Session#getMessageListener() 857 * @see javax.jms.ServerSessionPool 858 * @see javax.jms.ServerSession 859 */ 860 @Override 861 public void setMessageListener(MessageListener listener) throws JMSException { 862 // only check for closed if we set a new listener, as we allow to clear 863 // the listener, such as when an application is shutting down, and is 864 // no longer using a message listener on this session 865 if (listener != null) { 866 checkClosed(); 867 } 868 this.messageListener = listener; 869 870 if (listener != null) { 871 executor.setDispatchedBySessionPool(true); 872 } 873 } 874 875 /** 876 * Optional operation, intended to be used only by Application Servers, not 877 * by ordinary JMS clients. 878 * 879 * @see javax.jms.ServerSession 880 */ 881 @Override 882 public void run() { 883 MessageDispatch messageDispatch; 884 while ((messageDispatch = executor.dequeueNoWait()) != null) { 885 final MessageDispatch md = messageDispatch; 886 887 // subset of org.apache.activemq.ActiveMQMessageConsumer.createActiveMQMessage 888 final ActiveMQMessage message = (ActiveMQMessage)md.getMessage().copy(); 889 if (message.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) { 890 ((ActiveMQBlobMessage)message).setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 891 } 892 if (message.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) { 893 ((ActiveMQObjectMessage)message).setTrustAllPackages(getConnection().isTrustAllPackages()); 894 ((ActiveMQObjectMessage)message).setTrustedPackages(getConnection().getTrustedPackages()); 895 } 896 897 MessageAck earlyAck = null; 898 if (message.isExpired()) { 899 earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); 900 earlyAck.setFirstMessageId(message.getMessageId()); 901 } else if (connection.isDuplicate(ActiveMQSession.this, message)) { 902 LOG.debug("{} got duplicate: {}", this, message.getMessageId()); 903 earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 904 earlyAck.setFirstMessageId(md.getMessage().getMessageId()); 905 earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); 906 } 907 if (earlyAck != null) { 908 try { 909 asyncSendPacket(earlyAck); 910 } catch (Throwable t) { 911 LOG.error("error dispatching ack: {} ", earlyAck, t); 912 connection.onClientInternalException(t); 913 } finally { 914 continue; 915 } 916 } 917 918 if (isClientAcknowledge()||isIndividualAcknowledge()) { 919 message.setAcknowledgeCallback(new Callback() { 920 @Override 921 public void execute() throws Exception { 922 } 923 }); 924 } 925 926 if (deliveryListener != null) { 927 deliveryListener.beforeDelivery(this, message); 928 } 929 930 md.setDeliverySequenceId(getNextDeliveryId()); 931 lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId(); 932 933 final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 934 935 final AtomicBoolean afterDeliveryError = new AtomicBoolean(false); 936 /* 937 * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched. 938 * We dont want the after deliver being called after the redeliver as it may cause some weird stuff. 939 * */ 940 synchronized (redeliveryGuard) { 941 try { 942 ack.setFirstMessageId(md.getMessage().getMessageId()); 943 doStartTransaction(); 944 ack.setTransactionId(getTransactionContext().getTransactionId()); 945 if (ack.getTransactionId() != null) { 946 getTransactionContext().addSynchronization(new Synchronization() { 947 948 final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); 949 950 @Override 951 public void beforeEnd() throws Exception { 952 // validate our consumer so we don't push stale acks that get ignored 953 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 954 LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); 955 throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); 956 } 957 LOG.trace("beforeEnd ack {}", ack); 958 sendAck(ack); 959 } 960 961 @Override 962 public void afterRollback() throws Exception { 963 if (LOG.isTraceEnabled()) { 964 LOG.trace("afterRollback {}", ack, new Throwable("here")); 965 } 966 // ensure we don't filter this as a duplicate 967 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); 968 969 // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect 970 if (clearRequestsCounter.get() > clearRequestCount) { 971 LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); 972 return; 973 } 974 975 // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched 976 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 977 LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); 978 return; 979 } 980 981 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); 982 int redeliveryCounter = md.getMessage().getRedeliveryCounter(); 983 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 984 && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) { 985 // We need to NACK the messages so that they get 986 // sent to the 987 // DLQ. 988 // Acknowledge the last message. 989 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 990 ack.setFirstMessageId(md.getMessage().getMessageId()); 991 ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); 992 LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack); 993 asyncSendPacket(ack); 994 995 } else { 996 997 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); 998 ack.setFirstMessageId(md.getMessage().getMessageId()); 999 asyncSendPacket(ack); 1000 1001 // Figure out how long we should wait to resend 1002 // this message. 1003 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 1004 for (int i = 0; i < redeliveryCounter; i++) { 1005 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 1006 } 1007 1008 /* 1009 * If we are a non blocking delivery then we need to stop the executor to avoid more 1010 * messages being delivered, once the message is redelivered we can restart it. 1011 * */ 1012 if (!connection.isNonBlockingRedelivery()) { 1013 LOG.debug("Blocking session until re-delivery..."); 1014 executor.stop(); 1015 } 1016 1017 connection.getScheduler().executeAfterDelay(new Runnable() { 1018 1019 @Override 1020 public void run() { 1021 /* 1022 * wait for the first delivery to be complete, i.e. after delivery has been called. 1023 * */ 1024 synchronized (redeliveryGuard) { 1025 /* 1026 * If its non blocking then we can just dispatch in a new session. 1027 * */ 1028 if (connection.isNonBlockingRedelivery()) { 1029 ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); 1030 } else { 1031 /* 1032 * If there has been an error thrown during afterDelivery then the 1033 * endpoint will be marked as dead so redelivery will fail (and eventually 1034 * the session marked as stale), in this case we can only call dispatch 1035 * which will create a new session with a new endpoint. 1036 * */ 1037 if (afterDeliveryError.get()) { 1038 ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); 1039 } else { 1040 executor.executeFirst(md); 1041 executor.start(); 1042 } 1043 } 1044 } 1045 } 1046 }, redeliveryDelay); 1047 } 1048 md.getMessage().onMessageRolledBack(); 1049 } 1050 }); 1051 } 1052 1053 LOG.trace("{} onMessage({})", this, message.getMessageId()); 1054 messageListener.onMessage(message); 1055 1056 } catch (Throwable e) { 1057 if (!isClosed()) { 1058 LOG.error("{} error dispatching message: {} ", this, message.getMessageId(), e); 1059 } 1060 1061 if (getTransactionContext() != null && getTransactionContext().isInXATransaction()) { 1062 LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext()); 1063 getTransactionContext().setRollbackOnly(true); 1064 } 1065 1066 // A problem while invoking the MessageListener does not 1067 // in general indicate a problem with the connection to the broker, i.e. 1068 // it will usually be sufficient to let the afterDelivery() method either 1069 // commit or roll back in order to deal with the exception. 1070 // However, we notify any registered client internal exception listener 1071 // of the problem. 1072 connection.onClientInternalException(e); 1073 } finally { 1074 if (ack.getTransactionId() == null) { 1075 try { 1076 asyncSendPacket(ack); 1077 } catch (Throwable e) { 1078 connection.onClientInternalException(e); 1079 } 1080 } 1081 } 1082 1083 if (deliveryListener != null) { 1084 try { 1085 deliveryListener.afterDelivery(this, message); 1086 } catch (Throwable t) { 1087 LOG.debug("Unable to call after delivery", t); 1088 afterDeliveryError.set(true); 1089 throw new RuntimeException(t); 1090 } 1091 } 1092 } 1093 /* 1094 * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway. 1095 * It also needs to be outside the redelivery guard. 1096 * */ 1097 try { 1098 executor.waitForQueueRestart(); 1099 } catch (InterruptedException ex) { 1100 connection.onClientInternalException(ex); 1101 } 1102 } 1103 } 1104 1105 /** 1106 * Creates a <CODE>MessageProducer</CODE> to send messages to the 1107 * specified destination. 1108 * <P> 1109 * A client uses a <CODE>MessageProducer</CODE> object to send messages to 1110 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both 1111 * inherit from <CODE>Destination</CODE>, they can be used in the 1112 * destination parameter to create a <CODE>MessageProducer</CODE> object. 1113 * 1114 * @param destination the <CODE>Destination</CODE> to send to, or null if 1115 * this is a producer which does not have a specified 1116 * destination. 1117 * @return the MessageProducer 1118 * @throws JMSException if the session fails to create a MessageProducer due 1119 * to some internal error. 1120 * @throws InvalidDestinationException if an invalid destination is 1121 * specified. 1122 * @since 1.1 1123 */ 1124 @Override 1125 public MessageProducer createProducer(Destination destination) throws JMSException { 1126 checkClosed(); 1127 if (destination instanceof CustomDestination) { 1128 CustomDestination customDestination = (CustomDestination)destination; 1129 return customDestination.createProducer(this); 1130 } 1131 int timeSendOut = connection.getSendTimeout(); 1132 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut); 1133 } 1134 1135 /** 1136 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1137 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1138 * <CODE>Destination</CODE>, they can be used in the destination 1139 * parameter to create a <CODE>MessageConsumer</CODE>. 1140 * 1141 * @param destination the <CODE>Destination</CODE> to access. 1142 * @return the MessageConsumer 1143 * @throws JMSException if the session fails to create a consumer due to 1144 * some internal error. 1145 * @throws InvalidDestinationException if an invalid destination is 1146 * specified. 1147 * @since 1.1 1148 */ 1149 @Override 1150 public MessageConsumer createConsumer(Destination destination) throws JMSException { 1151 return createConsumer(destination, (String) null); 1152 } 1153 1154 /** 1155 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1156 * using a message selector. Since <CODE> Queue</CODE> and 1157 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1158 * can be used in the destination parameter to create a 1159 * <CODE>MessageConsumer</CODE>. 1160 * <P> 1161 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1162 * that have been sent to a destination. 1163 * 1164 * @param destination the <CODE>Destination</CODE> to access 1165 * @param messageSelector only messages with properties matching the message 1166 * selector expression are delivered. A value of null or an 1167 * empty string indicates that there is no message selector 1168 * for the message consumer. 1169 * @return the MessageConsumer 1170 * @throws JMSException if the session fails to create a MessageConsumer due 1171 * to some internal error. 1172 * @throws InvalidDestinationException if an invalid destination is 1173 * specified. 1174 * @throws InvalidSelectorException if the message selector is invalid. 1175 * @since 1.1 1176 */ 1177 @Override 1178 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 1179 return createConsumer(destination, messageSelector, false); 1180 } 1181 1182 /** 1183 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1184 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1185 * <CODE>Destination</CODE>, they can be used in the destination 1186 * parameter to create a <CODE>MessageConsumer</CODE>. 1187 * 1188 * @param destination the <CODE>Destination</CODE> to access. 1189 * @param messageListener the listener to use for async consumption of messages 1190 * @return the MessageConsumer 1191 * @throws JMSException if the session fails to create a consumer due to 1192 * some internal error. 1193 * @throws InvalidDestinationException if an invalid destination is 1194 * specified. 1195 * @since 1.1 1196 */ 1197 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { 1198 return createConsumer(destination, null, messageListener); 1199 } 1200 1201 /** 1202 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1203 * using a message selector. Since <CODE> Queue</CODE> and 1204 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1205 * can be used in the destination parameter to create a 1206 * <CODE>MessageConsumer</CODE>. 1207 * <P> 1208 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1209 * that have been sent to a destination. 1210 * 1211 * @param destination the <CODE>Destination</CODE> to access 1212 * @param messageSelector only messages with properties matching the message 1213 * selector expression are delivered. A value of null or an 1214 * empty string indicates that there is no message selector 1215 * for the message consumer. 1216 * @param messageListener the listener to use for async consumption of messages 1217 * @return the MessageConsumer 1218 * @throws JMSException if the session fails to create a MessageConsumer due 1219 * to some internal error. 1220 * @throws InvalidDestinationException if an invalid destination is 1221 * specified. 1222 * @throws InvalidSelectorException if the message selector is invalid. 1223 * @since 1.1 1224 */ 1225 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { 1226 return createConsumer(destination, messageSelector, false, messageListener); 1227 } 1228 1229 /** 1230 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1231 * using a message selector. This method can specify whether messages 1232 * published by its own connection should be delivered to it, if the 1233 * destination is a topic. 1234 * <P> 1235 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1236 * <CODE>Destination</CODE>, they can be used in the destination 1237 * parameter to create a <CODE>MessageConsumer</CODE>. 1238 * <P> 1239 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1240 * that have been published to a destination. 1241 * <P> 1242 * In some cases, a connection may both publish and subscribe to a topic. 1243 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1244 * inhibit the delivery of messages published by its own connection. The 1245 * default value for this attribute is False. The <CODE>noLocal</CODE> 1246 * value must be supported by destinations that are topics. 1247 * 1248 * @param destination the <CODE>Destination</CODE> to access 1249 * @param messageSelector only messages with properties matching the message 1250 * selector expression are delivered. A value of null or an 1251 * empty string indicates that there is no message selector 1252 * for the message consumer. 1253 * @param noLocal - if true, and the destination is a topic, inhibits the 1254 * delivery of messages published by its own connection. The 1255 * behavior for <CODE>NoLocal</CODE> is not specified if 1256 * the destination is a queue. 1257 * @return the MessageConsumer 1258 * @throws JMSException if the session fails to create a MessageConsumer due 1259 * to some internal error. 1260 * @throws InvalidDestinationException if an invalid destination is 1261 * specified. 1262 * @throws InvalidSelectorException if the message selector is invalid. 1263 * @since 1.1 1264 */ 1265 @Override 1266 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { 1267 return createConsumer(destination, messageSelector, noLocal, null); 1268 } 1269 1270 /** 1271 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1272 * using a message selector. This method can specify whether messages 1273 * published by its own connection should be delivered to it, if the 1274 * destination is a topic. 1275 * <P> 1276 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1277 * <CODE>Destination</CODE>, they can be used in the destination 1278 * parameter to create a <CODE>MessageConsumer</CODE>. 1279 * <P> 1280 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1281 * that have been published to a destination. 1282 * <P> 1283 * In some cases, a connection may both publish and subscribe to a topic. 1284 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1285 * inhibit the delivery of messages published by its own connection. The 1286 * default value for this attribute is False. The <CODE>noLocal</CODE> 1287 * value must be supported by destinations that are topics. 1288 * 1289 * @param destination the <CODE>Destination</CODE> to access 1290 * @param messageSelector only messages with properties matching the message 1291 * selector expression are delivered. A value of null or an 1292 * empty string indicates that there is no message selector 1293 * for the message consumer. 1294 * @param noLocal - if true, and the destination is a topic, inhibits the 1295 * delivery of messages published by its own connection. The 1296 * behavior for <CODE>NoLocal</CODE> is not specified if 1297 * the destination is a queue. 1298 * @param messageListener the listener to use for async consumption of messages 1299 * @return the MessageConsumer 1300 * @throws JMSException if the session fails to create a MessageConsumer due 1301 * to some internal error. 1302 * @throws InvalidDestinationException if an invalid destination is 1303 * specified. 1304 * @throws InvalidSelectorException if the message selector is invalid. 1305 * @since 1.1 1306 */ 1307 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { 1308 checkClosed(); 1309 1310 if (destination instanceof CustomDestination) { 1311 CustomDestination customDestination = (CustomDestination)destination; 1312 return customDestination.createConsumer(this, messageSelector, noLocal); 1313 } 1314 1315 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 1316 int prefetch = 0; 1317 if (destination instanceof Topic) { 1318 prefetch = prefetchPolicy.getTopicPrefetch(); 1319 } else { 1320 prefetch = prefetchPolicy.getQueuePrefetch(); 1321 } 1322 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); 1323 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, 1324 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); 1325 } 1326 1327 /** 1328 * Creates a queue identity given a <CODE>Queue</CODE> name. 1329 * <P> 1330 * This facility is provided for the rare cases where clients need to 1331 * dynamically manipulate queue identity. It allows the creation of a queue 1332 * identity with a provider-specific name. Clients that depend on this 1333 * ability are not portable. 1334 * <P> 1335 * Note that this method is not for creating the physical queue. The 1336 * physical creation of queues is an administrative task and is not to be 1337 * initiated by the JMS API. The one exception is the creation of temporary 1338 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> 1339 * method. 1340 * 1341 * @param queueName the name of this <CODE>Queue</CODE> 1342 * @return a <CODE>Queue</CODE> with the given name 1343 * @throws JMSException if the session fails to create a queue due to some 1344 * internal error. 1345 * @since 1.1 1346 */ 1347 @Override 1348 public Queue createQueue(String queueName) throws JMSException { 1349 checkClosed(); 1350 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1351 return new ActiveMQTempQueue(queueName); 1352 } 1353 return new ActiveMQQueue(queueName); 1354 } 1355 1356 /** 1357 * Creates a topic identity given a <CODE>Topic</CODE> name. 1358 * <P> 1359 * This facility is provided for the rare cases where clients need to 1360 * dynamically manipulate topic identity. This allows the creation of a 1361 * topic identity with a provider-specific name. Clients that depend on this 1362 * ability are not portable. 1363 * <P> 1364 * Note that this method is not for creating the physical topic. The 1365 * physical creation of topics is an administrative task and is not to be 1366 * initiated by the JMS API. The one exception is the creation of temporary 1367 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> 1368 * method. 1369 * 1370 * @param topicName the name of this <CODE>Topic</CODE> 1371 * @return a <CODE>Topic</CODE> with the given name 1372 * @throws JMSException if the session fails to create a topic due to some 1373 * internal error. 1374 * @since 1.1 1375 */ 1376 @Override 1377 public Topic createTopic(String topicName) throws JMSException { 1378 checkClosed(); 1379 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1380 return new ActiveMQTempTopic(topicName); 1381 } 1382 return new ActiveMQTopic(topicName); 1383 } 1384 1385 /** 1386 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1387 * the specified queue. 1388 * 1389 * @param queue the <CODE>queue</CODE> to access 1390 * @exception InvalidDestinationException if an invalid destination is 1391 * specified 1392 * @since 1.1 1393 */ 1394 /** 1395 * Creates a durable subscriber to the specified topic. 1396 * <P> 1397 * If a client needs to receive all the messages published on a topic, 1398 * including the ones published while the subscriber is inactive, it uses a 1399 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1400 * record of this durable subscription and insures that all messages from 1401 * the topic's publishers are retained until they are acknowledged by this 1402 * durable subscriber or they have expired. 1403 * <P> 1404 * Sessions with durable subscribers must always provide the same client 1405 * identifier. In addition, each client must specify a name that uniquely 1406 * identifies (within client identifier) each durable subscription it 1407 * creates. Only one session at a time can have a 1408 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. 1409 * <P> 1410 * A client can change an existing durable subscription by creating a 1411 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1412 * and/or message selector. Changing a durable subscriber is equivalent to 1413 * unsubscribing (deleting) the old one and creating a new one. 1414 * <P> 1415 * In some cases, a connection may both publish and subscribe to a topic. 1416 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1417 * inhibit the delivery of messages published by its own connection. The 1418 * default value for this attribute is false. 1419 * 1420 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1421 * @param name the name used to identify this subscription 1422 * @return the TopicSubscriber 1423 * @throws JMSException if the session fails to create a subscriber due to 1424 * some internal error. 1425 * @throws InvalidDestinationException if an invalid topic is specified. 1426 * @since 1.1 1427 */ 1428 @Override 1429 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1430 checkClosed(); 1431 return createDurableSubscriber(topic, name, null, false); 1432 } 1433 1434 /** 1435 * Creates a durable subscriber to the specified topic, using a message 1436 * selector and specifying whether messages published by its own connection 1437 * should be delivered to it. 1438 * <P> 1439 * If a client needs to receive all the messages published on a topic, 1440 * including the ones published while the subscriber is inactive, it uses a 1441 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1442 * record of this durable subscription and insures that all messages from 1443 * the topic's publishers are retained until they are acknowledged by this 1444 * durable subscriber or they have expired. 1445 * <P> 1446 * Sessions with durable subscribers must always provide the same client 1447 * identifier. In addition, each client must specify a name which uniquely 1448 * identifies (within client identifier) each durable subscription it 1449 * creates. Only one session at a time can have a 1450 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An 1451 * inactive durable subscriber is one that exists but does not currently 1452 * have a message consumer associated with it. 1453 * <P> 1454 * A client can change an existing durable subscription by creating a 1455 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1456 * and/or message selector. Changing a durable subscriber is equivalent to 1457 * unsubscribing (deleting) the old one and creating a new one. 1458 * 1459 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1460 * @param name the name used to identify this subscription 1461 * @param messageSelector only messages with properties matching the message 1462 * selector expression are delivered. A value of null or an 1463 * empty string indicates that there is no message selector 1464 * for the message consumer. 1465 * @param noLocal if set, inhibits the delivery of messages published by its 1466 * own connection 1467 * @return the Queue Browser 1468 * @throws JMSException if the session fails to create a subscriber due to 1469 * some internal error. 1470 * @throws InvalidDestinationException if an invalid topic is specified. 1471 * @throws InvalidSelectorException if the message selector is invalid. 1472 * @since 1.1 1473 */ 1474 @Override 1475 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1476 checkClosed(); 1477 1478 if (topic == null) { 1479 throw new InvalidDestinationException("Topic cannot be null"); 1480 } 1481 1482 if (topic instanceof CustomDestination) { 1483 CustomDestination customDestination = (CustomDestination)topic; 1484 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); 1485 } 1486 1487 connection.checkClientIDWasManuallySpecified(); 1488 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1489 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch(); 1490 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit(); 1491 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit, 1492 noLocal, false, asyncDispatch); 1493 } 1494 1495 /** 1496 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1497 * the specified queue. 1498 * 1499 * @param queue the <CODE>queue</CODE> to access 1500 * @return the Queue Browser 1501 * @throws JMSException if the session fails to create a browser due to some 1502 * internal error. 1503 * @throws InvalidDestinationException if an invalid destination is 1504 * specified 1505 * @since 1.1 1506 */ 1507 @Override 1508 public QueueBrowser createBrowser(Queue queue) throws JMSException { 1509 checkClosed(); 1510 return createBrowser(queue, null); 1511 } 1512 1513 /** 1514 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1515 * the specified queue using a message selector. 1516 * 1517 * @param queue the <CODE>queue</CODE> to access 1518 * @param messageSelector only messages with properties matching the message 1519 * selector expression are delivered. A value of null or an 1520 * empty string indicates that there is no message selector 1521 * for the message consumer. 1522 * @return the Queue Browser 1523 * @throws JMSException if the session fails to create a browser due to some 1524 * internal error. 1525 * @throws InvalidDestinationException if an invalid destination is 1526 * specified 1527 * @throws InvalidSelectorException if the message selector is invalid. 1528 * @since 1.1 1529 */ 1530 @Override 1531 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 1532 checkClosed(); 1533 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); 1534 } 1535 1536 /** 1537 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that 1538 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1539 * 1540 * @return a temporary queue identity 1541 * @throws JMSException if the session fails to create a temporary queue due 1542 * to some internal error. 1543 * @since 1.1 1544 */ 1545 @Override 1546 public TemporaryQueue createTemporaryQueue() throws JMSException { 1547 checkClosed(); 1548 return (TemporaryQueue)connection.createTempDestination(false); 1549 } 1550 1551 /** 1552 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that 1553 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1554 * 1555 * @return a temporary topic identity 1556 * @throws JMSException if the session fails to create a temporary topic due 1557 * to some internal error. 1558 * @since 1.1 1559 */ 1560 @Override 1561 public TemporaryTopic createTemporaryTopic() throws JMSException { 1562 checkClosed(); 1563 return (TemporaryTopic)connection.createTempDestination(true); 1564 } 1565 1566 /** 1567 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1568 * the specified queue. 1569 * 1570 * @param queue the <CODE>Queue</CODE> to access 1571 * @return 1572 * @throws JMSException if the session fails to create a receiver due to 1573 * some internal error. 1574 * @throws JMSException 1575 * @throws InvalidDestinationException if an invalid queue is specified. 1576 */ 1577 @Override 1578 public QueueReceiver createReceiver(Queue queue) throws JMSException { 1579 checkClosed(); 1580 return createReceiver(queue, null); 1581 } 1582 1583 /** 1584 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1585 * the specified queue using a message selector. 1586 * 1587 * @param queue the <CODE>Queue</CODE> to access 1588 * @param messageSelector only messages with properties matching the message 1589 * selector expression are delivered. A value of null or an 1590 * empty string indicates that there is no message selector 1591 * for the message consumer. 1592 * @return QueueReceiver 1593 * @throws JMSException if the session fails to create a receiver due to 1594 * some internal error. 1595 * @throws InvalidDestinationException if an invalid queue is specified. 1596 * @throws InvalidSelectorException if the message selector is invalid. 1597 */ 1598 @Override 1599 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1600 checkClosed(); 1601 1602 if (queue instanceof CustomDestination) { 1603 CustomDestination customDestination = (CustomDestination)queue; 1604 return customDestination.createReceiver(this, messageSelector); 1605 } 1606 1607 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1608 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), 1609 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); 1610 } 1611 1612 /** 1613 * Creates a <CODE>QueueSender</CODE> object to send messages to the 1614 * specified queue. 1615 * 1616 * @param queue the <CODE>Queue</CODE> to access, or null if this is an 1617 * unidentified producer 1618 * @return QueueSender 1619 * @throws JMSException if the session fails to create a sender due to some 1620 * internal error. 1621 * @throws InvalidDestinationException if an invalid queue is specified. 1622 */ 1623 @Override 1624 public QueueSender createSender(Queue queue) throws JMSException { 1625 checkClosed(); 1626 if (queue instanceof CustomDestination) { 1627 CustomDestination customDestination = (CustomDestination)queue; 1628 return customDestination.createSender(this); 1629 } 1630 int timeSendOut = connection.getSendTimeout(); 1631 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut); 1632 } 1633 1634 /** 1635 * Creates a nondurable subscriber to the specified topic. <p/> 1636 * <P> 1637 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1638 * that have been published to a topic. <p/> 1639 * <P> 1640 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1641 * receive only messages that are published while they are active. <p/> 1642 * <P> 1643 * In some cases, a connection may both publish and subscribe to a topic. 1644 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1645 * inhibit the delivery of messages published by its own connection. The 1646 * default value for this attribute is false. 1647 * 1648 * @param topic the <CODE>Topic</CODE> to subscribe to 1649 * @return TopicSubscriber 1650 * @throws JMSException if the session fails to create a subscriber due to 1651 * some internal error. 1652 * @throws InvalidDestinationException if an invalid topic is specified. 1653 */ 1654 @Override 1655 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1656 checkClosed(); 1657 return createSubscriber(topic, null, false); 1658 } 1659 1660 /** 1661 * Creates a nondurable subscriber to the specified topic, using a message 1662 * selector or specifying whether messages published by its own connection 1663 * should be delivered to it. <p/> 1664 * <P> 1665 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1666 * that have been published to a topic. <p/> 1667 * <P> 1668 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1669 * receive only messages that are published while they are active. <p/> 1670 * <P> 1671 * Messages filtered out by a subscriber's message selector will never be 1672 * delivered to the subscriber. From the subscriber's perspective, they do 1673 * not exist. <p/> 1674 * <P> 1675 * In some cases, a connection may both publish and subscribe to a topic. 1676 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1677 * inhibit the delivery of messages published by its own connection. The 1678 * default value for this attribute is false. 1679 * 1680 * @param topic the <CODE>Topic</CODE> to subscribe to 1681 * @param messageSelector only messages with properties matching the message 1682 * selector expression are delivered. A value of null or an 1683 * empty string indicates that there is no message selector 1684 * for the message consumer. 1685 * @param noLocal if set, inhibits the delivery of messages published by its 1686 * own connection 1687 * @return TopicSubscriber 1688 * @throws JMSException if the session fails to create a subscriber due to 1689 * some internal error. 1690 * @throws InvalidDestinationException if an invalid topic is specified. 1691 * @throws InvalidSelectorException if the message selector is invalid. 1692 */ 1693 @Override 1694 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1695 checkClosed(); 1696 1697 if (topic instanceof CustomDestination) { 1698 CustomDestination customDestination = (CustomDestination)topic; 1699 return customDestination.createSubscriber(this, messageSelector, noLocal); 1700 } 1701 1702 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1703 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy 1704 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); 1705 } 1706 1707 /** 1708 * Creates a publisher for the specified topic. <p/> 1709 * <P> 1710 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages 1711 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on 1712 * a topic, it defines a new sequence of messages that have no ordering 1713 * relationship with the messages it has previously sent. 1714 * 1715 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is 1716 * an unidentified producer 1717 * @return TopicPublisher 1718 * @throws JMSException if the session fails to create a publisher due to 1719 * some internal error. 1720 * @throws InvalidDestinationException if an invalid topic is specified. 1721 */ 1722 @Override 1723 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1724 checkClosed(); 1725 1726 if (topic instanceof CustomDestination) { 1727 CustomDestination customDestination = (CustomDestination)topic; 1728 return customDestination.createPublisher(this); 1729 } 1730 int timeSendOut = connection.getSendTimeout(); 1731 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut); 1732 } 1733 1734 /** 1735 * Unsubscribes a durable subscription that has been created by a client. 1736 * <P> 1737 * This method deletes the state being maintained on behalf of the 1738 * subscriber by its provider. 1739 * <P> 1740 * It is erroneous for a client to delete a durable subscription while there 1741 * is an active <CODE>MessageConsumer </CODE> or 1742 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 1743 * message is part of a pending transaction or has not been acknowledged in 1744 * the session. 1745 * 1746 * @param name the name used to identify this subscription 1747 * @throws JMSException if the session fails to unsubscribe to the durable 1748 * subscription due to some internal error. 1749 * @throws InvalidDestinationException if an invalid subscription name is 1750 * specified. 1751 * @since 1.1 1752 */ 1753 @Override 1754 public void unsubscribe(String name) throws JMSException { 1755 checkClosed(); 1756 connection.unsubscribe(name); 1757 } 1758 1759 @Override 1760 public void dispatch(MessageDispatch messageDispatch) { 1761 try { 1762 executor.execute(messageDispatch); 1763 } catch (InterruptedException e) { 1764 Thread.currentThread().interrupt(); 1765 connection.onClientInternalException(e); 1766 } 1767 } 1768 1769 /** 1770 * Acknowledges all consumed messages of the session of this consumed 1771 * message. 1772 * <P> 1773 * All consumed JMS messages support the <CODE>acknowledge</CODE> method 1774 * for use when a client has specified that its JMS session's consumed 1775 * messages are to be explicitly acknowledged. By invoking 1776 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges 1777 * all messages consumed by the session that the message was delivered to. 1778 * <P> 1779 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted 1780 * sessions and sessions specified to use implicit acknowledgement modes. 1781 * <P> 1782 * A client may individually acknowledge each message as it is consumed, or 1783 * it may choose to acknowledge messages as an application-defined group 1784 * (which is done by calling acknowledge on the last received message of the 1785 * group, thereby acknowledging all messages consumed by the session.) 1786 * <P> 1787 * Messages that have been received but not acknowledged may be redelivered. 1788 * 1789 * @throws JMSException if the JMS provider fails to acknowledge the 1790 * messages due to some internal error. 1791 * @throws javax.jms.IllegalStateException if this method is called on a 1792 * closed session. 1793 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE 1794 */ 1795 public void acknowledge() throws JMSException { 1796 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1797 ActiveMQMessageConsumer c = iter.next(); 1798 c.acknowledge(); 1799 } 1800 } 1801 1802 /** 1803 * Add a message consumer. 1804 * 1805 * @param consumer - message consumer. 1806 * @throws JMSException 1807 */ 1808 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1809 this.consumers.add(consumer); 1810 if (consumer.isDurableSubscriber()) { 1811 stats.onCreateDurableSubscriber(); 1812 } 1813 this.connection.addDispatcher(consumer.getConsumerId(), this); 1814 } 1815 1816 /** 1817 * Remove the message consumer. 1818 * 1819 * @param consumer - consumer to be removed. 1820 * @throws JMSException 1821 */ 1822 protected void removeConsumer(ActiveMQMessageConsumer consumer) { 1823 this.connection.removeDispatcher(consumer.getConsumerId()); 1824 if (consumer.isDurableSubscriber()) { 1825 stats.onRemoveDurableSubscriber(); 1826 } 1827 this.consumers.remove(consumer); 1828 this.connection.removeDispatcher(consumer); 1829 } 1830 1831 /** 1832 * Adds a message producer. 1833 * 1834 * @param producer - message producer to be added. 1835 * @throws JMSException 1836 */ 1837 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1838 this.producers.add(producer); 1839 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); 1840 } 1841 1842 /** 1843 * Removes a message producer. 1844 * 1845 * @param producer - message producer to be removed. 1846 * @throws JMSException 1847 */ 1848 protected void removeProducer(ActiveMQMessageProducer producer) { 1849 this.connection.removeProducer(producer.getProducerInfo().getProducerId()); 1850 this.producers.remove(producer); 1851 } 1852 1853 /** 1854 * Start this Session. 1855 * 1856 * @throws JMSException 1857 */ 1858 protected void start() throws JMSException { 1859 started.set(true); 1860 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1861 ActiveMQMessageConsumer c = iter.next(); 1862 c.start(); 1863 } 1864 executor.start(); 1865 } 1866 1867 /** 1868 * Stops this session. 1869 * 1870 * @throws JMSException 1871 */ 1872 protected void stop() throws JMSException { 1873 1874 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1875 ActiveMQMessageConsumer c = iter.next(); 1876 c.stop(); 1877 } 1878 1879 started.set(false); 1880 executor.stop(); 1881 } 1882 1883 /** 1884 * Returns the session id. 1885 * 1886 * @return value - session id. 1887 */ 1888 protected SessionId getSessionId() { 1889 return info.getSessionId(); 1890 } 1891 1892 /** 1893 * @return 1894 */ 1895 protected ConsumerId getNextConsumerId() { 1896 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); 1897 } 1898 1899 /** 1900 * @return 1901 */ 1902 protected ProducerId getNextProducerId() { 1903 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); 1904 } 1905 1906 /** 1907 * Sends the message for dispatch by the broker. 1908 * 1909 * 1910 * @param producer - message producer. 1911 * @param destination - message destination. 1912 * @param message - message to be sent. 1913 * @param deliveryMode - JMS messsage delivery mode. 1914 * @param priority - message priority. 1915 * @param timeToLive - message expiration. 1916 * @param producerWindow 1917 * @param onComplete 1918 * @throws JMSException 1919 */ 1920 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 1921 MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { 1922 1923 checkClosed(); 1924 if (destination.isTemporary() && connection.isDeleted(destination)) { 1925 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); 1926 } 1927 synchronized (sendMutex) { 1928 // tell the Broker we are about to start a new transaction 1929 doStartTransaction(); 1930 if (transactionContext.isRollbackOnly()) { 1931 throw new IllegalStateException("transaction marked rollback only"); 1932 } 1933 TransactionId txid = transactionContext.getTransactionId(); 1934 long sequenceNumber = producer.getMessageSequence(); 1935 1936 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 1937 message.setJMSDeliveryMode(deliveryMode); 1938 long expiration = 0L; 1939 if (!producer.getDisableMessageTimestamp()) { 1940 long timeStamp = System.currentTimeMillis(); 1941 message.setJMSTimestamp(timeStamp); 1942 if (timeToLive > 0) { 1943 expiration = timeToLive + timeStamp; 1944 } 1945 } 1946 message.setJMSExpiration(expiration); 1947 message.setJMSPriority(priority); 1948 message.setJMSRedelivered(false); 1949 1950 // transform to our own message format here 1951 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); 1952 msg.setDestination(destination); 1953 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1954 1955 // Set the message id. 1956 if (msg != message) { 1957 message.setJMSMessageID(msg.getMessageId().toString()); 1958 // Make sure the JMS destination is set on the foreign messages too. 1959 message.setJMSDestination(destination); 1960 } 1961 //clear the brokerPath in case we are re-sending this message 1962 msg.setBrokerPath(null); 1963 1964 msg.setTransactionId(txid); 1965 if (connection.isCopyMessageOnSend()) { 1966 msg = (ActiveMQMessage)msg.copy(); 1967 } 1968 msg.setConnection(connection); 1969 msg.onSend(); 1970 msg.setProducerId(msg.getMessageId().getProducerId()); 1971 if (LOG.isTraceEnabled()) { 1972 LOG.trace(getSessionId() + " sending message: " + msg); 1973 } 1974 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { 1975 this.connection.asyncSendPacket(msg); 1976 if (producerWindow != null) { 1977 // Since we defer lots of the marshaling till we hit the 1978 // wire, this might not 1979 // provide and accurate size. We may change over to doing 1980 // more aggressive marshaling, 1981 // to get more accurate sizes.. this is more important once 1982 // users start using producer window 1983 // flow control. 1984 int size = msg.getSize(); 1985 producerWindow.increaseUsage(size); 1986 } 1987 } else { 1988 if (sendTimeout > 0 && onComplete==null) { 1989 this.connection.syncSendPacket(msg,sendTimeout); 1990 }else { 1991 this.connection.syncSendPacket(msg, onComplete); 1992 } 1993 } 1994 1995 } 1996 } 1997 1998 /** 1999 * Send TransactionInfo to indicate transaction has started 2000 * 2001 * @throws JMSException if some internal error occurs 2002 */ 2003 protected void doStartTransaction() throws JMSException { 2004 if (getTransacted() && !transactionContext.isInXATransaction()) { 2005 transactionContext.begin(); 2006 } 2007 } 2008 2009 /** 2010 * Checks whether the session has unconsumed messages. 2011 * 2012 * @return true - if there are unconsumed messages. 2013 */ 2014 public boolean hasUncomsumedMessages() { 2015 return executor.hasUncomsumedMessages(); 2016 } 2017 2018 /** 2019 * Checks whether the session uses transactions. 2020 * 2021 * @return true - if the session uses transactions. 2022 */ 2023 public boolean isTransacted() { 2024 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); 2025 } 2026 2027 /** 2028 * Checks whether the session used client acknowledgment. 2029 * 2030 * @return true - if the session uses client acknowledgment. 2031 */ 2032 protected boolean isClientAcknowledge() { 2033 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; 2034 } 2035 2036 /** 2037 * Checks whether the session used auto acknowledgment. 2038 * 2039 * @return true - if the session uses client acknowledgment. 2040 */ 2041 public boolean isAutoAcknowledge() { 2042 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; 2043 } 2044 2045 /** 2046 * Checks whether the session used dup ok acknowledgment. 2047 * 2048 * @return true - if the session uses client acknowledgment. 2049 */ 2050 public boolean isDupsOkAcknowledge() { 2051 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; 2052 } 2053 2054 public boolean isIndividualAcknowledge(){ 2055 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; 2056 } 2057 2058 /** 2059 * Returns the message delivery listener. 2060 * 2061 * @return deliveryListener - message delivery listener. 2062 */ 2063 public DeliveryListener getDeliveryListener() { 2064 return deliveryListener; 2065 } 2066 2067 /** 2068 * Sets the message delivery listener. 2069 * 2070 * @param deliveryListener - message delivery listener. 2071 */ 2072 public void setDeliveryListener(DeliveryListener deliveryListener) { 2073 this.deliveryListener = deliveryListener; 2074 } 2075 2076 /** 2077 * Returns the SessionInfo bean. 2078 * 2079 * @return info - SessionInfo bean. 2080 * @throws JMSException 2081 */ 2082 protected SessionInfo getSessionInfo() throws JMSException { 2083 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); 2084 return info; 2085 } 2086 2087 /** 2088 * Send the asynchronus command. 2089 * 2090 * @param command - command to be executed. 2091 * @throws JMSException 2092 */ 2093 public void asyncSendPacket(Command command) throws JMSException { 2094 connection.asyncSendPacket(command); 2095 } 2096 2097 /** 2098 * Send the synchronus command. 2099 * 2100 * @param command - command to be executed. 2101 * @return Response 2102 * @throws JMSException 2103 */ 2104 public Response syncSendPacket(Command command) throws JMSException { 2105 return connection.syncSendPacket(command); 2106 } 2107 2108 public long getNextDeliveryId() { 2109 return deliveryIdGenerator.getNextSequenceId(); 2110 } 2111 2112 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { 2113 2114 List<MessageDispatch> c = unconsumedMessages.removeAll(); 2115 Collections.reverse(c); 2116 2117 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) { 2118 MessageDispatch md = iter.next(); 2119 executor.executeFirst(md); 2120 } 2121 2122 } 2123 2124 public boolean isRunning() { 2125 return started.get(); 2126 } 2127 2128 public boolean isAsyncDispatch() { 2129 return asyncDispatch; 2130 } 2131 2132 public void setAsyncDispatch(boolean asyncDispatch) { 2133 this.asyncDispatch = asyncDispatch; 2134 } 2135 2136 /** 2137 * @return Returns the sessionAsyncDispatch. 2138 */ 2139 public boolean isSessionAsyncDispatch() { 2140 return sessionAsyncDispatch; 2141 } 2142 2143 /** 2144 * @param sessionAsyncDispatch The sessionAsyncDispatch to set. 2145 */ 2146 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) { 2147 this.sessionAsyncDispatch = sessionAsyncDispatch; 2148 } 2149 2150 public MessageTransformer getTransformer() { 2151 return transformer; 2152 } 2153 2154 public ActiveMQConnection getConnection() { 2155 return connection; 2156 } 2157 2158 /** 2159 * Sets the transformer used to transform messages before they are sent on 2160 * to the JMS bus or when they are received from the bus but before they are 2161 * delivered to the JMS client 2162 */ 2163 public void setTransformer(MessageTransformer transformer) { 2164 this.transformer = transformer; 2165 } 2166 2167 public BlobTransferPolicy getBlobTransferPolicy() { 2168 return blobTransferPolicy; 2169 } 2170 2171 /** 2172 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 2173 * OBjects) are transferred from producers to brokers to consumers 2174 */ 2175 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 2176 this.blobTransferPolicy = blobTransferPolicy; 2177 } 2178 2179 public List<MessageDispatch> getUnconsumedMessages() { 2180 return executor.getUnconsumedMessages(); 2181 } 2182 2183 @Override 2184 public String toString() { 2185 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + ",closed=" + closed + "} " + sendMutex; 2186 } 2187 2188 public void checkMessageListener() throws JMSException { 2189 if (messageListener != null) { 2190 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2191 } 2192 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) { 2193 ActiveMQMessageConsumer consumer = i.next(); 2194 if (consumer.hasMessageListener()) { 2195 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2196 } 2197 } 2198 } 2199 2200 protected void setOptimizeAcknowledge(boolean value) { 2201 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2202 ActiveMQMessageConsumer c = iter.next(); 2203 c.setOptimizeAcknowledge(value); 2204 } 2205 } 2206 2207 protected void setPrefetchSize(ConsumerId id, int prefetch) { 2208 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2209 ActiveMQMessageConsumer c = iter.next(); 2210 if (c.getConsumerId().equals(id)) { 2211 c.setPrefetchSize(prefetch); 2212 break; 2213 } 2214 } 2215 } 2216 2217 protected void close(ConsumerId id) { 2218 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2219 ActiveMQMessageConsumer c = iter.next(); 2220 if (c.getConsumerId().equals(id)) { 2221 try { 2222 c.close(); 2223 } catch (JMSException e) { 2224 LOG.warn("Exception closing consumer", e); 2225 } 2226 LOG.warn("Closed consumer on Command, " + id); 2227 break; 2228 } 2229 } 2230 } 2231 2232 public boolean isInUse(ActiveMQTempDestination destination) { 2233 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2234 ActiveMQMessageConsumer c = iter.next(); 2235 if (c.isInUse(destination)) { 2236 return true; 2237 } 2238 } 2239 return false; 2240 } 2241 2242 /** 2243 * highest sequence id of the last message delivered by this session. 2244 * Passed to the broker in the close command, maintained by dispose() 2245 * @return lastDeliveredSequenceId 2246 */ 2247 public long getLastDeliveredSequenceId() { 2248 return lastDeliveredSequenceId; 2249 } 2250 2251 protected void sendAck(MessageAck ack) throws JMSException { 2252 sendAck(ack,false); 2253 } 2254 2255 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { 2256 if (lazy || connection.isSendAcksAsync() || getTransacted()) { 2257 asyncSendPacket(ack); 2258 } else { 2259 syncSendPacket(ack); 2260 } 2261 } 2262 2263 protected Scheduler getScheduler() throws JMSException { 2264 return this.connection.getScheduler(); 2265 } 2266 2267 protected ThreadPoolExecutor getConnectionExecutor() { 2268 return this.connectionExecutor; 2269 } 2270}