001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.File; 020import java.io.InputStream; 021import java.io.Serializable; 022import java.net.URL; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.List; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import javax.jms.BytesMessage; 032import javax.jms.Destination; 033import javax.jms.IllegalStateException; 034import javax.jms.InvalidDestinationException; 035import javax.jms.InvalidSelectorException; 036import javax.jms.JMSException; 037import javax.jms.MapMessage; 038import javax.jms.Message; 039import javax.jms.MessageConsumer; 040import javax.jms.MessageListener; 041import javax.jms.MessageProducer; 042import javax.jms.ObjectMessage; 043import javax.jms.Queue; 044import javax.jms.QueueBrowser; 045import javax.jms.QueueReceiver; 046import javax.jms.QueueSender; 047import javax.jms.QueueSession; 048import javax.jms.Session; 049import javax.jms.StreamMessage; 050import javax.jms.TemporaryQueue; 051import javax.jms.TemporaryTopic; 052import javax.jms.TextMessage; 053import javax.jms.Topic; 054import javax.jms.TopicPublisher; 055import javax.jms.TopicSession; 056import javax.jms.TopicSubscriber; 057import javax.jms.TransactionRolledBackException; 058 059import org.apache.activemq.blob.BlobDownloader; 060import org.apache.activemq.blob.BlobTransferPolicy; 061import org.apache.activemq.blob.BlobUploader; 062import org.apache.activemq.command.ActiveMQBlobMessage; 063import org.apache.activemq.command.ActiveMQBytesMessage; 064import org.apache.activemq.command.ActiveMQDestination; 065import org.apache.activemq.command.ActiveMQMapMessage; 066import org.apache.activemq.command.ActiveMQMessage; 067import org.apache.activemq.command.ActiveMQObjectMessage; 068import org.apache.activemq.command.ActiveMQQueue; 069import org.apache.activemq.command.ActiveMQStreamMessage; 070import org.apache.activemq.command.ActiveMQTempDestination; 071import org.apache.activemq.command.ActiveMQTempQueue; 072import org.apache.activemq.command.ActiveMQTempTopic; 073import org.apache.activemq.command.ActiveMQTextMessage; 074import org.apache.activemq.command.ActiveMQTopic; 075import org.apache.activemq.command.Command; 076import org.apache.activemq.command.ConsumerId; 077import org.apache.activemq.command.MessageAck; 078import org.apache.activemq.command.MessageDispatch; 079import org.apache.activemq.command.MessageId; 080import org.apache.activemq.command.ProducerId; 081import org.apache.activemq.command.RemoveInfo; 082import org.apache.activemq.command.Response; 083import org.apache.activemq.command.SessionId; 084import org.apache.activemq.command.SessionInfo; 085import org.apache.activemq.command.TransactionId; 086import org.apache.activemq.management.JMSSessionStatsImpl; 087import org.apache.activemq.management.StatsCapable; 088import org.apache.activemq.management.StatsImpl; 089import org.apache.activemq.thread.Scheduler; 090import org.apache.activemq.transaction.Synchronization; 091import org.apache.activemq.usage.MemoryUsage; 092import org.apache.activemq.util.Callback; 093import org.apache.activemq.util.LongSequenceGenerator; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * <P> 099 * A <CODE>Session</CODE> object is a single-threaded context for producing 100 * and consuming messages. Although it may allocate provider resources outside 101 * the Java virtual machine (JVM), it is considered a lightweight JMS object. 102 * <P> 103 * A session serves several purposes: 104 * <UL> 105 * <LI>It is a factory for its message producers and consumers. 106 * <LI>It supplies provider-optimized message factories. 107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and 108 * <CODE>TemporaryQueues</CODE>. 109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> 110 * objects for those clients that need to dynamically manipulate 111 * provider-specific destination names. 112 * <LI>It supports a single series of transactions that combine work spanning 113 * its producers and consumers into atomic units. 114 * <LI>It defines a serial order for the messages it consumes and the messages 115 * it produces. 116 * <LI>It retains messages it consumes until they have been acknowledged. 117 * <LI>It serializes execution of message listeners registered with its message 118 * consumers. 119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. 120 * </UL> 121 * <P> 122 * A session can create and service multiple message producers and consumers. 123 * <P> 124 * One typical use is to have a thread block on a synchronous 125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then 126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. 127 * <P> 128 * If a client desires to have one thread produce messages while others consume 129 * them, the client should use a separate session for its producing thread. 130 * <P> 131 * Once a connection has been started, any session with one or more registered 132 * message listeners is dedicated to the thread of control that delivers 133 * messages to it. It is erroneous for client code to use this session or any of 134 * its constituent objects from another thread of control. The only exception to 135 * this rule is the use of the session or connection <CODE>close</CODE> 136 * method. 137 * <P> 138 * It should be easy for most clients to partition their work naturally into 139 * sessions. This model allows clients to start simply and incrementally add 140 * message processing complexity as their need for concurrency grows. 141 * <P> 142 * The <CODE>close</CODE> method is the only session method that can be called 143 * while some other session method is being executed in another thread. 144 * <P> 145 * A session may be specified as transacted. Each transacted session supports a 146 * single series of transactions. Each transaction groups a set of message sends 147 * and a set of message receives into an atomic unit of work. In effect, 148 * transactions organize a session's input message stream and output message 149 * stream into series of atomic units. When a transaction commits, its atomic 150 * unit of input is acknowledged and its associated atomic unit of output is 151 * sent. If a transaction rollback is done, the transaction's sent messages are 152 * destroyed and the session's input is automatically recovered. 153 * <P> 154 * The content of a transaction's input and output units is simply those 155 * messages that have been produced and consumed within the session's current 156 * transaction. 157 * <P> 158 * A transaction is completed using either its session's <CODE>commit</CODE> 159 * method or its session's <CODE>rollback </CODE> method. The completion of a 160 * session's current transaction automatically begins the next. The result is 161 * that a transacted session always has a current transaction within which its 162 * work is done. 163 * <P> 164 * The Java Transaction Service (JTS) or some other transaction monitor may be 165 * used to combine a session's transaction with transactions on other resources 166 * (databases, other JMS sessions, etc.). Since Java distributed transactions 167 * are controlled via the Java Transaction API (JTA), use of the session's 168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is 169 * prohibited. 170 * <P> 171 * The JMS API does not require support for JTA; however, it does define how a 172 * provider supplies this support. 173 * <P> 174 * Although it is also possible for a JMS client to handle distributed 175 * transactions directly, it is unlikely that many JMS clients will do this. 176 * Support for JTA in the JMS API is targeted at systems vendors who will be 177 * integrating the JMS API into their application server products. 178 * 179 * 180 * @see javax.jms.Session 181 * @see javax.jms.QueueSession 182 * @see javax.jms.TopicSession 183 * @see javax.jms.XASession 184 */ 185public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { 186 187 /** 188 * Only acknowledge an individual message - using message.acknowledge() 189 * as opposed to CLIENT_ACKNOWLEDGE which 190 * acknowledges all messages consumed by a session at when acknowledge() 191 * is called 192 */ 193 public static final int INDIVIDUAL_ACKNOWLEDGE = 4; 194 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; 195 196 public static interface DeliveryListener { 197 void beforeDelivery(ActiveMQSession session, Message msg); 198 199 void afterDelivery(ActiveMQSession session, Message msg); 200 } 201 202 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); 203 private final ThreadPoolExecutor connectionExecutor; 204 205 protected int acknowledgementMode; 206 protected final ActiveMQConnection connection; 207 protected final SessionInfo info; 208 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 209 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 210 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); 211 protected final ActiveMQSessionExecutor executor; 212 protected final AtomicBoolean started = new AtomicBoolean(false); 213 214 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>(); 215 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>(); 216 217 protected boolean closed; 218 private volatile boolean synchronizationRegistered; 219 protected boolean asyncDispatch; 220 protected boolean sessionAsyncDispatch; 221 protected final boolean debug; 222 protected final Object sendMutex = new Object(); 223 protected final Object redeliveryGuard = new Object(); 224 225 private final AtomicBoolean clearInProgress = new AtomicBoolean(); 226 227 private MessageListener messageListener; 228 private final JMSSessionStatsImpl stats; 229 private TransactionContext transactionContext; 230 private DeliveryListener deliveryListener; 231 private MessageTransformer transformer; 232 private BlobTransferPolicy blobTransferPolicy; 233 private long lastDeliveredSequenceId = -2; 234 235 /** 236 * Construct the Session 237 * 238 * @param connection 239 * @param sessionId 240 * @param acknowledgeMode n.b if transacted - the acknowledgeMode == 241 * Session.SESSION_TRANSACTED 242 * @param asyncDispatch 243 * @param sessionAsyncDispatch 244 * @throws JMSException on internal error 245 */ 246 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { 247 this.debug = LOG.isDebugEnabled(); 248 this.connection = connection; 249 this.acknowledgementMode = acknowledgeMode; 250 this.asyncDispatch = asyncDispatch; 251 this.sessionAsyncDispatch = sessionAsyncDispatch; 252 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); 253 setTransactionContext(new TransactionContext(connection)); 254 stats = new JMSSessionStatsImpl(producers, consumers); 255 this.connection.asyncSendPacket(info); 256 setTransformer(connection.getTransformer()); 257 setBlobTransferPolicy(connection.getBlobTransferPolicy()); 258 this.connectionExecutor=connection.getExecutor(); 259 this.executor = new ActiveMQSessionExecutor(this); 260 connection.addSession(this); 261 if (connection.isStarted()) { 262 start(); 263 } 264 265 } 266 267 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException { 268 this(connection, sessionId, acknowledgeMode, asyncDispatch, true); 269 } 270 271 /** 272 * Sets the transaction context of the session. 273 * 274 * @param transactionContext - provides the means to control a JMS 275 * transaction. 276 */ 277 public void setTransactionContext(TransactionContext transactionContext) { 278 this.transactionContext = transactionContext; 279 } 280 281 /** 282 * Returns the transaction context of the session. 283 * 284 * @return transactionContext - session's transaction context. 285 */ 286 public TransactionContext getTransactionContext() { 287 return transactionContext; 288 } 289 290 /* 291 * (non-Javadoc) 292 * 293 * @see org.apache.activemq.management.StatsCapable#getStats() 294 */ 295 @Override 296 public StatsImpl getStats() { 297 return stats; 298 } 299 300 /** 301 * Returns the session's statistics. 302 * 303 * @return stats - session's statistics. 304 */ 305 public JMSSessionStatsImpl getSessionStats() { 306 return stats; 307 } 308 309 /** 310 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> 311 * object is used to send a message containing a stream of uninterpreted 312 * bytes. 313 * 314 * @return the an ActiveMQBytesMessage 315 * @throws JMSException if the JMS provider fails to create this message due 316 * to some internal error. 317 */ 318 @Override 319 public BytesMessage createBytesMessage() throws JMSException { 320 ActiveMQBytesMessage message = new ActiveMQBytesMessage(); 321 configureMessage(message); 322 return message; 323 } 324 325 /** 326 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> 327 * object is used to send a self-defining set of name-value pairs, where 328 * names are <CODE>String</CODE> objects and values are primitive values 329 * in the Java programming language. 330 * 331 * @return an ActiveMQMapMessage 332 * @throws JMSException if the JMS provider fails to create this message due 333 * to some internal error. 334 */ 335 @Override 336 public MapMessage createMapMessage() throws JMSException { 337 ActiveMQMapMessage message = new ActiveMQMapMessage(); 338 configureMessage(message); 339 return message; 340 } 341 342 /** 343 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> 344 * interface is the root interface of all JMS messages. A 345 * <CODE>Message</CODE> object holds all the standard message header 346 * information. It can be sent when a message containing only header 347 * information is sufficient. 348 * 349 * @return an ActiveMQMessage 350 * @throws JMSException if the JMS provider fails to create this message due 351 * to some internal error. 352 */ 353 @Override 354 public Message createMessage() throws JMSException { 355 ActiveMQMessage message = new ActiveMQMessage(); 356 configureMessage(message); 357 return message; 358 } 359 360 /** 361 * Creates an <CODE>ObjectMessage</CODE> object. An 362 * <CODE>ObjectMessage</CODE> object is used to send a message that 363 * contains a serializable Java object. 364 * 365 * @return an ActiveMQObjectMessage 366 * @throws JMSException if the JMS provider fails to create this message due 367 * to some internal error. 368 */ 369 @Override 370 public ObjectMessage createObjectMessage() throws JMSException { 371 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 372 configureMessage(message); 373 return message; 374 } 375 376 /** 377 * Creates an initialized <CODE>ObjectMessage</CODE> object. An 378 * <CODE>ObjectMessage</CODE> object is used to send a message that 379 * contains a serializable Java object. 380 * 381 * @param object the object to use to initialize this message 382 * @return an ActiveMQObjectMessage 383 * @throws JMSException if the JMS provider fails to create this message due 384 * to some internal error. 385 */ 386 @Override 387 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 388 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 389 configureMessage(message); 390 message.setObject(object); 391 return message; 392 } 393 394 /** 395 * Creates a <CODE>StreamMessage</CODE> object. A 396 * <CODE>StreamMessage</CODE> object is used to send a self-defining 397 * stream of primitive values in the Java programming language. 398 * 399 * @return an ActiveMQStreamMessage 400 * @throws JMSException if the JMS provider fails to create this message due 401 * to some internal error. 402 */ 403 @Override 404 public StreamMessage createStreamMessage() throws JMSException { 405 ActiveMQStreamMessage message = new ActiveMQStreamMessage(); 406 configureMessage(message); 407 return message; 408 } 409 410 /** 411 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> 412 * object is used to send a message containing a <CODE>String</CODE> 413 * object. 414 * 415 * @return an ActiveMQTextMessage 416 * @throws JMSException if the JMS provider fails to create this message due 417 * to some internal error. 418 */ 419 @Override 420 public TextMessage createTextMessage() throws JMSException { 421 ActiveMQTextMessage message = new ActiveMQTextMessage(); 422 configureMessage(message); 423 return message; 424 } 425 426 /** 427 * Creates an initialized <CODE>TextMessage</CODE> object. A 428 * <CODE>TextMessage</CODE> object is used to send a message containing a 429 * <CODE>String</CODE>. 430 * 431 * @param text the string used to initialize this message 432 * @return an ActiveMQTextMessage 433 * @throws JMSException if the JMS provider fails to create this message due 434 * to some internal error. 435 */ 436 @Override 437 public TextMessage createTextMessage(String text) throws JMSException { 438 ActiveMQTextMessage message = new ActiveMQTextMessage(); 439 message.setText(text); 440 configureMessage(message); 441 return message; 442 } 443 444 /** 445 * Creates an initialized <CODE>BlobMessage</CODE> object. A 446 * <CODE>BlobMessage</CODE> object is used to send a message containing a 447 * <CODE>URL</CODE> which points to some network addressible BLOB. 448 * 449 * @param url the network addressable URL used to pass directly to the 450 * consumer 451 * @return a BlobMessage 452 * @throws JMSException if the JMS provider fails to create this message due 453 * to some internal error. 454 */ 455 public BlobMessage createBlobMessage(URL url) throws JMSException { 456 return createBlobMessage(url, false); 457 } 458 459 /** 460 * Creates an initialized <CODE>BlobMessage</CODE> object. A 461 * <CODE>BlobMessage</CODE> object is used to send a message containing a 462 * <CODE>URL</CODE> which points to some network addressible BLOB. 463 * 464 * @param url the network addressable URL used to pass directly to the 465 * consumer 466 * @param deletedByBroker indicates whether or not the resource is deleted 467 * by the broker when the message is acknowledged 468 * @return a BlobMessage 469 * @throws JMSException if the JMS provider fails to create this message due 470 * to some internal error. 471 */ 472 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { 473 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 474 configureMessage(message); 475 message.setURL(url); 476 message.setDeletedByBroker(deletedByBroker); 477 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 478 return message; 479 } 480 481 /** 482 * Creates an initialized <CODE>BlobMessage</CODE> object. A 483 * <CODE>BlobMessage</CODE> object is used to send a message containing 484 * the <CODE>File</CODE> content. Before the message is sent the file 485 * conent will be uploaded to the broker or some other remote repository 486 * depending on the {@link #getBlobTransferPolicy()}. 487 * 488 * @param file the file to be uploaded to some remote repo (or the broker) 489 * depending on the strategy 490 * @return a BlobMessage 491 * @throws JMSException if the JMS provider fails to create this message due 492 * to some internal error. 493 */ 494 public BlobMessage createBlobMessage(File file) throws JMSException { 495 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 496 configureMessage(message); 497 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); 498 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy()))); 499 message.setDeletedByBroker(true); 500 message.setName(file.getName()); 501 return message; 502 } 503 504 /** 505 * Creates an initialized <CODE>BlobMessage</CODE> object. A 506 * <CODE>BlobMessage</CODE> object is used to send a message containing 507 * the <CODE>File</CODE> content. Before the message is sent the file 508 * conent will be uploaded to the broker or some other remote repository 509 * depending on the {@link #getBlobTransferPolicy()}. 510 * 511 * @param in the stream to be uploaded to some remote repo (or the broker) 512 * depending on the strategy 513 * @return a BlobMessage 514 * @throws JMSException if the JMS provider fails to create this message due 515 * to some internal error. 516 */ 517 public BlobMessage createBlobMessage(InputStream in) throws JMSException { 518 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 519 configureMessage(message); 520 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); 521 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 522 message.setDeletedByBroker(true); 523 return message; 524 } 525 526 /** 527 * Indicates whether the session is in transacted mode. 528 * 529 * @return true if the session is in transacted mode 530 * @throws JMSException if there is some internal error. 531 */ 532 @Override 533 public boolean getTransacted() throws JMSException { 534 checkClosed(); 535 return isTransacted(); 536 } 537 538 /** 539 * Returns the acknowledgement mode of the session. The acknowledgement mode 540 * is set at the time that the session is created. If the session is 541 * transacted, the acknowledgement mode is ignored. 542 * 543 * @return If the session is not transacted, returns the current 544 * acknowledgement mode for the session. If the session is 545 * transacted, returns SESSION_TRANSACTED. 546 * @throws JMSException 547 * @see javax.jms.Connection#createSession(boolean,int) 548 * @since 1.1 exception JMSException if there is some internal error. 549 */ 550 @Override 551 public int getAcknowledgeMode() throws JMSException { 552 checkClosed(); 553 return this.acknowledgementMode; 554 } 555 556 /** 557 * Commits all messages done in this transaction and releases any locks 558 * currently held. 559 * 560 * @throws JMSException if the JMS provider fails to commit the transaction 561 * due to some internal error. 562 * @throws TransactionRolledBackException if the transaction is rolled back 563 * due to some internal error during commit. 564 * @throws javax.jms.IllegalStateException if the method is not called by a 565 * transacted session. 566 */ 567 @Override 568 public void commit() throws JMSException { 569 checkClosed(); 570 if (!getTransacted()) { 571 throw new javax.jms.IllegalStateException("Not a transacted session"); 572 } 573 if (LOG.isDebugEnabled()) { 574 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); 575 } 576 transactionContext.commit(); 577 } 578 579 /** 580 * Rolls back any messages done in this transaction and releases any locks 581 * currently held. 582 * 583 * @throws JMSException if the JMS provider fails to roll back the 584 * transaction due to some internal error. 585 * @throws javax.jms.IllegalStateException if the method is not called by a 586 * transacted session. 587 */ 588 @Override 589 public void rollback() throws JMSException { 590 checkClosed(); 591 if (!getTransacted()) { 592 throw new javax.jms.IllegalStateException("Not a transacted session"); 593 } 594 if (LOG.isDebugEnabled()) { 595 LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId()); 596 } 597 transactionContext.rollback(); 598 } 599 600 /** 601 * Closes the session. 602 * <P> 603 * Since a provider may allocate some resources on behalf of a session 604 * outside the JVM, clients should close the resources when they are not 605 * needed. Relying on garbage collection to eventually reclaim these 606 * resources may not be timely enough. 607 * <P> 608 * There is no need to close the producers and consumers of a closed 609 * session. 610 * <P> 611 * This call will block until a <CODE>receive</CODE> call or message 612 * listener in progress has completed. A blocked message consumer 613 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session 614 * is closed. 615 * <P> 616 * Closing a transacted session must roll back the transaction in progress. 617 * <P> 618 * This method is the only <CODE>Session</CODE> method that can be called 619 * concurrently. 620 * <P> 621 * Invoking any other <CODE>Session</CODE> method on a closed session must 622 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a 623 * closed session must <I>not </I> throw an exception. 624 * 625 * @throws JMSException if the JMS provider fails to close the session due 626 * to some internal error. 627 */ 628 @Override 629 public void close() throws JMSException { 630 if (!closed) { 631 if (getTransactionContext().isInXATransaction()) { 632 if (!synchronizationRegistered) { 633 synchronizationRegistered = true; 634 getTransactionContext().addSynchronization(new Synchronization() { 635 636 @Override 637 public void afterCommit() throws Exception { 638 doClose(); 639 synchronizationRegistered = false; 640 } 641 642 @Override 643 public void afterRollback() throws Exception { 644 doClose(); 645 synchronizationRegistered = false; 646 } 647 }); 648 } 649 650 } else { 651 doClose(); 652 } 653 } 654 } 655 656 private void doClose() throws JMSException { 657 dispose(); 658 RemoveInfo removeCommand = info.createRemoveCommand(); 659 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 660 connection.asyncSendPacket(removeCommand); 661 } 662 663 final AtomicInteger clearRequestsCounter = new AtomicInteger(0); 664 void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { 665 clearRequestsCounter.incrementAndGet(); 666 executor.clearMessagesInProgress(); 667 // we are called from inside the transport reconnection logic which involves us 668 // clearing all the connections' consumers dispatch and delivered lists. So rather 669 // than trying to grab a mutex (which could be already owned by the message listener 670 // calling the send or an ack) we allow it to complete in a separate thread via the 671 // scheduler and notify us via connection.transportInterruptionProcessingComplete() 672 // 673 // We must be careful though not to allow multiple calls to this method from a 674 // connection that is having issue becoming fully established from causing a large 675 // build up of scheduled tasks to clear the same consumers over and over. 676 if (consumers.isEmpty()) { 677 return; 678 } 679 680 if (clearInProgress.compareAndSet(false, true)) { 681 for (final ActiveMQMessageConsumer consumer : consumers) { 682 consumer.inProgressClearRequired(); 683 transportInterruptionProcessingComplete.incrementAndGet(); 684 try { 685 connection.getScheduler().executeAfterDelay(new Runnable() { 686 @Override 687 public void run() { 688 consumer.clearMessagesInProgress(); 689 }}, 0l); 690 } catch (JMSException e) { 691 connection.onClientInternalException(e); 692 } 693 } 694 695 try { 696 connection.getScheduler().executeAfterDelay(new Runnable() { 697 @Override 698 public void run() { 699 clearInProgress.set(false); 700 }}, 0l); 701 } catch (JMSException e) { 702 connection.onClientInternalException(e); 703 } 704 } 705 } 706 707 void deliverAcks() { 708 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 709 ActiveMQMessageConsumer consumer = iter.next(); 710 consumer.deliverAcks(); 711 } 712 } 713 714 public synchronized void dispose() throws JMSException { 715 if (!closed) { 716 717 try { 718 executor.close(); 719 720 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 721 ActiveMQMessageConsumer consumer = iter.next(); 722 consumer.setFailureError(connection.getFirstFailureError()); 723 consumer.dispose(); 724 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); 725 } 726 consumers.clear(); 727 728 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) { 729 ActiveMQMessageProducer producer = iter.next(); 730 producer.dispose(); 731 } 732 producers.clear(); 733 734 try { 735 if (getTransactionContext().isInLocalTransaction()) { 736 rollback(); 737 } 738 } catch (JMSException e) { 739 } 740 741 } finally { 742 connection.removeSession(this); 743 this.transactionContext = null; 744 closed = true; 745 } 746 } 747 } 748 749 /** 750 * Checks that the session is not closed then configures the message 751 */ 752 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { 753 checkClosed(); 754 message.setConnection(connection); 755 } 756 757 /** 758 * Check if the session is closed. It is used for ensuring that the session 759 * is open before performing various operations. 760 * 761 * @throws IllegalStateException if the Session is closed 762 */ 763 protected void checkClosed() throws IllegalStateException { 764 if (closed) { 765 throw new IllegalStateException("The Session is closed"); 766 } 767 } 768 769 /** 770 * Checks if the session is closed. 771 * 772 * @return true if the session is closed, false otherwise. 773 */ 774 public boolean isClosed() { 775 return closed; 776 } 777 778 /** 779 * Stops message delivery in this session, and restarts message delivery 780 * with the oldest unacknowledged message. 781 * <P> 782 * All consumers deliver messages in a serial order. Acknowledging a 783 * received message automatically acknowledges all messages that have been 784 * delivered to the client. 785 * <P> 786 * Restarting a session causes it to take the following actions: 787 * <UL> 788 * <LI>Stop message delivery 789 * <LI>Mark all messages that might have been delivered but not 790 * acknowledged as "redelivered" 791 * <LI>Restart the delivery sequence including all unacknowledged messages 792 * that had been previously delivered. Redelivered messages do not have to 793 * be delivered in exactly their original delivery order. 794 * </UL> 795 * 796 * @throws JMSException if the JMS provider fails to stop and restart 797 * message delivery due to some internal error. 798 * @throws IllegalStateException if the method is called by a transacted 799 * session. 800 */ 801 @Override 802 public void recover() throws JMSException { 803 804 checkClosed(); 805 if (getTransacted()) { 806 throw new IllegalStateException("This session is transacted"); 807 } 808 809 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 810 ActiveMQMessageConsumer c = iter.next(); 811 c.rollback(); 812 } 813 814 } 815 816 /** 817 * Returns the session's distinguished message listener (optional). 818 * 819 * @return the message listener associated with this session 820 * @throws JMSException if the JMS provider fails to get the message 821 * listener due to an internal error. 822 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) 823 * @see javax.jms.ServerSessionPool 824 * @see javax.jms.ServerSession 825 */ 826 @Override 827 public MessageListener getMessageListener() throws JMSException { 828 checkClosed(); 829 return this.messageListener; 830 } 831 832 /** 833 * Sets the session's distinguished message listener (optional). 834 * <P> 835 * When the distinguished message listener is set, no other form of message 836 * receipt in the session can be used; however, all forms of sending 837 * messages are still supported. 838 * <P> 839 * If this session has been closed, then an {@link IllegalStateException} is 840 * thrown, if trying to set a new listener. However setting the listener 841 * to <tt>null</tt> is allowed, to clear the listener, even if this session 842 * has been closed prior. 843 * <P> 844 * This is an expert facility not used by regular JMS clients. 845 * 846 * @param listener the message listener to associate with this session 847 * @throws JMSException if the JMS provider fails to set the message 848 * listener due to an internal error. 849 * @see javax.jms.Session#getMessageListener() 850 * @see javax.jms.ServerSessionPool 851 * @see javax.jms.ServerSession 852 */ 853 @Override 854 public void setMessageListener(MessageListener listener) throws JMSException { 855 // only check for closed if we set a new listener, as we allow to clear 856 // the listener, such as when an application is shutting down, and is 857 // no longer using a message listener on this session 858 if (listener != null) { 859 checkClosed(); 860 } 861 this.messageListener = listener; 862 863 if (listener != null) { 864 executor.setDispatchedBySessionPool(true); 865 } 866 } 867 868 /** 869 * Optional operation, intended to be used only by Application Servers, not 870 * by ordinary JMS clients. 871 * 872 * @see javax.jms.ServerSession 873 */ 874 @Override 875 public void run() { 876 MessageDispatch messageDispatch; 877 while ((messageDispatch = executor.dequeueNoWait()) != null) { 878 final MessageDispatch md = messageDispatch; 879 final ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 880 881 MessageAck earlyAck = null; 882 if (message.isExpired()) { 883 earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); 884 earlyAck.setFirstMessageId(message.getMessageId()); 885 } else if (connection.isDuplicate(ActiveMQSession.this, message)) { 886 LOG.debug("{} got duplicate: {}", this, message.getMessageId()); 887 earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 888 earlyAck.setFirstMessageId(md.getMessage().getMessageId()); 889 earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); 890 } 891 if (earlyAck != null) { 892 try { 893 asyncSendPacket(earlyAck); 894 } catch (Throwable t) { 895 LOG.error("error dispatching ack: {} ", earlyAck, t); 896 connection.onClientInternalException(t); 897 } finally { 898 continue; 899 } 900 } 901 902 if (isClientAcknowledge()||isIndividualAcknowledge()) { 903 message.setAcknowledgeCallback(new Callback() { 904 @Override 905 public void execute() throws Exception { 906 } 907 }); 908 } 909 910 if (deliveryListener != null) { 911 deliveryListener.beforeDelivery(this, message); 912 } 913 914 md.setDeliverySequenceId(getNextDeliveryId()); 915 lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId(); 916 917 final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 918 919 final AtomicBoolean afterDeliveryError = new AtomicBoolean(false); 920 /* 921 * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched. 922 * We dont want the after deliver being called after the redeliver as it may cause some weird stuff. 923 * */ 924 synchronized (redeliveryGuard) { 925 try { 926 ack.setFirstMessageId(md.getMessage().getMessageId()); 927 doStartTransaction(); 928 ack.setTransactionId(getTransactionContext().getTransactionId()); 929 if (ack.getTransactionId() != null) { 930 getTransactionContext().addSynchronization(new Synchronization() { 931 932 final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); 933 934 @Override 935 public void beforeEnd() throws Exception { 936 // validate our consumer so we don't push stale acks that get ignored 937 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 938 LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); 939 throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); 940 } 941 LOG.trace("beforeEnd ack {}", ack); 942 sendAck(ack); 943 } 944 945 @Override 946 public void afterRollback() throws Exception { 947 LOG.trace("rollback {}", ack, new Throwable("here")); 948 // ensure we don't filter this as a duplicate 949 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); 950 951 // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect 952 if (clearRequestsCounter.get() > clearRequestCount) { 953 LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); 954 return; 955 } 956 957 // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched 958 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 959 LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); 960 return; 961 } 962 963 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); 964 int redeliveryCounter = md.getMessage().getRedeliveryCounter(); 965 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 966 && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) { 967 // We need to NACK the messages so that they get 968 // sent to the 969 // DLQ. 970 // Acknowledge the last message. 971 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 972 ack.setFirstMessageId(md.getMessage().getMessageId()); 973 ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); 974 asyncSendPacket(ack); 975 976 } else { 977 978 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); 979 ack.setFirstMessageId(md.getMessage().getMessageId()); 980 asyncSendPacket(ack); 981 982 // Figure out how long we should wait to resend 983 // this message. 984 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 985 for (int i = 0; i < redeliveryCounter; i++) { 986 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 987 } 988 989 /* 990 * If we are a non blocking delivery then we need to stop the executor to avoid more 991 * messages being delivered, once the message is redelivered we can restart it. 992 * */ 993 if (!connection.isNonBlockingRedelivery()) { 994 LOG.debug("Blocking session until re-delivery..."); 995 executor.stop(); 996 } 997 998 connection.getScheduler().executeAfterDelay(new Runnable() { 999 1000 @Override 1001 public void run() { 1002 /* 1003 * wait for the first delivery to be complete, i.e. after delivery has been called. 1004 * */ 1005 synchronized (redeliveryGuard) { 1006 /* 1007 * If its non blocking then we can just dispatch in a new session. 1008 * */ 1009 if (connection.isNonBlockingRedelivery()) { 1010 ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); 1011 } else { 1012 /* 1013 * If there has been an error thrown during afterDelivery then the 1014 * endpoint will be marked as dead so redelivery will fail (and eventually 1015 * the session marked as stale), in this case we can only call dispatch 1016 * which will create a new session with a new endpoint. 1017 * */ 1018 if (afterDeliveryError.get()) { 1019 ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); 1020 } else { 1021 executor.executeFirst(md); 1022 executor.start(); 1023 } 1024 } 1025 } 1026 } 1027 }, redeliveryDelay); 1028 } 1029 md.getMessage().onMessageRolledBack(); 1030 } 1031 }); 1032 } 1033 1034 LOG.trace("{} onMessage({})", this, message.getMessageId()); 1035 messageListener.onMessage(message); 1036 1037 } catch (Throwable e) { 1038 LOG.error("error dispatching message: ", e); 1039 1040 // A problem while invoking the MessageListener does not 1041 // in general indicate a problem with the connection to the broker, i.e. 1042 // it will usually be sufficient to let the afterDelivery() method either 1043 // commit or roll back in order to deal with the exception. 1044 // However, we notify any registered client internal exception listener 1045 // of the problem. 1046 connection.onClientInternalException(e); 1047 } finally { 1048 if (ack.getTransactionId() == null) { 1049 try { 1050 asyncSendPacket(ack); 1051 } catch (Throwable e) { 1052 connection.onClientInternalException(e); 1053 } 1054 } 1055 } 1056 1057 if (deliveryListener != null) { 1058 try { 1059 deliveryListener.afterDelivery(this, message); 1060 } catch (Throwable t) { 1061 LOG.debug("Unable to call after delivery", t); 1062 afterDeliveryError.set(true); 1063 throw new RuntimeException(t); 1064 } 1065 } 1066 } 1067 /* 1068 * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway. 1069 * It also needs to be outside the redelivery guard. 1070 * */ 1071 try { 1072 executor.waitForQueueRestart(); 1073 } catch (InterruptedException ex) { 1074 connection.onClientInternalException(ex); 1075 } 1076 } 1077 } 1078 1079 /** 1080 * Creates a <CODE>MessageProducer</CODE> to send messages to the 1081 * specified destination. 1082 * <P> 1083 * A client uses a <CODE>MessageProducer</CODE> object to send messages to 1084 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both 1085 * inherit from <CODE>Destination</CODE>, they can be used in the 1086 * destination parameter to create a <CODE>MessageProducer</CODE> object. 1087 * 1088 * @param destination the <CODE>Destination</CODE> to send to, or null if 1089 * this is a producer which does not have a specified 1090 * destination. 1091 * @return the MessageProducer 1092 * @throws JMSException if the session fails to create a MessageProducer due 1093 * to some internal error. 1094 * @throws InvalidDestinationException if an invalid destination is 1095 * specified. 1096 * @since 1.1 1097 */ 1098 @Override 1099 public MessageProducer createProducer(Destination destination) throws JMSException { 1100 checkClosed(); 1101 if (destination instanceof CustomDestination) { 1102 CustomDestination customDestination = (CustomDestination)destination; 1103 return customDestination.createProducer(this); 1104 } 1105 int timeSendOut = connection.getSendTimeout(); 1106 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut); 1107 } 1108 1109 /** 1110 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1111 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1112 * <CODE>Destination</CODE>, they can be used in the destination 1113 * parameter to create a <CODE>MessageConsumer</CODE>. 1114 * 1115 * @param destination the <CODE>Destination</CODE> to access. 1116 * @return the MessageConsumer 1117 * @throws JMSException if the session fails to create a consumer due to 1118 * some internal error. 1119 * @throws InvalidDestinationException if an invalid destination is 1120 * specified. 1121 * @since 1.1 1122 */ 1123 @Override 1124 public MessageConsumer createConsumer(Destination destination) throws JMSException { 1125 return createConsumer(destination, (String) null); 1126 } 1127 1128 /** 1129 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1130 * using a message selector. Since <CODE> Queue</CODE> and 1131 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1132 * can be used in the destination parameter to create a 1133 * <CODE>MessageConsumer</CODE>. 1134 * <P> 1135 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1136 * that have been sent to a destination. 1137 * 1138 * @param destination the <CODE>Destination</CODE> to access 1139 * @param messageSelector only messages with properties matching the message 1140 * selector expression are delivered. A value of null or an 1141 * empty string indicates that there is no message selector 1142 * for the message consumer. 1143 * @return the MessageConsumer 1144 * @throws JMSException if the session fails to create a MessageConsumer due 1145 * to some internal error. 1146 * @throws InvalidDestinationException if an invalid destination is 1147 * specified. 1148 * @throws InvalidSelectorException if the message selector is invalid. 1149 * @since 1.1 1150 */ 1151 @Override 1152 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 1153 return createConsumer(destination, messageSelector, false); 1154 } 1155 1156 /** 1157 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1158 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1159 * <CODE>Destination</CODE>, they can be used in the destination 1160 * parameter to create a <CODE>MessageConsumer</CODE>. 1161 * 1162 * @param destination the <CODE>Destination</CODE> to access. 1163 * @param messageListener the listener to use for async consumption of messages 1164 * @return the MessageConsumer 1165 * @throws JMSException if the session fails to create a consumer due to 1166 * some internal error. 1167 * @throws InvalidDestinationException if an invalid destination is 1168 * specified. 1169 * @since 1.1 1170 */ 1171 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { 1172 return createConsumer(destination, null, messageListener); 1173 } 1174 1175 /** 1176 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1177 * using a message selector. Since <CODE> Queue</CODE> and 1178 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1179 * can be used in the destination parameter to create a 1180 * <CODE>MessageConsumer</CODE>. 1181 * <P> 1182 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1183 * that have been sent to a destination. 1184 * 1185 * @param destination the <CODE>Destination</CODE> to access 1186 * @param messageSelector only messages with properties matching the message 1187 * selector expression are delivered. A value of null or an 1188 * empty string indicates that there is no message selector 1189 * for the message consumer. 1190 * @param messageListener the listener to use for async consumption of messages 1191 * @return the MessageConsumer 1192 * @throws JMSException if the session fails to create a MessageConsumer due 1193 * to some internal error. 1194 * @throws InvalidDestinationException if an invalid destination is 1195 * specified. 1196 * @throws InvalidSelectorException if the message selector is invalid. 1197 * @since 1.1 1198 */ 1199 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { 1200 return createConsumer(destination, messageSelector, false, messageListener); 1201 } 1202 1203 /** 1204 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1205 * using a message selector. This method can specify whether messages 1206 * published by its own connection should be delivered to it, if the 1207 * destination is a topic. 1208 * <P> 1209 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1210 * <CODE>Destination</CODE>, they can be used in the destination 1211 * parameter to create a <CODE>MessageConsumer</CODE>. 1212 * <P> 1213 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1214 * that have been published to a destination. 1215 * <P> 1216 * In some cases, a connection may both publish and subscribe to a topic. 1217 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1218 * inhibit the delivery of messages published by its own connection. The 1219 * default value for this attribute is False. The <CODE>noLocal</CODE> 1220 * value must be supported by destinations that are topics. 1221 * 1222 * @param destination the <CODE>Destination</CODE> to access 1223 * @param messageSelector only messages with properties matching the message 1224 * selector expression are delivered. A value of null or an 1225 * empty string indicates that there is no message selector 1226 * for the message consumer. 1227 * @param noLocal - if true, and the destination is a topic, inhibits the 1228 * delivery of messages published by its own connection. The 1229 * behavior for <CODE>NoLocal</CODE> is not specified if 1230 * the destination is a queue. 1231 * @return the MessageConsumer 1232 * @throws JMSException if the session fails to create a MessageConsumer due 1233 * to some internal error. 1234 * @throws InvalidDestinationException if an invalid destination is 1235 * specified. 1236 * @throws InvalidSelectorException if the message selector is invalid. 1237 * @since 1.1 1238 */ 1239 @Override 1240 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { 1241 return createConsumer(destination, messageSelector, noLocal, null); 1242 } 1243 1244 /** 1245 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1246 * using a message selector. This method can specify whether messages 1247 * published by its own connection should be delivered to it, if the 1248 * destination is a topic. 1249 * <P> 1250 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1251 * <CODE>Destination</CODE>, they can be used in the destination 1252 * parameter to create a <CODE>MessageConsumer</CODE>. 1253 * <P> 1254 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1255 * that have been published to a destination. 1256 * <P> 1257 * In some cases, a connection may both publish and subscribe to a topic. 1258 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1259 * inhibit the delivery of messages published by its own connection. The 1260 * default value for this attribute is False. The <CODE>noLocal</CODE> 1261 * value must be supported by destinations that are topics. 1262 * 1263 * @param destination the <CODE>Destination</CODE> to access 1264 * @param messageSelector only messages with properties matching the message 1265 * selector expression are delivered. A value of null or an 1266 * empty string indicates that there is no message selector 1267 * for the message consumer. 1268 * @param noLocal - if true, and the destination is a topic, inhibits the 1269 * delivery of messages published by its own connection. The 1270 * behavior for <CODE>NoLocal</CODE> is not specified if 1271 * the destination is a queue. 1272 * @param messageListener the listener to use for async consumption of messages 1273 * @return the MessageConsumer 1274 * @throws JMSException if the session fails to create a MessageConsumer due 1275 * to some internal error. 1276 * @throws InvalidDestinationException if an invalid destination is 1277 * specified. 1278 * @throws InvalidSelectorException if the message selector is invalid. 1279 * @since 1.1 1280 */ 1281 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { 1282 checkClosed(); 1283 1284 if (destination instanceof CustomDestination) { 1285 CustomDestination customDestination = (CustomDestination)destination; 1286 return customDestination.createConsumer(this, messageSelector, noLocal); 1287 } 1288 1289 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 1290 int prefetch = 0; 1291 if (destination instanceof Topic) { 1292 prefetch = prefetchPolicy.getTopicPrefetch(); 1293 } else { 1294 prefetch = prefetchPolicy.getQueuePrefetch(); 1295 } 1296 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); 1297 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, 1298 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); 1299 } 1300 1301 /** 1302 * Creates a queue identity given a <CODE>Queue</CODE> name. 1303 * <P> 1304 * This facility is provided for the rare cases where clients need to 1305 * dynamically manipulate queue identity. It allows the creation of a queue 1306 * identity with a provider-specific name. Clients that depend on this 1307 * ability are not portable. 1308 * <P> 1309 * Note that this method is not for creating the physical queue. The 1310 * physical creation of queues is an administrative task and is not to be 1311 * initiated by the JMS API. The one exception is the creation of temporary 1312 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> 1313 * method. 1314 * 1315 * @param queueName the name of this <CODE>Queue</CODE> 1316 * @return a <CODE>Queue</CODE> with the given name 1317 * @throws JMSException if the session fails to create a queue due to some 1318 * internal error. 1319 * @since 1.1 1320 */ 1321 @Override 1322 public Queue createQueue(String queueName) throws JMSException { 1323 checkClosed(); 1324 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1325 return new ActiveMQTempQueue(queueName); 1326 } 1327 return new ActiveMQQueue(queueName); 1328 } 1329 1330 /** 1331 * Creates a topic identity given a <CODE>Topic</CODE> name. 1332 * <P> 1333 * This facility is provided for the rare cases where clients need to 1334 * dynamically manipulate topic identity. This allows the creation of a 1335 * topic identity with a provider-specific name. Clients that depend on this 1336 * ability are not portable. 1337 * <P> 1338 * Note that this method is not for creating the physical topic. The 1339 * physical creation of topics is an administrative task and is not to be 1340 * initiated by the JMS API. The one exception is the creation of temporary 1341 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> 1342 * method. 1343 * 1344 * @param topicName the name of this <CODE>Topic</CODE> 1345 * @return a <CODE>Topic</CODE> with the given name 1346 * @throws JMSException if the session fails to create a topic due to some 1347 * internal error. 1348 * @since 1.1 1349 */ 1350 @Override 1351 public Topic createTopic(String topicName) throws JMSException { 1352 checkClosed(); 1353 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1354 return new ActiveMQTempTopic(topicName); 1355 } 1356 return new ActiveMQTopic(topicName); 1357 } 1358 1359 /** 1360 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1361 * the specified queue. 1362 * 1363 * @param queue the <CODE>queue</CODE> to access 1364 * @exception InvalidDestinationException if an invalid destination is 1365 * specified 1366 * @since 1.1 1367 */ 1368 /** 1369 * Creates a durable subscriber to the specified topic. 1370 * <P> 1371 * If a client needs to receive all the messages published on a topic, 1372 * including the ones published while the subscriber is inactive, it uses a 1373 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1374 * record of this durable subscription and insures that all messages from 1375 * the topic's publishers are retained until they are acknowledged by this 1376 * durable subscriber or they have expired. 1377 * <P> 1378 * Sessions with durable subscribers must always provide the same client 1379 * identifier. In addition, each client must specify a name that uniquely 1380 * identifies (within client identifier) each durable subscription it 1381 * creates. Only one session at a time can have a 1382 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. 1383 * <P> 1384 * A client can change an existing durable subscription by creating a 1385 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1386 * and/or message selector. Changing a durable subscriber is equivalent to 1387 * unsubscribing (deleting) the old one and creating a new one. 1388 * <P> 1389 * In some cases, a connection may both publish and subscribe to a topic. 1390 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1391 * inhibit the delivery of messages published by its own connection. The 1392 * default value for this attribute is false. 1393 * 1394 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1395 * @param name the name used to identify this subscription 1396 * @return the TopicSubscriber 1397 * @throws JMSException if the session fails to create a subscriber due to 1398 * some internal error. 1399 * @throws InvalidDestinationException if an invalid topic is specified. 1400 * @since 1.1 1401 */ 1402 @Override 1403 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1404 checkClosed(); 1405 return createDurableSubscriber(topic, name, null, false); 1406 } 1407 1408 /** 1409 * Creates a durable subscriber to the specified topic, using a message 1410 * selector and specifying whether messages published by its own connection 1411 * should be delivered to it. 1412 * <P> 1413 * If a client needs to receive all the messages published on a topic, 1414 * including the ones published while the subscriber is inactive, it uses a 1415 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1416 * record of this durable subscription and insures that all messages from 1417 * the topic's publishers are retained until they are acknowledged by this 1418 * durable subscriber or they have expired. 1419 * <P> 1420 * Sessions with durable subscribers must always provide the same client 1421 * identifier. In addition, each client must specify a name which uniquely 1422 * identifies (within client identifier) each durable subscription it 1423 * creates. Only one session at a time can have a 1424 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An 1425 * inactive durable subscriber is one that exists but does not currently 1426 * have a message consumer associated with it. 1427 * <P> 1428 * A client can change an existing durable subscription by creating a 1429 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1430 * and/or message selector. Changing a durable subscriber is equivalent to 1431 * unsubscribing (deleting) the old one and creating a new one. 1432 * 1433 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1434 * @param name the name used to identify this subscription 1435 * @param messageSelector only messages with properties matching the message 1436 * selector expression are delivered. A value of null or an 1437 * empty string indicates that there is no message selector 1438 * for the message consumer. 1439 * @param noLocal if set, inhibits the delivery of messages published by its 1440 * own connection 1441 * @return the Queue Browser 1442 * @throws JMSException if the session fails to create a subscriber due to 1443 * some internal error. 1444 * @throws InvalidDestinationException if an invalid topic is specified. 1445 * @throws InvalidSelectorException if the message selector is invalid. 1446 * @since 1.1 1447 */ 1448 @Override 1449 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1450 checkClosed(); 1451 1452 if (topic == null) { 1453 throw new InvalidDestinationException("Topic cannot be null"); 1454 } 1455 1456 if (topic instanceof CustomDestination) { 1457 CustomDestination customDestination = (CustomDestination)topic; 1458 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); 1459 } 1460 1461 connection.checkClientIDWasManuallySpecified(); 1462 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1463 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch(); 1464 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit(); 1465 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit, 1466 noLocal, false, asyncDispatch); 1467 } 1468 1469 /** 1470 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1471 * the specified queue. 1472 * 1473 * @param queue the <CODE>queue</CODE> to access 1474 * @return the Queue Browser 1475 * @throws JMSException if the session fails to create a browser due to some 1476 * internal error. 1477 * @throws InvalidDestinationException if an invalid destination is 1478 * specified 1479 * @since 1.1 1480 */ 1481 @Override 1482 public QueueBrowser createBrowser(Queue queue) throws JMSException { 1483 checkClosed(); 1484 return createBrowser(queue, null); 1485 } 1486 1487 /** 1488 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1489 * the specified queue using a message selector. 1490 * 1491 * @param queue the <CODE>queue</CODE> to access 1492 * @param messageSelector only messages with properties matching the message 1493 * selector expression are delivered. A value of null or an 1494 * empty string indicates that there is no message selector 1495 * for the message consumer. 1496 * @return the Queue Browser 1497 * @throws JMSException if the session fails to create a browser due to some 1498 * internal error. 1499 * @throws InvalidDestinationException if an invalid destination is 1500 * specified 1501 * @throws InvalidSelectorException if the message selector is invalid. 1502 * @since 1.1 1503 */ 1504 @Override 1505 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 1506 checkClosed(); 1507 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); 1508 } 1509 1510 /** 1511 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that 1512 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1513 * 1514 * @return a temporary queue identity 1515 * @throws JMSException if the session fails to create a temporary queue due 1516 * to some internal error. 1517 * @since 1.1 1518 */ 1519 @Override 1520 public TemporaryQueue createTemporaryQueue() throws JMSException { 1521 checkClosed(); 1522 return (TemporaryQueue)connection.createTempDestination(false); 1523 } 1524 1525 /** 1526 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that 1527 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1528 * 1529 * @return a temporary topic identity 1530 * @throws JMSException if the session fails to create a temporary topic due 1531 * to some internal error. 1532 * @since 1.1 1533 */ 1534 @Override 1535 public TemporaryTopic createTemporaryTopic() throws JMSException { 1536 checkClosed(); 1537 return (TemporaryTopic)connection.createTempDestination(true); 1538 } 1539 1540 /** 1541 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1542 * the specified queue. 1543 * 1544 * @param queue the <CODE>Queue</CODE> to access 1545 * @return 1546 * @throws JMSException if the session fails to create a receiver due to 1547 * some internal error. 1548 * @throws JMSException 1549 * @throws InvalidDestinationException if an invalid queue is specified. 1550 */ 1551 @Override 1552 public QueueReceiver createReceiver(Queue queue) throws JMSException { 1553 checkClosed(); 1554 return createReceiver(queue, null); 1555 } 1556 1557 /** 1558 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1559 * the specified queue using a message selector. 1560 * 1561 * @param queue the <CODE>Queue</CODE> to access 1562 * @param messageSelector only messages with properties matching the message 1563 * selector expression are delivered. A value of null or an 1564 * empty string indicates that there is no message selector 1565 * for the message consumer. 1566 * @return QueueReceiver 1567 * @throws JMSException if the session fails to create a receiver due to 1568 * some internal error. 1569 * @throws InvalidDestinationException if an invalid queue is specified. 1570 * @throws InvalidSelectorException if the message selector is invalid. 1571 */ 1572 @Override 1573 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1574 checkClosed(); 1575 1576 if (queue instanceof CustomDestination) { 1577 CustomDestination customDestination = (CustomDestination)queue; 1578 return customDestination.createReceiver(this, messageSelector); 1579 } 1580 1581 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1582 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), 1583 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); 1584 } 1585 1586 /** 1587 * Creates a <CODE>QueueSender</CODE> object to send messages to the 1588 * specified queue. 1589 * 1590 * @param queue the <CODE>Queue</CODE> to access, or null if this is an 1591 * unidentified producer 1592 * @return QueueSender 1593 * @throws JMSException if the session fails to create a sender due to some 1594 * internal error. 1595 * @throws InvalidDestinationException if an invalid queue is specified. 1596 */ 1597 @Override 1598 public QueueSender createSender(Queue queue) throws JMSException { 1599 checkClosed(); 1600 if (queue instanceof CustomDestination) { 1601 CustomDestination customDestination = (CustomDestination)queue; 1602 return customDestination.createSender(this); 1603 } 1604 int timeSendOut = connection.getSendTimeout(); 1605 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut); 1606 } 1607 1608 /** 1609 * Creates a nondurable subscriber to the specified topic. <p/> 1610 * <P> 1611 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1612 * that have been published to a topic. <p/> 1613 * <P> 1614 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1615 * receive only messages that are published while they are active. <p/> 1616 * <P> 1617 * In some cases, a connection may both publish and subscribe to a topic. 1618 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1619 * inhibit the delivery of messages published by its own connection. The 1620 * default value for this attribute is false. 1621 * 1622 * @param topic the <CODE>Topic</CODE> to subscribe to 1623 * @return TopicSubscriber 1624 * @throws JMSException if the session fails to create a subscriber due to 1625 * some internal error. 1626 * @throws InvalidDestinationException if an invalid topic is specified. 1627 */ 1628 @Override 1629 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1630 checkClosed(); 1631 return createSubscriber(topic, null, false); 1632 } 1633 1634 /** 1635 * Creates a nondurable subscriber to the specified topic, using a message 1636 * selector or specifying whether messages published by its own connection 1637 * should be delivered to it. <p/> 1638 * <P> 1639 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1640 * that have been published to a topic. <p/> 1641 * <P> 1642 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1643 * receive only messages that are published while they are active. <p/> 1644 * <P> 1645 * Messages filtered out by a subscriber's message selector will never be 1646 * delivered to the subscriber. From the subscriber's perspective, they do 1647 * not exist. <p/> 1648 * <P> 1649 * In some cases, a connection may both publish and subscribe to a topic. 1650 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1651 * inhibit the delivery of messages published by its own connection. The 1652 * default value for this attribute is false. 1653 * 1654 * @param topic the <CODE>Topic</CODE> to subscribe to 1655 * @param messageSelector only messages with properties matching the message 1656 * selector expression are delivered. A value of null or an 1657 * empty string indicates that there is no message selector 1658 * for the message consumer. 1659 * @param noLocal if set, inhibits the delivery of messages published by its 1660 * own connection 1661 * @return TopicSubscriber 1662 * @throws JMSException if the session fails to create a subscriber due to 1663 * some internal error. 1664 * @throws InvalidDestinationException if an invalid topic is specified. 1665 * @throws InvalidSelectorException if the message selector is invalid. 1666 */ 1667 @Override 1668 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1669 checkClosed(); 1670 1671 if (topic instanceof CustomDestination) { 1672 CustomDestination customDestination = (CustomDestination)topic; 1673 return customDestination.createSubscriber(this, messageSelector, noLocal); 1674 } 1675 1676 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1677 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy 1678 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); 1679 } 1680 1681 /** 1682 * Creates a publisher for the specified topic. <p/> 1683 * <P> 1684 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages 1685 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on 1686 * a topic, it defines a new sequence of messages that have no ordering 1687 * relationship with the messages it has previously sent. 1688 * 1689 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is 1690 * an unidentified producer 1691 * @return TopicPublisher 1692 * @throws JMSException if the session fails to create a publisher due to 1693 * some internal error. 1694 * @throws InvalidDestinationException if an invalid topic is specified. 1695 */ 1696 @Override 1697 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1698 checkClosed(); 1699 1700 if (topic instanceof CustomDestination) { 1701 CustomDestination customDestination = (CustomDestination)topic; 1702 return customDestination.createPublisher(this); 1703 } 1704 int timeSendOut = connection.getSendTimeout(); 1705 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut); 1706 } 1707 1708 /** 1709 * Unsubscribes a durable subscription that has been created by a client. 1710 * <P> 1711 * This method deletes the state being maintained on behalf of the 1712 * subscriber by its provider. 1713 * <P> 1714 * It is erroneous for a client to delete a durable subscription while there 1715 * is an active <CODE>MessageConsumer </CODE> or 1716 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 1717 * message is part of a pending transaction or has not been acknowledged in 1718 * the session. 1719 * 1720 * @param name the name used to identify this subscription 1721 * @throws JMSException if the session fails to unsubscribe to the durable 1722 * subscription due to some internal error. 1723 * @throws InvalidDestinationException if an invalid subscription name is 1724 * specified. 1725 * @since 1.1 1726 */ 1727 @Override 1728 public void unsubscribe(String name) throws JMSException { 1729 checkClosed(); 1730 connection.unsubscribe(name); 1731 } 1732 1733 @Override 1734 public void dispatch(MessageDispatch messageDispatch) { 1735 try { 1736 executor.execute(messageDispatch); 1737 } catch (InterruptedException e) { 1738 Thread.currentThread().interrupt(); 1739 connection.onClientInternalException(e); 1740 } 1741 } 1742 1743 /** 1744 * Acknowledges all consumed messages of the session of this consumed 1745 * message. 1746 * <P> 1747 * All consumed JMS messages support the <CODE>acknowledge</CODE> method 1748 * for use when a client has specified that its JMS session's consumed 1749 * messages are to be explicitly acknowledged. By invoking 1750 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges 1751 * all messages consumed by the session that the message was delivered to. 1752 * <P> 1753 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted 1754 * sessions and sessions specified to use implicit acknowledgement modes. 1755 * <P> 1756 * A client may individually acknowledge each message as it is consumed, or 1757 * it may choose to acknowledge messages as an application-defined group 1758 * (which is done by calling acknowledge on the last received message of the 1759 * group, thereby acknowledging all messages consumed by the session.) 1760 * <P> 1761 * Messages that have been received but not acknowledged may be redelivered. 1762 * 1763 * @throws JMSException if the JMS provider fails to acknowledge the 1764 * messages due to some internal error. 1765 * @throws javax.jms.IllegalStateException if this method is called on a 1766 * closed session. 1767 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE 1768 */ 1769 public void acknowledge() throws JMSException { 1770 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1771 ActiveMQMessageConsumer c = iter.next(); 1772 c.acknowledge(); 1773 } 1774 } 1775 1776 /** 1777 * Add a message consumer. 1778 * 1779 * @param consumer - message consumer. 1780 * @throws JMSException 1781 */ 1782 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1783 this.consumers.add(consumer); 1784 if (consumer.isDurableSubscriber()) { 1785 stats.onCreateDurableSubscriber(); 1786 } 1787 this.connection.addDispatcher(consumer.getConsumerId(), this); 1788 } 1789 1790 /** 1791 * Remove the message consumer. 1792 * 1793 * @param consumer - consumer to be removed. 1794 * @throws JMSException 1795 */ 1796 protected void removeConsumer(ActiveMQMessageConsumer consumer) { 1797 this.connection.removeDispatcher(consumer.getConsumerId()); 1798 if (consumer.isDurableSubscriber()) { 1799 stats.onRemoveDurableSubscriber(); 1800 } 1801 this.consumers.remove(consumer); 1802 this.connection.removeDispatcher(consumer); 1803 } 1804 1805 /** 1806 * Adds a message producer. 1807 * 1808 * @param producer - message producer to be added. 1809 * @throws JMSException 1810 */ 1811 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1812 this.producers.add(producer); 1813 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); 1814 } 1815 1816 /** 1817 * Removes a message producer. 1818 * 1819 * @param producer - message producer to be removed. 1820 * @throws JMSException 1821 */ 1822 protected void removeProducer(ActiveMQMessageProducer producer) { 1823 this.connection.removeProducer(producer.getProducerInfo().getProducerId()); 1824 this.producers.remove(producer); 1825 } 1826 1827 /** 1828 * Start this Session. 1829 * 1830 * @throws JMSException 1831 */ 1832 protected void start() throws JMSException { 1833 started.set(true); 1834 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1835 ActiveMQMessageConsumer c = iter.next(); 1836 c.start(); 1837 } 1838 executor.start(); 1839 } 1840 1841 /** 1842 * Stops this session. 1843 * 1844 * @throws JMSException 1845 */ 1846 protected void stop() throws JMSException { 1847 1848 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1849 ActiveMQMessageConsumer c = iter.next(); 1850 c.stop(); 1851 } 1852 1853 started.set(false); 1854 executor.stop(); 1855 } 1856 1857 /** 1858 * Returns the session id. 1859 * 1860 * @return value - session id. 1861 */ 1862 protected SessionId getSessionId() { 1863 return info.getSessionId(); 1864 } 1865 1866 /** 1867 * @return 1868 */ 1869 protected ConsumerId getNextConsumerId() { 1870 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); 1871 } 1872 1873 /** 1874 * @return 1875 */ 1876 protected ProducerId getNextProducerId() { 1877 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); 1878 } 1879 1880 /** 1881 * Sends the message for dispatch by the broker. 1882 * 1883 * 1884 * @param producer - message producer. 1885 * @param destination - message destination. 1886 * @param message - message to be sent. 1887 * @param deliveryMode - JMS messsage delivery mode. 1888 * @param priority - message priority. 1889 * @param timeToLive - message expiration. 1890 * @param producerWindow 1891 * @param onComplete 1892 * @throws JMSException 1893 */ 1894 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 1895 MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { 1896 1897 checkClosed(); 1898 if (destination.isTemporary() && connection.isDeleted(destination)) { 1899 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); 1900 } 1901 synchronized (sendMutex) { 1902 // tell the Broker we are about to start a new transaction 1903 doStartTransaction(); 1904 TransactionId txid = transactionContext.getTransactionId(); 1905 long sequenceNumber = producer.getMessageSequence(); 1906 1907 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 1908 message.setJMSDeliveryMode(deliveryMode); 1909 long expiration = 0L; 1910 if (!producer.getDisableMessageTimestamp()) { 1911 long timeStamp = System.currentTimeMillis(); 1912 message.setJMSTimestamp(timeStamp); 1913 if (timeToLive > 0) { 1914 expiration = timeToLive + timeStamp; 1915 } 1916 } 1917 message.setJMSExpiration(expiration); 1918 message.setJMSPriority(priority); 1919 message.setJMSRedelivered(false); 1920 1921 // transform to our own message format here 1922 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); 1923 msg.setDestination(destination); 1924 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1925 1926 // Set the message id. 1927 if (msg != message) { 1928 message.setJMSMessageID(msg.getMessageId().toString()); 1929 // Make sure the JMS destination is set on the foreign messages too. 1930 message.setJMSDestination(destination); 1931 } 1932 //clear the brokerPath in case we are re-sending this message 1933 msg.setBrokerPath(null); 1934 1935 msg.setTransactionId(txid); 1936 if (connection.isCopyMessageOnSend()) { 1937 msg = (ActiveMQMessage)msg.copy(); 1938 } 1939 msg.setConnection(connection); 1940 msg.onSend(); 1941 msg.setProducerId(msg.getMessageId().getProducerId()); 1942 if (LOG.isTraceEnabled()) { 1943 LOG.trace(getSessionId() + " sending message: " + msg); 1944 } 1945 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { 1946 this.connection.asyncSendPacket(msg); 1947 if (producerWindow != null) { 1948 // Since we defer lots of the marshaling till we hit the 1949 // wire, this might not 1950 // provide and accurate size. We may change over to doing 1951 // more aggressive marshaling, 1952 // to get more accurate sizes.. this is more important once 1953 // users start using producer window 1954 // flow control. 1955 int size = msg.getSize(); 1956 producerWindow.increaseUsage(size); 1957 } 1958 } else { 1959 if (sendTimeout > 0 && onComplete==null) { 1960 this.connection.syncSendPacket(msg,sendTimeout); 1961 }else { 1962 this.connection.syncSendPacket(msg, onComplete); 1963 } 1964 } 1965 1966 } 1967 } 1968 1969 /** 1970 * Send TransactionInfo to indicate transaction has started 1971 * 1972 * @throws JMSException if some internal error occurs 1973 */ 1974 protected void doStartTransaction() throws JMSException { 1975 if (getTransacted() && !transactionContext.isInXATransaction()) { 1976 transactionContext.begin(); 1977 } 1978 } 1979 1980 /** 1981 * Checks whether the session has unconsumed messages. 1982 * 1983 * @return true - if there are unconsumed messages. 1984 */ 1985 public boolean hasUncomsumedMessages() { 1986 return executor.hasUncomsumedMessages(); 1987 } 1988 1989 /** 1990 * Checks whether the session uses transactions. 1991 * 1992 * @return true - if the session uses transactions. 1993 */ 1994 public boolean isTransacted() { 1995 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); 1996 } 1997 1998 /** 1999 * Checks whether the session used client acknowledgment. 2000 * 2001 * @return true - if the session uses client acknowledgment. 2002 */ 2003 protected boolean isClientAcknowledge() { 2004 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; 2005 } 2006 2007 /** 2008 * Checks whether the session used auto acknowledgment. 2009 * 2010 * @return true - if the session uses client acknowledgment. 2011 */ 2012 public boolean isAutoAcknowledge() { 2013 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; 2014 } 2015 2016 /** 2017 * Checks whether the session used dup ok acknowledgment. 2018 * 2019 * @return true - if the session uses client acknowledgment. 2020 */ 2021 public boolean isDupsOkAcknowledge() { 2022 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; 2023 } 2024 2025 public boolean isIndividualAcknowledge(){ 2026 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; 2027 } 2028 2029 /** 2030 * Returns the message delivery listener. 2031 * 2032 * @return deliveryListener - message delivery listener. 2033 */ 2034 public DeliveryListener getDeliveryListener() { 2035 return deliveryListener; 2036 } 2037 2038 /** 2039 * Sets the message delivery listener. 2040 * 2041 * @param deliveryListener - message delivery listener. 2042 */ 2043 public void setDeliveryListener(DeliveryListener deliveryListener) { 2044 this.deliveryListener = deliveryListener; 2045 } 2046 2047 /** 2048 * Returns the SessionInfo bean. 2049 * 2050 * @return info - SessionInfo bean. 2051 * @throws JMSException 2052 */ 2053 protected SessionInfo getSessionInfo() throws JMSException { 2054 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); 2055 return info; 2056 } 2057 2058 /** 2059 * Send the asynchronus command. 2060 * 2061 * @param command - command to be executed. 2062 * @throws JMSException 2063 */ 2064 public void asyncSendPacket(Command command) throws JMSException { 2065 connection.asyncSendPacket(command); 2066 } 2067 2068 /** 2069 * Send the synchronus command. 2070 * 2071 * @param command - command to be executed. 2072 * @return Response 2073 * @throws JMSException 2074 */ 2075 public Response syncSendPacket(Command command) throws JMSException { 2076 return connection.syncSendPacket(command); 2077 } 2078 2079 public long getNextDeliveryId() { 2080 return deliveryIdGenerator.getNextSequenceId(); 2081 } 2082 2083 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { 2084 2085 List<MessageDispatch> c = unconsumedMessages.removeAll(); 2086 for (MessageDispatch md : c) { 2087 this.connection.rollbackDuplicate(dispatcher, md.getMessage()); 2088 } 2089 Collections.reverse(c); 2090 2091 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) { 2092 MessageDispatch md = iter.next(); 2093 executor.executeFirst(md); 2094 } 2095 2096 } 2097 2098 public boolean isRunning() { 2099 return started.get(); 2100 } 2101 2102 public boolean isAsyncDispatch() { 2103 return asyncDispatch; 2104 } 2105 2106 public void setAsyncDispatch(boolean asyncDispatch) { 2107 this.asyncDispatch = asyncDispatch; 2108 } 2109 2110 /** 2111 * @return Returns the sessionAsyncDispatch. 2112 */ 2113 public boolean isSessionAsyncDispatch() { 2114 return sessionAsyncDispatch; 2115 } 2116 2117 /** 2118 * @param sessionAsyncDispatch The sessionAsyncDispatch to set. 2119 */ 2120 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) { 2121 this.sessionAsyncDispatch = sessionAsyncDispatch; 2122 } 2123 2124 public MessageTransformer getTransformer() { 2125 return transformer; 2126 } 2127 2128 public ActiveMQConnection getConnection() { 2129 return connection; 2130 } 2131 2132 /** 2133 * Sets the transformer used to transform messages before they are sent on 2134 * to the JMS bus or when they are received from the bus but before they are 2135 * delivered to the JMS client 2136 */ 2137 public void setTransformer(MessageTransformer transformer) { 2138 this.transformer = transformer; 2139 } 2140 2141 public BlobTransferPolicy getBlobTransferPolicy() { 2142 return blobTransferPolicy; 2143 } 2144 2145 /** 2146 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 2147 * OBjects) are transferred from producers to brokers to consumers 2148 */ 2149 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 2150 this.blobTransferPolicy = blobTransferPolicy; 2151 } 2152 2153 public List<MessageDispatch> getUnconsumedMessages() { 2154 return executor.getUnconsumedMessages(); 2155 } 2156 2157 @Override 2158 public String toString() { 2159 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "} " + sendMutex; 2160 } 2161 2162 public void checkMessageListener() throws JMSException { 2163 if (messageListener != null) { 2164 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2165 } 2166 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) { 2167 ActiveMQMessageConsumer consumer = i.next(); 2168 if (consumer.hasMessageListener()) { 2169 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2170 } 2171 } 2172 } 2173 2174 protected void setOptimizeAcknowledge(boolean value) { 2175 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2176 ActiveMQMessageConsumer c = iter.next(); 2177 c.setOptimizeAcknowledge(value); 2178 } 2179 } 2180 2181 protected void setPrefetchSize(ConsumerId id, int prefetch) { 2182 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2183 ActiveMQMessageConsumer c = iter.next(); 2184 if (c.getConsumerId().equals(id)) { 2185 c.setPrefetchSize(prefetch); 2186 break; 2187 } 2188 } 2189 } 2190 2191 protected void close(ConsumerId id) { 2192 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2193 ActiveMQMessageConsumer c = iter.next(); 2194 if (c.getConsumerId().equals(id)) { 2195 try { 2196 c.close(); 2197 } catch (JMSException e) { 2198 LOG.warn("Exception closing consumer", e); 2199 } 2200 LOG.warn("Closed consumer on Command, " + id); 2201 break; 2202 } 2203 } 2204 } 2205 2206 public boolean isInUse(ActiveMQTempDestination destination) { 2207 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2208 ActiveMQMessageConsumer c = iter.next(); 2209 if (c.isInUse(destination)) { 2210 return true; 2211 } 2212 } 2213 return false; 2214 } 2215 2216 /** 2217 * highest sequence id of the last message delivered by this session. 2218 * Passed to the broker in the close command, maintained by dispose() 2219 * @return lastDeliveredSequenceId 2220 */ 2221 public long getLastDeliveredSequenceId() { 2222 return lastDeliveredSequenceId; 2223 } 2224 2225 protected void sendAck(MessageAck ack) throws JMSException { 2226 sendAck(ack,false); 2227 } 2228 2229 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { 2230 if (lazy || connection.isSendAcksAsync() || getTransacted()) { 2231 asyncSendPacket(ack); 2232 } else { 2233 syncSendPacket(ack); 2234 } 2235 } 2236 2237 protected Scheduler getScheduler() throws JMSException { 2238 return this.connection.getScheduler(); 2239 } 2240 2241 protected ThreadPoolExecutor getConnectionExecutor() { 2242 return this.connectionExecutor; 2243 } 2244}