001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.File;
020import java.io.InputStream;
021import java.io.Serializable;
022import java.net.URL;
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.List;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030
031import javax.jms.BytesMessage;
032import javax.jms.Destination;
033import javax.jms.IllegalStateException;
034import javax.jms.InvalidDestinationException;
035import javax.jms.InvalidSelectorException;
036import javax.jms.JMSException;
037import javax.jms.MapMessage;
038import javax.jms.Message;
039import javax.jms.MessageConsumer;
040import javax.jms.MessageListener;
041import javax.jms.MessageProducer;
042import javax.jms.ObjectMessage;
043import javax.jms.Queue;
044import javax.jms.QueueBrowser;
045import javax.jms.QueueReceiver;
046import javax.jms.QueueSender;
047import javax.jms.QueueSession;
048import javax.jms.Session;
049import javax.jms.StreamMessage;
050import javax.jms.TemporaryQueue;
051import javax.jms.TemporaryTopic;
052import javax.jms.TextMessage;
053import javax.jms.Topic;
054import javax.jms.TopicPublisher;
055import javax.jms.TopicSession;
056import javax.jms.TopicSubscriber;
057import javax.jms.TransactionRolledBackException;
058
059import org.apache.activemq.blob.BlobDownloader;
060import org.apache.activemq.blob.BlobTransferPolicy;
061import org.apache.activemq.blob.BlobUploader;
062import org.apache.activemq.command.ActiveMQBlobMessage;
063import org.apache.activemq.command.ActiveMQBytesMessage;
064import org.apache.activemq.command.ActiveMQDestination;
065import org.apache.activemq.command.ActiveMQMapMessage;
066import org.apache.activemq.command.ActiveMQMessage;
067import org.apache.activemq.command.ActiveMQObjectMessage;
068import org.apache.activemq.command.ActiveMQQueue;
069import org.apache.activemq.command.ActiveMQStreamMessage;
070import org.apache.activemq.command.ActiveMQTempDestination;
071import org.apache.activemq.command.ActiveMQTempQueue;
072import org.apache.activemq.command.ActiveMQTempTopic;
073import org.apache.activemq.command.ActiveMQTextMessage;
074import org.apache.activemq.command.ActiveMQTopic;
075import org.apache.activemq.command.Command;
076import org.apache.activemq.command.CommandTypes;
077import org.apache.activemq.command.ConsumerId;
078import org.apache.activemq.command.MessageAck;
079import org.apache.activemq.command.MessageDispatch;
080import org.apache.activemq.command.MessageId;
081import org.apache.activemq.command.ProducerId;
082import org.apache.activemq.command.RemoveInfo;
083import org.apache.activemq.command.Response;
084import org.apache.activemq.command.SessionId;
085import org.apache.activemq.command.SessionInfo;
086import org.apache.activemq.command.TransactionId;
087import org.apache.activemq.management.JMSSessionStatsImpl;
088import org.apache.activemq.management.StatsCapable;
089import org.apache.activemq.management.StatsImpl;
090import org.apache.activemq.thread.Scheduler;
091import org.apache.activemq.transaction.Synchronization;
092import org.apache.activemq.usage.MemoryUsage;
093import org.apache.activemq.util.Callback;
094import org.apache.activemq.util.LongSequenceGenerator;
095import org.slf4j.Logger;
096import org.slf4j.LoggerFactory;
097
098/**
099 * <P>
100 * A <CODE>Session</CODE> object is a single-threaded context for producing
101 * and consuming messages. Although it may allocate provider resources outside
102 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
103 * <P>
104 * A session serves several purposes:
105 * <UL>
106 * <LI>It is a factory for its message producers and consumers.
107 * <LI>It supplies provider-optimized message factories.
108 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
109 * <CODE>TemporaryQueues</CODE>.
110 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
111 * objects for those clients that need to dynamically manipulate
112 * provider-specific destination names.
113 * <LI>It supports a single series of transactions that combine work spanning
114 * its producers and consumers into atomic units.
115 * <LI>It defines a serial order for the messages it consumes and the messages
116 * it produces.
117 * <LI>It retains messages it consumes until they have been acknowledged.
118 * <LI>It serializes execution of message listeners registered with its message
119 * consumers.
120 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
121 * </UL>
122 * <P>
123 * A session can create and service multiple message producers and consumers.
124 * <P>
125 * One typical use is to have a thread block on a synchronous
126 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
127 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
128 * <P>
129 * If a client desires to have one thread produce messages while others consume
130 * them, the client should use a separate session for its producing thread.
131 * <P>
132 * Once a connection has been started, any session with one or more registered
133 * message listeners is dedicated to the thread of control that delivers
134 * messages to it. It is erroneous for client code to use this session or any of
135 * its constituent objects from another thread of control. The only exception to
136 * this rule is the use of the session or connection <CODE>close</CODE>
137 * method.
138 * <P>
139 * It should be easy for most clients to partition their work naturally into
140 * sessions. This model allows clients to start simply and incrementally add
141 * message processing complexity as their need for concurrency grows.
142 * <P>
143 * The <CODE>close</CODE> method is the only session method that can be called
144 * while some other session method is being executed in another thread.
145 * <P>
146 * A session may be specified as transacted. Each transacted session supports a
147 * single series of transactions. Each transaction groups a set of message sends
148 * and a set of message receives into an atomic unit of work. In effect,
149 * transactions organize a session's input message stream and output message
150 * stream into series of atomic units. When a transaction commits, its atomic
151 * unit of input is acknowledged and its associated atomic unit of output is
152 * sent. If a transaction rollback is done, the transaction's sent messages are
153 * destroyed and the session's input is automatically recovered.
154 * <P>
155 * The content of a transaction's input and output units is simply those
156 * messages that have been produced and consumed within the session's current
157 * transaction.
158 * <P>
159 * A transaction is completed using either its session's <CODE>commit</CODE>
160 * method or its session's <CODE>rollback </CODE> method. The completion of a
161 * session's current transaction automatically begins the next. The result is
162 * that a transacted session always has a current transaction within which its
163 * work is done.
164 * <P>
165 * The Java Transaction Service (JTS) or some other transaction monitor may be
166 * used to combine a session's transaction with transactions on other resources
167 * (databases, other JMS sessions, etc.). Since Java distributed transactions
168 * are controlled via the Java Transaction API (JTA), use of the session's
169 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
170 * prohibited.
171 * <P>
172 * The JMS API does not require support for JTA; however, it does define how a
173 * provider supplies this support.
174 * <P>
175 * Although it is also possible for a JMS client to handle distributed
176 * transactions directly, it is unlikely that many JMS clients will do this.
177 * Support for JTA in the JMS API is targeted at systems vendors who will be
178 * integrating the JMS API into their application server products.
179 *
180 *
181 * @see javax.jms.Session
182 * @see javax.jms.QueueSession
183 * @see javax.jms.TopicSession
184 * @see javax.jms.XASession
185 */
186public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
187
188    /**
189     * Only acknowledge an individual message - using message.acknowledge()
190     * as opposed to CLIENT_ACKNOWLEDGE which
191     * acknowledges all messages consumed by a session at when acknowledge()
192     * is called
193     */
194    public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
195    public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
196
197    public static interface DeliveryListener {
198        void beforeDelivery(ActiveMQSession session, Message msg);
199
200        void afterDelivery(ActiveMQSession session, Message msg);
201    }
202
203    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
204    private final ThreadPoolExecutor connectionExecutor;
205
206    protected int acknowledgementMode;
207    protected final ActiveMQConnection connection;
208    protected final SessionInfo info;
209    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
210    protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
211    protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
212    protected final ActiveMQSessionExecutor executor;
213    protected final AtomicBoolean started = new AtomicBoolean(false);
214
215    protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
216    protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
217
218    protected boolean closed;
219    private volatile boolean synchronizationRegistered;
220    protected boolean asyncDispatch;
221    protected boolean sessionAsyncDispatch;
222    protected final boolean debug;
223    protected final Object sendMutex = new Object();
224    protected final Object redeliveryGuard = new Object();
225
226    private final AtomicBoolean clearInProgress = new AtomicBoolean();
227
228    private MessageListener messageListener;
229    private final JMSSessionStatsImpl stats;
230    private TransactionContext transactionContext;
231    private DeliveryListener deliveryListener;
232    private MessageTransformer transformer;
233    private BlobTransferPolicy blobTransferPolicy;
234    private long lastDeliveredSequenceId = -2;
235
236    /**
237     * Construct the Session
238     *
239     * @param connection
240     * @param sessionId
241     * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
242     *                Session.SESSION_TRANSACTED
243     * @param asyncDispatch
244     * @param sessionAsyncDispatch
245     * @throws JMSException on internal error
246     */
247    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
248        this.debug = LOG.isDebugEnabled();
249        this.connection = connection;
250        this.acknowledgementMode = acknowledgeMode;
251        this.asyncDispatch = asyncDispatch;
252        this.sessionAsyncDispatch = sessionAsyncDispatch;
253        this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
254        setTransactionContext(new TransactionContext(connection));
255        stats = new JMSSessionStatsImpl(producers, consumers);
256        this.connection.asyncSendPacket(info);
257        setTransformer(connection.getTransformer());
258        setBlobTransferPolicy(connection.getBlobTransferPolicy());
259        this.connectionExecutor=connection.getExecutor();
260        this.executor = new ActiveMQSessionExecutor(this);
261        connection.addSession(this);
262        if (connection.isStarted()) {
263            start();
264        }
265
266    }
267
268    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
269        this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
270    }
271
272    /**
273     * Sets the transaction context of the session.
274     *
275     * @param transactionContext - provides the means to control a JMS
276     *                transaction.
277     */
278    public void setTransactionContext(TransactionContext transactionContext) {
279        this.transactionContext = transactionContext;
280    }
281
282    /**
283     * Returns the transaction context of the session.
284     *
285     * @return transactionContext - session's transaction context.
286     */
287    public TransactionContext getTransactionContext() {
288        return transactionContext;
289    }
290
291    /*
292     * (non-Javadoc)
293     *
294     * @see org.apache.activemq.management.StatsCapable#getStats()
295     */
296    @Override
297    public StatsImpl getStats() {
298        return stats;
299    }
300
301    /**
302     * Returns the session's statistics.
303     *
304     * @return stats - session's statistics.
305     */
306    public JMSSessionStatsImpl getSessionStats() {
307        return stats;
308    }
309
310    /**
311     * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
312     * object is used to send a message containing a stream of uninterpreted
313     * bytes.
314     *
315     * @return the an ActiveMQBytesMessage
316     * @throws JMSException if the JMS provider fails to create this message due
317     *                 to some internal error.
318     */
319    @Override
320    public BytesMessage createBytesMessage() throws JMSException {
321        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
322        configureMessage(message);
323        return message;
324    }
325
326    /**
327     * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
328     * object is used to send a self-defining set of name-value pairs, where
329     * names are <CODE>String</CODE> objects and values are primitive values
330     * in the Java programming language.
331     *
332     * @return an ActiveMQMapMessage
333     * @throws JMSException if the JMS provider fails to create this message due
334     *                 to some internal error.
335     */
336    @Override
337    public MapMessage createMapMessage() throws JMSException {
338        ActiveMQMapMessage message = new ActiveMQMapMessage();
339        configureMessage(message);
340        return message;
341    }
342
343    /**
344     * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
345     * interface is the root interface of all JMS messages. A
346     * <CODE>Message</CODE> object holds all the standard message header
347     * information. It can be sent when a message containing only header
348     * information is sufficient.
349     *
350     * @return an ActiveMQMessage
351     * @throws JMSException if the JMS provider fails to create this message due
352     *                 to some internal error.
353     */
354    @Override
355    public Message createMessage() throws JMSException {
356        ActiveMQMessage message = new ActiveMQMessage();
357        configureMessage(message);
358        return message;
359    }
360
361    /**
362     * Creates an <CODE>ObjectMessage</CODE> object. An
363     * <CODE>ObjectMessage</CODE> object is used to send a message that
364     * contains a serializable Java object.
365     *
366     * @return an ActiveMQObjectMessage
367     * @throws JMSException if the JMS provider fails to create this message due
368     *                 to some internal error.
369     */
370    @Override
371    public ObjectMessage createObjectMessage() throws JMSException {
372        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
373        configureMessage(message);
374        return message;
375    }
376
377    /**
378     * Creates an initialized <CODE>ObjectMessage</CODE> object. An
379     * <CODE>ObjectMessage</CODE> object is used to send a message that
380     * contains a serializable Java object.
381     *
382     * @param object the object to use to initialize this message
383     * @return an ActiveMQObjectMessage
384     * @throws JMSException if the JMS provider fails to create this message due
385     *                 to some internal error.
386     */
387    @Override
388    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
389        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
390        configureMessage(message);
391        message.setObject(object);
392        return message;
393    }
394
395    /**
396     * Creates a <CODE>StreamMessage</CODE> object. A
397     * <CODE>StreamMessage</CODE> object is used to send a self-defining
398     * stream of primitive values in the Java programming language.
399     *
400     * @return an ActiveMQStreamMessage
401     * @throws JMSException if the JMS provider fails to create this message due
402     *                 to some internal error.
403     */
404    @Override
405    public StreamMessage createStreamMessage() throws JMSException {
406        ActiveMQStreamMessage message = new ActiveMQStreamMessage();
407        configureMessage(message);
408        return message;
409    }
410
411    /**
412     * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
413     * object is used to send a message containing a <CODE>String</CODE>
414     * object.
415     *
416     * @return an ActiveMQTextMessage
417     * @throws JMSException if the JMS provider fails to create this message due
418     *                 to some internal error.
419     */
420    @Override
421    public TextMessage createTextMessage() throws JMSException {
422        ActiveMQTextMessage message = new ActiveMQTextMessage();
423        configureMessage(message);
424        return message;
425    }
426
427    /**
428     * Creates an initialized <CODE>TextMessage</CODE> object. A
429     * <CODE>TextMessage</CODE> object is used to send a message containing a
430     * <CODE>String</CODE>.
431     *
432     * @param text the string used to initialize this message
433     * @return an ActiveMQTextMessage
434     * @throws JMSException if the JMS provider fails to create this message due
435     *                 to some internal error.
436     */
437    @Override
438    public TextMessage createTextMessage(String text) throws JMSException {
439        ActiveMQTextMessage message = new ActiveMQTextMessage();
440        message.setText(text);
441        configureMessage(message);
442        return message;
443    }
444
445    /**
446     * Creates an initialized <CODE>BlobMessage</CODE> object. A
447     * <CODE>BlobMessage</CODE> object is used to send a message containing a
448     * <CODE>URL</CODE> which points to some network addressible BLOB.
449     *
450     * @param url the network addressable URL used to pass directly to the
451     *                consumer
452     * @return a BlobMessage
453     * @throws JMSException if the JMS provider fails to create this message due
454     *                 to some internal error.
455     */
456    public BlobMessage createBlobMessage(URL url) throws JMSException {
457        return createBlobMessage(url, false);
458    }
459
460    /**
461     * Creates an initialized <CODE>BlobMessage</CODE> object. A
462     * <CODE>BlobMessage</CODE> object is used to send a message containing a
463     * <CODE>URL</CODE> which points to some network addressible BLOB.
464     *
465     * @param url the network addressable URL used to pass directly to the
466     *                consumer
467     * @param deletedByBroker indicates whether or not the resource is deleted
468     *                by the broker when the message is acknowledged
469     * @return a BlobMessage
470     * @throws JMSException if the JMS provider fails to create this message due
471     *                 to some internal error.
472     */
473    public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
474        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
475        configureMessage(message);
476        message.setURL(url);
477        message.setDeletedByBroker(deletedByBroker);
478        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
479        return message;
480    }
481
482    /**
483     * Creates an initialized <CODE>BlobMessage</CODE> object. A
484     * <CODE>BlobMessage</CODE> object is used to send a message containing
485     * the <CODE>File</CODE> content. Before the message is sent the file
486     * conent will be uploaded to the broker or some other remote repository
487     * depending on the {@link #getBlobTransferPolicy()}.
488     *
489     * @param file the file to be uploaded to some remote repo (or the broker)
490     *                depending on the strategy
491     * @return a BlobMessage
492     * @throws JMSException if the JMS provider fails to create this message due
493     *                 to some internal error.
494     */
495    public BlobMessage createBlobMessage(File file) throws JMSException {
496        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
497        configureMessage(message);
498        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
499        message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
500        message.setDeletedByBroker(true);
501        message.setName(file.getName());
502        return message;
503    }
504
505    /**
506     * Creates an initialized <CODE>BlobMessage</CODE> object. A
507     * <CODE>BlobMessage</CODE> object is used to send a message containing
508     * the <CODE>File</CODE> content. Before the message is sent the file
509     * conent will be uploaded to the broker or some other remote repository
510     * depending on the {@link #getBlobTransferPolicy()}. <br/>
511     * <p>
512     * The caller of this method is responsible for closing the
513     * input stream that is used, however the stream can not be closed
514     * until <b>after</b> the message has been sent.  To have this class
515     * manage the stream and close it automatically, use the method
516     * {@link ActiveMQSession#createBlobMessage(File)}
517     *
518     * @param in the stream to be uploaded to some remote repo (or the broker)
519     *                depending on the strategy
520     * @return a BlobMessage
521     * @throws JMSException if the JMS provider fails to create this message due
522     *                 to some internal error.
523     */
524    public BlobMessage createBlobMessage(InputStream in) throws JMSException {
525        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
526        configureMessage(message);
527        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
528        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
529        message.setDeletedByBroker(true);
530        return message;
531    }
532
533    /**
534     * Indicates whether the session is in transacted mode.
535     *
536     * @return true if the session is in transacted mode
537     * @throws JMSException if there is some internal error.
538     */
539    @Override
540    public boolean getTransacted() throws JMSException {
541        checkClosed();
542        return isTransacted();
543    }
544
545    /**
546     * Returns the acknowledgement mode of the session. The acknowledgement mode
547     * is set at the time that the session is created. If the session is
548     * transacted, the acknowledgement mode is ignored.
549     *
550     * @return If the session is not transacted, returns the current
551     *         acknowledgement mode for the session. If the session is
552     *         transacted, returns SESSION_TRANSACTED.
553     * @throws JMSException
554     * @see javax.jms.Connection#createSession(boolean,int)
555     * @since 1.1 exception JMSException if there is some internal error.
556     */
557    @Override
558    public int getAcknowledgeMode() throws JMSException {
559        checkClosed();
560        return this.acknowledgementMode;
561    }
562
563    /**
564     * Commits all messages done in this transaction and releases any locks
565     * currently held.
566     *
567     * @throws JMSException if the JMS provider fails to commit the transaction
568     *                 due to some internal error.
569     * @throws TransactionRolledBackException if the transaction is rolled back
570     *                 due to some internal error during commit.
571     * @throws javax.jms.IllegalStateException if the method is not called by a
572     *                 transacted session.
573     */
574    @Override
575    public void commit() throws JMSException {
576        checkClosed();
577        if (!getTransacted()) {
578            throw new javax.jms.IllegalStateException("Not a transacted session");
579        }
580        if (LOG.isDebugEnabled()) {
581            LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
582        }
583        transactionContext.commit();
584    }
585
586    /**
587     * Rolls back any messages done in this transaction and releases any locks
588     * currently held.
589     *
590     * @throws JMSException if the JMS provider fails to roll back the
591     *                 transaction due to some internal error.
592     * @throws javax.jms.IllegalStateException if the method is not called by a
593     *                 transacted session.
594     */
595    @Override
596    public void rollback() throws JMSException {
597        checkClosed();
598        if (!getTransacted()) {
599            throw new javax.jms.IllegalStateException("Not a transacted session");
600        }
601        if (LOG.isDebugEnabled()) {
602            LOG.debug(getSessionId() + " Transaction Rollback, txid:"  + transactionContext.getTransactionId());
603        }
604        transactionContext.rollback();
605    }
606
607    /**
608     * Closes the session.
609     * <P>
610     * Since a provider may allocate some resources on behalf of a session
611     * outside the JVM, clients should close the resources when they are not
612     * needed. Relying on garbage collection to eventually reclaim these
613     * resources may not be timely enough.
614     * <P>
615     * There is no need to close the producers and consumers of a closed
616     * session.
617     * <P>
618     * This call will block until a <CODE>receive</CODE> call or message
619     * listener in progress has completed. A blocked message consumer
620     * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
621     * is closed.
622     * <P>
623     * Closing a transacted session must roll back the transaction in progress.
624     * <P>
625     * This method is the only <CODE>Session</CODE> method that can be called
626     * concurrently.
627     * <P>
628     * Invoking any other <CODE>Session</CODE> method on a closed session must
629     * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
630     * closed session must <I>not </I> throw an exception.
631     *
632     * @throws JMSException if the JMS provider fails to close the session due
633     *                 to some internal error.
634     */
635    @Override
636    public void close() throws JMSException {
637        if (!closed) {
638            if (getTransactionContext().isInXATransaction()) {
639                if (!synchronizationRegistered) {
640                    synchronizationRegistered = true;
641                    getTransactionContext().addSynchronization(new Synchronization() {
642
643                                        @Override
644                                        public void afterCommit() throws Exception {
645                                            doClose();
646                                            synchronizationRegistered = false;
647                                        }
648
649                                        @Override
650                                        public void afterRollback() throws Exception {
651                                            doClose();
652                                            synchronizationRegistered = false;
653                                        }
654                                    });
655                }
656
657            } else {
658                doClose();
659            }
660        }
661    }
662
663    private void doClose() throws JMSException {
664        dispose();
665        RemoveInfo removeCommand = info.createRemoveCommand();
666        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
667        connection.asyncSendPacket(removeCommand);
668    }
669
670    final AtomicInteger clearRequestsCounter = new AtomicInteger(0);
671    void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
672        clearRequestsCounter.incrementAndGet();
673        executor.clearMessagesInProgress();
674        // we are called from inside the transport reconnection logic which involves us
675        // clearing all the connections' consumers dispatch and delivered lists. So rather
676        // than trying to grab a mutex (which could be already owned by the message listener
677        // calling the send or an ack) we allow it to complete in a separate thread via the
678        // scheduler and notify us via connection.transportInterruptionProcessingComplete()
679        //
680        // We must be careful though not to allow multiple calls to this method from a
681        // connection that is having issue becoming fully established from causing a large
682        // build up of scheduled tasks to clear the same consumers over and over.
683        if (consumers.isEmpty()) {
684            return;
685        }
686
687        if (clearInProgress.compareAndSet(false, true)) {
688            for (final ActiveMQMessageConsumer consumer : consumers) {
689                consumer.inProgressClearRequired();
690                transportInterruptionProcessingComplete.incrementAndGet();
691                try {
692                    connection.getScheduler().executeAfterDelay(new Runnable() {
693                        @Override
694                        public void run() {
695                            consumer.clearMessagesInProgress();
696                        }}, 0l);
697                } catch (JMSException e) {
698                    connection.onClientInternalException(e);
699                }
700            }
701
702            try {
703                connection.getScheduler().executeAfterDelay(new Runnable() {
704                    @Override
705                    public void run() {
706                        clearInProgress.set(false);
707                    }}, 0l);
708            } catch (JMSException e) {
709                connection.onClientInternalException(e);
710            }
711        }
712    }
713
714    void deliverAcks() {
715        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
716            ActiveMQMessageConsumer consumer = iter.next();
717            consumer.deliverAcks();
718        }
719    }
720
721    public synchronized void dispose() throws JMSException {
722        if (!closed) {
723
724            try {
725                executor.close();
726
727                for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
728                    ActiveMQMessageConsumer consumer = iter.next();
729                    consumer.setFailureError(connection.getFirstFailureError());
730                    consumer.dispose();
731                    lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
732                }
733                consumers.clear();
734
735                for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
736                    ActiveMQMessageProducer producer = iter.next();
737                    producer.dispose();
738                }
739                producers.clear();
740
741                try {
742                    if (getTransactionContext().isInLocalTransaction()) {
743                        rollback();
744                    }
745                } catch (JMSException e) {
746                }
747
748            } finally {
749                connection.removeSession(this);
750                closed = true;
751                this.transactionContext = null;
752            }
753        }
754    }
755
756    /**
757     * Checks that the session is not closed then configures the message
758     */
759    protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
760        checkClosed();
761        message.setConnection(connection);
762    }
763
764    /**
765     * Check if the session is closed. It is used for ensuring that the session
766     * is open before performing various operations.
767     *
768     * @throws IllegalStateException if the Session is closed
769     */
770    protected void checkClosed() throws IllegalStateException {
771        if (closed) {
772            throw new IllegalStateException("The Session is closed");
773        }
774    }
775
776    /**
777     * Checks if the session is closed.
778     *
779     * @return true if the session is closed, false otherwise.
780     */
781    public boolean isClosed() {
782        return closed;
783    }
784
785    /**
786     * Stops message delivery in this session, and restarts message delivery
787     * with the oldest unacknowledged message.
788     * <P>
789     * All consumers deliver messages in a serial order. Acknowledging a
790     * received message automatically acknowledges all messages that have been
791     * delivered to the client.
792     * <P>
793     * Restarting a session causes it to take the following actions:
794     * <UL>
795     * <LI>Stop message delivery
796     * <LI>Mark all messages that might have been delivered but not
797     * acknowledged as "redelivered"
798     * <LI>Restart the delivery sequence including all unacknowledged messages
799     * that had been previously delivered. Redelivered messages do not have to
800     * be delivered in exactly their original delivery order.
801     * </UL>
802     *
803     * @throws JMSException if the JMS provider fails to stop and restart
804     *                 message delivery due to some internal error.
805     * @throws IllegalStateException if the method is called by a transacted
806     *                 session.
807     */
808    @Override
809    public void recover() throws JMSException {
810
811        checkClosed();
812        if (getTransacted()) {
813            throw new IllegalStateException("This session is transacted");
814        }
815
816        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
817            ActiveMQMessageConsumer c = iter.next();
818            c.rollback();
819        }
820
821    }
822
823    /**
824     * Returns the session's distinguished message listener (optional).
825     *
826     * @return the message listener associated with this session
827     * @throws JMSException if the JMS provider fails to get the message
828     *                 listener due to an internal error.
829     * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
830     * @see javax.jms.ServerSessionPool
831     * @see javax.jms.ServerSession
832     */
833    @Override
834    public MessageListener getMessageListener() throws JMSException {
835        checkClosed();
836        return this.messageListener;
837    }
838
839    /**
840     * Sets the session's distinguished message listener (optional).
841     * <P>
842     * When the distinguished message listener is set, no other form of message
843     * receipt in the session can be used; however, all forms of sending
844     * messages are still supported.
845     * <P>
846     * If this session has been closed, then an {@link IllegalStateException} is
847     * thrown, if trying to set a new listener. However setting the listener
848     * to <tt>null</tt> is allowed, to clear the listener, even if this session
849     * has been closed prior.
850     * <P>
851     * This is an expert facility not used by regular JMS clients.
852     *
853     * @param listener the message listener to associate with this session
854     * @throws JMSException if the JMS provider fails to set the message
855     *                 listener due to an internal error.
856     * @see javax.jms.Session#getMessageListener()
857     * @see javax.jms.ServerSessionPool
858     * @see javax.jms.ServerSession
859     */
860    @Override
861    public void setMessageListener(MessageListener listener) throws JMSException {
862        // only check for closed if we set a new listener, as we allow to clear
863        // the listener, such as when an application is shutting down, and is
864        // no longer using a message listener on this session
865        if (listener != null) {
866            checkClosed();
867        }
868        this.messageListener = listener;
869
870        if (listener != null) {
871            executor.setDispatchedBySessionPool(true);
872        }
873    }
874
875    /**
876     * Optional operation, intended to be used only by Application Servers, not
877     * by ordinary JMS clients.
878     *
879     * @see javax.jms.ServerSession
880     */
881    @Override
882    public void run() {
883        MessageDispatch messageDispatch;
884        while ((messageDispatch = executor.dequeueNoWait()) != null) {
885            final MessageDispatch md = messageDispatch;
886
887            // subset of org.apache.activemq.ActiveMQMessageConsumer.createActiveMQMessage
888            final ActiveMQMessage message = (ActiveMQMessage)md.getMessage().copy();
889            if (message.getDataStructureType()==CommandTypes.ACTIVEMQ_BLOB_MESSAGE) {
890                ((ActiveMQBlobMessage)message).setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
891            }
892            if (message.getDataStructureType() == CommandTypes.ACTIVEMQ_OBJECT_MESSAGE) {
893                ((ActiveMQObjectMessage)message).setTrustAllPackages(getConnection().isTrustAllPackages());
894                ((ActiveMQObjectMessage)message).setTrustedPackages(getConnection().getTrustedPackages());
895            }
896
897            MessageAck earlyAck = null;
898            if (message.isExpired()) {
899                earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
900                earlyAck.setFirstMessageId(message.getMessageId());
901            } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
902                LOG.debug("{} got duplicate: {}", this, message.getMessageId());
903                earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
904                earlyAck.setFirstMessageId(md.getMessage().getMessageId());
905                earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
906            }
907            if (earlyAck != null) {
908                try {
909                    asyncSendPacket(earlyAck);
910                } catch (Throwable t) {
911                    LOG.error("error dispatching ack: {} ", earlyAck, t);
912                    connection.onClientInternalException(t);
913                } finally {
914                    continue;
915                }
916            }
917
918            if (isClientAcknowledge()||isIndividualAcknowledge()) {
919                message.setAcknowledgeCallback(new Callback() {
920                    @Override
921                    public void execute() throws Exception {
922                    }
923                });
924            }
925
926            if (deliveryListener != null) {
927                deliveryListener.beforeDelivery(this, message);
928            }
929
930            md.setDeliverySequenceId(getNextDeliveryId());
931            lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
932
933            final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
934
935            final AtomicBoolean afterDeliveryError = new AtomicBoolean(false);
936            /*
937            * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
938            * We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
939            * */
940            synchronized (redeliveryGuard) {
941                try {
942                    ack.setFirstMessageId(md.getMessage().getMessageId());
943                    doStartTransaction();
944                    ack.setTransactionId(getTransactionContext().getTransactionId());
945                    if (ack.getTransactionId() != null) {
946                        getTransactionContext().addSynchronization(new Synchronization() {
947
948                            final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
949
950                            @Override
951                            public void beforeEnd() throws Exception {
952                                // validate our consumer so we don't push stale acks that get ignored
953                                if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
954                                    LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
955                                    throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
956                                }
957                                LOG.trace("beforeEnd ack {}", ack);
958                                sendAck(ack);
959                            }
960
961                            @Override
962                            public void afterRollback() throws Exception {
963                                if (LOG.isTraceEnabled()) {
964                                    LOG.trace("afterRollback {}", ack, new Throwable("here"));
965                                }
966                                // ensure we don't filter this as a duplicate
967                                connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
968
969                                // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
970                                if (clearRequestsCounter.get() > clearRequestCount) {
971                                    LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
972                                    return;
973                                }
974
975                                // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
976                                if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
977                                    LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
978                                    return;
979                                }
980
981                                RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
982                                int redeliveryCounter = md.getMessage().getRedeliveryCounter();
983                                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
984                                        && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
985                                    // We need to NACK the messages so that they get
986                                    // sent to the
987                                    // DLQ.
988                                    // Acknowledge the last message.
989                                    MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
990                                    ack.setFirstMessageId(md.getMessage().getMessageId());
991                                    ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
992                                    LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack);
993                                    asyncSendPacket(ack);
994
995                                } else {
996
997                                    MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
998                                    ack.setFirstMessageId(md.getMessage().getMessageId());
999                                    asyncSendPacket(ack);
1000
1001                                    // Figure out how long we should wait to resend
1002                                    // this message.
1003                                    long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
1004                                    for (int i = 0; i < redeliveryCounter; i++) {
1005                                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
1006                                    }
1007
1008                                    /*
1009                                    * If we are a non blocking delivery then we need to stop the executor to avoid more
1010                                    * messages being delivered, once the message is redelivered we can restart it.
1011                                    * */
1012                                    if (!connection.isNonBlockingRedelivery()) {
1013                                        LOG.debug("Blocking session until re-delivery...");
1014                                        executor.stop();
1015                                    }
1016
1017                                    connection.getScheduler().executeAfterDelay(new Runnable() {
1018
1019                                        @Override
1020                                        public void run() {
1021                                            /*
1022                                            * wait for the first delivery to be complete, i.e. after delivery has been called.
1023                                            * */
1024                                            synchronized (redeliveryGuard) {
1025                                                /*
1026                                                * If its non blocking then we can just dispatch in a new session.
1027                                                * */
1028                                                if (connection.isNonBlockingRedelivery()) {
1029                                                    ((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
1030                                                } else {
1031                                                    /*
1032                                                    * If there has been an error thrown during afterDelivery then the
1033                                                    * endpoint will be marked as dead so redelivery will fail (and eventually
1034                                                    * the session marked as stale), in this case we can only call dispatch
1035                                                    * which will create a new session with a new endpoint.
1036                                                    * */
1037                                                    if (afterDeliveryError.get()) {
1038                                                        ((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
1039                                                    } else {
1040                                                        executor.executeFirst(md);
1041                                                        executor.start();
1042                                                    }
1043                                                }
1044                                            }
1045                                        }
1046                                    }, redeliveryDelay);
1047                                }
1048                                md.getMessage().onMessageRolledBack();
1049                            }
1050                        });
1051                    }
1052
1053                    LOG.trace("{} onMessage({})", this, message.getMessageId());
1054                    messageListener.onMessage(message);
1055
1056                } catch (Throwable e) {
1057                    if (!isClosed()) {
1058                        LOG.error("{} error dispatching message: {} ", this, message.getMessageId(), e);
1059                    }
1060
1061                    if (getTransactionContext() != null && getTransactionContext().isInXATransaction()) {
1062                        LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext());
1063                        getTransactionContext().setRollbackOnly(true);
1064                    }
1065
1066                    // A problem while invoking the MessageListener does not
1067                    // in general indicate a problem with the connection to the broker, i.e.
1068                    // it will usually be sufficient to let the afterDelivery() method either
1069                    // commit or roll back in order to deal with the exception.
1070                    // However, we notify any registered client internal exception listener
1071                    // of the problem.
1072                    connection.onClientInternalException(e);
1073                } finally {
1074                    if (ack.getTransactionId() == null) {
1075                        try {
1076                            asyncSendPacket(ack);
1077                        } catch (Throwable e) {
1078                            connection.onClientInternalException(e);
1079                        }
1080                    }
1081                }
1082
1083                if (deliveryListener != null) {
1084                    try {
1085                        deliveryListener.afterDelivery(this, message);
1086                    } catch (Throwable t) {
1087                        LOG.debug("Unable to call after delivery", t);
1088                        afterDeliveryError.set(true);
1089                        throw new RuntimeException(t);
1090                    }
1091                }
1092            }
1093            /*
1094            * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway.
1095            * It also needs to be outside the redelivery guard.
1096            * */
1097            try {
1098                executor.waitForQueueRestart();
1099            } catch (InterruptedException ex) {
1100                connection.onClientInternalException(ex);
1101            }
1102        }
1103    }
1104
1105    /**
1106     * Creates a <CODE>MessageProducer</CODE> to send messages to the
1107     * specified destination.
1108     * <P>
1109     * A client uses a <CODE>MessageProducer</CODE> object to send messages to
1110     * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
1111     * inherit from <CODE>Destination</CODE>, they can be used in the
1112     * destination parameter to create a <CODE>MessageProducer</CODE> object.
1113     *
1114     * @param destination the <CODE>Destination</CODE> to send to, or null if
1115     *                this is a producer which does not have a specified
1116     *                destination.
1117     * @return the MessageProducer
1118     * @throws JMSException if the session fails to create a MessageProducer due
1119     *                 to some internal error.
1120     * @throws InvalidDestinationException if an invalid destination is
1121     *                 specified.
1122     * @since 1.1
1123     */
1124    @Override
1125    public MessageProducer createProducer(Destination destination) throws JMSException {
1126        checkClosed();
1127        if (destination instanceof CustomDestination) {
1128            CustomDestination customDestination = (CustomDestination)destination;
1129            return customDestination.createProducer(this);
1130        }
1131        int timeSendOut = connection.getSendTimeout();
1132        return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
1133    }
1134
1135    /**
1136     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1137     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1138     * <CODE>Destination</CODE>, they can be used in the destination
1139     * parameter to create a <CODE>MessageConsumer</CODE>.
1140     *
1141     * @param destination the <CODE>Destination</CODE> to access.
1142     * @return the MessageConsumer
1143     * @throws JMSException if the session fails to create a consumer due to
1144     *                 some internal error.
1145     * @throws InvalidDestinationException if an invalid destination is
1146     *                 specified.
1147     * @since 1.1
1148     */
1149    @Override
1150    public MessageConsumer createConsumer(Destination destination) throws JMSException {
1151        return createConsumer(destination, (String) null);
1152    }
1153
1154    /**
1155     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1156     * using a message selector. Since <CODE> Queue</CODE> and
1157     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1158     * can be used in the destination parameter to create a
1159     * <CODE>MessageConsumer</CODE>.
1160     * <P>
1161     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1162     * that have been sent to a destination.
1163     *
1164     * @param destination the <CODE>Destination</CODE> to access
1165     * @param messageSelector only messages with properties matching the message
1166     *                selector expression are delivered. A value of null or an
1167     *                empty string indicates that there is no message selector
1168     *                for the message consumer.
1169     * @return the MessageConsumer
1170     * @throws JMSException if the session fails to create a MessageConsumer due
1171     *                 to some internal error.
1172     * @throws InvalidDestinationException if an invalid destination is
1173     *                 specified.
1174     * @throws InvalidSelectorException if the message selector is invalid.
1175     * @since 1.1
1176     */
1177    @Override
1178    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
1179        return createConsumer(destination, messageSelector, false);
1180    }
1181
1182    /**
1183     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1184     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1185     * <CODE>Destination</CODE>, they can be used in the destination
1186     * parameter to create a <CODE>MessageConsumer</CODE>.
1187     *
1188     * @param destination the <CODE>Destination</CODE> to access.
1189     * @param messageListener the listener to use for async consumption of messages
1190     * @return the MessageConsumer
1191     * @throws JMSException if the session fails to create a consumer due to
1192     *                 some internal error.
1193     * @throws InvalidDestinationException if an invalid destination is
1194     *                 specified.
1195     * @since 1.1
1196     */
1197    public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
1198        return createConsumer(destination, null, messageListener);
1199    }
1200
1201    /**
1202     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1203     * using a message selector. Since <CODE> Queue</CODE> and
1204     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1205     * can be used in the destination parameter to create a
1206     * <CODE>MessageConsumer</CODE>.
1207     * <P>
1208     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1209     * that have been sent to a destination.
1210     *
1211     * @param destination the <CODE>Destination</CODE> to access
1212     * @param messageSelector only messages with properties matching the message
1213     *                selector expression are delivered. A value of null or an
1214     *                empty string indicates that there is no message selector
1215     *                for the message consumer.
1216     * @param messageListener the listener to use for async consumption of messages
1217     * @return the MessageConsumer
1218     * @throws JMSException if the session fails to create a MessageConsumer due
1219     *                 to some internal error.
1220     * @throws InvalidDestinationException if an invalid destination is
1221     *                 specified.
1222     * @throws InvalidSelectorException if the message selector is invalid.
1223     * @since 1.1
1224     */
1225    public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1226        return createConsumer(destination, messageSelector, false, messageListener);
1227    }
1228
1229    /**
1230     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1231     * using a message selector. This method can specify whether messages
1232     * published by its own connection should be delivered to it, if the
1233     * destination is a topic.
1234     * <P>
1235     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1236     * <CODE>Destination</CODE>, they can be used in the destination
1237     * parameter to create a <CODE>MessageConsumer</CODE>.
1238     * <P>
1239     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1240     * that have been published to a destination.
1241     * <P>
1242     * In some cases, a connection may both publish and subscribe to a topic.
1243     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1244     * inhibit the delivery of messages published by its own connection. The
1245     * default value for this attribute is False. The <CODE>noLocal</CODE>
1246     * value must be supported by destinations that are topics.
1247     *
1248     * @param destination the <CODE>Destination</CODE> to access
1249     * @param messageSelector only messages with properties matching the message
1250     *                selector expression are delivered. A value of null or an
1251     *                empty string indicates that there is no message selector
1252     *                for the message consumer.
1253     * @param noLocal - if true, and the destination is a topic, inhibits the
1254     *                delivery of messages published by its own connection. The
1255     *                behavior for <CODE>NoLocal</CODE> is not specified if
1256     *                the destination is a queue.
1257     * @return the MessageConsumer
1258     * @throws JMSException if the session fails to create a MessageConsumer due
1259     *                 to some internal error.
1260     * @throws InvalidDestinationException if an invalid destination is
1261     *                 specified.
1262     * @throws InvalidSelectorException if the message selector is invalid.
1263     * @since 1.1
1264     */
1265    @Override
1266    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1267        return createConsumer(destination, messageSelector, noLocal, null);
1268    }
1269
1270    /**
1271     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1272     * using a message selector. This method can specify whether messages
1273     * published by its own connection should be delivered to it, if the
1274     * destination is a topic.
1275     * <P>
1276     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1277     * <CODE>Destination</CODE>, they can be used in the destination
1278     * parameter to create a <CODE>MessageConsumer</CODE>.
1279     * <P>
1280     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1281     * that have been published to a destination.
1282     * <P>
1283     * In some cases, a connection may both publish and subscribe to a topic.
1284     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1285     * inhibit the delivery of messages published by its own connection. The
1286     * default value for this attribute is False. The <CODE>noLocal</CODE>
1287     * value must be supported by destinations that are topics.
1288     *
1289     * @param destination the <CODE>Destination</CODE> to access
1290     * @param messageSelector only messages with properties matching the message
1291     *                selector expression are delivered. A value of null or an
1292     *                empty string indicates that there is no message selector
1293     *                for the message consumer.
1294     * @param noLocal - if true, and the destination is a topic, inhibits the
1295     *                delivery of messages published by its own connection. The
1296     *                behavior for <CODE>NoLocal</CODE> is not specified if
1297     *                the destination is a queue.
1298     * @param messageListener the listener to use for async consumption of messages
1299     * @return the MessageConsumer
1300     * @throws JMSException if the session fails to create a MessageConsumer due
1301     *                 to some internal error.
1302     * @throws InvalidDestinationException if an invalid destination is
1303     *                 specified.
1304     * @throws InvalidSelectorException if the message selector is invalid.
1305     * @since 1.1
1306     */
1307    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1308        checkClosed();
1309
1310        if (destination instanceof CustomDestination) {
1311            CustomDestination customDestination = (CustomDestination)destination;
1312            return customDestination.createConsumer(this, messageSelector, noLocal);
1313        }
1314
1315        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1316        int prefetch = 0;
1317        if (destination instanceof Topic) {
1318            prefetch = prefetchPolicy.getTopicPrefetch();
1319        } else {
1320            prefetch = prefetchPolicy.getQueuePrefetch();
1321        }
1322        ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1323        return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1324                prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1325    }
1326
1327    /**
1328     * Creates a queue identity given a <CODE>Queue</CODE> name.
1329     * <P>
1330     * This facility is provided for the rare cases where clients need to
1331     * dynamically manipulate queue identity. It allows the creation of a queue
1332     * identity with a provider-specific name. Clients that depend on this
1333     * ability are not portable.
1334     * <P>
1335     * Note that this method is not for creating the physical queue. The
1336     * physical creation of queues is an administrative task and is not to be
1337     * initiated by the JMS API. The one exception is the creation of temporary
1338     * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1339     * method.
1340     *
1341     * @param queueName the name of this <CODE>Queue</CODE>
1342     * @return a <CODE>Queue</CODE> with the given name
1343     * @throws JMSException if the session fails to create a queue due to some
1344     *                 internal error.
1345     * @since 1.1
1346     */
1347    @Override
1348    public Queue createQueue(String queueName) throws JMSException {
1349        checkClosed();
1350        if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1351            return new ActiveMQTempQueue(queueName);
1352        }
1353        return new ActiveMQQueue(queueName);
1354    }
1355
1356    /**
1357     * Creates a topic identity given a <CODE>Topic</CODE> name.
1358     * <P>
1359     * This facility is provided for the rare cases where clients need to
1360     * dynamically manipulate topic identity. This allows the creation of a
1361     * topic identity with a provider-specific name. Clients that depend on this
1362     * ability are not portable.
1363     * <P>
1364     * Note that this method is not for creating the physical topic. The
1365     * physical creation of topics is an administrative task and is not to be
1366     * initiated by the JMS API. The one exception is the creation of temporary
1367     * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1368     * method.
1369     *
1370     * @param topicName the name of this <CODE>Topic</CODE>
1371     * @return a <CODE>Topic</CODE> with the given name
1372     * @throws JMSException if the session fails to create a topic due to some
1373     *                 internal error.
1374     * @since 1.1
1375     */
1376    @Override
1377    public Topic createTopic(String topicName) throws JMSException {
1378        checkClosed();
1379        if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1380            return new ActiveMQTempTopic(topicName);
1381        }
1382        return new ActiveMQTopic(topicName);
1383    }
1384
1385    /**
1386     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1387     * the specified queue.
1388     *
1389     * @param queue the <CODE>queue</CODE> to access
1390     * @exception InvalidDestinationException if an invalid destination is
1391     *                    specified
1392     * @since 1.1
1393     */
1394    /**
1395     * Creates a durable subscriber to the specified topic.
1396     * <P>
1397     * If a client needs to receive all the messages published on a topic,
1398     * including the ones published while the subscriber is inactive, it uses a
1399     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1400     * record of this durable subscription and insures that all messages from
1401     * the topic's publishers are retained until they are acknowledged by this
1402     * durable subscriber or they have expired.
1403     * <P>
1404     * Sessions with durable subscribers must always provide the same client
1405     * identifier. In addition, each client must specify a name that uniquely
1406     * identifies (within client identifier) each durable subscription it
1407     * creates. Only one session at a time can have a
1408     * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1409     * <P>
1410     * A client can change an existing durable subscription by creating a
1411     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1412     * and/or message selector. Changing a durable subscriber is equivalent to
1413     * unsubscribing (deleting) the old one and creating a new one.
1414     * <P>
1415     * In some cases, a connection may both publish and subscribe to a topic.
1416     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1417     * inhibit the delivery of messages published by its own connection. The
1418     * default value for this attribute is false.
1419     *
1420     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1421     * @param name the name used to identify this subscription
1422     * @return the TopicSubscriber
1423     * @throws JMSException if the session fails to create a subscriber due to
1424     *                 some internal error.
1425     * @throws InvalidDestinationException if an invalid topic is specified.
1426     * @since 1.1
1427     */
1428    @Override
1429    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1430        checkClosed();
1431        return createDurableSubscriber(topic, name, null, false);
1432    }
1433
1434    /**
1435     * Creates a durable subscriber to the specified topic, using a message
1436     * selector and specifying whether messages published by its own connection
1437     * should be delivered to it.
1438     * <P>
1439     * If a client needs to receive all the messages published on a topic,
1440     * including the ones published while the subscriber is inactive, it uses a
1441     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1442     * record of this durable subscription and insures that all messages from
1443     * the topic's publishers are retained until they are acknowledged by this
1444     * durable subscriber or they have expired.
1445     * <P>
1446     * Sessions with durable subscribers must always provide the same client
1447     * identifier. In addition, each client must specify a name which uniquely
1448     * identifies (within client identifier) each durable subscription it
1449     * creates. Only one session at a time can have a
1450     * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1451     * inactive durable subscriber is one that exists but does not currently
1452     * have a message consumer associated with it.
1453     * <P>
1454     * A client can change an existing durable subscription by creating a
1455     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1456     * and/or message selector. Changing a durable subscriber is equivalent to
1457     * unsubscribing (deleting) the old one and creating a new one.
1458     *
1459     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1460     * @param name the name used to identify this subscription
1461     * @param messageSelector only messages with properties matching the message
1462     *                selector expression are delivered. A value of null or an
1463     *                empty string indicates that there is no message selector
1464     *                for the message consumer.
1465     * @param noLocal if set, inhibits the delivery of messages published by its
1466     *                own connection
1467     * @return the Queue Browser
1468     * @throws JMSException if the session fails to create a subscriber due to
1469     *                 some internal error.
1470     * @throws InvalidDestinationException if an invalid topic is specified.
1471     * @throws InvalidSelectorException if the message selector is invalid.
1472     * @since 1.1
1473     */
1474    @Override
1475    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1476        checkClosed();
1477
1478        if (topic == null) {
1479            throw new InvalidDestinationException("Topic cannot be null");
1480        }
1481
1482        if (topic instanceof CustomDestination) {
1483            CustomDestination customDestination = (CustomDestination)topic;
1484            return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1485        }
1486
1487        connection.checkClientIDWasManuallySpecified();
1488        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1489        int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1490        int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1491        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1492                                           noLocal, false, asyncDispatch);
1493    }
1494
1495    /**
1496     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1497     * the specified queue.
1498     *
1499     * @param queue the <CODE>queue</CODE> to access
1500     * @return the Queue Browser
1501     * @throws JMSException if the session fails to create a browser due to some
1502     *                 internal error.
1503     * @throws InvalidDestinationException if an invalid destination is
1504     *                 specified
1505     * @since 1.1
1506     */
1507    @Override
1508    public QueueBrowser createBrowser(Queue queue) throws JMSException {
1509        checkClosed();
1510        return createBrowser(queue, null);
1511    }
1512
1513    /**
1514     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1515     * the specified queue using a message selector.
1516     *
1517     * @param queue the <CODE>queue</CODE> to access
1518     * @param messageSelector only messages with properties matching the message
1519     *                selector expression are delivered. A value of null or an
1520     *                empty string indicates that there is no message selector
1521     *                for the message consumer.
1522     * @return the Queue Browser
1523     * @throws JMSException if the session fails to create a browser due to some
1524     *                 internal error.
1525     * @throws InvalidDestinationException if an invalid destination is
1526     *                 specified
1527     * @throws InvalidSelectorException if the message selector is invalid.
1528     * @since 1.1
1529     */
1530    @Override
1531    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1532        checkClosed();
1533        return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1534    }
1535
1536    /**
1537     * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1538     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1539     *
1540     * @return a temporary queue identity
1541     * @throws JMSException if the session fails to create a temporary queue due
1542     *                 to some internal error.
1543     * @since 1.1
1544     */
1545    @Override
1546    public TemporaryQueue createTemporaryQueue() throws JMSException {
1547        checkClosed();
1548        return (TemporaryQueue)connection.createTempDestination(false);
1549    }
1550
1551    /**
1552     * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1553     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1554     *
1555     * @return a temporary topic identity
1556     * @throws JMSException if the session fails to create a temporary topic due
1557     *                 to some internal error.
1558     * @since 1.1
1559     */
1560    @Override
1561    public TemporaryTopic createTemporaryTopic() throws JMSException {
1562        checkClosed();
1563        return (TemporaryTopic)connection.createTempDestination(true);
1564    }
1565
1566    /**
1567     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1568     * the specified queue.
1569     *
1570     * @param queue the <CODE>Queue</CODE> to access
1571     * @return
1572     * @throws JMSException if the session fails to create a receiver due to
1573     *                 some internal error.
1574     * @throws JMSException
1575     * @throws InvalidDestinationException if an invalid queue is specified.
1576     */
1577    @Override
1578    public QueueReceiver createReceiver(Queue queue) throws JMSException {
1579        checkClosed();
1580        return createReceiver(queue, null);
1581    }
1582
1583    /**
1584     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1585     * the specified queue using a message selector.
1586     *
1587     * @param queue the <CODE>Queue</CODE> to access
1588     * @param messageSelector only messages with properties matching the message
1589     *                selector expression are delivered. A value of null or an
1590     *                empty string indicates that there is no message selector
1591     *                for the message consumer.
1592     * @return QueueReceiver
1593     * @throws JMSException if the session fails to create a receiver due to
1594     *                 some internal error.
1595     * @throws InvalidDestinationException if an invalid queue is specified.
1596     * @throws InvalidSelectorException if the message selector is invalid.
1597     */
1598    @Override
1599    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1600        checkClosed();
1601
1602        if (queue instanceof CustomDestination) {
1603            CustomDestination customDestination = (CustomDestination)queue;
1604            return customDestination.createReceiver(this, messageSelector);
1605        }
1606
1607        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1608        return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1609                                         prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1610    }
1611
1612    /**
1613     * Creates a <CODE>QueueSender</CODE> object to send messages to the
1614     * specified queue.
1615     *
1616     * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1617     *                unidentified producer
1618     * @return QueueSender
1619     * @throws JMSException if the session fails to create a sender due to some
1620     *                 internal error.
1621     * @throws InvalidDestinationException if an invalid queue is specified.
1622     */
1623    @Override
1624    public QueueSender createSender(Queue queue) throws JMSException {
1625        checkClosed();
1626        if (queue instanceof CustomDestination) {
1627            CustomDestination customDestination = (CustomDestination)queue;
1628            return customDestination.createSender(this);
1629        }
1630        int timeSendOut = connection.getSendTimeout();
1631        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1632    }
1633
1634    /**
1635     * Creates a nondurable subscriber to the specified topic. <p/>
1636     * <P>
1637     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1638     * that have been published to a topic. <p/>
1639     * <P>
1640     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1641     * receive only messages that are published while they are active. <p/>
1642     * <P>
1643     * In some cases, a connection may both publish and subscribe to a topic.
1644     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1645     * inhibit the delivery of messages published by its own connection. The
1646     * default value for this attribute is false.
1647     *
1648     * @param topic the <CODE>Topic</CODE> to subscribe to
1649     * @return TopicSubscriber
1650     * @throws JMSException if the session fails to create a subscriber due to
1651     *                 some internal error.
1652     * @throws InvalidDestinationException if an invalid topic is specified.
1653     */
1654    @Override
1655    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1656        checkClosed();
1657        return createSubscriber(topic, null, false);
1658    }
1659
1660    /**
1661     * Creates a nondurable subscriber to the specified topic, using a message
1662     * selector or specifying whether messages published by its own connection
1663     * should be delivered to it. <p/>
1664     * <P>
1665     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1666     * that have been published to a topic. <p/>
1667     * <P>
1668     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1669     * receive only messages that are published while they are active. <p/>
1670     * <P>
1671     * Messages filtered out by a subscriber's message selector will never be
1672     * delivered to the subscriber. From the subscriber's perspective, they do
1673     * not exist. <p/>
1674     * <P>
1675     * In some cases, a connection may both publish and subscribe to a topic.
1676     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1677     * inhibit the delivery of messages published by its own connection. The
1678     * default value for this attribute is false.
1679     *
1680     * @param topic the <CODE>Topic</CODE> to subscribe to
1681     * @param messageSelector only messages with properties matching the message
1682     *                selector expression are delivered. A value of null or an
1683     *                empty string indicates that there is no message selector
1684     *                for the message consumer.
1685     * @param noLocal if set, inhibits the delivery of messages published by its
1686     *                own connection
1687     * @return TopicSubscriber
1688     * @throws JMSException if the session fails to create a subscriber due to
1689     *                 some internal error.
1690     * @throws InvalidDestinationException if an invalid topic is specified.
1691     * @throws InvalidSelectorException if the message selector is invalid.
1692     */
1693    @Override
1694    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1695        checkClosed();
1696
1697        if (topic instanceof CustomDestination) {
1698            CustomDestination customDestination = (CustomDestination)topic;
1699            return customDestination.createSubscriber(this, messageSelector, noLocal);
1700        }
1701
1702        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1703        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1704            .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1705    }
1706
1707    /**
1708     * Creates a publisher for the specified topic. <p/>
1709     * <P>
1710     * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1711     * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1712     * a topic, it defines a new sequence of messages that have no ordering
1713     * relationship with the messages it has previously sent.
1714     *
1715     * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1716     *                an unidentified producer
1717     * @return TopicPublisher
1718     * @throws JMSException if the session fails to create a publisher due to
1719     *                 some internal error.
1720     * @throws InvalidDestinationException if an invalid topic is specified.
1721     */
1722    @Override
1723    public TopicPublisher createPublisher(Topic topic) throws JMSException {
1724        checkClosed();
1725
1726        if (topic instanceof CustomDestination) {
1727            CustomDestination customDestination = (CustomDestination)topic;
1728            return customDestination.createPublisher(this);
1729        }
1730        int timeSendOut = connection.getSendTimeout();
1731        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1732    }
1733
1734    /**
1735     * Unsubscribes a durable subscription that has been created by a client.
1736     * <P>
1737     * This method deletes the state being maintained on behalf of the
1738     * subscriber by its provider.
1739     * <P>
1740     * It is erroneous for a client to delete a durable subscription while there
1741     * is an active <CODE>MessageConsumer </CODE> or
1742     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1743     * message is part of a pending transaction or has not been acknowledged in
1744     * the session.
1745     *
1746     * @param name the name used to identify this subscription
1747     * @throws JMSException if the session fails to unsubscribe to the durable
1748     *                 subscription due to some internal error.
1749     * @throws InvalidDestinationException if an invalid subscription name is
1750     *                 specified.
1751     * @since 1.1
1752     */
1753    @Override
1754    public void unsubscribe(String name) throws JMSException {
1755        checkClosed();
1756        connection.unsubscribe(name);
1757    }
1758
1759    @Override
1760    public void dispatch(MessageDispatch messageDispatch) {
1761        try {
1762            executor.execute(messageDispatch);
1763        } catch (InterruptedException e) {
1764            Thread.currentThread().interrupt();
1765            connection.onClientInternalException(e);
1766        }
1767    }
1768
1769    /**
1770     * Acknowledges all consumed messages of the session of this consumed
1771     * message.
1772     * <P>
1773     * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1774     * for use when a client has specified that its JMS session's consumed
1775     * messages are to be explicitly acknowledged. By invoking
1776     * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1777     * all messages consumed by the session that the message was delivered to.
1778     * <P>
1779     * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1780     * sessions and sessions specified to use implicit acknowledgement modes.
1781     * <P>
1782     * A client may individually acknowledge each message as it is consumed, or
1783     * it may choose to acknowledge messages as an application-defined group
1784     * (which is done by calling acknowledge on the last received message of the
1785     * group, thereby acknowledging all messages consumed by the session.)
1786     * <P>
1787     * Messages that have been received but not acknowledged may be redelivered.
1788     *
1789     * @throws JMSException if the JMS provider fails to acknowledge the
1790     *                 messages due to some internal error.
1791     * @throws javax.jms.IllegalStateException if this method is called on a
1792     *                 closed session.
1793     * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1794     */
1795    public void acknowledge() throws JMSException {
1796        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1797            ActiveMQMessageConsumer c = iter.next();
1798            c.acknowledge();
1799        }
1800    }
1801
1802    /**
1803     * Add a message consumer.
1804     *
1805     * @param consumer - message consumer.
1806     * @throws JMSException
1807     */
1808    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1809        this.consumers.add(consumer);
1810        if (consumer.isDurableSubscriber()) {
1811            stats.onCreateDurableSubscriber();
1812        }
1813        this.connection.addDispatcher(consumer.getConsumerId(), this);
1814    }
1815
1816    /**
1817     * Remove the message consumer.
1818     *
1819     * @param consumer - consumer to be removed.
1820     * @throws JMSException
1821     */
1822    protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1823        this.connection.removeDispatcher(consumer.getConsumerId());
1824        if (consumer.isDurableSubscriber()) {
1825            stats.onRemoveDurableSubscriber();
1826        }
1827        this.consumers.remove(consumer);
1828        this.connection.removeDispatcher(consumer);
1829    }
1830
1831    /**
1832     * Adds a message producer.
1833     *
1834     * @param producer - message producer to be added.
1835     * @throws JMSException
1836     */
1837    protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1838        this.producers.add(producer);
1839        this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1840    }
1841
1842    /**
1843     * Removes a message producer.
1844     *
1845     * @param producer - message producer to be removed.
1846     * @throws JMSException
1847     */
1848    protected void removeProducer(ActiveMQMessageProducer producer) {
1849        this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1850        this.producers.remove(producer);
1851    }
1852
1853    /**
1854     * Start this Session.
1855     *
1856     * @throws JMSException
1857     */
1858    protected void start() throws JMSException {
1859        started.set(true);
1860        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1861            ActiveMQMessageConsumer c = iter.next();
1862            c.start();
1863        }
1864        executor.start();
1865    }
1866
1867    /**
1868     * Stops this session.
1869     *
1870     * @throws JMSException
1871     */
1872    protected void stop() throws JMSException {
1873
1874        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1875            ActiveMQMessageConsumer c = iter.next();
1876            c.stop();
1877        }
1878
1879        started.set(false);
1880        executor.stop();
1881    }
1882
1883    /**
1884     * Returns the session id.
1885     *
1886     * @return value - session id.
1887     */
1888    protected SessionId getSessionId() {
1889        return info.getSessionId();
1890    }
1891
1892    /**
1893     * @return
1894     */
1895    protected ConsumerId getNextConsumerId() {
1896        return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1897    }
1898
1899    /**
1900     * @return
1901     */
1902    protected ProducerId getNextProducerId() {
1903        return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1904    }
1905
1906    /**
1907     * Sends the message for dispatch by the broker.
1908     *
1909     *
1910     * @param producer - message producer.
1911     * @param destination - message destination.
1912     * @param message - message to be sent.
1913     * @param deliveryMode - JMS messsage delivery mode.
1914     * @param priority - message priority.
1915     * @param timeToLive - message expiration.
1916     * @param producerWindow
1917     * @param onComplete
1918     * @throws JMSException
1919     */
1920    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1921                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
1922
1923        checkClosed();
1924        if (destination.isTemporary() && connection.isDeleted(destination)) {
1925            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1926        }
1927        synchronized (sendMutex) {
1928            // tell the Broker we are about to start a new transaction
1929            doStartTransaction();
1930            if (transactionContext.isRollbackOnly()) {
1931                throw new IllegalStateException("transaction marked rollback only");
1932            }
1933            TransactionId txid = transactionContext.getTransactionId();
1934            long sequenceNumber = producer.getMessageSequence();
1935
1936            //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1937            message.setJMSDeliveryMode(deliveryMode);
1938            long expiration = 0L;
1939            if (!producer.getDisableMessageTimestamp()) {
1940                long timeStamp = System.currentTimeMillis();
1941                message.setJMSTimestamp(timeStamp);
1942                if (timeToLive > 0) {
1943                    expiration = timeToLive + timeStamp;
1944                }
1945            }
1946            message.setJMSExpiration(expiration);
1947            message.setJMSPriority(priority);
1948            message.setJMSRedelivered(false);
1949
1950            // transform to our own message format here
1951            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1952            msg.setDestination(destination);
1953            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1954
1955            // Set the message id.
1956            if (msg != message) {
1957                message.setJMSMessageID(msg.getMessageId().toString());
1958                // Make sure the JMS destination is set on the foreign messages too.
1959                message.setJMSDestination(destination);
1960            }
1961            //clear the brokerPath in case we are re-sending this message
1962            msg.setBrokerPath(null);
1963
1964            msg.setTransactionId(txid);
1965            if (connection.isCopyMessageOnSend()) {
1966                msg = (ActiveMQMessage)msg.copy();
1967            }
1968            msg.setConnection(connection);
1969            msg.onSend();
1970            msg.setProducerId(msg.getMessageId().getProducerId());
1971            if (LOG.isTraceEnabled()) {
1972                LOG.trace(getSessionId() + " sending message: " + msg);
1973            }
1974            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1975                this.connection.asyncSendPacket(msg);
1976                if (producerWindow != null) {
1977                    // Since we defer lots of the marshaling till we hit the
1978                    // wire, this might not
1979                    // provide and accurate size. We may change over to doing
1980                    // more aggressive marshaling,
1981                    // to get more accurate sizes.. this is more important once
1982                    // users start using producer window
1983                    // flow control.
1984                    int size = msg.getSize();
1985                    producerWindow.increaseUsage(size);
1986                }
1987            } else {
1988                if (sendTimeout > 0 && onComplete==null) {
1989                    this.connection.syncSendPacket(msg,sendTimeout);
1990                }else {
1991                    this.connection.syncSendPacket(msg, onComplete);
1992                }
1993            }
1994
1995        }
1996    }
1997
1998    /**
1999     * Send TransactionInfo to indicate transaction has started
2000     *
2001     * @throws JMSException if some internal error occurs
2002     */
2003    protected void doStartTransaction() throws JMSException {
2004        if (getTransacted() && !transactionContext.isInXATransaction()) {
2005            transactionContext.begin();
2006        }
2007    }
2008
2009    /**
2010     * Checks whether the session has unconsumed messages.
2011     *
2012     * @return true - if there are unconsumed messages.
2013     */
2014    public boolean hasUncomsumedMessages() {
2015        return executor.hasUncomsumedMessages();
2016    }
2017
2018    /**
2019     * Checks whether the session uses transactions.
2020     *
2021     * @return true - if the session uses transactions.
2022     */
2023    public boolean isTransacted() {
2024        return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
2025    }
2026
2027    /**
2028     * Checks whether the session used client acknowledgment.
2029     *
2030     * @return true - if the session uses client acknowledgment.
2031     */
2032    protected boolean isClientAcknowledge() {
2033        return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
2034    }
2035
2036    /**
2037     * Checks whether the session used auto acknowledgment.
2038     *
2039     * @return true - if the session uses client acknowledgment.
2040     */
2041    public boolean isAutoAcknowledge() {
2042        return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
2043    }
2044
2045    /**
2046     * Checks whether the session used dup ok acknowledgment.
2047     *
2048     * @return true - if the session uses client acknowledgment.
2049     */
2050    public boolean isDupsOkAcknowledge() {
2051        return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
2052    }
2053
2054    public boolean isIndividualAcknowledge(){
2055        return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
2056    }
2057
2058    /**
2059     * Returns the message delivery listener.
2060     *
2061     * @return deliveryListener - message delivery listener.
2062     */
2063    public DeliveryListener getDeliveryListener() {
2064        return deliveryListener;
2065    }
2066
2067    /**
2068     * Sets the message delivery listener.
2069     *
2070     * @param deliveryListener - message delivery listener.
2071     */
2072    public void setDeliveryListener(DeliveryListener deliveryListener) {
2073        this.deliveryListener = deliveryListener;
2074    }
2075
2076    /**
2077     * Returns the SessionInfo bean.
2078     *
2079     * @return info - SessionInfo bean.
2080     * @throws JMSException
2081     */
2082    protected SessionInfo getSessionInfo() throws JMSException {
2083        SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
2084        return info;
2085    }
2086
2087    /**
2088     * Send the asynchronus command.
2089     *
2090     * @param command - command to be executed.
2091     * @throws JMSException
2092     */
2093    public void asyncSendPacket(Command command) throws JMSException {
2094        connection.asyncSendPacket(command);
2095    }
2096
2097    /**
2098     * Send the synchronus command.
2099     *
2100     * @param command - command to be executed.
2101     * @return Response
2102     * @throws JMSException
2103     */
2104    public Response syncSendPacket(Command command) throws JMSException {
2105        return connection.syncSendPacket(command);
2106    }
2107
2108    public long getNextDeliveryId() {
2109        return deliveryIdGenerator.getNextSequenceId();
2110    }
2111
2112    public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
2113
2114        List<MessageDispatch> c = unconsumedMessages.removeAll();
2115        Collections.reverse(c);
2116
2117        for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
2118            MessageDispatch md = iter.next();
2119            executor.executeFirst(md);
2120        }
2121
2122    }
2123
2124    public boolean isRunning() {
2125        return started.get();
2126    }
2127
2128    public boolean isAsyncDispatch() {
2129        return asyncDispatch;
2130    }
2131
2132    public void setAsyncDispatch(boolean asyncDispatch) {
2133        this.asyncDispatch = asyncDispatch;
2134    }
2135
2136    /**
2137     * @return Returns the sessionAsyncDispatch.
2138     */
2139    public boolean isSessionAsyncDispatch() {
2140        return sessionAsyncDispatch;
2141    }
2142
2143    /**
2144     * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
2145     */
2146    public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
2147        this.sessionAsyncDispatch = sessionAsyncDispatch;
2148    }
2149
2150    public MessageTransformer getTransformer() {
2151        return transformer;
2152    }
2153
2154    public ActiveMQConnection getConnection() {
2155        return connection;
2156    }
2157
2158    /**
2159     * Sets the transformer used to transform messages before they are sent on
2160     * to the JMS bus or when they are received from the bus but before they are
2161     * delivered to the JMS client
2162     */
2163    public void setTransformer(MessageTransformer transformer) {
2164        this.transformer = transformer;
2165    }
2166
2167    public BlobTransferPolicy getBlobTransferPolicy() {
2168        return blobTransferPolicy;
2169    }
2170
2171    /**
2172     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
2173     * OBjects) are transferred from producers to brokers to consumers
2174     */
2175    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
2176        this.blobTransferPolicy = blobTransferPolicy;
2177    }
2178
2179    public List<MessageDispatch> getUnconsumedMessages() {
2180        return executor.getUnconsumedMessages();
2181    }
2182
2183    @Override
2184    public String toString() {
2185        return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + ",closed=" + closed + "} " + sendMutex;
2186    }
2187
2188    public void checkMessageListener() throws JMSException {
2189        if (messageListener != null) {
2190            throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2191        }
2192        for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
2193            ActiveMQMessageConsumer consumer = i.next();
2194            if (consumer.hasMessageListener()) {
2195                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2196            }
2197        }
2198    }
2199
2200    protected void setOptimizeAcknowledge(boolean value) {
2201        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2202            ActiveMQMessageConsumer c = iter.next();
2203            c.setOptimizeAcknowledge(value);
2204        }
2205    }
2206
2207    protected void setPrefetchSize(ConsumerId id, int prefetch) {
2208        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2209            ActiveMQMessageConsumer c = iter.next();
2210            if (c.getConsumerId().equals(id)) {
2211                c.setPrefetchSize(prefetch);
2212                break;
2213            }
2214        }
2215    }
2216
2217    protected void close(ConsumerId id) {
2218        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2219            ActiveMQMessageConsumer c = iter.next();
2220            if (c.getConsumerId().equals(id)) {
2221                try {
2222                    c.close();
2223                } catch (JMSException e) {
2224                    LOG.warn("Exception closing consumer", e);
2225                }
2226                LOG.warn("Closed consumer on Command, " + id);
2227                break;
2228            }
2229        }
2230    }
2231
2232    public boolean isInUse(ActiveMQTempDestination destination) {
2233        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2234            ActiveMQMessageConsumer c = iter.next();
2235            if (c.isInUse(destination)) {
2236                return true;
2237            }
2238        }
2239        return false;
2240    }
2241
2242    /**
2243     * highest sequence id of the last message delivered by this session.
2244     * Passed to the broker in the close command, maintained by dispose()
2245     * @return lastDeliveredSequenceId
2246     */
2247    public long getLastDeliveredSequenceId() {
2248        return lastDeliveredSequenceId;
2249    }
2250
2251    protected void sendAck(MessageAck ack) throws JMSException {
2252        sendAck(ack,false);
2253    }
2254
2255    protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2256        if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2257            asyncSendPacket(ack);
2258        } else {
2259            syncSendPacket(ack);
2260        }
2261    }
2262
2263    protected Scheduler getScheduler() throws JMSException {
2264        return this.connection.getScheduler();
2265    }
2266
2267    protected ThreadPoolExecutor getConnectionExecutor() {
2268        return this.connectionExecutor;
2269    }
2270}