001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.File;
020import java.io.InputStream;
021import java.io.Serializable;
022import java.net.URL;
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.List;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030
031import javax.jms.BytesMessage;
032import javax.jms.Destination;
033import javax.jms.IllegalStateException;
034import javax.jms.InvalidDestinationException;
035import javax.jms.InvalidSelectorException;
036import javax.jms.JMSException;
037import javax.jms.MapMessage;
038import javax.jms.Message;
039import javax.jms.MessageConsumer;
040import javax.jms.MessageListener;
041import javax.jms.MessageProducer;
042import javax.jms.ObjectMessage;
043import javax.jms.Queue;
044import javax.jms.QueueBrowser;
045import javax.jms.QueueReceiver;
046import javax.jms.QueueSender;
047import javax.jms.QueueSession;
048import javax.jms.Session;
049import javax.jms.StreamMessage;
050import javax.jms.TemporaryQueue;
051import javax.jms.TemporaryTopic;
052import javax.jms.TextMessage;
053import javax.jms.Topic;
054import javax.jms.TopicPublisher;
055import javax.jms.TopicSession;
056import javax.jms.TopicSubscriber;
057import javax.jms.TransactionRolledBackException;
058
059import org.apache.activemq.blob.BlobDownloader;
060import org.apache.activemq.blob.BlobTransferPolicy;
061import org.apache.activemq.blob.BlobUploader;
062import org.apache.activemq.command.ActiveMQBlobMessage;
063import org.apache.activemq.command.ActiveMQBytesMessage;
064import org.apache.activemq.command.ActiveMQDestination;
065import org.apache.activemq.command.ActiveMQMapMessage;
066import org.apache.activemq.command.ActiveMQMessage;
067import org.apache.activemq.command.ActiveMQObjectMessage;
068import org.apache.activemq.command.ActiveMQQueue;
069import org.apache.activemq.command.ActiveMQStreamMessage;
070import org.apache.activemq.command.ActiveMQTempDestination;
071import org.apache.activemq.command.ActiveMQTempQueue;
072import org.apache.activemq.command.ActiveMQTempTopic;
073import org.apache.activemq.command.ActiveMQTextMessage;
074import org.apache.activemq.command.ActiveMQTopic;
075import org.apache.activemq.command.Command;
076import org.apache.activemq.command.ConsumerId;
077import org.apache.activemq.command.MessageAck;
078import org.apache.activemq.command.MessageDispatch;
079import org.apache.activemq.command.MessageId;
080import org.apache.activemq.command.ProducerId;
081import org.apache.activemq.command.RemoveInfo;
082import org.apache.activemq.command.Response;
083import org.apache.activemq.command.SessionId;
084import org.apache.activemq.command.SessionInfo;
085import org.apache.activemq.command.TransactionId;
086import org.apache.activemq.management.JMSSessionStatsImpl;
087import org.apache.activemq.management.StatsCapable;
088import org.apache.activemq.management.StatsImpl;
089import org.apache.activemq.thread.Scheduler;
090import org.apache.activemq.transaction.Synchronization;
091import org.apache.activemq.usage.MemoryUsage;
092import org.apache.activemq.util.Callback;
093import org.apache.activemq.util.LongSequenceGenerator;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097/**
098 * <P>
099 * A <CODE>Session</CODE> object is a single-threaded context for producing
100 * and consuming messages. Although it may allocate provider resources outside
101 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
102 * <P>
103 * A session serves several purposes:
104 * <UL>
105 * <LI>It is a factory for its message producers and consumers.
106 * <LI>It supplies provider-optimized message factories.
107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
108 * <CODE>TemporaryQueues</CODE>.
109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
110 * objects for those clients that need to dynamically manipulate
111 * provider-specific destination names.
112 * <LI>It supports a single series of transactions that combine work spanning
113 * its producers and consumers into atomic units.
114 * <LI>It defines a serial order for the messages it consumes and the messages
115 * it produces.
116 * <LI>It retains messages it consumes until they have been acknowledged.
117 * <LI>It serializes execution of message listeners registered with its message
118 * consumers.
119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
120 * </UL>
121 * <P>
122 * A session can create and service multiple message producers and consumers.
123 * <P>
124 * One typical use is to have a thread block on a synchronous
125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
127 * <P>
128 * If a client desires to have one thread produce messages while others consume
129 * them, the client should use a separate session for its producing thread.
130 * <P>
131 * Once a connection has been started, any session with one or more registered
132 * message listeners is dedicated to the thread of control that delivers
133 * messages to it. It is erroneous for client code to use this session or any of
134 * its constituent objects from another thread of control. The only exception to
135 * this rule is the use of the session or connection <CODE>close</CODE>
136 * method.
137 * <P>
138 * It should be easy for most clients to partition their work naturally into
139 * sessions. This model allows clients to start simply and incrementally add
140 * message processing complexity as their need for concurrency grows.
141 * <P>
142 * The <CODE>close</CODE> method is the only session method that can be called
143 * while some other session method is being executed in another thread.
144 * <P>
145 * A session may be specified as transacted. Each transacted session supports a
146 * single series of transactions. Each transaction groups a set of message sends
147 * and a set of message receives into an atomic unit of work. In effect,
148 * transactions organize a session's input message stream and output message
149 * stream into series of atomic units. When a transaction commits, its atomic
150 * unit of input is acknowledged and its associated atomic unit of output is
151 * sent. If a transaction rollback is done, the transaction's sent messages are
152 * destroyed and the session's input is automatically recovered.
153 * <P>
154 * The content of a transaction's input and output units is simply those
155 * messages that have been produced and consumed within the session's current
156 * transaction.
157 * <P>
158 * A transaction is completed using either its session's <CODE>commit</CODE>
159 * method or its session's <CODE>rollback </CODE> method. The completion of a
160 * session's current transaction automatically begins the next. The result is
161 * that a transacted session always has a current transaction within which its
162 * work is done.
163 * <P>
164 * The Java Transaction Service (JTS) or some other transaction monitor may be
165 * used to combine a session's transaction with transactions on other resources
166 * (databases, other JMS sessions, etc.). Since Java distributed transactions
167 * are controlled via the Java Transaction API (JTA), use of the session's
168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
169 * prohibited.
170 * <P>
171 * The JMS API does not require support for JTA; however, it does define how a
172 * provider supplies this support.
173 * <P>
174 * Although it is also possible for a JMS client to handle distributed
175 * transactions directly, it is unlikely that many JMS clients will do this.
176 * Support for JTA in the JMS API is targeted at systems vendors who will be
177 * integrating the JMS API into their application server products.
178 *
179 *
180 * @see javax.jms.Session
181 * @see javax.jms.QueueSession
182 * @see javax.jms.TopicSession
183 * @see javax.jms.XASession
184 */
185public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
186
187    /**
188     * Only acknowledge an individual message - using message.acknowledge()
189     * as opposed to CLIENT_ACKNOWLEDGE which
190     * acknowledges all messages consumed by a session at when acknowledge()
191     * is called
192     */
193    public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
194    public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
195
196    public static interface DeliveryListener {
197        void beforeDelivery(ActiveMQSession session, Message msg);
198
199        void afterDelivery(ActiveMQSession session, Message msg);
200    }
201
202    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
203    private final ThreadPoolExecutor connectionExecutor;
204
205    protected int acknowledgementMode;
206    protected final ActiveMQConnection connection;
207    protected final SessionInfo info;
208    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
209    protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
210    protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
211    protected final ActiveMQSessionExecutor executor;
212    protected final AtomicBoolean started = new AtomicBoolean(false);
213
214    protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
215    protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
216
217    protected boolean closed;
218    private volatile boolean synchronizationRegistered;
219    protected boolean asyncDispatch;
220    protected boolean sessionAsyncDispatch;
221    protected final boolean debug;
222    protected final Object sendMutex = new Object();
223    protected final Object redeliveryGuard = new Object();
224
225    private final AtomicBoolean clearInProgress = new AtomicBoolean();
226
227    private MessageListener messageListener;
228    private final JMSSessionStatsImpl stats;
229    private TransactionContext transactionContext;
230    private DeliveryListener deliveryListener;
231    private MessageTransformer transformer;
232    private BlobTransferPolicy blobTransferPolicy;
233    private long lastDeliveredSequenceId = -2;
234
235    /**
236     * Construct the Session
237     *
238     * @param connection
239     * @param sessionId
240     * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
241     *                Session.SESSION_TRANSACTED
242     * @param asyncDispatch
243     * @param sessionAsyncDispatch
244     * @throws JMSException on internal error
245     */
246    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
247        this.debug = LOG.isDebugEnabled();
248        this.connection = connection;
249        this.acknowledgementMode = acknowledgeMode;
250        this.asyncDispatch = asyncDispatch;
251        this.sessionAsyncDispatch = sessionAsyncDispatch;
252        this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
253        setTransactionContext(new TransactionContext(connection));
254        stats = new JMSSessionStatsImpl(producers, consumers);
255        this.connection.asyncSendPacket(info);
256        setTransformer(connection.getTransformer());
257        setBlobTransferPolicy(connection.getBlobTransferPolicy());
258        this.connectionExecutor=connection.getExecutor();
259        this.executor = new ActiveMQSessionExecutor(this);
260        connection.addSession(this);
261        if (connection.isStarted()) {
262            start();
263        }
264
265    }
266
267    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
268        this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
269    }
270
271    /**
272     * Sets the transaction context of the session.
273     *
274     * @param transactionContext - provides the means to control a JMS
275     *                transaction.
276     */
277    public void setTransactionContext(TransactionContext transactionContext) {
278        this.transactionContext = transactionContext;
279    }
280
281    /**
282     * Returns the transaction context of the session.
283     *
284     * @return transactionContext - session's transaction context.
285     */
286    public TransactionContext getTransactionContext() {
287        return transactionContext;
288    }
289
290    /*
291     * (non-Javadoc)
292     *
293     * @see org.apache.activemq.management.StatsCapable#getStats()
294     */
295    @Override
296    public StatsImpl getStats() {
297        return stats;
298    }
299
300    /**
301     * Returns the session's statistics.
302     *
303     * @return stats - session's statistics.
304     */
305    public JMSSessionStatsImpl getSessionStats() {
306        return stats;
307    }
308
309    /**
310     * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
311     * object is used to send a message containing a stream of uninterpreted
312     * bytes.
313     *
314     * @return the an ActiveMQBytesMessage
315     * @throws JMSException if the JMS provider fails to create this message due
316     *                 to some internal error.
317     */
318    @Override
319    public BytesMessage createBytesMessage() throws JMSException {
320        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
321        configureMessage(message);
322        return message;
323    }
324
325    /**
326     * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
327     * object is used to send a self-defining set of name-value pairs, where
328     * names are <CODE>String</CODE> objects and values are primitive values
329     * in the Java programming language.
330     *
331     * @return an ActiveMQMapMessage
332     * @throws JMSException if the JMS provider fails to create this message due
333     *                 to some internal error.
334     */
335    @Override
336    public MapMessage createMapMessage() throws JMSException {
337        ActiveMQMapMessage message = new ActiveMQMapMessage();
338        configureMessage(message);
339        return message;
340    }
341
342    /**
343     * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
344     * interface is the root interface of all JMS messages. A
345     * <CODE>Message</CODE> object holds all the standard message header
346     * information. It can be sent when a message containing only header
347     * information is sufficient.
348     *
349     * @return an ActiveMQMessage
350     * @throws JMSException if the JMS provider fails to create this message due
351     *                 to some internal error.
352     */
353    @Override
354    public Message createMessage() throws JMSException {
355        ActiveMQMessage message = new ActiveMQMessage();
356        configureMessage(message);
357        return message;
358    }
359
360    /**
361     * Creates an <CODE>ObjectMessage</CODE> object. An
362     * <CODE>ObjectMessage</CODE> object is used to send a message that
363     * contains a serializable Java object.
364     *
365     * @return an ActiveMQObjectMessage
366     * @throws JMSException if the JMS provider fails to create this message due
367     *                 to some internal error.
368     */
369    @Override
370    public ObjectMessage createObjectMessage() throws JMSException {
371        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
372        configureMessage(message);
373        return message;
374    }
375
376    /**
377     * Creates an initialized <CODE>ObjectMessage</CODE> object. An
378     * <CODE>ObjectMessage</CODE> object is used to send a message that
379     * contains a serializable Java object.
380     *
381     * @param object the object to use to initialize this message
382     * @return an ActiveMQObjectMessage
383     * @throws JMSException if the JMS provider fails to create this message due
384     *                 to some internal error.
385     */
386    @Override
387    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
388        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
389        configureMessage(message);
390        message.setObject(object);
391        return message;
392    }
393
394    /**
395     * Creates a <CODE>StreamMessage</CODE> object. A
396     * <CODE>StreamMessage</CODE> object is used to send a self-defining
397     * stream of primitive values in the Java programming language.
398     *
399     * @return an ActiveMQStreamMessage
400     * @throws JMSException if the JMS provider fails to create this message due
401     *                 to some internal error.
402     */
403    @Override
404    public StreamMessage createStreamMessage() throws JMSException {
405        ActiveMQStreamMessage message = new ActiveMQStreamMessage();
406        configureMessage(message);
407        return message;
408    }
409
410    /**
411     * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
412     * object is used to send a message containing a <CODE>String</CODE>
413     * object.
414     *
415     * @return an ActiveMQTextMessage
416     * @throws JMSException if the JMS provider fails to create this message due
417     *                 to some internal error.
418     */
419    @Override
420    public TextMessage createTextMessage() throws JMSException {
421        ActiveMQTextMessage message = new ActiveMQTextMessage();
422        configureMessage(message);
423        return message;
424    }
425
426    /**
427     * Creates an initialized <CODE>TextMessage</CODE> object. A
428     * <CODE>TextMessage</CODE> object is used to send a message containing a
429     * <CODE>String</CODE>.
430     *
431     * @param text the string used to initialize this message
432     * @return an ActiveMQTextMessage
433     * @throws JMSException if the JMS provider fails to create this message due
434     *                 to some internal error.
435     */
436    @Override
437    public TextMessage createTextMessage(String text) throws JMSException {
438        ActiveMQTextMessage message = new ActiveMQTextMessage();
439        message.setText(text);
440        configureMessage(message);
441        return message;
442    }
443
444    /**
445     * Creates an initialized <CODE>BlobMessage</CODE> object. A
446     * <CODE>BlobMessage</CODE> object is used to send a message containing a
447     * <CODE>URL</CODE> which points to some network addressible BLOB.
448     *
449     * @param url the network addressable URL used to pass directly to the
450     *                consumer
451     * @return a BlobMessage
452     * @throws JMSException if the JMS provider fails to create this message due
453     *                 to some internal error.
454     */
455    public BlobMessage createBlobMessage(URL url) throws JMSException {
456        return createBlobMessage(url, false);
457    }
458
459    /**
460     * Creates an initialized <CODE>BlobMessage</CODE> object. A
461     * <CODE>BlobMessage</CODE> object is used to send a message containing a
462     * <CODE>URL</CODE> which points to some network addressible BLOB.
463     *
464     * @param url the network addressable URL used to pass directly to the
465     *                consumer
466     * @param deletedByBroker indicates whether or not the resource is deleted
467     *                by the broker when the message is acknowledged
468     * @return a BlobMessage
469     * @throws JMSException if the JMS provider fails to create this message due
470     *                 to some internal error.
471     */
472    public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
473        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
474        configureMessage(message);
475        message.setURL(url);
476        message.setDeletedByBroker(deletedByBroker);
477        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
478        return message;
479    }
480
481    /**
482     * Creates an initialized <CODE>BlobMessage</CODE> object. A
483     * <CODE>BlobMessage</CODE> object is used to send a message containing
484     * the <CODE>File</CODE> content. Before the message is sent the file
485     * conent will be uploaded to the broker or some other remote repository
486     * depending on the {@link #getBlobTransferPolicy()}.
487     *
488     * @param file the file to be uploaded to some remote repo (or the broker)
489     *                depending on the strategy
490     * @return a BlobMessage
491     * @throws JMSException if the JMS provider fails to create this message due
492     *                 to some internal error.
493     */
494    public BlobMessage createBlobMessage(File file) throws JMSException {
495        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
496        configureMessage(message);
497        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
498        message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
499        message.setDeletedByBroker(true);
500        message.setName(file.getName());
501        return message;
502    }
503
504    /**
505     * Creates an initialized <CODE>BlobMessage</CODE> object. A
506     * <CODE>BlobMessage</CODE> object is used to send a message containing
507     * the <CODE>File</CODE> content. Before the message is sent the file
508     * conent will be uploaded to the broker or some other remote repository
509     * depending on the {@link #getBlobTransferPolicy()}. <br/>
510     * <p>
511     * The caller of this method is responsible for closing the
512     * input stream that is used, however the stream can not be closed
513     * until <b>after</b> the message has been sent.  To have this class
514     * manage the stream and close it automatically, use the method
515     * {@link ActiveMQSession#createBlobMessage(File)}
516     *
517     * @param in the stream to be uploaded to some remote repo (or the broker)
518     *                depending on the strategy
519     * @return a BlobMessage
520     * @throws JMSException if the JMS provider fails to create this message due
521     *                 to some internal error.
522     */
523    public BlobMessage createBlobMessage(InputStream in) throws JMSException {
524        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
525        configureMessage(message);
526        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
527        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
528        message.setDeletedByBroker(true);
529        return message;
530    }
531
532    /**
533     * Indicates whether the session is in transacted mode.
534     *
535     * @return true if the session is in transacted mode
536     * @throws JMSException if there is some internal error.
537     */
538    @Override
539    public boolean getTransacted() throws JMSException {
540        checkClosed();
541        return isTransacted();
542    }
543
544    /**
545     * Returns the acknowledgement mode of the session. The acknowledgement mode
546     * is set at the time that the session is created. If the session is
547     * transacted, the acknowledgement mode is ignored.
548     *
549     * @return If the session is not transacted, returns the current
550     *         acknowledgement mode for the session. If the session is
551     *         transacted, returns SESSION_TRANSACTED.
552     * @throws JMSException
553     * @see javax.jms.Connection#createSession(boolean,int)
554     * @since 1.1 exception JMSException if there is some internal error.
555     */
556    @Override
557    public int getAcknowledgeMode() throws JMSException {
558        checkClosed();
559        return this.acknowledgementMode;
560    }
561
562    /**
563     * Commits all messages done in this transaction and releases any locks
564     * currently held.
565     *
566     * @throws JMSException if the JMS provider fails to commit the transaction
567     *                 due to some internal error.
568     * @throws TransactionRolledBackException if the transaction is rolled back
569     *                 due to some internal error during commit.
570     * @throws javax.jms.IllegalStateException if the method is not called by a
571     *                 transacted session.
572     */
573    @Override
574    public void commit() throws JMSException {
575        checkClosed();
576        if (!getTransacted()) {
577            throw new javax.jms.IllegalStateException("Not a transacted session");
578        }
579        if (LOG.isDebugEnabled()) {
580            LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
581        }
582        transactionContext.commit();
583    }
584
585    /**
586     * Rolls back any messages done in this transaction and releases any locks
587     * currently held.
588     *
589     * @throws JMSException if the JMS provider fails to roll back the
590     *                 transaction due to some internal error.
591     * @throws javax.jms.IllegalStateException if the method is not called by a
592     *                 transacted session.
593     */
594    @Override
595    public void rollback() throws JMSException {
596        checkClosed();
597        if (!getTransacted()) {
598            throw new javax.jms.IllegalStateException("Not a transacted session");
599        }
600        if (LOG.isDebugEnabled()) {
601            LOG.debug(getSessionId() + " Transaction Rollback, txid:"  + transactionContext.getTransactionId());
602        }
603        transactionContext.rollback();
604    }
605
606    /**
607     * Closes the session.
608     * <P>
609     * Since a provider may allocate some resources on behalf of a session
610     * outside the JVM, clients should close the resources when they are not
611     * needed. Relying on garbage collection to eventually reclaim these
612     * resources may not be timely enough.
613     * <P>
614     * There is no need to close the producers and consumers of a closed
615     * session.
616     * <P>
617     * This call will block until a <CODE>receive</CODE> call or message
618     * listener in progress has completed. A blocked message consumer
619     * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
620     * is closed.
621     * <P>
622     * Closing a transacted session must roll back the transaction in progress.
623     * <P>
624     * This method is the only <CODE>Session</CODE> method that can be called
625     * concurrently.
626     * <P>
627     * Invoking any other <CODE>Session</CODE> method on a closed session must
628     * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
629     * closed session must <I>not </I> throw an exception.
630     *
631     * @throws JMSException if the JMS provider fails to close the session due
632     *                 to some internal error.
633     */
634    @Override
635    public void close() throws JMSException {
636        if (!closed) {
637            if (getTransactionContext().isInXATransaction()) {
638                if (!synchronizationRegistered) {
639                    synchronizationRegistered = true;
640                    getTransactionContext().addSynchronization(new Synchronization() {
641
642                                        @Override
643                                        public void afterCommit() throws Exception {
644                                            doClose();
645                                            synchronizationRegistered = false;
646                                        }
647
648                                        @Override
649                                        public void afterRollback() throws Exception {
650                                            doClose();
651                                            synchronizationRegistered = false;
652                                        }
653                                    });
654                }
655
656            } else {
657                doClose();
658            }
659        }
660    }
661
662    private void doClose() throws JMSException {
663        dispose();
664        RemoveInfo removeCommand = info.createRemoveCommand();
665        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
666        connection.asyncSendPacket(removeCommand);
667    }
668
669    final AtomicInteger clearRequestsCounter = new AtomicInteger(0);
670    void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
671        clearRequestsCounter.incrementAndGet();
672        executor.clearMessagesInProgress();
673        // we are called from inside the transport reconnection logic which involves us
674        // clearing all the connections' consumers dispatch and delivered lists. So rather
675        // than trying to grab a mutex (which could be already owned by the message listener
676        // calling the send or an ack) we allow it to complete in a separate thread via the
677        // scheduler and notify us via connection.transportInterruptionProcessingComplete()
678        //
679        // We must be careful though not to allow multiple calls to this method from a
680        // connection that is having issue becoming fully established from causing a large
681        // build up of scheduled tasks to clear the same consumers over and over.
682        if (consumers.isEmpty()) {
683            return;
684        }
685
686        if (clearInProgress.compareAndSet(false, true)) {
687            for (final ActiveMQMessageConsumer consumer : consumers) {
688                consumer.inProgressClearRequired();
689                transportInterruptionProcessingComplete.incrementAndGet();
690                try {
691                    connection.getScheduler().executeAfterDelay(new Runnable() {
692                        @Override
693                        public void run() {
694                            consumer.clearMessagesInProgress();
695                        }}, 0l);
696                } catch (JMSException e) {
697                    connection.onClientInternalException(e);
698                }
699            }
700
701            try {
702                connection.getScheduler().executeAfterDelay(new Runnable() {
703                    @Override
704                    public void run() {
705                        clearInProgress.set(false);
706                    }}, 0l);
707            } catch (JMSException e) {
708                connection.onClientInternalException(e);
709            }
710        }
711    }
712
713    void deliverAcks() {
714        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
715            ActiveMQMessageConsumer consumer = iter.next();
716            consumer.deliverAcks();
717        }
718    }
719
720    public synchronized void dispose() throws JMSException {
721        if (!closed) {
722
723            try {
724                executor.close();
725
726                for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
727                    ActiveMQMessageConsumer consumer = iter.next();
728                    consumer.setFailureError(connection.getFirstFailureError());
729                    consumer.dispose();
730                    lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
731                }
732                consumers.clear();
733
734                for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
735                    ActiveMQMessageProducer producer = iter.next();
736                    producer.dispose();
737                }
738                producers.clear();
739
740                try {
741                    if (getTransactionContext().isInLocalTransaction()) {
742                        rollback();
743                    }
744                } catch (JMSException e) {
745                }
746
747            } finally {
748                connection.removeSession(this);
749                this.transactionContext = null;
750                closed = true;
751            }
752        }
753    }
754
755    /**
756     * Checks that the session is not closed then configures the message
757     */
758    protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
759        checkClosed();
760        message.setConnection(connection);
761    }
762
763    /**
764     * Check if the session is closed. It is used for ensuring that the session
765     * is open before performing various operations.
766     *
767     * @throws IllegalStateException if the Session is closed
768     */
769    protected void checkClosed() throws IllegalStateException {
770        if (closed) {
771            throw new IllegalStateException("The Session is closed");
772        }
773    }
774
775    /**
776     * Checks if the session is closed.
777     *
778     * @return true if the session is closed, false otherwise.
779     */
780    public boolean isClosed() {
781        return closed;
782    }
783
784    /**
785     * Stops message delivery in this session, and restarts message delivery
786     * with the oldest unacknowledged message.
787     * <P>
788     * All consumers deliver messages in a serial order. Acknowledging a
789     * received message automatically acknowledges all messages that have been
790     * delivered to the client.
791     * <P>
792     * Restarting a session causes it to take the following actions:
793     * <UL>
794     * <LI>Stop message delivery
795     * <LI>Mark all messages that might have been delivered but not
796     * acknowledged as "redelivered"
797     * <LI>Restart the delivery sequence including all unacknowledged messages
798     * that had been previously delivered. Redelivered messages do not have to
799     * be delivered in exactly their original delivery order.
800     * </UL>
801     *
802     * @throws JMSException if the JMS provider fails to stop and restart
803     *                 message delivery due to some internal error.
804     * @throws IllegalStateException if the method is called by a transacted
805     *                 session.
806     */
807    @Override
808    public void recover() throws JMSException {
809
810        checkClosed();
811        if (getTransacted()) {
812            throw new IllegalStateException("This session is transacted");
813        }
814
815        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
816            ActiveMQMessageConsumer c = iter.next();
817            c.rollback();
818        }
819
820    }
821
822    /**
823     * Returns the session's distinguished message listener (optional).
824     *
825     * @return the message listener associated with this session
826     * @throws JMSException if the JMS provider fails to get the message
827     *                 listener due to an internal error.
828     * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
829     * @see javax.jms.ServerSessionPool
830     * @see javax.jms.ServerSession
831     */
832    @Override
833    public MessageListener getMessageListener() throws JMSException {
834        checkClosed();
835        return this.messageListener;
836    }
837
838    /**
839     * Sets the session's distinguished message listener (optional).
840     * <P>
841     * When the distinguished message listener is set, no other form of message
842     * receipt in the session can be used; however, all forms of sending
843     * messages are still supported.
844     * <P>
845     * If this session has been closed, then an {@link IllegalStateException} is
846     * thrown, if trying to set a new listener. However setting the listener
847     * to <tt>null</tt> is allowed, to clear the listener, even if this session
848     * has been closed prior.
849     * <P>
850     * This is an expert facility not used by regular JMS clients.
851     *
852     * @param listener the message listener to associate with this session
853     * @throws JMSException if the JMS provider fails to set the message
854     *                 listener due to an internal error.
855     * @see javax.jms.Session#getMessageListener()
856     * @see javax.jms.ServerSessionPool
857     * @see javax.jms.ServerSession
858     */
859    @Override
860    public void setMessageListener(MessageListener listener) throws JMSException {
861        // only check for closed if we set a new listener, as we allow to clear
862        // the listener, such as when an application is shutting down, and is
863        // no longer using a message listener on this session
864        if (listener != null) {
865            checkClosed();
866        }
867        this.messageListener = listener;
868
869        if (listener != null) {
870            executor.setDispatchedBySessionPool(true);
871        }
872    }
873
874    /**
875     * Optional operation, intended to be used only by Application Servers, not
876     * by ordinary JMS clients.
877     *
878     * @see javax.jms.ServerSession
879     */
880    @Override
881    public void run() {
882        MessageDispatch messageDispatch;
883        while ((messageDispatch = executor.dequeueNoWait()) != null) {
884            final MessageDispatch md = messageDispatch;
885            final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
886
887            MessageAck earlyAck = null;
888            if (message.isExpired()) {
889                earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
890                earlyAck.setFirstMessageId(message.getMessageId());
891            } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
892                LOG.debug("{} got duplicate: {}", this, message.getMessageId());
893                earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
894                earlyAck.setFirstMessageId(md.getMessage().getMessageId());
895                earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
896            }
897            if (earlyAck != null) {
898                try {
899                    asyncSendPacket(earlyAck);
900                } catch (Throwable t) {
901                    LOG.error("error dispatching ack: {} ", earlyAck, t);
902                    connection.onClientInternalException(t);
903                } finally {
904                    continue;
905                }
906            }
907
908            if (isClientAcknowledge()||isIndividualAcknowledge()) {
909                message.setAcknowledgeCallback(new Callback() {
910                    @Override
911                    public void execute() throws Exception {
912                    }
913                });
914            }
915
916            if (deliveryListener != null) {
917                deliveryListener.beforeDelivery(this, message);
918            }
919
920            md.setDeliverySequenceId(getNextDeliveryId());
921            lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
922
923            final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
924
925            final AtomicBoolean afterDeliveryError = new AtomicBoolean(false);
926            /*
927            * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
928            * We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
929            * */
930            synchronized (redeliveryGuard) {
931                try {
932                    ack.setFirstMessageId(md.getMessage().getMessageId());
933                    doStartTransaction();
934                    ack.setTransactionId(getTransactionContext().getTransactionId());
935                    if (ack.getTransactionId() != null) {
936                        getTransactionContext().addSynchronization(new Synchronization() {
937
938                            final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
939
940                            @Override
941                            public void beforeEnd() throws Exception {
942                                // validate our consumer so we don't push stale acks that get ignored
943                                if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
944                                    LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
945                                    throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
946                                }
947                                LOG.trace("beforeEnd ack {}", ack);
948                                sendAck(ack);
949                            }
950
951                            @Override
952                            public void afterRollback() throws Exception {
953                                LOG.trace("rollback {}", ack, new Throwable("here"));
954                                // ensure we don't filter this as a duplicate
955                                connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
956
957                                // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
958                                if (clearRequestsCounter.get() > clearRequestCount) {
959                                    LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
960                                    return;
961                                }
962
963                                // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
964                                if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
965                                    LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
966                                    return;
967                                }
968
969                                RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
970                                int redeliveryCounter = md.getMessage().getRedeliveryCounter();
971                                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
972                                        && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
973                                    // We need to NACK the messages so that they get
974                                    // sent to the
975                                    // DLQ.
976                                    // Acknowledge the last message.
977                                    MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
978                                    ack.setFirstMessageId(md.getMessage().getMessageId());
979                                    ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
980                                    asyncSendPacket(ack);
981
982                                } else {
983
984                                    MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
985                                    ack.setFirstMessageId(md.getMessage().getMessageId());
986                                    asyncSendPacket(ack);
987
988                                    // Figure out how long we should wait to resend
989                                    // this message.
990                                    long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
991                                    for (int i = 0; i < redeliveryCounter; i++) {
992                                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
993                                    }
994
995                                    /*
996                                    * If we are a non blocking delivery then we need to stop the executor to avoid more
997                                    * messages being delivered, once the message is redelivered we can restart it.
998                                    * */
999                                    if (!connection.isNonBlockingRedelivery()) {
1000                                        LOG.debug("Blocking session until re-delivery...");
1001                                        executor.stop();
1002                                    }
1003
1004                                    connection.getScheduler().executeAfterDelay(new Runnable() {
1005
1006                                        @Override
1007                                        public void run() {
1008                                            /*
1009                                            * wait for the first delivery to be complete, i.e. after delivery has been called.
1010                                            * */
1011                                            synchronized (redeliveryGuard) {
1012                                                /*
1013                                                * If its non blocking then we can just dispatch in a new session.
1014                                                * */
1015                                                if (connection.isNonBlockingRedelivery()) {
1016                                                    ((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
1017                                                } else {
1018                                                    /*
1019                                                    * If there has been an error thrown during afterDelivery then the
1020                                                    * endpoint will be marked as dead so redelivery will fail (and eventually
1021                                                    * the session marked as stale), in this case we can only call dispatch
1022                                                    * which will create a new session with a new endpoint.
1023                                                    * */
1024                                                    if (afterDeliveryError.get()) {
1025                                                        ((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
1026                                                    } else {
1027                                                        executor.executeFirst(md);
1028                                                        executor.start();
1029                                                    }
1030                                                }
1031                                            }
1032                                        }
1033                                    }, redeliveryDelay);
1034                                }
1035                                md.getMessage().onMessageRolledBack();
1036                            }
1037                        });
1038                    }
1039
1040                    LOG.trace("{} onMessage({})", this, message.getMessageId());
1041                    messageListener.onMessage(message);
1042
1043                } catch (Throwable e) {
1044                    if (!isClosed()) {
1045                        LOG.error("{} error dispatching message: {} ", this, message.getMessageId(), e);
1046                    }
1047
1048                    if (getTransactionContext() != null && getTransactionContext().isInXATransaction()) {
1049                        LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext());
1050                        getTransactionContext().setRollbackOnly(true);
1051                    }
1052
1053                    // A problem while invoking the MessageListener does not
1054                    // in general indicate a problem with the connection to the broker, i.e.
1055                    // it will usually be sufficient to let the afterDelivery() method either
1056                    // commit or roll back in order to deal with the exception.
1057                    // However, we notify any registered client internal exception listener
1058                    // of the problem.
1059                    connection.onClientInternalException(e);
1060                } finally {
1061                    if (ack.getTransactionId() == null) {
1062                        try {
1063                            asyncSendPacket(ack);
1064                        } catch (Throwable e) {
1065                            connection.onClientInternalException(e);
1066                        }
1067                    }
1068                }
1069
1070                if (deliveryListener != null) {
1071                    try {
1072                        deliveryListener.afterDelivery(this, message);
1073                    } catch (Throwable t) {
1074                        LOG.debug("Unable to call after delivery", t);
1075                        afterDeliveryError.set(true);
1076                        throw new RuntimeException(t);
1077                    }
1078                }
1079            }
1080            /*
1081            * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway.
1082            * It also needs to be outside the redelivery guard.
1083            * */
1084            try {
1085                executor.waitForQueueRestart();
1086            } catch (InterruptedException ex) {
1087                connection.onClientInternalException(ex);
1088            }
1089        }
1090    }
1091
1092    /**
1093     * Creates a <CODE>MessageProducer</CODE> to send messages to the
1094     * specified destination.
1095     * <P>
1096     * A client uses a <CODE>MessageProducer</CODE> object to send messages to
1097     * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
1098     * inherit from <CODE>Destination</CODE>, they can be used in the
1099     * destination parameter to create a <CODE>MessageProducer</CODE> object.
1100     *
1101     * @param destination the <CODE>Destination</CODE> to send to, or null if
1102     *                this is a producer which does not have a specified
1103     *                destination.
1104     * @return the MessageProducer
1105     * @throws JMSException if the session fails to create a MessageProducer due
1106     *                 to some internal error.
1107     * @throws InvalidDestinationException if an invalid destination is
1108     *                 specified.
1109     * @since 1.1
1110     */
1111    @Override
1112    public MessageProducer createProducer(Destination destination) throws JMSException {
1113        checkClosed();
1114        if (destination instanceof CustomDestination) {
1115            CustomDestination customDestination = (CustomDestination)destination;
1116            return customDestination.createProducer(this);
1117        }
1118        int timeSendOut = connection.getSendTimeout();
1119        return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
1120    }
1121
1122    /**
1123     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1124     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1125     * <CODE>Destination</CODE>, they can be used in the destination
1126     * parameter to create a <CODE>MessageConsumer</CODE>.
1127     *
1128     * @param destination the <CODE>Destination</CODE> to access.
1129     * @return the MessageConsumer
1130     * @throws JMSException if the session fails to create a consumer due to
1131     *                 some internal error.
1132     * @throws InvalidDestinationException if an invalid destination is
1133     *                 specified.
1134     * @since 1.1
1135     */
1136    @Override
1137    public MessageConsumer createConsumer(Destination destination) throws JMSException {
1138        return createConsumer(destination, (String) null);
1139    }
1140
1141    /**
1142     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1143     * using a message selector. Since <CODE> Queue</CODE> and
1144     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1145     * can be used in the destination parameter to create a
1146     * <CODE>MessageConsumer</CODE>.
1147     * <P>
1148     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1149     * that have been sent to a destination.
1150     *
1151     * @param destination the <CODE>Destination</CODE> to access
1152     * @param messageSelector only messages with properties matching the message
1153     *                selector expression are delivered. A value of null or an
1154     *                empty string indicates that there is no message selector
1155     *                for the message consumer.
1156     * @return the MessageConsumer
1157     * @throws JMSException if the session fails to create a MessageConsumer due
1158     *                 to some internal error.
1159     * @throws InvalidDestinationException if an invalid destination is
1160     *                 specified.
1161     * @throws InvalidSelectorException if the message selector is invalid.
1162     * @since 1.1
1163     */
1164    @Override
1165    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
1166        return createConsumer(destination, messageSelector, false);
1167    }
1168
1169    /**
1170     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1171     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1172     * <CODE>Destination</CODE>, they can be used in the destination
1173     * parameter to create a <CODE>MessageConsumer</CODE>.
1174     *
1175     * @param destination the <CODE>Destination</CODE> to access.
1176     * @param messageListener the listener to use for async consumption of messages
1177     * @return the MessageConsumer
1178     * @throws JMSException if the session fails to create a consumer due to
1179     *                 some internal error.
1180     * @throws InvalidDestinationException if an invalid destination is
1181     *                 specified.
1182     * @since 1.1
1183     */
1184    public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
1185        return createConsumer(destination, null, messageListener);
1186    }
1187
1188    /**
1189     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1190     * using a message selector. Since <CODE> Queue</CODE> and
1191     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1192     * can be used in the destination parameter to create a
1193     * <CODE>MessageConsumer</CODE>.
1194     * <P>
1195     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1196     * that have been sent to a destination.
1197     *
1198     * @param destination the <CODE>Destination</CODE> to access
1199     * @param messageSelector only messages with properties matching the message
1200     *                selector expression are delivered. A value of null or an
1201     *                empty string indicates that there is no message selector
1202     *                for the message consumer.
1203     * @param messageListener the listener to use for async consumption of messages
1204     * @return the MessageConsumer
1205     * @throws JMSException if the session fails to create a MessageConsumer due
1206     *                 to some internal error.
1207     * @throws InvalidDestinationException if an invalid destination is
1208     *                 specified.
1209     * @throws InvalidSelectorException if the message selector is invalid.
1210     * @since 1.1
1211     */
1212    public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1213        return createConsumer(destination, messageSelector, false, messageListener);
1214    }
1215
1216    /**
1217     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1218     * using a message selector. This method can specify whether messages
1219     * published by its own connection should be delivered to it, if the
1220     * destination is a topic.
1221     * <P>
1222     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1223     * <CODE>Destination</CODE>, they can be used in the destination
1224     * parameter to create a <CODE>MessageConsumer</CODE>.
1225     * <P>
1226     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1227     * that have been published to a destination.
1228     * <P>
1229     * In some cases, a connection may both publish and subscribe to a topic.
1230     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1231     * inhibit the delivery of messages published by its own connection. The
1232     * default value for this attribute is False. The <CODE>noLocal</CODE>
1233     * value must be supported by destinations that are topics.
1234     *
1235     * @param destination the <CODE>Destination</CODE> to access
1236     * @param messageSelector only messages with properties matching the message
1237     *                selector expression are delivered. A value of null or an
1238     *                empty string indicates that there is no message selector
1239     *                for the message consumer.
1240     * @param noLocal - if true, and the destination is a topic, inhibits the
1241     *                delivery of messages published by its own connection. The
1242     *                behavior for <CODE>NoLocal</CODE> is not specified if
1243     *                the destination is a queue.
1244     * @return the MessageConsumer
1245     * @throws JMSException if the session fails to create a MessageConsumer due
1246     *                 to some internal error.
1247     * @throws InvalidDestinationException if an invalid destination is
1248     *                 specified.
1249     * @throws InvalidSelectorException if the message selector is invalid.
1250     * @since 1.1
1251     */
1252    @Override
1253    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1254        return createConsumer(destination, messageSelector, noLocal, null);
1255    }
1256
1257    /**
1258     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1259     * using a message selector. This method can specify whether messages
1260     * published by its own connection should be delivered to it, if the
1261     * destination is a topic.
1262     * <P>
1263     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1264     * <CODE>Destination</CODE>, they can be used in the destination
1265     * parameter to create a <CODE>MessageConsumer</CODE>.
1266     * <P>
1267     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1268     * that have been published to a destination.
1269     * <P>
1270     * In some cases, a connection may both publish and subscribe to a topic.
1271     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1272     * inhibit the delivery of messages published by its own connection. The
1273     * default value for this attribute is False. The <CODE>noLocal</CODE>
1274     * value must be supported by destinations that are topics.
1275     *
1276     * @param destination the <CODE>Destination</CODE> to access
1277     * @param messageSelector only messages with properties matching the message
1278     *                selector expression are delivered. A value of null or an
1279     *                empty string indicates that there is no message selector
1280     *                for the message consumer.
1281     * @param noLocal - if true, and the destination is a topic, inhibits the
1282     *                delivery of messages published by its own connection. The
1283     *                behavior for <CODE>NoLocal</CODE> is not specified if
1284     *                the destination is a queue.
1285     * @param messageListener the listener to use for async consumption of messages
1286     * @return the MessageConsumer
1287     * @throws JMSException if the session fails to create a MessageConsumer due
1288     *                 to some internal error.
1289     * @throws InvalidDestinationException if an invalid destination is
1290     *                 specified.
1291     * @throws InvalidSelectorException if the message selector is invalid.
1292     * @since 1.1
1293     */
1294    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1295        checkClosed();
1296
1297        if (destination instanceof CustomDestination) {
1298            CustomDestination customDestination = (CustomDestination)destination;
1299            return customDestination.createConsumer(this, messageSelector, noLocal);
1300        }
1301
1302        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1303        int prefetch = 0;
1304        if (destination instanceof Topic) {
1305            prefetch = prefetchPolicy.getTopicPrefetch();
1306        } else {
1307            prefetch = prefetchPolicy.getQueuePrefetch();
1308        }
1309        ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1310        return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1311                prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1312    }
1313
1314    /**
1315     * Creates a queue identity given a <CODE>Queue</CODE> name.
1316     * <P>
1317     * This facility is provided for the rare cases where clients need to
1318     * dynamically manipulate queue identity. It allows the creation of a queue
1319     * identity with a provider-specific name. Clients that depend on this
1320     * ability are not portable.
1321     * <P>
1322     * Note that this method is not for creating the physical queue. The
1323     * physical creation of queues is an administrative task and is not to be
1324     * initiated by the JMS API. The one exception is the creation of temporary
1325     * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1326     * method.
1327     *
1328     * @param queueName the name of this <CODE>Queue</CODE>
1329     * @return a <CODE>Queue</CODE> with the given name
1330     * @throws JMSException if the session fails to create a queue due to some
1331     *                 internal error.
1332     * @since 1.1
1333     */
1334    @Override
1335    public Queue createQueue(String queueName) throws JMSException {
1336        checkClosed();
1337        if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1338            return new ActiveMQTempQueue(queueName);
1339        }
1340        return new ActiveMQQueue(queueName);
1341    }
1342
1343    /**
1344     * Creates a topic identity given a <CODE>Topic</CODE> name.
1345     * <P>
1346     * This facility is provided for the rare cases where clients need to
1347     * dynamically manipulate topic identity. This allows the creation of a
1348     * topic identity with a provider-specific name. Clients that depend on this
1349     * ability are not portable.
1350     * <P>
1351     * Note that this method is not for creating the physical topic. The
1352     * physical creation of topics is an administrative task and is not to be
1353     * initiated by the JMS API. The one exception is the creation of temporary
1354     * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1355     * method.
1356     *
1357     * @param topicName the name of this <CODE>Topic</CODE>
1358     * @return a <CODE>Topic</CODE> with the given name
1359     * @throws JMSException if the session fails to create a topic due to some
1360     *                 internal error.
1361     * @since 1.1
1362     */
1363    @Override
1364    public Topic createTopic(String topicName) throws JMSException {
1365        checkClosed();
1366        if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1367            return new ActiveMQTempTopic(topicName);
1368        }
1369        return new ActiveMQTopic(topicName);
1370    }
1371
1372    /**
1373     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1374     * the specified queue.
1375     *
1376     * @param queue the <CODE>queue</CODE> to access
1377     * @exception InvalidDestinationException if an invalid destination is
1378     *                    specified
1379     * @since 1.1
1380     */
1381    /**
1382     * Creates a durable subscriber to the specified topic.
1383     * <P>
1384     * If a client needs to receive all the messages published on a topic,
1385     * including the ones published while the subscriber is inactive, it uses a
1386     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1387     * record of this durable subscription and insures that all messages from
1388     * the topic's publishers are retained until they are acknowledged by this
1389     * durable subscriber or they have expired.
1390     * <P>
1391     * Sessions with durable subscribers must always provide the same client
1392     * identifier. In addition, each client must specify a name that uniquely
1393     * identifies (within client identifier) each durable subscription it
1394     * creates. Only one session at a time can have a
1395     * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1396     * <P>
1397     * A client can change an existing durable subscription by creating a
1398     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1399     * and/or message selector. Changing a durable subscriber is equivalent to
1400     * unsubscribing (deleting) the old one and creating a new one.
1401     * <P>
1402     * In some cases, a connection may both publish and subscribe to a topic.
1403     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1404     * inhibit the delivery of messages published by its own connection. The
1405     * default value for this attribute is false.
1406     *
1407     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1408     * @param name the name used to identify this subscription
1409     * @return the TopicSubscriber
1410     * @throws JMSException if the session fails to create a subscriber due to
1411     *                 some internal error.
1412     * @throws InvalidDestinationException if an invalid topic is specified.
1413     * @since 1.1
1414     */
1415    @Override
1416    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1417        checkClosed();
1418        return createDurableSubscriber(topic, name, null, false);
1419    }
1420
1421    /**
1422     * Creates a durable subscriber to the specified topic, using a message
1423     * selector and specifying whether messages published by its own connection
1424     * should be delivered to it.
1425     * <P>
1426     * If a client needs to receive all the messages published on a topic,
1427     * including the ones published while the subscriber is inactive, it uses a
1428     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1429     * record of this durable subscription and insures that all messages from
1430     * the topic's publishers are retained until they are acknowledged by this
1431     * durable subscriber or they have expired.
1432     * <P>
1433     * Sessions with durable subscribers must always provide the same client
1434     * identifier. In addition, each client must specify a name which uniquely
1435     * identifies (within client identifier) each durable subscription it
1436     * creates. Only one session at a time can have a
1437     * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1438     * inactive durable subscriber is one that exists but does not currently
1439     * have a message consumer associated with it.
1440     * <P>
1441     * A client can change an existing durable subscription by creating a
1442     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1443     * and/or message selector. Changing a durable subscriber is equivalent to
1444     * unsubscribing (deleting) the old one and creating a new one.
1445     *
1446     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1447     * @param name the name used to identify this subscription
1448     * @param messageSelector only messages with properties matching the message
1449     *                selector expression are delivered. A value of null or an
1450     *                empty string indicates that there is no message selector
1451     *                for the message consumer.
1452     * @param noLocal if set, inhibits the delivery of messages published by its
1453     *                own connection
1454     * @return the Queue Browser
1455     * @throws JMSException if the session fails to create a subscriber due to
1456     *                 some internal error.
1457     * @throws InvalidDestinationException if an invalid topic is specified.
1458     * @throws InvalidSelectorException if the message selector is invalid.
1459     * @since 1.1
1460     */
1461    @Override
1462    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1463        checkClosed();
1464
1465        if (topic == null) {
1466            throw new InvalidDestinationException("Topic cannot be null");
1467        }
1468
1469        if (topic instanceof CustomDestination) {
1470            CustomDestination customDestination = (CustomDestination)topic;
1471            return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1472        }
1473
1474        connection.checkClientIDWasManuallySpecified();
1475        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1476        int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1477        int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1478        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1479                                           noLocal, false, asyncDispatch);
1480    }
1481
1482    /**
1483     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1484     * the specified queue.
1485     *
1486     * @param queue the <CODE>queue</CODE> to access
1487     * @return the Queue Browser
1488     * @throws JMSException if the session fails to create a browser due to some
1489     *                 internal error.
1490     * @throws InvalidDestinationException if an invalid destination is
1491     *                 specified
1492     * @since 1.1
1493     */
1494    @Override
1495    public QueueBrowser createBrowser(Queue queue) throws JMSException {
1496        checkClosed();
1497        return createBrowser(queue, null);
1498    }
1499
1500    /**
1501     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1502     * the specified queue using a message selector.
1503     *
1504     * @param queue the <CODE>queue</CODE> to access
1505     * @param messageSelector only messages with properties matching the message
1506     *                selector expression are delivered. A value of null or an
1507     *                empty string indicates that there is no message selector
1508     *                for the message consumer.
1509     * @return the Queue Browser
1510     * @throws JMSException if the session fails to create a browser due to some
1511     *                 internal error.
1512     * @throws InvalidDestinationException if an invalid destination is
1513     *                 specified
1514     * @throws InvalidSelectorException if the message selector is invalid.
1515     * @since 1.1
1516     */
1517    @Override
1518    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1519        checkClosed();
1520        return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1521    }
1522
1523    /**
1524     * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1525     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1526     *
1527     * @return a temporary queue identity
1528     * @throws JMSException if the session fails to create a temporary queue due
1529     *                 to some internal error.
1530     * @since 1.1
1531     */
1532    @Override
1533    public TemporaryQueue createTemporaryQueue() throws JMSException {
1534        checkClosed();
1535        return (TemporaryQueue)connection.createTempDestination(false);
1536    }
1537
1538    /**
1539     * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1540     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1541     *
1542     * @return a temporary topic identity
1543     * @throws JMSException if the session fails to create a temporary topic due
1544     *                 to some internal error.
1545     * @since 1.1
1546     */
1547    @Override
1548    public TemporaryTopic createTemporaryTopic() throws JMSException {
1549        checkClosed();
1550        return (TemporaryTopic)connection.createTempDestination(true);
1551    }
1552
1553    /**
1554     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1555     * the specified queue.
1556     *
1557     * @param queue the <CODE>Queue</CODE> to access
1558     * @return
1559     * @throws JMSException if the session fails to create a receiver due to
1560     *                 some internal error.
1561     * @throws JMSException
1562     * @throws InvalidDestinationException if an invalid queue is specified.
1563     */
1564    @Override
1565    public QueueReceiver createReceiver(Queue queue) throws JMSException {
1566        checkClosed();
1567        return createReceiver(queue, null);
1568    }
1569
1570    /**
1571     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1572     * the specified queue using a message selector.
1573     *
1574     * @param queue the <CODE>Queue</CODE> to access
1575     * @param messageSelector only messages with properties matching the message
1576     *                selector expression are delivered. A value of null or an
1577     *                empty string indicates that there is no message selector
1578     *                for the message consumer.
1579     * @return QueueReceiver
1580     * @throws JMSException if the session fails to create a receiver due to
1581     *                 some internal error.
1582     * @throws InvalidDestinationException if an invalid queue is specified.
1583     * @throws InvalidSelectorException if the message selector is invalid.
1584     */
1585    @Override
1586    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1587        checkClosed();
1588
1589        if (queue instanceof CustomDestination) {
1590            CustomDestination customDestination = (CustomDestination)queue;
1591            return customDestination.createReceiver(this, messageSelector);
1592        }
1593
1594        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1595        return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1596                                         prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1597    }
1598
1599    /**
1600     * Creates a <CODE>QueueSender</CODE> object to send messages to the
1601     * specified queue.
1602     *
1603     * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1604     *                unidentified producer
1605     * @return QueueSender
1606     * @throws JMSException if the session fails to create a sender due to some
1607     *                 internal error.
1608     * @throws InvalidDestinationException if an invalid queue is specified.
1609     */
1610    @Override
1611    public QueueSender createSender(Queue queue) throws JMSException {
1612        checkClosed();
1613        if (queue instanceof CustomDestination) {
1614            CustomDestination customDestination = (CustomDestination)queue;
1615            return customDestination.createSender(this);
1616        }
1617        int timeSendOut = connection.getSendTimeout();
1618        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1619    }
1620
1621    /**
1622     * Creates a nondurable subscriber to the specified topic. <p/>
1623     * <P>
1624     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1625     * that have been published to a topic. <p/>
1626     * <P>
1627     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1628     * receive only messages that are published while they are active. <p/>
1629     * <P>
1630     * In some cases, a connection may both publish and subscribe to a topic.
1631     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1632     * inhibit the delivery of messages published by its own connection. The
1633     * default value for this attribute is false.
1634     *
1635     * @param topic the <CODE>Topic</CODE> to subscribe to
1636     * @return TopicSubscriber
1637     * @throws JMSException if the session fails to create a subscriber due to
1638     *                 some internal error.
1639     * @throws InvalidDestinationException if an invalid topic is specified.
1640     */
1641    @Override
1642    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1643        checkClosed();
1644        return createSubscriber(topic, null, false);
1645    }
1646
1647    /**
1648     * Creates a nondurable subscriber to the specified topic, using a message
1649     * selector or specifying whether messages published by its own connection
1650     * should be delivered to it. <p/>
1651     * <P>
1652     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1653     * that have been published to a topic. <p/>
1654     * <P>
1655     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1656     * receive only messages that are published while they are active. <p/>
1657     * <P>
1658     * Messages filtered out by a subscriber's message selector will never be
1659     * delivered to the subscriber. From the subscriber's perspective, they do
1660     * not exist. <p/>
1661     * <P>
1662     * In some cases, a connection may both publish and subscribe to a topic.
1663     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1664     * inhibit the delivery of messages published by its own connection. The
1665     * default value for this attribute is false.
1666     *
1667     * @param topic the <CODE>Topic</CODE> to subscribe to
1668     * @param messageSelector only messages with properties matching the message
1669     *                selector expression are delivered. A value of null or an
1670     *                empty string indicates that there is no message selector
1671     *                for the message consumer.
1672     * @param noLocal if set, inhibits the delivery of messages published by its
1673     *                own connection
1674     * @return TopicSubscriber
1675     * @throws JMSException if the session fails to create a subscriber due to
1676     *                 some internal error.
1677     * @throws InvalidDestinationException if an invalid topic is specified.
1678     * @throws InvalidSelectorException if the message selector is invalid.
1679     */
1680    @Override
1681    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1682        checkClosed();
1683
1684        if (topic instanceof CustomDestination) {
1685            CustomDestination customDestination = (CustomDestination)topic;
1686            return customDestination.createSubscriber(this, messageSelector, noLocal);
1687        }
1688
1689        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1690        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1691            .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1692    }
1693
1694    /**
1695     * Creates a publisher for the specified topic. <p/>
1696     * <P>
1697     * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1698     * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1699     * a topic, it defines a new sequence of messages that have no ordering
1700     * relationship with the messages it has previously sent.
1701     *
1702     * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1703     *                an unidentified producer
1704     * @return TopicPublisher
1705     * @throws JMSException if the session fails to create a publisher due to
1706     *                 some internal error.
1707     * @throws InvalidDestinationException if an invalid topic is specified.
1708     */
1709    @Override
1710    public TopicPublisher createPublisher(Topic topic) throws JMSException {
1711        checkClosed();
1712
1713        if (topic instanceof CustomDestination) {
1714            CustomDestination customDestination = (CustomDestination)topic;
1715            return customDestination.createPublisher(this);
1716        }
1717        int timeSendOut = connection.getSendTimeout();
1718        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1719    }
1720
1721    /**
1722     * Unsubscribes a durable subscription that has been created by a client.
1723     * <P>
1724     * This method deletes the state being maintained on behalf of the
1725     * subscriber by its provider.
1726     * <P>
1727     * It is erroneous for a client to delete a durable subscription while there
1728     * is an active <CODE>MessageConsumer </CODE> or
1729     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1730     * message is part of a pending transaction or has not been acknowledged in
1731     * the session.
1732     *
1733     * @param name the name used to identify this subscription
1734     * @throws JMSException if the session fails to unsubscribe to the durable
1735     *                 subscription due to some internal error.
1736     * @throws InvalidDestinationException if an invalid subscription name is
1737     *                 specified.
1738     * @since 1.1
1739     */
1740    @Override
1741    public void unsubscribe(String name) throws JMSException {
1742        checkClosed();
1743        connection.unsubscribe(name);
1744    }
1745
1746    @Override
1747    public void dispatch(MessageDispatch messageDispatch) {
1748        try {
1749            executor.execute(messageDispatch);
1750        } catch (InterruptedException e) {
1751            Thread.currentThread().interrupt();
1752            connection.onClientInternalException(e);
1753        }
1754    }
1755
1756    /**
1757     * Acknowledges all consumed messages of the session of this consumed
1758     * message.
1759     * <P>
1760     * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1761     * for use when a client has specified that its JMS session's consumed
1762     * messages are to be explicitly acknowledged. By invoking
1763     * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1764     * all messages consumed by the session that the message was delivered to.
1765     * <P>
1766     * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1767     * sessions and sessions specified to use implicit acknowledgement modes.
1768     * <P>
1769     * A client may individually acknowledge each message as it is consumed, or
1770     * it may choose to acknowledge messages as an application-defined group
1771     * (which is done by calling acknowledge on the last received message of the
1772     * group, thereby acknowledging all messages consumed by the session.)
1773     * <P>
1774     * Messages that have been received but not acknowledged may be redelivered.
1775     *
1776     * @throws JMSException if the JMS provider fails to acknowledge the
1777     *                 messages due to some internal error.
1778     * @throws javax.jms.IllegalStateException if this method is called on a
1779     *                 closed session.
1780     * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1781     */
1782    public void acknowledge() throws JMSException {
1783        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1784            ActiveMQMessageConsumer c = iter.next();
1785            c.acknowledge();
1786        }
1787    }
1788
1789    /**
1790     * Add a message consumer.
1791     *
1792     * @param consumer - message consumer.
1793     * @throws JMSException
1794     */
1795    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1796        this.consumers.add(consumer);
1797        if (consumer.isDurableSubscriber()) {
1798            stats.onCreateDurableSubscriber();
1799        }
1800        this.connection.addDispatcher(consumer.getConsumerId(), this);
1801    }
1802
1803    /**
1804     * Remove the message consumer.
1805     *
1806     * @param consumer - consumer to be removed.
1807     * @throws JMSException
1808     */
1809    protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1810        this.connection.removeDispatcher(consumer.getConsumerId());
1811        if (consumer.isDurableSubscriber()) {
1812            stats.onRemoveDurableSubscriber();
1813        }
1814        this.consumers.remove(consumer);
1815        this.connection.removeDispatcher(consumer);
1816    }
1817
1818    /**
1819     * Adds a message producer.
1820     *
1821     * @param producer - message producer to be added.
1822     * @throws JMSException
1823     */
1824    protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1825        this.producers.add(producer);
1826        this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1827    }
1828
1829    /**
1830     * Removes a message producer.
1831     *
1832     * @param producer - message producer to be removed.
1833     * @throws JMSException
1834     */
1835    protected void removeProducer(ActiveMQMessageProducer producer) {
1836        this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1837        this.producers.remove(producer);
1838    }
1839
1840    /**
1841     * Start this Session.
1842     *
1843     * @throws JMSException
1844     */
1845    protected void start() throws JMSException {
1846        started.set(true);
1847        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1848            ActiveMQMessageConsumer c = iter.next();
1849            c.start();
1850        }
1851        executor.start();
1852    }
1853
1854    /**
1855     * Stops this session.
1856     *
1857     * @throws JMSException
1858     */
1859    protected void stop() throws JMSException {
1860
1861        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1862            ActiveMQMessageConsumer c = iter.next();
1863            c.stop();
1864        }
1865
1866        started.set(false);
1867        executor.stop();
1868    }
1869
1870    /**
1871     * Returns the session id.
1872     *
1873     * @return value - session id.
1874     */
1875    protected SessionId getSessionId() {
1876        return info.getSessionId();
1877    }
1878
1879    /**
1880     * @return
1881     */
1882    protected ConsumerId getNextConsumerId() {
1883        return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1884    }
1885
1886    /**
1887     * @return
1888     */
1889    protected ProducerId getNextProducerId() {
1890        return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1891    }
1892
1893    /**
1894     * Sends the message for dispatch by the broker.
1895     *
1896     *
1897     * @param producer - message producer.
1898     * @param destination - message destination.
1899     * @param message - message to be sent.
1900     * @param deliveryMode - JMS messsage delivery mode.
1901     * @param priority - message priority.
1902     * @param timeToLive - message expiration.
1903     * @param producerWindow
1904     * @param onComplete
1905     * @throws JMSException
1906     */
1907    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1908                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
1909
1910        checkClosed();
1911        if (destination.isTemporary() && connection.isDeleted(destination)) {
1912            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1913        }
1914        synchronized (sendMutex) {
1915            // tell the Broker we are about to start a new transaction
1916            doStartTransaction();
1917            TransactionId txid = transactionContext.getTransactionId();
1918            long sequenceNumber = producer.getMessageSequence();
1919
1920            //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1921            message.setJMSDeliveryMode(deliveryMode);
1922            long expiration = 0L;
1923            if (!producer.getDisableMessageTimestamp()) {
1924                long timeStamp = System.currentTimeMillis();
1925                message.setJMSTimestamp(timeStamp);
1926                if (timeToLive > 0) {
1927                    expiration = timeToLive + timeStamp;
1928                }
1929            }
1930            message.setJMSExpiration(expiration);
1931            message.setJMSPriority(priority);
1932            message.setJMSRedelivered(false);
1933
1934            // transform to our own message format here
1935            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1936            msg.setDestination(destination);
1937            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1938
1939            // Set the message id.
1940            if (msg != message) {
1941                message.setJMSMessageID(msg.getMessageId().toString());
1942                // Make sure the JMS destination is set on the foreign messages too.
1943                message.setJMSDestination(destination);
1944            }
1945            //clear the brokerPath in case we are re-sending this message
1946            msg.setBrokerPath(null);
1947
1948            msg.setTransactionId(txid);
1949            if (connection.isCopyMessageOnSend()) {
1950                msg = (ActiveMQMessage)msg.copy();
1951            }
1952            msg.setConnection(connection);
1953            msg.onSend();
1954            msg.setProducerId(msg.getMessageId().getProducerId());
1955            if (LOG.isTraceEnabled()) {
1956                LOG.trace(getSessionId() + " sending message: " + msg);
1957            }
1958            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1959                this.connection.asyncSendPacket(msg);
1960                if (producerWindow != null) {
1961                    // Since we defer lots of the marshaling till we hit the
1962                    // wire, this might not
1963                    // provide and accurate size. We may change over to doing
1964                    // more aggressive marshaling,
1965                    // to get more accurate sizes.. this is more important once
1966                    // users start using producer window
1967                    // flow control.
1968                    int size = msg.getSize();
1969                    producerWindow.increaseUsage(size);
1970                }
1971            } else {
1972                if (sendTimeout > 0 && onComplete==null) {
1973                    this.connection.syncSendPacket(msg,sendTimeout);
1974                }else {
1975                    this.connection.syncSendPacket(msg, onComplete);
1976                }
1977            }
1978
1979        }
1980    }
1981
1982    /**
1983     * Send TransactionInfo to indicate transaction has started
1984     *
1985     * @throws JMSException if some internal error occurs
1986     */
1987    protected void doStartTransaction() throws JMSException {
1988        if (getTransacted() && !transactionContext.isInXATransaction()) {
1989            transactionContext.begin();
1990        }
1991    }
1992
1993    /**
1994     * Checks whether the session has unconsumed messages.
1995     *
1996     * @return true - if there are unconsumed messages.
1997     */
1998    public boolean hasUncomsumedMessages() {
1999        return executor.hasUncomsumedMessages();
2000    }
2001
2002    /**
2003     * Checks whether the session uses transactions.
2004     *
2005     * @return true - if the session uses transactions.
2006     */
2007    public boolean isTransacted() {
2008        return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
2009    }
2010
2011    /**
2012     * Checks whether the session used client acknowledgment.
2013     *
2014     * @return true - if the session uses client acknowledgment.
2015     */
2016    protected boolean isClientAcknowledge() {
2017        return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
2018    }
2019
2020    /**
2021     * Checks whether the session used auto acknowledgment.
2022     *
2023     * @return true - if the session uses client acknowledgment.
2024     */
2025    public boolean isAutoAcknowledge() {
2026        return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
2027    }
2028
2029    /**
2030     * Checks whether the session used dup ok acknowledgment.
2031     *
2032     * @return true - if the session uses client acknowledgment.
2033     */
2034    public boolean isDupsOkAcknowledge() {
2035        return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
2036    }
2037
2038    public boolean isIndividualAcknowledge(){
2039        return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
2040    }
2041
2042    /**
2043     * Returns the message delivery listener.
2044     *
2045     * @return deliveryListener - message delivery listener.
2046     */
2047    public DeliveryListener getDeliveryListener() {
2048        return deliveryListener;
2049    }
2050
2051    /**
2052     * Sets the message delivery listener.
2053     *
2054     * @param deliveryListener - message delivery listener.
2055     */
2056    public void setDeliveryListener(DeliveryListener deliveryListener) {
2057        this.deliveryListener = deliveryListener;
2058    }
2059
2060    /**
2061     * Returns the SessionInfo bean.
2062     *
2063     * @return info - SessionInfo bean.
2064     * @throws JMSException
2065     */
2066    protected SessionInfo getSessionInfo() throws JMSException {
2067        SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
2068        return info;
2069    }
2070
2071    /**
2072     * Send the asynchronus command.
2073     *
2074     * @param command - command to be executed.
2075     * @throws JMSException
2076     */
2077    public void asyncSendPacket(Command command) throws JMSException {
2078        connection.asyncSendPacket(command);
2079    }
2080
2081    /**
2082     * Send the synchronus command.
2083     *
2084     * @param command - command to be executed.
2085     * @return Response
2086     * @throws JMSException
2087     */
2088    public Response syncSendPacket(Command command) throws JMSException {
2089        return connection.syncSendPacket(command);
2090    }
2091
2092    public long getNextDeliveryId() {
2093        return deliveryIdGenerator.getNextSequenceId();
2094    }
2095
2096    public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
2097
2098        List<MessageDispatch> c = unconsumedMessages.removeAll();
2099        for (MessageDispatch md : c) {
2100            this.connection.rollbackDuplicate(dispatcher, md.getMessage());
2101        }
2102        Collections.reverse(c);
2103
2104        for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
2105            MessageDispatch md = iter.next();
2106            executor.executeFirst(md);
2107        }
2108
2109    }
2110
2111    public boolean isRunning() {
2112        return started.get();
2113    }
2114
2115    public boolean isAsyncDispatch() {
2116        return asyncDispatch;
2117    }
2118
2119    public void setAsyncDispatch(boolean asyncDispatch) {
2120        this.asyncDispatch = asyncDispatch;
2121    }
2122
2123    /**
2124     * @return Returns the sessionAsyncDispatch.
2125     */
2126    public boolean isSessionAsyncDispatch() {
2127        return sessionAsyncDispatch;
2128    }
2129
2130    /**
2131     * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
2132     */
2133    public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
2134        this.sessionAsyncDispatch = sessionAsyncDispatch;
2135    }
2136
2137    public MessageTransformer getTransformer() {
2138        return transformer;
2139    }
2140
2141    public ActiveMQConnection getConnection() {
2142        return connection;
2143    }
2144
2145    /**
2146     * Sets the transformer used to transform messages before they are sent on
2147     * to the JMS bus or when they are received from the bus but before they are
2148     * delivered to the JMS client
2149     */
2150    public void setTransformer(MessageTransformer transformer) {
2151        this.transformer = transformer;
2152    }
2153
2154    public BlobTransferPolicy getBlobTransferPolicy() {
2155        return blobTransferPolicy;
2156    }
2157
2158    /**
2159     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
2160     * OBjects) are transferred from producers to brokers to consumers
2161     */
2162    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
2163        this.blobTransferPolicy = blobTransferPolicy;
2164    }
2165
2166    public List<MessageDispatch> getUnconsumedMessages() {
2167        return executor.getUnconsumedMessages();
2168    }
2169
2170    @Override
2171    public String toString() {
2172        return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + ",closed=" + closed + "} " + sendMutex;
2173    }
2174
2175    public void checkMessageListener() throws JMSException {
2176        if (messageListener != null) {
2177            throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2178        }
2179        for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
2180            ActiveMQMessageConsumer consumer = i.next();
2181            if (consumer.hasMessageListener()) {
2182                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2183            }
2184        }
2185    }
2186
2187    protected void setOptimizeAcknowledge(boolean value) {
2188        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2189            ActiveMQMessageConsumer c = iter.next();
2190            c.setOptimizeAcknowledge(value);
2191        }
2192    }
2193
2194    protected void setPrefetchSize(ConsumerId id, int prefetch) {
2195        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2196            ActiveMQMessageConsumer c = iter.next();
2197            if (c.getConsumerId().equals(id)) {
2198                c.setPrefetchSize(prefetch);
2199                break;
2200            }
2201        }
2202    }
2203
2204    protected void close(ConsumerId id) {
2205        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2206            ActiveMQMessageConsumer c = iter.next();
2207            if (c.getConsumerId().equals(id)) {
2208                try {
2209                    c.close();
2210                } catch (JMSException e) {
2211                    LOG.warn("Exception closing consumer", e);
2212                }
2213                LOG.warn("Closed consumer on Command, " + id);
2214                break;
2215            }
2216        }
2217    }
2218
2219    public boolean isInUse(ActiveMQTempDestination destination) {
2220        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2221            ActiveMQMessageConsumer c = iter.next();
2222            if (c.isInUse(destination)) {
2223                return true;
2224            }
2225        }
2226        return false;
2227    }
2228
2229    /**
2230     * highest sequence id of the last message delivered by this session.
2231     * Passed to the broker in the close command, maintained by dispose()
2232     * @return lastDeliveredSequenceId
2233     */
2234    public long getLastDeliveredSequenceId() {
2235        return lastDeliveredSequenceId;
2236    }
2237
2238    protected void sendAck(MessageAck ack) throws JMSException {
2239        sendAck(ack,false);
2240    }
2241
2242    protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2243        if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2244            asyncSendPacket(ack);
2245        } else {
2246            syncSendPacket(ack);
2247        }
2248    }
2249
2250    protected Scheduler getScheduler() throws JMSException {
2251        return this.connection.getScheduler();
2252    }
2253
2254    protected ThreadPoolExecutor getConnectionExecutor() {
2255        return this.connectionExecutor;
2256    }
2257}