001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.File; 020import java.io.InputStream; 021import java.io.Serializable; 022import java.net.URL; 023import java.util.Collections; 024import java.util.Iterator; 025import java.util.List; 026import java.util.concurrent.CopyOnWriteArrayList; 027import java.util.concurrent.ThreadPoolExecutor; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030 031import javax.jms.BytesMessage; 032import javax.jms.Destination; 033import javax.jms.IllegalStateException; 034import javax.jms.InvalidDestinationException; 035import javax.jms.InvalidSelectorException; 036import javax.jms.JMSException; 037import javax.jms.MapMessage; 038import javax.jms.Message; 039import javax.jms.MessageConsumer; 040import javax.jms.MessageListener; 041import javax.jms.MessageProducer; 042import javax.jms.ObjectMessage; 043import javax.jms.Queue; 044import javax.jms.QueueBrowser; 045import javax.jms.QueueReceiver; 046import javax.jms.QueueSender; 047import javax.jms.QueueSession; 048import javax.jms.Session; 049import javax.jms.StreamMessage; 050import javax.jms.TemporaryQueue; 051import javax.jms.TemporaryTopic; 052import javax.jms.TextMessage; 053import javax.jms.Topic; 054import javax.jms.TopicPublisher; 055import javax.jms.TopicSession; 056import javax.jms.TopicSubscriber; 057import javax.jms.TransactionRolledBackException; 058 059import org.apache.activemq.blob.BlobDownloader; 060import org.apache.activemq.blob.BlobTransferPolicy; 061import org.apache.activemq.blob.BlobUploader; 062import org.apache.activemq.command.ActiveMQBlobMessage; 063import org.apache.activemq.command.ActiveMQBytesMessage; 064import org.apache.activemq.command.ActiveMQDestination; 065import org.apache.activemq.command.ActiveMQMapMessage; 066import org.apache.activemq.command.ActiveMQMessage; 067import org.apache.activemq.command.ActiveMQObjectMessage; 068import org.apache.activemq.command.ActiveMQQueue; 069import org.apache.activemq.command.ActiveMQStreamMessage; 070import org.apache.activemq.command.ActiveMQTempDestination; 071import org.apache.activemq.command.ActiveMQTempQueue; 072import org.apache.activemq.command.ActiveMQTempTopic; 073import org.apache.activemq.command.ActiveMQTextMessage; 074import org.apache.activemq.command.ActiveMQTopic; 075import org.apache.activemq.command.Command; 076import org.apache.activemq.command.ConsumerId; 077import org.apache.activemq.command.MessageAck; 078import org.apache.activemq.command.MessageDispatch; 079import org.apache.activemq.command.MessageId; 080import org.apache.activemq.command.ProducerId; 081import org.apache.activemq.command.RemoveInfo; 082import org.apache.activemq.command.Response; 083import org.apache.activemq.command.SessionId; 084import org.apache.activemq.command.SessionInfo; 085import org.apache.activemq.command.TransactionId; 086import org.apache.activemq.management.JMSSessionStatsImpl; 087import org.apache.activemq.management.StatsCapable; 088import org.apache.activemq.management.StatsImpl; 089import org.apache.activemq.thread.Scheduler; 090import org.apache.activemq.transaction.Synchronization; 091import org.apache.activemq.usage.MemoryUsage; 092import org.apache.activemq.util.Callback; 093import org.apache.activemq.util.LongSequenceGenerator; 094import org.slf4j.Logger; 095import org.slf4j.LoggerFactory; 096 097/** 098 * <P> 099 * A <CODE>Session</CODE> object is a single-threaded context for producing 100 * and consuming messages. Although it may allocate provider resources outside 101 * the Java virtual machine (JVM), it is considered a lightweight JMS object. 102 * <P> 103 * A session serves several purposes: 104 * <UL> 105 * <LI>It is a factory for its message producers and consumers. 106 * <LI>It supplies provider-optimized message factories. 107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and 108 * <CODE>TemporaryQueues</CODE>. 109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> 110 * objects for those clients that need to dynamically manipulate 111 * provider-specific destination names. 112 * <LI>It supports a single series of transactions that combine work spanning 113 * its producers and consumers into atomic units. 114 * <LI>It defines a serial order for the messages it consumes and the messages 115 * it produces. 116 * <LI>It retains messages it consumes until they have been acknowledged. 117 * <LI>It serializes execution of message listeners registered with its message 118 * consumers. 119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>. 120 * </UL> 121 * <P> 122 * A session can create and service multiple message producers and consumers. 123 * <P> 124 * One typical use is to have a thread block on a synchronous 125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then 126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s. 127 * <P> 128 * If a client desires to have one thread produce messages while others consume 129 * them, the client should use a separate session for its producing thread. 130 * <P> 131 * Once a connection has been started, any session with one or more registered 132 * message listeners is dedicated to the thread of control that delivers 133 * messages to it. It is erroneous for client code to use this session or any of 134 * its constituent objects from another thread of control. The only exception to 135 * this rule is the use of the session or connection <CODE>close</CODE> 136 * method. 137 * <P> 138 * It should be easy for most clients to partition their work naturally into 139 * sessions. This model allows clients to start simply and incrementally add 140 * message processing complexity as their need for concurrency grows. 141 * <P> 142 * The <CODE>close</CODE> method is the only session method that can be called 143 * while some other session method is being executed in another thread. 144 * <P> 145 * A session may be specified as transacted. Each transacted session supports a 146 * single series of transactions. Each transaction groups a set of message sends 147 * and a set of message receives into an atomic unit of work. In effect, 148 * transactions organize a session's input message stream and output message 149 * stream into series of atomic units. When a transaction commits, its atomic 150 * unit of input is acknowledged and its associated atomic unit of output is 151 * sent. If a transaction rollback is done, the transaction's sent messages are 152 * destroyed and the session's input is automatically recovered. 153 * <P> 154 * The content of a transaction's input and output units is simply those 155 * messages that have been produced and consumed within the session's current 156 * transaction. 157 * <P> 158 * A transaction is completed using either its session's <CODE>commit</CODE> 159 * method or its session's <CODE>rollback </CODE> method. The completion of a 160 * session's current transaction automatically begins the next. The result is 161 * that a transacted session always has a current transaction within which its 162 * work is done. 163 * <P> 164 * The Java Transaction Service (JTS) or some other transaction monitor may be 165 * used to combine a session's transaction with transactions on other resources 166 * (databases, other JMS sessions, etc.). Since Java distributed transactions 167 * are controlled via the Java Transaction API (JTA), use of the session's 168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is 169 * prohibited. 170 * <P> 171 * The JMS API does not require support for JTA; however, it does define how a 172 * provider supplies this support. 173 * <P> 174 * Although it is also possible for a JMS client to handle distributed 175 * transactions directly, it is unlikely that many JMS clients will do this. 176 * Support for JTA in the JMS API is targeted at systems vendors who will be 177 * integrating the JMS API into their application server products. 178 * 179 * 180 * @see javax.jms.Session 181 * @see javax.jms.QueueSession 182 * @see javax.jms.TopicSession 183 * @see javax.jms.XASession 184 */ 185public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { 186 187 /** 188 * Only acknowledge an individual message - using message.acknowledge() 189 * as opposed to CLIENT_ACKNOWLEDGE which 190 * acknowledges all messages consumed by a session at when acknowledge() 191 * is called 192 */ 193 public static final int INDIVIDUAL_ACKNOWLEDGE = 4; 194 public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE; 195 196 public static interface DeliveryListener { 197 void beforeDelivery(ActiveMQSession session, Message msg); 198 199 void afterDelivery(ActiveMQSession session, Message msg); 200 } 201 202 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class); 203 private final ThreadPoolExecutor connectionExecutor; 204 205 protected int acknowledgementMode; 206 protected final ActiveMQConnection connection; 207 protected final SessionInfo info; 208 protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 209 protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator(); 210 protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator(); 211 protected final ActiveMQSessionExecutor executor; 212 protected final AtomicBoolean started = new AtomicBoolean(false); 213 214 protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>(); 215 protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>(); 216 217 protected boolean closed; 218 private volatile boolean synchronizationRegistered; 219 protected boolean asyncDispatch; 220 protected boolean sessionAsyncDispatch; 221 protected final boolean debug; 222 protected final Object sendMutex = new Object(); 223 protected final Object redeliveryGuard = new Object(); 224 225 private final AtomicBoolean clearInProgress = new AtomicBoolean(); 226 227 private MessageListener messageListener; 228 private final JMSSessionStatsImpl stats; 229 private TransactionContext transactionContext; 230 private DeliveryListener deliveryListener; 231 private MessageTransformer transformer; 232 private BlobTransferPolicy blobTransferPolicy; 233 private long lastDeliveredSequenceId = -2; 234 235 /** 236 * Construct the Session 237 * 238 * @param connection 239 * @param sessionId 240 * @param acknowledgeMode n.b if transacted - the acknowledgeMode == 241 * Session.SESSION_TRANSACTED 242 * @param asyncDispatch 243 * @param sessionAsyncDispatch 244 * @throws JMSException on internal error 245 */ 246 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException { 247 this.debug = LOG.isDebugEnabled(); 248 this.connection = connection; 249 this.acknowledgementMode = acknowledgeMode; 250 this.asyncDispatch = asyncDispatch; 251 this.sessionAsyncDispatch = sessionAsyncDispatch; 252 this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue()); 253 setTransactionContext(new TransactionContext(connection)); 254 stats = new JMSSessionStatsImpl(producers, consumers); 255 this.connection.asyncSendPacket(info); 256 setTransformer(connection.getTransformer()); 257 setBlobTransferPolicy(connection.getBlobTransferPolicy()); 258 this.connectionExecutor=connection.getExecutor(); 259 this.executor = new ActiveMQSessionExecutor(this); 260 connection.addSession(this); 261 if (connection.isStarted()) { 262 start(); 263 } 264 265 } 266 267 protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException { 268 this(connection, sessionId, acknowledgeMode, asyncDispatch, true); 269 } 270 271 /** 272 * Sets the transaction context of the session. 273 * 274 * @param transactionContext - provides the means to control a JMS 275 * transaction. 276 */ 277 public void setTransactionContext(TransactionContext transactionContext) { 278 this.transactionContext = transactionContext; 279 } 280 281 /** 282 * Returns the transaction context of the session. 283 * 284 * @return transactionContext - session's transaction context. 285 */ 286 public TransactionContext getTransactionContext() { 287 return transactionContext; 288 } 289 290 /* 291 * (non-Javadoc) 292 * 293 * @see org.apache.activemq.management.StatsCapable#getStats() 294 */ 295 @Override 296 public StatsImpl getStats() { 297 return stats; 298 } 299 300 /** 301 * Returns the session's statistics. 302 * 303 * @return stats - session's statistics. 304 */ 305 public JMSSessionStatsImpl getSessionStats() { 306 return stats; 307 } 308 309 /** 310 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> 311 * object is used to send a message containing a stream of uninterpreted 312 * bytes. 313 * 314 * @return the an ActiveMQBytesMessage 315 * @throws JMSException if the JMS provider fails to create this message due 316 * to some internal error. 317 */ 318 @Override 319 public BytesMessage createBytesMessage() throws JMSException { 320 ActiveMQBytesMessage message = new ActiveMQBytesMessage(); 321 configureMessage(message); 322 return message; 323 } 324 325 /** 326 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> 327 * object is used to send a self-defining set of name-value pairs, where 328 * names are <CODE>String</CODE> objects and values are primitive values 329 * in the Java programming language. 330 * 331 * @return an ActiveMQMapMessage 332 * @throws JMSException if the JMS provider fails to create this message due 333 * to some internal error. 334 */ 335 @Override 336 public MapMessage createMapMessage() throws JMSException { 337 ActiveMQMapMessage message = new ActiveMQMapMessage(); 338 configureMessage(message); 339 return message; 340 } 341 342 /** 343 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> 344 * interface is the root interface of all JMS messages. A 345 * <CODE>Message</CODE> object holds all the standard message header 346 * information. It can be sent when a message containing only header 347 * information is sufficient. 348 * 349 * @return an ActiveMQMessage 350 * @throws JMSException if the JMS provider fails to create this message due 351 * to some internal error. 352 */ 353 @Override 354 public Message createMessage() throws JMSException { 355 ActiveMQMessage message = new ActiveMQMessage(); 356 configureMessage(message); 357 return message; 358 } 359 360 /** 361 * Creates an <CODE>ObjectMessage</CODE> object. An 362 * <CODE>ObjectMessage</CODE> object is used to send a message that 363 * contains a serializable Java object. 364 * 365 * @return an ActiveMQObjectMessage 366 * @throws JMSException if the JMS provider fails to create this message due 367 * to some internal error. 368 */ 369 @Override 370 public ObjectMessage createObjectMessage() throws JMSException { 371 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 372 configureMessage(message); 373 return message; 374 } 375 376 /** 377 * Creates an initialized <CODE>ObjectMessage</CODE> object. An 378 * <CODE>ObjectMessage</CODE> object is used to send a message that 379 * contains a serializable Java object. 380 * 381 * @param object the object to use to initialize this message 382 * @return an ActiveMQObjectMessage 383 * @throws JMSException if the JMS provider fails to create this message due 384 * to some internal error. 385 */ 386 @Override 387 public ObjectMessage createObjectMessage(Serializable object) throws JMSException { 388 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 389 configureMessage(message); 390 message.setObject(object); 391 return message; 392 } 393 394 /** 395 * Creates a <CODE>StreamMessage</CODE> object. A 396 * <CODE>StreamMessage</CODE> object is used to send a self-defining 397 * stream of primitive values in the Java programming language. 398 * 399 * @return an ActiveMQStreamMessage 400 * @throws JMSException if the JMS provider fails to create this message due 401 * to some internal error. 402 */ 403 @Override 404 public StreamMessage createStreamMessage() throws JMSException { 405 ActiveMQStreamMessage message = new ActiveMQStreamMessage(); 406 configureMessage(message); 407 return message; 408 } 409 410 /** 411 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> 412 * object is used to send a message containing a <CODE>String</CODE> 413 * object. 414 * 415 * @return an ActiveMQTextMessage 416 * @throws JMSException if the JMS provider fails to create this message due 417 * to some internal error. 418 */ 419 @Override 420 public TextMessage createTextMessage() throws JMSException { 421 ActiveMQTextMessage message = new ActiveMQTextMessage(); 422 configureMessage(message); 423 return message; 424 } 425 426 /** 427 * Creates an initialized <CODE>TextMessage</CODE> object. A 428 * <CODE>TextMessage</CODE> object is used to send a message containing a 429 * <CODE>String</CODE>. 430 * 431 * @param text the string used to initialize this message 432 * @return an ActiveMQTextMessage 433 * @throws JMSException if the JMS provider fails to create this message due 434 * to some internal error. 435 */ 436 @Override 437 public TextMessage createTextMessage(String text) throws JMSException { 438 ActiveMQTextMessage message = new ActiveMQTextMessage(); 439 message.setText(text); 440 configureMessage(message); 441 return message; 442 } 443 444 /** 445 * Creates an initialized <CODE>BlobMessage</CODE> object. A 446 * <CODE>BlobMessage</CODE> object is used to send a message containing a 447 * <CODE>URL</CODE> which points to some network addressible BLOB. 448 * 449 * @param url the network addressable URL used to pass directly to the 450 * consumer 451 * @return a BlobMessage 452 * @throws JMSException if the JMS provider fails to create this message due 453 * to some internal error. 454 */ 455 public BlobMessage createBlobMessage(URL url) throws JMSException { 456 return createBlobMessage(url, false); 457 } 458 459 /** 460 * Creates an initialized <CODE>BlobMessage</CODE> object. A 461 * <CODE>BlobMessage</CODE> object is used to send a message containing a 462 * <CODE>URL</CODE> which points to some network addressible BLOB. 463 * 464 * @param url the network addressable URL used to pass directly to the 465 * consumer 466 * @param deletedByBroker indicates whether or not the resource is deleted 467 * by the broker when the message is acknowledged 468 * @return a BlobMessage 469 * @throws JMSException if the JMS provider fails to create this message due 470 * to some internal error. 471 */ 472 public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException { 473 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 474 configureMessage(message); 475 message.setURL(url); 476 message.setDeletedByBroker(deletedByBroker); 477 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 478 return message; 479 } 480 481 /** 482 * Creates an initialized <CODE>BlobMessage</CODE> object. A 483 * <CODE>BlobMessage</CODE> object is used to send a message containing 484 * the <CODE>File</CODE> content. Before the message is sent the file 485 * conent will be uploaded to the broker or some other remote repository 486 * depending on the {@link #getBlobTransferPolicy()}. 487 * 488 * @param file the file to be uploaded to some remote repo (or the broker) 489 * depending on the strategy 490 * @return a BlobMessage 491 * @throws JMSException if the JMS provider fails to create this message due 492 * to some internal error. 493 */ 494 public BlobMessage createBlobMessage(File file) throws JMSException { 495 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 496 configureMessage(message); 497 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file)); 498 message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy()))); 499 message.setDeletedByBroker(true); 500 message.setName(file.getName()); 501 return message; 502 } 503 504 /** 505 * Creates an initialized <CODE>BlobMessage</CODE> object. A 506 * <CODE>BlobMessage</CODE> object is used to send a message containing 507 * the <CODE>File</CODE> content. Before the message is sent the file 508 * conent will be uploaded to the broker or some other remote repository 509 * depending on the {@link #getBlobTransferPolicy()}. 510 * 511 * @param in the stream to be uploaded to some remote repo (or the broker) 512 * depending on the strategy 513 * @return a BlobMessage 514 * @throws JMSException if the JMS provider fails to create this message due 515 * to some internal error. 516 */ 517 public BlobMessage createBlobMessage(InputStream in) throws JMSException { 518 ActiveMQBlobMessage message = new ActiveMQBlobMessage(); 519 configureMessage(message); 520 message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in)); 521 message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy())); 522 message.setDeletedByBroker(true); 523 return message; 524 } 525 526 /** 527 * Indicates whether the session is in transacted mode. 528 * 529 * @return true if the session is in transacted mode 530 * @throws JMSException if there is some internal error. 531 */ 532 @Override 533 public boolean getTransacted() throws JMSException { 534 checkClosed(); 535 return isTransacted(); 536 } 537 538 /** 539 * Returns the acknowledgement mode of the session. The acknowledgement mode 540 * is set at the time that the session is created. If the session is 541 * transacted, the acknowledgement mode is ignored. 542 * 543 * @return If the session is not transacted, returns the current 544 * acknowledgement mode for the session. If the session is 545 * transacted, returns SESSION_TRANSACTED. 546 * @throws JMSException 547 * @see javax.jms.Connection#createSession(boolean,int) 548 * @since 1.1 exception JMSException if there is some internal error. 549 */ 550 @Override 551 public int getAcknowledgeMode() throws JMSException { 552 checkClosed(); 553 return this.acknowledgementMode; 554 } 555 556 /** 557 * Commits all messages done in this transaction and releases any locks 558 * currently held. 559 * 560 * @throws JMSException if the JMS provider fails to commit the transaction 561 * due to some internal error. 562 * @throws TransactionRolledBackException if the transaction is rolled back 563 * due to some internal error during commit. 564 * @throws javax.jms.IllegalStateException if the method is not called by a 565 * transacted session. 566 */ 567 @Override 568 public void commit() throws JMSException { 569 checkClosed(); 570 if (!getTransacted()) { 571 throw new javax.jms.IllegalStateException("Not a transacted session"); 572 } 573 if (LOG.isDebugEnabled()) { 574 LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId()); 575 } 576 transactionContext.commit(); 577 } 578 579 /** 580 * Rolls back any messages done in this transaction and releases any locks 581 * currently held. 582 * 583 * @throws JMSException if the JMS provider fails to roll back the 584 * transaction due to some internal error. 585 * @throws javax.jms.IllegalStateException if the method is not called by a 586 * transacted session. 587 */ 588 @Override 589 public void rollback() throws JMSException { 590 checkClosed(); 591 if (!getTransacted()) { 592 throw new javax.jms.IllegalStateException("Not a transacted session"); 593 } 594 if (LOG.isDebugEnabled()) { 595 LOG.debug(getSessionId() + " Transaction Rollback, txid:" + transactionContext.getTransactionId()); 596 } 597 transactionContext.rollback(); 598 } 599 600 /** 601 * Closes the session. 602 * <P> 603 * Since a provider may allocate some resources on behalf of a session 604 * outside the JVM, clients should close the resources when they are not 605 * needed. Relying on garbage collection to eventually reclaim these 606 * resources may not be timely enough. 607 * <P> 608 * There is no need to close the producers and consumers of a closed 609 * session. 610 * <P> 611 * This call will block until a <CODE>receive</CODE> call or message 612 * listener in progress has completed. A blocked message consumer 613 * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session 614 * is closed. 615 * <P> 616 * Closing a transacted session must roll back the transaction in progress. 617 * <P> 618 * This method is the only <CODE>Session</CODE> method that can be called 619 * concurrently. 620 * <P> 621 * Invoking any other <CODE>Session</CODE> method on a closed session must 622 * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a 623 * closed session must <I>not </I> throw an exception. 624 * 625 * @throws JMSException if the JMS provider fails to close the session due 626 * to some internal error. 627 */ 628 @Override 629 public void close() throws JMSException { 630 if (!closed) { 631 if (getTransactionContext().isInXATransaction()) { 632 if (!synchronizationRegistered) { 633 synchronizationRegistered = true; 634 getTransactionContext().addSynchronization(new Synchronization() { 635 636 @Override 637 public void afterCommit() throws Exception { 638 doClose(); 639 synchronizationRegistered = false; 640 } 641 642 @Override 643 public void afterRollback() throws Exception { 644 doClose(); 645 synchronizationRegistered = false; 646 } 647 }); 648 } 649 650 } else { 651 doClose(); 652 } 653 } 654 } 655 656 private void doClose() throws JMSException { 657 dispose(); 658 RemoveInfo removeCommand = info.createRemoveCommand(); 659 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 660 connection.asyncSendPacket(removeCommand); 661 } 662 663 final AtomicInteger clearRequestsCounter = new AtomicInteger(0); 664 void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { 665 clearRequestsCounter.incrementAndGet(); 666 executor.clearMessagesInProgress(); 667 // we are called from inside the transport reconnection logic which involves us 668 // clearing all the connections' consumers dispatch and delivered lists. So rather 669 // than trying to grab a mutex (which could be already owned by the message listener 670 // calling the send or an ack) we allow it to complete in a separate thread via the 671 // scheduler and notify us via connection.transportInterruptionProcessingComplete() 672 // 673 // We must be careful though not to allow multiple calls to this method from a 674 // connection that is having issue becoming fully established from causing a large 675 // build up of scheduled tasks to clear the same consumers over and over. 676 if (consumers.isEmpty()) { 677 return; 678 } 679 680 if (clearInProgress.compareAndSet(false, true)) { 681 for (final ActiveMQMessageConsumer consumer : consumers) { 682 consumer.inProgressClearRequired(); 683 transportInterruptionProcessingComplete.incrementAndGet(); 684 try { 685 connection.getScheduler().executeAfterDelay(new Runnable() { 686 @Override 687 public void run() { 688 consumer.clearMessagesInProgress(); 689 }}, 0l); 690 } catch (JMSException e) { 691 connection.onClientInternalException(e); 692 } 693 } 694 695 try { 696 connection.getScheduler().executeAfterDelay(new Runnable() { 697 @Override 698 public void run() { 699 clearInProgress.set(false); 700 }}, 0l); 701 } catch (JMSException e) { 702 connection.onClientInternalException(e); 703 } 704 } 705 } 706 707 void deliverAcks() { 708 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 709 ActiveMQMessageConsumer consumer = iter.next(); 710 consumer.deliverAcks(); 711 } 712 } 713 714 public synchronized void dispose() throws JMSException { 715 if (!closed) { 716 717 try { 718 executor.close(); 719 720 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 721 ActiveMQMessageConsumer consumer = iter.next(); 722 consumer.setFailureError(connection.getFirstFailureError()); 723 consumer.dispose(); 724 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId()); 725 } 726 consumers.clear(); 727 728 for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) { 729 ActiveMQMessageProducer producer = iter.next(); 730 producer.dispose(); 731 } 732 producers.clear(); 733 734 try { 735 if (getTransactionContext().isInLocalTransaction()) { 736 rollback(); 737 } 738 } catch (JMSException e) { 739 } 740 741 } finally { 742 connection.removeSession(this); 743 this.transactionContext = null; 744 closed = true; 745 } 746 } 747 } 748 749 /** 750 * Checks that the session is not closed then configures the message 751 */ 752 protected void configureMessage(ActiveMQMessage message) throws IllegalStateException { 753 checkClosed(); 754 message.setConnection(connection); 755 } 756 757 /** 758 * Check if the session is closed. It is used for ensuring that the session 759 * is open before performing various operations. 760 * 761 * @throws IllegalStateException if the Session is closed 762 */ 763 protected void checkClosed() throws IllegalStateException { 764 if (closed) { 765 throw new IllegalStateException("The Session is closed"); 766 } 767 } 768 769 /** 770 * Checks if the session is closed. 771 * 772 * @return true if the session is closed, false otherwise. 773 */ 774 public boolean isClosed() { 775 return closed; 776 } 777 778 /** 779 * Stops message delivery in this session, and restarts message delivery 780 * with the oldest unacknowledged message. 781 * <P> 782 * All consumers deliver messages in a serial order. Acknowledging a 783 * received message automatically acknowledges all messages that have been 784 * delivered to the client. 785 * <P> 786 * Restarting a session causes it to take the following actions: 787 * <UL> 788 * <LI>Stop message delivery 789 * <LI>Mark all messages that might have been delivered but not 790 * acknowledged as "redelivered" 791 * <LI>Restart the delivery sequence including all unacknowledged messages 792 * that had been previously delivered. Redelivered messages do not have to 793 * be delivered in exactly their original delivery order. 794 * </UL> 795 * 796 * @throws JMSException if the JMS provider fails to stop and restart 797 * message delivery due to some internal error. 798 * @throws IllegalStateException if the method is called by a transacted 799 * session. 800 */ 801 @Override 802 public void recover() throws JMSException { 803 804 checkClosed(); 805 if (getTransacted()) { 806 throw new IllegalStateException("This session is transacted"); 807 } 808 809 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 810 ActiveMQMessageConsumer c = iter.next(); 811 c.rollback(); 812 } 813 814 } 815 816 /** 817 * Returns the session's distinguished message listener (optional). 818 * 819 * @return the message listener associated with this session 820 * @throws JMSException if the JMS provider fails to get the message 821 * listener due to an internal error. 822 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener) 823 * @see javax.jms.ServerSessionPool 824 * @see javax.jms.ServerSession 825 */ 826 @Override 827 public MessageListener getMessageListener() throws JMSException { 828 checkClosed(); 829 return this.messageListener; 830 } 831 832 /** 833 * Sets the session's distinguished message listener (optional). 834 * <P> 835 * When the distinguished message listener is set, no other form of message 836 * receipt in the session can be used; however, all forms of sending 837 * messages are still supported. 838 * <P> 839 * If this session has been closed, then an {@link IllegalStateException} is 840 * thrown, if trying to set a new listener. However setting the listener 841 * to <tt>null</tt> is allowed, to clear the listener, even if this session 842 * has been closed prior. 843 * <P> 844 * This is an expert facility not used by regular JMS clients. 845 * 846 * @param listener the message listener to associate with this session 847 * @throws JMSException if the JMS provider fails to set the message 848 * listener due to an internal error. 849 * @see javax.jms.Session#getMessageListener() 850 * @see javax.jms.ServerSessionPool 851 * @see javax.jms.ServerSession 852 */ 853 @Override 854 public void setMessageListener(MessageListener listener) throws JMSException { 855 // only check for closed if we set a new listener, as we allow to clear 856 // the listener, such as when an application is shutting down, and is 857 // no longer using a message listener on this session 858 if (listener != null) { 859 checkClosed(); 860 } 861 this.messageListener = listener; 862 863 if (listener != null) { 864 executor.setDispatchedBySessionPool(true); 865 } 866 } 867 868 /** 869 * Optional operation, intended to be used only by Application Servers, not 870 * by ordinary JMS clients. 871 * 872 * @see javax.jms.ServerSession 873 */ 874 @Override 875 public void run() { 876 MessageDispatch messageDispatch; 877 while ((messageDispatch = executor.dequeueNoWait()) != null) { 878 final MessageDispatch md = messageDispatch; 879 final ActiveMQMessage message = (ActiveMQMessage)md.getMessage(); 880 881 MessageAck earlyAck = null; 882 if (message.isExpired()) { 883 earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); 884 earlyAck.setFirstMessageId(message.getMessageId()); 885 } else if (connection.isDuplicate(ActiveMQSession.this, message)) { 886 LOG.debug("{} got duplicate: {}", this, message.getMessageId()); 887 earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 888 earlyAck.setFirstMessageId(md.getMessage().getMessageId()); 889 earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this)); 890 } 891 if (earlyAck != null) { 892 try { 893 asyncSendPacket(earlyAck); 894 } catch (Throwable t) { 895 LOG.error("error dispatching ack: {} ", earlyAck, t); 896 connection.onClientInternalException(t); 897 } finally { 898 continue; 899 } 900 } 901 902 if (isClientAcknowledge()||isIndividualAcknowledge()) { 903 message.setAcknowledgeCallback(new Callback() { 904 @Override 905 public void execute() throws Exception { 906 } 907 }); 908 } 909 910 if (deliveryListener != null) { 911 deliveryListener.beforeDelivery(this, message); 912 } 913 914 md.setDeliverySequenceId(getNextDeliveryId()); 915 lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId(); 916 917 final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); 918 919 final AtomicBoolean afterDeliveryError = new AtomicBoolean(false); 920 /* 921 * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched. 922 * We dont want the after deliver being called after the redeliver as it may cause some weird stuff. 923 * */ 924 synchronized (redeliveryGuard) { 925 try { 926 ack.setFirstMessageId(md.getMessage().getMessageId()); 927 doStartTransaction(); 928 ack.setTransactionId(getTransactionContext().getTransactionId()); 929 if (ack.getTransactionId() != null) { 930 getTransactionContext().addSynchronization(new Synchronization() { 931 932 final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); 933 934 @Override 935 public void beforeEnd() throws Exception { 936 // validate our consumer so we don't push stale acks that get ignored 937 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 938 LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection); 939 throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection); 940 } 941 LOG.trace("beforeEnd ack {}", ack); 942 sendAck(ack); 943 } 944 945 @Override 946 public void afterRollback() throws Exception { 947 LOG.trace("rollback {}", ack, new Throwable("here")); 948 // ensure we don't filter this as a duplicate 949 connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage()); 950 951 // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect 952 if (clearRequestsCounter.get() > clearRequestCount) { 953 LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport()); 954 return; 955 } 956 957 // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched 958 if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { 959 LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport()); 960 return; 961 } 962 963 RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); 964 int redeliveryCounter = md.getMessage().getRedeliveryCounter(); 965 if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES 966 && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) { 967 // We need to NACK the messages so that they get 968 // sent to the 969 // DLQ. 970 // Acknowledge the last message. 971 MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); 972 ack.setFirstMessageId(md.getMessage().getMessageId()); 973 ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy)); 974 asyncSendPacket(ack); 975 976 } else { 977 978 MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1); 979 ack.setFirstMessageId(md.getMessage().getMessageId()); 980 asyncSendPacket(ack); 981 982 // Figure out how long we should wait to resend 983 // this message. 984 long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay(); 985 for (int i = 0; i < redeliveryCounter; i++) { 986 redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay); 987 } 988 989 /* 990 * If we are a non blocking delivery then we need to stop the executor to avoid more 991 * messages being delivered, once the message is redelivered we can restart it. 992 * */ 993 if (!connection.isNonBlockingRedelivery()) { 994 LOG.debug("Blocking session until re-delivery..."); 995 executor.stop(); 996 } 997 998 connection.getScheduler().executeAfterDelay(new Runnable() { 999 1000 @Override 1001 public void run() { 1002 /* 1003 * wait for the first delivery to be complete, i.e. after delivery has been called. 1004 * */ 1005 synchronized (redeliveryGuard) { 1006 /* 1007 * If its non blocking then we can just dispatch in a new session. 1008 * */ 1009 if (connection.isNonBlockingRedelivery()) { 1010 ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); 1011 } else { 1012 /* 1013 * If there has been an error thrown during afterDelivery then the 1014 * endpoint will be marked as dead so redelivery will fail (and eventually 1015 * the session marked as stale), in this case we can only call dispatch 1016 * which will create a new session with a new endpoint. 1017 * */ 1018 if (afterDeliveryError.get()) { 1019 ((ActiveMQDispatcher) md.getConsumer()).dispatch(md); 1020 } else { 1021 executor.executeFirst(md); 1022 executor.start(); 1023 } 1024 } 1025 } 1026 } 1027 }, redeliveryDelay); 1028 } 1029 md.getMessage().onMessageRolledBack(); 1030 } 1031 }); 1032 } 1033 1034 LOG.trace("{} onMessage({})", this, message.getMessageId()); 1035 messageListener.onMessage(message); 1036 1037 } catch (Throwable e) { 1038 LOG.error("error dispatching message: ", e); 1039 1040 if (getTransactionContext().isInXATransaction()) { 1041 LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext()); 1042 getTransactionContext().setRollbackOnly(true); 1043 } 1044 1045 // A problem while invoking the MessageListener does not 1046 // in general indicate a problem with the connection to the broker, i.e. 1047 // it will usually be sufficient to let the afterDelivery() method either 1048 // commit or roll back in order to deal with the exception. 1049 // However, we notify any registered client internal exception listener 1050 // of the problem. 1051 connection.onClientInternalException(e); 1052 } finally { 1053 if (ack.getTransactionId() == null) { 1054 try { 1055 asyncSendPacket(ack); 1056 } catch (Throwable e) { 1057 connection.onClientInternalException(e); 1058 } 1059 } 1060 } 1061 1062 if (deliveryListener != null) { 1063 try { 1064 deliveryListener.afterDelivery(this, message); 1065 } catch (Throwable t) { 1066 LOG.debug("Unable to call after delivery", t); 1067 afterDeliveryError.set(true); 1068 throw new RuntimeException(t); 1069 } 1070 } 1071 } 1072 /* 1073 * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway. 1074 * It also needs to be outside the redelivery guard. 1075 * */ 1076 try { 1077 executor.waitForQueueRestart(); 1078 } catch (InterruptedException ex) { 1079 connection.onClientInternalException(ex); 1080 } 1081 } 1082 } 1083 1084 /** 1085 * Creates a <CODE>MessageProducer</CODE> to send messages to the 1086 * specified destination. 1087 * <P> 1088 * A client uses a <CODE>MessageProducer</CODE> object to send messages to 1089 * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both 1090 * inherit from <CODE>Destination</CODE>, they can be used in the 1091 * destination parameter to create a <CODE>MessageProducer</CODE> object. 1092 * 1093 * @param destination the <CODE>Destination</CODE> to send to, or null if 1094 * this is a producer which does not have a specified 1095 * destination. 1096 * @return the MessageProducer 1097 * @throws JMSException if the session fails to create a MessageProducer due 1098 * to some internal error. 1099 * @throws InvalidDestinationException if an invalid destination is 1100 * specified. 1101 * @since 1.1 1102 */ 1103 @Override 1104 public MessageProducer createProducer(Destination destination) throws JMSException { 1105 checkClosed(); 1106 if (destination instanceof CustomDestination) { 1107 CustomDestination customDestination = (CustomDestination)destination; 1108 return customDestination.createProducer(this); 1109 } 1110 int timeSendOut = connection.getSendTimeout(); 1111 return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut); 1112 } 1113 1114 /** 1115 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1116 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1117 * <CODE>Destination</CODE>, they can be used in the destination 1118 * parameter to create a <CODE>MessageConsumer</CODE>. 1119 * 1120 * @param destination the <CODE>Destination</CODE> to access. 1121 * @return the MessageConsumer 1122 * @throws JMSException if the session fails to create a consumer due to 1123 * some internal error. 1124 * @throws InvalidDestinationException if an invalid destination is 1125 * specified. 1126 * @since 1.1 1127 */ 1128 @Override 1129 public MessageConsumer createConsumer(Destination destination) throws JMSException { 1130 return createConsumer(destination, (String) null); 1131 } 1132 1133 /** 1134 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1135 * using a message selector. Since <CODE> Queue</CODE> and 1136 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1137 * can be used in the destination parameter to create a 1138 * <CODE>MessageConsumer</CODE>. 1139 * <P> 1140 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1141 * that have been sent to a destination. 1142 * 1143 * @param destination the <CODE>Destination</CODE> to access 1144 * @param messageSelector only messages with properties matching the message 1145 * selector expression are delivered. A value of null or an 1146 * empty string indicates that there is no message selector 1147 * for the message consumer. 1148 * @return the MessageConsumer 1149 * @throws JMSException if the session fails to create a MessageConsumer due 1150 * to some internal error. 1151 * @throws InvalidDestinationException if an invalid destination is 1152 * specified. 1153 * @throws InvalidSelectorException if the message selector is invalid. 1154 * @since 1.1 1155 */ 1156 @Override 1157 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { 1158 return createConsumer(destination, messageSelector, false); 1159 } 1160 1161 /** 1162 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. 1163 * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from 1164 * <CODE>Destination</CODE>, they can be used in the destination 1165 * parameter to create a <CODE>MessageConsumer</CODE>. 1166 * 1167 * @param destination the <CODE>Destination</CODE> to access. 1168 * @param messageListener the listener to use for async consumption of messages 1169 * @return the MessageConsumer 1170 * @throws JMSException if the session fails to create a consumer due to 1171 * some internal error. 1172 * @throws InvalidDestinationException if an invalid destination is 1173 * specified. 1174 * @since 1.1 1175 */ 1176 public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException { 1177 return createConsumer(destination, null, messageListener); 1178 } 1179 1180 /** 1181 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, 1182 * using a message selector. Since <CODE> Queue</CODE> and 1183 * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they 1184 * can be used in the destination parameter to create a 1185 * <CODE>MessageConsumer</CODE>. 1186 * <P> 1187 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1188 * that have been sent to a destination. 1189 * 1190 * @param destination the <CODE>Destination</CODE> to access 1191 * @param messageSelector only messages with properties matching the message 1192 * selector expression are delivered. A value of null or an 1193 * empty string indicates that there is no message selector 1194 * for the message consumer. 1195 * @param messageListener the listener to use for async consumption of messages 1196 * @return the MessageConsumer 1197 * @throws JMSException if the session fails to create a MessageConsumer due 1198 * to some internal error. 1199 * @throws InvalidDestinationException if an invalid destination is 1200 * specified. 1201 * @throws InvalidSelectorException if the message selector is invalid. 1202 * @since 1.1 1203 */ 1204 public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException { 1205 return createConsumer(destination, messageSelector, false, messageListener); 1206 } 1207 1208 /** 1209 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1210 * using a message selector. This method can specify whether messages 1211 * published by its own connection should be delivered to it, if the 1212 * destination is a topic. 1213 * <P> 1214 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1215 * <CODE>Destination</CODE>, they can be used in the destination 1216 * parameter to create a <CODE>MessageConsumer</CODE>. 1217 * <P> 1218 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1219 * that have been published to a destination. 1220 * <P> 1221 * In some cases, a connection may both publish and subscribe to a topic. 1222 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1223 * inhibit the delivery of messages published by its own connection. The 1224 * default value for this attribute is False. The <CODE>noLocal</CODE> 1225 * value must be supported by destinations that are topics. 1226 * 1227 * @param destination the <CODE>Destination</CODE> to access 1228 * @param messageSelector only messages with properties matching the message 1229 * selector expression are delivered. A value of null or an 1230 * empty string indicates that there is no message selector 1231 * for the message consumer. 1232 * @param noLocal - if true, and the destination is a topic, inhibits the 1233 * delivery of messages published by its own connection. The 1234 * behavior for <CODE>NoLocal</CODE> is not specified if 1235 * the destination is a queue. 1236 * @return the MessageConsumer 1237 * @throws JMSException if the session fails to create a MessageConsumer due 1238 * to some internal error. 1239 * @throws InvalidDestinationException if an invalid destination is 1240 * specified. 1241 * @throws InvalidSelectorException if the message selector is invalid. 1242 * @since 1.1 1243 */ 1244 @Override 1245 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { 1246 return createConsumer(destination, messageSelector, noLocal, null); 1247 } 1248 1249 /** 1250 * Creates <CODE>MessageConsumer</CODE> for the specified destination, 1251 * using a message selector. This method can specify whether messages 1252 * published by its own connection should be delivered to it, if the 1253 * destination is a topic. 1254 * <P> 1255 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from 1256 * <CODE>Destination</CODE>, they can be used in the destination 1257 * parameter to create a <CODE>MessageConsumer</CODE>. 1258 * <P> 1259 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages 1260 * that have been published to a destination. 1261 * <P> 1262 * In some cases, a connection may both publish and subscribe to a topic. 1263 * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to 1264 * inhibit the delivery of messages published by its own connection. The 1265 * default value for this attribute is False. The <CODE>noLocal</CODE> 1266 * value must be supported by destinations that are topics. 1267 * 1268 * @param destination the <CODE>Destination</CODE> to access 1269 * @param messageSelector only messages with properties matching the message 1270 * selector expression are delivered. A value of null or an 1271 * empty string indicates that there is no message selector 1272 * for the message consumer. 1273 * @param noLocal - if true, and the destination is a topic, inhibits the 1274 * delivery of messages published by its own connection. The 1275 * behavior for <CODE>NoLocal</CODE> is not specified if 1276 * the destination is a queue. 1277 * @param messageListener the listener to use for async consumption of messages 1278 * @return the MessageConsumer 1279 * @throws JMSException if the session fails to create a MessageConsumer due 1280 * to some internal error. 1281 * @throws InvalidDestinationException if an invalid destination is 1282 * specified. 1283 * @throws InvalidSelectorException if the message selector is invalid. 1284 * @since 1.1 1285 */ 1286 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException { 1287 checkClosed(); 1288 1289 if (destination instanceof CustomDestination) { 1290 CustomDestination customDestination = (CustomDestination)destination; 1291 return customDestination.createConsumer(this, messageSelector, noLocal); 1292 } 1293 1294 ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy(); 1295 int prefetch = 0; 1296 if (destination instanceof Topic) { 1297 prefetch = prefetchPolicy.getTopicPrefetch(); 1298 } else { 1299 prefetch = prefetchPolicy.getQueuePrefetch(); 1300 } 1301 ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination); 1302 return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector, 1303 prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener); 1304 } 1305 1306 /** 1307 * Creates a queue identity given a <CODE>Queue</CODE> name. 1308 * <P> 1309 * This facility is provided for the rare cases where clients need to 1310 * dynamically manipulate queue identity. It allows the creation of a queue 1311 * identity with a provider-specific name. Clients that depend on this 1312 * ability are not portable. 1313 * <P> 1314 * Note that this method is not for creating the physical queue. The 1315 * physical creation of queues is an administrative task and is not to be 1316 * initiated by the JMS API. The one exception is the creation of temporary 1317 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> 1318 * method. 1319 * 1320 * @param queueName the name of this <CODE>Queue</CODE> 1321 * @return a <CODE>Queue</CODE> with the given name 1322 * @throws JMSException if the session fails to create a queue due to some 1323 * internal error. 1324 * @since 1.1 1325 */ 1326 @Override 1327 public Queue createQueue(String queueName) throws JMSException { 1328 checkClosed(); 1329 if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1330 return new ActiveMQTempQueue(queueName); 1331 } 1332 return new ActiveMQQueue(queueName); 1333 } 1334 1335 /** 1336 * Creates a topic identity given a <CODE>Topic</CODE> name. 1337 * <P> 1338 * This facility is provided for the rare cases where clients need to 1339 * dynamically manipulate topic identity. This allows the creation of a 1340 * topic identity with a provider-specific name. Clients that depend on this 1341 * ability are not portable. 1342 * <P> 1343 * Note that this method is not for creating the physical topic. The 1344 * physical creation of topics is an administrative task and is not to be 1345 * initiated by the JMS API. The one exception is the creation of temporary 1346 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> 1347 * method. 1348 * 1349 * @param topicName the name of this <CODE>Topic</CODE> 1350 * @return a <CODE>Topic</CODE> with the given name 1351 * @throws JMSException if the session fails to create a topic due to some 1352 * internal error. 1353 * @since 1.1 1354 */ 1355 @Override 1356 public Topic createTopic(String topicName) throws JMSException { 1357 checkClosed(); 1358 if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { 1359 return new ActiveMQTempTopic(topicName); 1360 } 1361 return new ActiveMQTopic(topicName); 1362 } 1363 1364 /** 1365 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1366 * the specified queue. 1367 * 1368 * @param queue the <CODE>queue</CODE> to access 1369 * @exception InvalidDestinationException if an invalid destination is 1370 * specified 1371 * @since 1.1 1372 */ 1373 /** 1374 * Creates a durable subscriber to the specified topic. 1375 * <P> 1376 * If a client needs to receive all the messages published on a topic, 1377 * including the ones published while the subscriber is inactive, it uses a 1378 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1379 * record of this durable subscription and insures that all messages from 1380 * the topic's publishers are retained until they are acknowledged by this 1381 * durable subscriber or they have expired. 1382 * <P> 1383 * Sessions with durable subscribers must always provide the same client 1384 * identifier. In addition, each client must specify a name that uniquely 1385 * identifies (within client identifier) each durable subscription it 1386 * creates. Only one session at a time can have a 1387 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. 1388 * <P> 1389 * A client can change an existing durable subscription by creating a 1390 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1391 * and/or message selector. Changing a durable subscriber is equivalent to 1392 * unsubscribing (deleting) the old one and creating a new one. 1393 * <P> 1394 * In some cases, a connection may both publish and subscribe to a topic. 1395 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1396 * inhibit the delivery of messages published by its own connection. The 1397 * default value for this attribute is false. 1398 * 1399 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1400 * @param name the name used to identify this subscription 1401 * @return the TopicSubscriber 1402 * @throws JMSException if the session fails to create a subscriber due to 1403 * some internal error. 1404 * @throws InvalidDestinationException if an invalid topic is specified. 1405 * @since 1.1 1406 */ 1407 @Override 1408 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { 1409 checkClosed(); 1410 return createDurableSubscriber(topic, name, null, false); 1411 } 1412 1413 /** 1414 * Creates a durable subscriber to the specified topic, using a message 1415 * selector and specifying whether messages published by its own connection 1416 * should be delivered to it. 1417 * <P> 1418 * If a client needs to receive all the messages published on a topic, 1419 * including the ones published while the subscriber is inactive, it uses a 1420 * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a 1421 * record of this durable subscription and insures that all messages from 1422 * the topic's publishers are retained until they are acknowledged by this 1423 * durable subscriber or they have expired. 1424 * <P> 1425 * Sessions with durable subscribers must always provide the same client 1426 * identifier. In addition, each client must specify a name which uniquely 1427 * identifies (within client identifier) each durable subscription it 1428 * creates. Only one session at a time can have a 1429 * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An 1430 * inactive durable subscriber is one that exists but does not currently 1431 * have a message consumer associated with it. 1432 * <P> 1433 * A client can change an existing durable subscription by creating a 1434 * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic 1435 * and/or message selector. Changing a durable subscriber is equivalent to 1436 * unsubscribing (deleting) the old one and creating a new one. 1437 * 1438 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to 1439 * @param name the name used to identify this subscription 1440 * @param messageSelector only messages with properties matching the message 1441 * selector expression are delivered. A value of null or an 1442 * empty string indicates that there is no message selector 1443 * for the message consumer. 1444 * @param noLocal if set, inhibits the delivery of messages published by its 1445 * own connection 1446 * @return the Queue Browser 1447 * @throws JMSException if the session fails to create a subscriber due to 1448 * some internal error. 1449 * @throws InvalidDestinationException if an invalid topic is specified. 1450 * @throws InvalidSelectorException if the message selector is invalid. 1451 * @since 1.1 1452 */ 1453 @Override 1454 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { 1455 checkClosed(); 1456 1457 if (topic == null) { 1458 throw new InvalidDestinationException("Topic cannot be null"); 1459 } 1460 1461 if (topic instanceof CustomDestination) { 1462 CustomDestination customDestination = (CustomDestination)topic; 1463 return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); 1464 } 1465 1466 connection.checkClientIDWasManuallySpecified(); 1467 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1468 int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch(); 1469 int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit(); 1470 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit, 1471 noLocal, false, asyncDispatch); 1472 } 1473 1474 /** 1475 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1476 * the specified queue. 1477 * 1478 * @param queue the <CODE>queue</CODE> to access 1479 * @return the Queue Browser 1480 * @throws JMSException if the session fails to create a browser due to some 1481 * internal error. 1482 * @throws InvalidDestinationException if an invalid destination is 1483 * specified 1484 * @since 1.1 1485 */ 1486 @Override 1487 public QueueBrowser createBrowser(Queue queue) throws JMSException { 1488 checkClosed(); 1489 return createBrowser(queue, null); 1490 } 1491 1492 /** 1493 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on 1494 * the specified queue using a message selector. 1495 * 1496 * @param queue the <CODE>queue</CODE> to access 1497 * @param messageSelector only messages with properties matching the message 1498 * selector expression are delivered. A value of null or an 1499 * empty string indicates that there is no message selector 1500 * for the message consumer. 1501 * @return the Queue Browser 1502 * @throws JMSException if the session fails to create a browser due to some 1503 * internal error. 1504 * @throws InvalidDestinationException if an invalid destination is 1505 * specified 1506 * @throws InvalidSelectorException if the message selector is invalid. 1507 * @since 1.1 1508 */ 1509 @Override 1510 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { 1511 checkClosed(); 1512 return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); 1513 } 1514 1515 /** 1516 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that 1517 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1518 * 1519 * @return a temporary queue identity 1520 * @throws JMSException if the session fails to create a temporary queue due 1521 * to some internal error. 1522 * @since 1.1 1523 */ 1524 @Override 1525 public TemporaryQueue createTemporaryQueue() throws JMSException { 1526 checkClosed(); 1527 return (TemporaryQueue)connection.createTempDestination(false); 1528 } 1529 1530 /** 1531 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that 1532 * of the <CODE>Connection</CODE> unless it is deleted earlier. 1533 * 1534 * @return a temporary topic identity 1535 * @throws JMSException if the session fails to create a temporary topic due 1536 * to some internal error. 1537 * @since 1.1 1538 */ 1539 @Override 1540 public TemporaryTopic createTemporaryTopic() throws JMSException { 1541 checkClosed(); 1542 return (TemporaryTopic)connection.createTempDestination(true); 1543 } 1544 1545 /** 1546 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1547 * the specified queue. 1548 * 1549 * @param queue the <CODE>Queue</CODE> to access 1550 * @return 1551 * @throws JMSException if the session fails to create a receiver due to 1552 * some internal error. 1553 * @throws JMSException 1554 * @throws InvalidDestinationException if an invalid queue is specified. 1555 */ 1556 @Override 1557 public QueueReceiver createReceiver(Queue queue) throws JMSException { 1558 checkClosed(); 1559 return createReceiver(queue, null); 1560 } 1561 1562 /** 1563 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from 1564 * the specified queue using a message selector. 1565 * 1566 * @param queue the <CODE>Queue</CODE> to access 1567 * @param messageSelector only messages with properties matching the message 1568 * selector expression are delivered. A value of null or an 1569 * empty string indicates that there is no message selector 1570 * for the message consumer. 1571 * @return QueueReceiver 1572 * @throws JMSException if the session fails to create a receiver due to 1573 * some internal error. 1574 * @throws InvalidDestinationException if an invalid queue is specified. 1575 * @throws InvalidSelectorException if the message selector is invalid. 1576 */ 1577 @Override 1578 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { 1579 checkClosed(); 1580 1581 if (queue instanceof CustomDestination) { 1582 CustomDestination customDestination = (CustomDestination)queue; 1583 return customDestination.createReceiver(this, messageSelector); 1584 } 1585 1586 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1587 return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(), 1588 prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch); 1589 } 1590 1591 /** 1592 * Creates a <CODE>QueueSender</CODE> object to send messages to the 1593 * specified queue. 1594 * 1595 * @param queue the <CODE>Queue</CODE> to access, or null if this is an 1596 * unidentified producer 1597 * @return QueueSender 1598 * @throws JMSException if the session fails to create a sender due to some 1599 * internal error. 1600 * @throws InvalidDestinationException if an invalid queue is specified. 1601 */ 1602 @Override 1603 public QueueSender createSender(Queue queue) throws JMSException { 1604 checkClosed(); 1605 if (queue instanceof CustomDestination) { 1606 CustomDestination customDestination = (CustomDestination)queue; 1607 return customDestination.createSender(this); 1608 } 1609 int timeSendOut = connection.getSendTimeout(); 1610 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut); 1611 } 1612 1613 /** 1614 * Creates a nondurable subscriber to the specified topic. <p/> 1615 * <P> 1616 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1617 * that have been published to a topic. <p/> 1618 * <P> 1619 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1620 * receive only messages that are published while they are active. <p/> 1621 * <P> 1622 * In some cases, a connection may both publish and subscribe to a topic. 1623 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1624 * inhibit the delivery of messages published by its own connection. The 1625 * default value for this attribute is false. 1626 * 1627 * @param topic the <CODE>Topic</CODE> to subscribe to 1628 * @return TopicSubscriber 1629 * @throws JMSException if the session fails to create a subscriber due to 1630 * some internal error. 1631 * @throws InvalidDestinationException if an invalid topic is specified. 1632 */ 1633 @Override 1634 public TopicSubscriber createSubscriber(Topic topic) throws JMSException { 1635 checkClosed(); 1636 return createSubscriber(topic, null, false); 1637 } 1638 1639 /** 1640 * Creates a nondurable subscriber to the specified topic, using a message 1641 * selector or specifying whether messages published by its own connection 1642 * should be delivered to it. <p/> 1643 * <P> 1644 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages 1645 * that have been published to a topic. <p/> 1646 * <P> 1647 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They 1648 * receive only messages that are published while they are active. <p/> 1649 * <P> 1650 * Messages filtered out by a subscriber's message selector will never be 1651 * delivered to the subscriber. From the subscriber's perspective, they do 1652 * not exist. <p/> 1653 * <P> 1654 * In some cases, a connection may both publish and subscribe to a topic. 1655 * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to 1656 * inhibit the delivery of messages published by its own connection. The 1657 * default value for this attribute is false. 1658 * 1659 * @param topic the <CODE>Topic</CODE> to subscribe to 1660 * @param messageSelector only messages with properties matching the message 1661 * selector expression are delivered. A value of null or an 1662 * empty string indicates that there is no message selector 1663 * for the message consumer. 1664 * @param noLocal if set, inhibits the delivery of messages published by its 1665 * own connection 1666 * @return TopicSubscriber 1667 * @throws JMSException if the session fails to create a subscriber due to 1668 * some internal error. 1669 * @throws InvalidDestinationException if an invalid topic is specified. 1670 * @throws InvalidSelectorException if the message selector is invalid. 1671 */ 1672 @Override 1673 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { 1674 checkClosed(); 1675 1676 if (topic instanceof CustomDestination) { 1677 CustomDestination customDestination = (CustomDestination)topic; 1678 return customDestination.createSubscriber(this, messageSelector, noLocal); 1679 } 1680 1681 ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy(); 1682 return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy 1683 .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch); 1684 } 1685 1686 /** 1687 * Creates a publisher for the specified topic. <p/> 1688 * <P> 1689 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages 1690 * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on 1691 * a topic, it defines a new sequence of messages that have no ordering 1692 * relationship with the messages it has previously sent. 1693 * 1694 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is 1695 * an unidentified producer 1696 * @return TopicPublisher 1697 * @throws JMSException if the session fails to create a publisher due to 1698 * some internal error. 1699 * @throws InvalidDestinationException if an invalid topic is specified. 1700 */ 1701 @Override 1702 public TopicPublisher createPublisher(Topic topic) throws JMSException { 1703 checkClosed(); 1704 1705 if (topic instanceof CustomDestination) { 1706 CustomDestination customDestination = (CustomDestination)topic; 1707 return customDestination.createPublisher(this); 1708 } 1709 int timeSendOut = connection.getSendTimeout(); 1710 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut); 1711 } 1712 1713 /** 1714 * Unsubscribes a durable subscription that has been created by a client. 1715 * <P> 1716 * This method deletes the state being maintained on behalf of the 1717 * subscriber by its provider. 1718 * <P> 1719 * It is erroneous for a client to delete a durable subscription while there 1720 * is an active <CODE>MessageConsumer </CODE> or 1721 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 1722 * message is part of a pending transaction or has not been acknowledged in 1723 * the session. 1724 * 1725 * @param name the name used to identify this subscription 1726 * @throws JMSException if the session fails to unsubscribe to the durable 1727 * subscription due to some internal error. 1728 * @throws InvalidDestinationException if an invalid subscription name is 1729 * specified. 1730 * @since 1.1 1731 */ 1732 @Override 1733 public void unsubscribe(String name) throws JMSException { 1734 checkClosed(); 1735 connection.unsubscribe(name); 1736 } 1737 1738 @Override 1739 public void dispatch(MessageDispatch messageDispatch) { 1740 try { 1741 executor.execute(messageDispatch); 1742 } catch (InterruptedException e) { 1743 Thread.currentThread().interrupt(); 1744 connection.onClientInternalException(e); 1745 } 1746 } 1747 1748 /** 1749 * Acknowledges all consumed messages of the session of this consumed 1750 * message. 1751 * <P> 1752 * All consumed JMS messages support the <CODE>acknowledge</CODE> method 1753 * for use when a client has specified that its JMS session's consumed 1754 * messages are to be explicitly acknowledged. By invoking 1755 * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges 1756 * all messages consumed by the session that the message was delivered to. 1757 * <P> 1758 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted 1759 * sessions and sessions specified to use implicit acknowledgement modes. 1760 * <P> 1761 * A client may individually acknowledge each message as it is consumed, or 1762 * it may choose to acknowledge messages as an application-defined group 1763 * (which is done by calling acknowledge on the last received message of the 1764 * group, thereby acknowledging all messages consumed by the session.) 1765 * <P> 1766 * Messages that have been received but not acknowledged may be redelivered. 1767 * 1768 * @throws JMSException if the JMS provider fails to acknowledge the 1769 * messages due to some internal error. 1770 * @throws javax.jms.IllegalStateException if this method is called on a 1771 * closed session. 1772 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE 1773 */ 1774 public void acknowledge() throws JMSException { 1775 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1776 ActiveMQMessageConsumer c = iter.next(); 1777 c.acknowledge(); 1778 } 1779 } 1780 1781 /** 1782 * Add a message consumer. 1783 * 1784 * @param consumer - message consumer. 1785 * @throws JMSException 1786 */ 1787 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException { 1788 this.consumers.add(consumer); 1789 if (consumer.isDurableSubscriber()) { 1790 stats.onCreateDurableSubscriber(); 1791 } 1792 this.connection.addDispatcher(consumer.getConsumerId(), this); 1793 } 1794 1795 /** 1796 * Remove the message consumer. 1797 * 1798 * @param consumer - consumer to be removed. 1799 * @throws JMSException 1800 */ 1801 protected void removeConsumer(ActiveMQMessageConsumer consumer) { 1802 this.connection.removeDispatcher(consumer.getConsumerId()); 1803 if (consumer.isDurableSubscriber()) { 1804 stats.onRemoveDurableSubscriber(); 1805 } 1806 this.consumers.remove(consumer); 1807 this.connection.removeDispatcher(consumer); 1808 } 1809 1810 /** 1811 * Adds a message producer. 1812 * 1813 * @param producer - message producer to be added. 1814 * @throws JMSException 1815 */ 1816 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException { 1817 this.producers.add(producer); 1818 this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer); 1819 } 1820 1821 /** 1822 * Removes a message producer. 1823 * 1824 * @param producer - message producer to be removed. 1825 * @throws JMSException 1826 */ 1827 protected void removeProducer(ActiveMQMessageProducer producer) { 1828 this.connection.removeProducer(producer.getProducerInfo().getProducerId()); 1829 this.producers.remove(producer); 1830 } 1831 1832 /** 1833 * Start this Session. 1834 * 1835 * @throws JMSException 1836 */ 1837 protected void start() throws JMSException { 1838 started.set(true); 1839 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1840 ActiveMQMessageConsumer c = iter.next(); 1841 c.start(); 1842 } 1843 executor.start(); 1844 } 1845 1846 /** 1847 * Stops this session. 1848 * 1849 * @throws JMSException 1850 */ 1851 protected void stop() throws JMSException { 1852 1853 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 1854 ActiveMQMessageConsumer c = iter.next(); 1855 c.stop(); 1856 } 1857 1858 started.set(false); 1859 executor.stop(); 1860 } 1861 1862 /** 1863 * Returns the session id. 1864 * 1865 * @return value - session id. 1866 */ 1867 protected SessionId getSessionId() { 1868 return info.getSessionId(); 1869 } 1870 1871 /** 1872 * @return 1873 */ 1874 protected ConsumerId getNextConsumerId() { 1875 return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId()); 1876 } 1877 1878 /** 1879 * @return 1880 */ 1881 protected ProducerId getNextProducerId() { 1882 return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId()); 1883 } 1884 1885 /** 1886 * Sends the message for dispatch by the broker. 1887 * 1888 * 1889 * @param producer - message producer. 1890 * @param destination - message destination. 1891 * @param message - message to be sent. 1892 * @param deliveryMode - JMS messsage delivery mode. 1893 * @param priority - message priority. 1894 * @param timeToLive - message expiration. 1895 * @param producerWindow 1896 * @param onComplete 1897 * @throws JMSException 1898 */ 1899 protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, 1900 MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { 1901 1902 checkClosed(); 1903 if (destination.isTemporary() && connection.isDeleted(destination)) { 1904 throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination); 1905 } 1906 synchronized (sendMutex) { 1907 // tell the Broker we are about to start a new transaction 1908 doStartTransaction(); 1909 TransactionId txid = transactionContext.getTransactionId(); 1910 long sequenceNumber = producer.getMessageSequence(); 1911 1912 //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11 1913 message.setJMSDeliveryMode(deliveryMode); 1914 long expiration = 0L; 1915 if (!producer.getDisableMessageTimestamp()) { 1916 long timeStamp = System.currentTimeMillis(); 1917 message.setJMSTimestamp(timeStamp); 1918 if (timeToLive > 0) { 1919 expiration = timeToLive + timeStamp; 1920 } 1921 } 1922 message.setJMSExpiration(expiration); 1923 message.setJMSPriority(priority); 1924 message.setJMSRedelivered(false); 1925 1926 // transform to our own message format here 1927 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection); 1928 msg.setDestination(destination); 1929 msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber)); 1930 1931 // Set the message id. 1932 if (msg != message) { 1933 message.setJMSMessageID(msg.getMessageId().toString()); 1934 // Make sure the JMS destination is set on the foreign messages too. 1935 message.setJMSDestination(destination); 1936 } 1937 //clear the brokerPath in case we are re-sending this message 1938 msg.setBrokerPath(null); 1939 1940 msg.setTransactionId(txid); 1941 if (connection.isCopyMessageOnSend()) { 1942 msg = (ActiveMQMessage)msg.copy(); 1943 } 1944 msg.setConnection(connection); 1945 msg.onSend(); 1946 msg.setProducerId(msg.getMessageId().getProducerId()); 1947 if (LOG.isTraceEnabled()) { 1948 LOG.trace(getSessionId() + " sending message: " + msg); 1949 } 1950 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { 1951 this.connection.asyncSendPacket(msg); 1952 if (producerWindow != null) { 1953 // Since we defer lots of the marshaling till we hit the 1954 // wire, this might not 1955 // provide and accurate size. We may change over to doing 1956 // more aggressive marshaling, 1957 // to get more accurate sizes.. this is more important once 1958 // users start using producer window 1959 // flow control. 1960 int size = msg.getSize(); 1961 producerWindow.increaseUsage(size); 1962 } 1963 } else { 1964 if (sendTimeout > 0 && onComplete==null) { 1965 this.connection.syncSendPacket(msg,sendTimeout); 1966 }else { 1967 this.connection.syncSendPacket(msg, onComplete); 1968 } 1969 } 1970 1971 } 1972 } 1973 1974 /** 1975 * Send TransactionInfo to indicate transaction has started 1976 * 1977 * @throws JMSException if some internal error occurs 1978 */ 1979 protected void doStartTransaction() throws JMSException { 1980 if (getTransacted() && !transactionContext.isInXATransaction()) { 1981 transactionContext.begin(); 1982 } 1983 } 1984 1985 /** 1986 * Checks whether the session has unconsumed messages. 1987 * 1988 * @return true - if there are unconsumed messages. 1989 */ 1990 public boolean hasUncomsumedMessages() { 1991 return executor.hasUncomsumedMessages(); 1992 } 1993 1994 /** 1995 * Checks whether the session uses transactions. 1996 * 1997 * @return true - if the session uses transactions. 1998 */ 1999 public boolean isTransacted() { 2000 return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction()); 2001 } 2002 2003 /** 2004 * Checks whether the session used client acknowledgment. 2005 * 2006 * @return true - if the session uses client acknowledgment. 2007 */ 2008 protected boolean isClientAcknowledge() { 2009 return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE; 2010 } 2011 2012 /** 2013 * Checks whether the session used auto acknowledgment. 2014 * 2015 * @return true - if the session uses client acknowledgment. 2016 */ 2017 public boolean isAutoAcknowledge() { 2018 return acknowledgementMode == Session.AUTO_ACKNOWLEDGE; 2019 } 2020 2021 /** 2022 * Checks whether the session used dup ok acknowledgment. 2023 * 2024 * @return true - if the session uses client acknowledgment. 2025 */ 2026 public boolean isDupsOkAcknowledge() { 2027 return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE; 2028 } 2029 2030 public boolean isIndividualAcknowledge(){ 2031 return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE; 2032 } 2033 2034 /** 2035 * Returns the message delivery listener. 2036 * 2037 * @return deliveryListener - message delivery listener. 2038 */ 2039 public DeliveryListener getDeliveryListener() { 2040 return deliveryListener; 2041 } 2042 2043 /** 2044 * Sets the message delivery listener. 2045 * 2046 * @param deliveryListener - message delivery listener. 2047 */ 2048 public void setDeliveryListener(DeliveryListener deliveryListener) { 2049 this.deliveryListener = deliveryListener; 2050 } 2051 2052 /** 2053 * Returns the SessionInfo bean. 2054 * 2055 * @return info - SessionInfo bean. 2056 * @throws JMSException 2057 */ 2058 protected SessionInfo getSessionInfo() throws JMSException { 2059 SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue()); 2060 return info; 2061 } 2062 2063 /** 2064 * Send the asynchronus command. 2065 * 2066 * @param command - command to be executed. 2067 * @throws JMSException 2068 */ 2069 public void asyncSendPacket(Command command) throws JMSException { 2070 connection.asyncSendPacket(command); 2071 } 2072 2073 /** 2074 * Send the synchronus command. 2075 * 2076 * @param command - command to be executed. 2077 * @return Response 2078 * @throws JMSException 2079 */ 2080 public Response syncSendPacket(Command command) throws JMSException { 2081 return connection.syncSendPacket(command); 2082 } 2083 2084 public long getNextDeliveryId() { 2085 return deliveryIdGenerator.getNextSequenceId(); 2086 } 2087 2088 public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException { 2089 2090 List<MessageDispatch> c = unconsumedMessages.removeAll(); 2091 for (MessageDispatch md : c) { 2092 this.connection.rollbackDuplicate(dispatcher, md.getMessage()); 2093 } 2094 Collections.reverse(c); 2095 2096 for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) { 2097 MessageDispatch md = iter.next(); 2098 executor.executeFirst(md); 2099 } 2100 2101 } 2102 2103 public boolean isRunning() { 2104 return started.get(); 2105 } 2106 2107 public boolean isAsyncDispatch() { 2108 return asyncDispatch; 2109 } 2110 2111 public void setAsyncDispatch(boolean asyncDispatch) { 2112 this.asyncDispatch = asyncDispatch; 2113 } 2114 2115 /** 2116 * @return Returns the sessionAsyncDispatch. 2117 */ 2118 public boolean isSessionAsyncDispatch() { 2119 return sessionAsyncDispatch; 2120 } 2121 2122 /** 2123 * @param sessionAsyncDispatch The sessionAsyncDispatch to set. 2124 */ 2125 public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) { 2126 this.sessionAsyncDispatch = sessionAsyncDispatch; 2127 } 2128 2129 public MessageTransformer getTransformer() { 2130 return transformer; 2131 } 2132 2133 public ActiveMQConnection getConnection() { 2134 return connection; 2135 } 2136 2137 /** 2138 * Sets the transformer used to transform messages before they are sent on 2139 * to the JMS bus or when they are received from the bus but before they are 2140 * delivered to the JMS client 2141 */ 2142 public void setTransformer(MessageTransformer transformer) { 2143 this.transformer = transformer; 2144 } 2145 2146 public BlobTransferPolicy getBlobTransferPolicy() { 2147 return blobTransferPolicy; 2148 } 2149 2150 /** 2151 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 2152 * OBjects) are transferred from producers to brokers to consumers 2153 */ 2154 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 2155 this.blobTransferPolicy = blobTransferPolicy; 2156 } 2157 2158 public List<MessageDispatch> getUnconsumedMessages() { 2159 return executor.getUnconsumedMessages(); 2160 } 2161 2162 @Override 2163 public String toString() { 2164 return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "} " + sendMutex; 2165 } 2166 2167 public void checkMessageListener() throws JMSException { 2168 if (messageListener != null) { 2169 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2170 } 2171 for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) { 2172 ActiveMQMessageConsumer consumer = i.next(); 2173 if (consumer.hasMessageListener()) { 2174 throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set"); 2175 } 2176 } 2177 } 2178 2179 protected void setOptimizeAcknowledge(boolean value) { 2180 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2181 ActiveMQMessageConsumer c = iter.next(); 2182 c.setOptimizeAcknowledge(value); 2183 } 2184 } 2185 2186 protected void setPrefetchSize(ConsumerId id, int prefetch) { 2187 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2188 ActiveMQMessageConsumer c = iter.next(); 2189 if (c.getConsumerId().equals(id)) { 2190 c.setPrefetchSize(prefetch); 2191 break; 2192 } 2193 } 2194 } 2195 2196 protected void close(ConsumerId id) { 2197 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2198 ActiveMQMessageConsumer c = iter.next(); 2199 if (c.getConsumerId().equals(id)) { 2200 try { 2201 c.close(); 2202 } catch (JMSException e) { 2203 LOG.warn("Exception closing consumer", e); 2204 } 2205 LOG.warn("Closed consumer on Command, " + id); 2206 break; 2207 } 2208 } 2209 } 2210 2211 public boolean isInUse(ActiveMQTempDestination destination) { 2212 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 2213 ActiveMQMessageConsumer c = iter.next(); 2214 if (c.isInUse(destination)) { 2215 return true; 2216 } 2217 } 2218 return false; 2219 } 2220 2221 /** 2222 * highest sequence id of the last message delivered by this session. 2223 * Passed to the broker in the close command, maintained by dispose() 2224 * @return lastDeliveredSequenceId 2225 */ 2226 public long getLastDeliveredSequenceId() { 2227 return lastDeliveredSequenceId; 2228 } 2229 2230 protected void sendAck(MessageAck ack) throws JMSException { 2231 sendAck(ack,false); 2232 } 2233 2234 protected void sendAck(MessageAck ack, boolean lazy) throws JMSException { 2235 if (lazy || connection.isSendAcksAsync() || getTransacted()) { 2236 asyncSendPacket(ack); 2237 } else { 2238 syncSendPacket(ack); 2239 } 2240 } 2241 2242 protected Scheduler getScheduler() throws JMSException { 2243 return this.connection.getScheduler(); 2244 } 2245 2246 protected ThreadPoolExecutor getConnectionExecutor() { 2247 return this.connectionExecutor; 2248 } 2249}