001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.File;
020import java.io.InputStream;
021import java.io.Serializable;
022import java.net.URL;
023import java.util.Collections;
024import java.util.Iterator;
025import java.util.List;
026import java.util.concurrent.CopyOnWriteArrayList;
027import java.util.concurrent.ThreadPoolExecutor;
028import java.util.concurrent.atomic.AtomicBoolean;
029import java.util.concurrent.atomic.AtomicInteger;
030
031import javax.jms.BytesMessage;
032import javax.jms.Destination;
033import javax.jms.IllegalStateException;
034import javax.jms.InvalidDestinationException;
035import javax.jms.InvalidSelectorException;
036import javax.jms.JMSException;
037import javax.jms.MapMessage;
038import javax.jms.Message;
039import javax.jms.MessageConsumer;
040import javax.jms.MessageListener;
041import javax.jms.MessageProducer;
042import javax.jms.ObjectMessage;
043import javax.jms.Queue;
044import javax.jms.QueueBrowser;
045import javax.jms.QueueReceiver;
046import javax.jms.QueueSender;
047import javax.jms.QueueSession;
048import javax.jms.Session;
049import javax.jms.StreamMessage;
050import javax.jms.TemporaryQueue;
051import javax.jms.TemporaryTopic;
052import javax.jms.TextMessage;
053import javax.jms.Topic;
054import javax.jms.TopicPublisher;
055import javax.jms.TopicSession;
056import javax.jms.TopicSubscriber;
057import javax.jms.TransactionRolledBackException;
058
059import org.apache.activemq.blob.BlobDownloader;
060import org.apache.activemq.blob.BlobTransferPolicy;
061import org.apache.activemq.blob.BlobUploader;
062import org.apache.activemq.command.ActiveMQBlobMessage;
063import org.apache.activemq.command.ActiveMQBytesMessage;
064import org.apache.activemq.command.ActiveMQDestination;
065import org.apache.activemq.command.ActiveMQMapMessage;
066import org.apache.activemq.command.ActiveMQMessage;
067import org.apache.activemq.command.ActiveMQObjectMessage;
068import org.apache.activemq.command.ActiveMQQueue;
069import org.apache.activemq.command.ActiveMQStreamMessage;
070import org.apache.activemq.command.ActiveMQTempDestination;
071import org.apache.activemq.command.ActiveMQTempQueue;
072import org.apache.activemq.command.ActiveMQTempTopic;
073import org.apache.activemq.command.ActiveMQTextMessage;
074import org.apache.activemq.command.ActiveMQTopic;
075import org.apache.activemq.command.Command;
076import org.apache.activemq.command.ConsumerId;
077import org.apache.activemq.command.MessageAck;
078import org.apache.activemq.command.MessageDispatch;
079import org.apache.activemq.command.MessageId;
080import org.apache.activemq.command.ProducerId;
081import org.apache.activemq.command.RemoveInfo;
082import org.apache.activemq.command.Response;
083import org.apache.activemq.command.SessionId;
084import org.apache.activemq.command.SessionInfo;
085import org.apache.activemq.command.TransactionId;
086import org.apache.activemq.management.JMSSessionStatsImpl;
087import org.apache.activemq.management.StatsCapable;
088import org.apache.activemq.management.StatsImpl;
089import org.apache.activemq.thread.Scheduler;
090import org.apache.activemq.transaction.Synchronization;
091import org.apache.activemq.usage.MemoryUsage;
092import org.apache.activemq.util.Callback;
093import org.apache.activemq.util.LongSequenceGenerator;
094import org.slf4j.Logger;
095import org.slf4j.LoggerFactory;
096
097/**
098 * <P>
099 * A <CODE>Session</CODE> object is a single-threaded context for producing
100 * and consuming messages. Although it may allocate provider resources outside
101 * the Java virtual machine (JVM), it is considered a lightweight JMS object.
102 * <P>
103 * A session serves several purposes:
104 * <UL>
105 * <LI>It is a factory for its message producers and consumers.
106 * <LI>It supplies provider-optimized message factories.
107 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and
108 * <CODE>TemporaryQueues</CODE>.
109 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE>
110 * objects for those clients that need to dynamically manipulate
111 * provider-specific destination names.
112 * <LI>It supports a single series of transactions that combine work spanning
113 * its producers and consumers into atomic units.
114 * <LI>It defines a serial order for the messages it consumes and the messages
115 * it produces.
116 * <LI>It retains messages it consumes until they have been acknowledged.
117 * <LI>It serializes execution of message listeners registered with its message
118 * consumers.
119 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
120 * </UL>
121 * <P>
122 * A session can create and service multiple message producers and consumers.
123 * <P>
124 * One typical use is to have a thread block on a synchronous
125 * <CODE>MessageConsumer</CODE> until a message arrives. The thread may then
126 * use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
127 * <P>
128 * If a client desires to have one thread produce messages while others consume
129 * them, the client should use a separate session for its producing thread.
130 * <P>
131 * Once a connection has been started, any session with one or more registered
132 * message listeners is dedicated to the thread of control that delivers
133 * messages to it. It is erroneous for client code to use this session or any of
134 * its constituent objects from another thread of control. The only exception to
135 * this rule is the use of the session or connection <CODE>close</CODE>
136 * method.
137 * <P>
138 * It should be easy for most clients to partition their work naturally into
139 * sessions. This model allows clients to start simply and incrementally add
140 * message processing complexity as their need for concurrency grows.
141 * <P>
142 * The <CODE>close</CODE> method is the only session method that can be called
143 * while some other session method is being executed in another thread.
144 * <P>
145 * A session may be specified as transacted. Each transacted session supports a
146 * single series of transactions. Each transaction groups a set of message sends
147 * and a set of message receives into an atomic unit of work. In effect,
148 * transactions organize a session's input message stream and output message
149 * stream into series of atomic units. When a transaction commits, its atomic
150 * unit of input is acknowledged and its associated atomic unit of output is
151 * sent. If a transaction rollback is done, the transaction's sent messages are
152 * destroyed and the session's input is automatically recovered.
153 * <P>
154 * The content of a transaction's input and output units is simply those
155 * messages that have been produced and consumed within the session's current
156 * transaction.
157 * <P>
158 * A transaction is completed using either its session's <CODE>commit</CODE>
159 * method or its session's <CODE>rollback </CODE> method. The completion of a
160 * session's current transaction automatically begins the next. The result is
161 * that a transacted session always has a current transaction within which its
162 * work is done.
163 * <P>
164 * The Java Transaction Service (JTS) or some other transaction monitor may be
165 * used to combine a session's transaction with transactions on other resources
166 * (databases, other JMS sessions, etc.). Since Java distributed transactions
167 * are controlled via the Java Transaction API (JTA), use of the session's
168 * <CODE>commit</CODE> and <CODE>rollback</CODE> methods in this context is
169 * prohibited.
170 * <P>
171 * The JMS API does not require support for JTA; however, it does define how a
172 * provider supplies this support.
173 * <P>
174 * Although it is also possible for a JMS client to handle distributed
175 * transactions directly, it is unlikely that many JMS clients will do this.
176 * Support for JTA in the JMS API is targeted at systems vendors who will be
177 * integrating the JMS API into their application server products.
178 *
179 *
180 * @see javax.jms.Session
181 * @see javax.jms.QueueSession
182 * @see javax.jms.TopicSession
183 * @see javax.jms.XASession
184 */
185public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
186
187    /**
188     * Only acknowledge an individual message - using message.acknowledge()
189     * as opposed to CLIENT_ACKNOWLEDGE which
190     * acknowledges all messages consumed by a session at when acknowledge()
191     * is called
192     */
193    public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
194    public static final int MAX_ACK_CONSTANT = INDIVIDUAL_ACKNOWLEDGE;
195
196    public static interface DeliveryListener {
197        void beforeDelivery(ActiveMQSession session, Message msg);
198
199        void afterDelivery(ActiveMQSession session, Message msg);
200    }
201
202    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
203    private final ThreadPoolExecutor connectionExecutor;
204
205    protected int acknowledgementMode;
206    protected final ActiveMQConnection connection;
207    protected final SessionInfo info;
208    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
209    protected final LongSequenceGenerator producerIdGenerator = new LongSequenceGenerator();
210    protected final LongSequenceGenerator deliveryIdGenerator = new LongSequenceGenerator();
211    protected final ActiveMQSessionExecutor executor;
212    protected final AtomicBoolean started = new AtomicBoolean(false);
213
214    protected final CopyOnWriteArrayList<ActiveMQMessageConsumer> consumers = new CopyOnWriteArrayList<ActiveMQMessageConsumer>();
215    protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
216
217    protected boolean closed;
218    private volatile boolean synchronizationRegistered;
219    protected boolean asyncDispatch;
220    protected boolean sessionAsyncDispatch;
221    protected final boolean debug;
222    protected final Object sendMutex = new Object();
223    protected final Object redeliveryGuard = new Object();
224
225    private final AtomicBoolean clearInProgress = new AtomicBoolean();
226
227    private MessageListener messageListener;
228    private final JMSSessionStatsImpl stats;
229    private TransactionContext transactionContext;
230    private DeliveryListener deliveryListener;
231    private MessageTransformer transformer;
232    private BlobTransferPolicy blobTransferPolicy;
233    private long lastDeliveredSequenceId = -2;
234
235    /**
236     * Construct the Session
237     *
238     * @param connection
239     * @param sessionId
240     * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
241     *                Session.SESSION_TRANSACTED
242     * @param asyncDispatch
243     * @param sessionAsyncDispatch
244     * @throws JMSException on internal error
245     */
246    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
247        this.debug = LOG.isDebugEnabled();
248        this.connection = connection;
249        this.acknowledgementMode = acknowledgeMode;
250        this.asyncDispatch = asyncDispatch;
251        this.sessionAsyncDispatch = sessionAsyncDispatch;
252        this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
253        setTransactionContext(new TransactionContext(connection));
254        stats = new JMSSessionStatsImpl(producers, consumers);
255        this.connection.asyncSendPacket(info);
256        setTransformer(connection.getTransformer());
257        setBlobTransferPolicy(connection.getBlobTransferPolicy());
258        this.connectionExecutor=connection.getExecutor();
259        this.executor = new ActiveMQSessionExecutor(this);
260        connection.addSession(this);
261        if (connection.isStarted()) {
262            start();
263        }
264
265    }
266
267    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch) throws JMSException {
268        this(connection, sessionId, acknowledgeMode, asyncDispatch, true);
269    }
270
271    /**
272     * Sets the transaction context of the session.
273     *
274     * @param transactionContext - provides the means to control a JMS
275     *                transaction.
276     */
277    public void setTransactionContext(TransactionContext transactionContext) {
278        this.transactionContext = transactionContext;
279    }
280
281    /**
282     * Returns the transaction context of the session.
283     *
284     * @return transactionContext - session's transaction context.
285     */
286    public TransactionContext getTransactionContext() {
287        return transactionContext;
288    }
289
290    /*
291     * (non-Javadoc)
292     *
293     * @see org.apache.activemq.management.StatsCapable#getStats()
294     */
295    @Override
296    public StatsImpl getStats() {
297        return stats;
298    }
299
300    /**
301     * Returns the session's statistics.
302     *
303     * @return stats - session's statistics.
304     */
305    public JMSSessionStatsImpl getSessionStats() {
306        return stats;
307    }
308
309    /**
310     * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE>
311     * object is used to send a message containing a stream of uninterpreted
312     * bytes.
313     *
314     * @return the an ActiveMQBytesMessage
315     * @throws JMSException if the JMS provider fails to create this message due
316     *                 to some internal error.
317     */
318    @Override
319    public BytesMessage createBytesMessage() throws JMSException {
320        ActiveMQBytesMessage message = new ActiveMQBytesMessage();
321        configureMessage(message);
322        return message;
323    }
324
325    /**
326     * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE>
327     * object is used to send a self-defining set of name-value pairs, where
328     * names are <CODE>String</CODE> objects and values are primitive values
329     * in the Java programming language.
330     *
331     * @return an ActiveMQMapMessage
332     * @throws JMSException if the JMS provider fails to create this message due
333     *                 to some internal error.
334     */
335    @Override
336    public MapMessage createMapMessage() throws JMSException {
337        ActiveMQMapMessage message = new ActiveMQMapMessage();
338        configureMessage(message);
339        return message;
340    }
341
342    /**
343     * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE>
344     * interface is the root interface of all JMS messages. A
345     * <CODE>Message</CODE> object holds all the standard message header
346     * information. It can be sent when a message containing only header
347     * information is sufficient.
348     *
349     * @return an ActiveMQMessage
350     * @throws JMSException if the JMS provider fails to create this message due
351     *                 to some internal error.
352     */
353    @Override
354    public Message createMessage() throws JMSException {
355        ActiveMQMessage message = new ActiveMQMessage();
356        configureMessage(message);
357        return message;
358    }
359
360    /**
361     * Creates an <CODE>ObjectMessage</CODE> object. An
362     * <CODE>ObjectMessage</CODE> object is used to send a message that
363     * contains a serializable Java object.
364     *
365     * @return an ActiveMQObjectMessage
366     * @throws JMSException if the JMS provider fails to create this message due
367     *                 to some internal error.
368     */
369    @Override
370    public ObjectMessage createObjectMessage() throws JMSException {
371        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
372        configureMessage(message);
373        return message;
374    }
375
376    /**
377     * Creates an initialized <CODE>ObjectMessage</CODE> object. An
378     * <CODE>ObjectMessage</CODE> object is used to send a message that
379     * contains a serializable Java object.
380     *
381     * @param object the object to use to initialize this message
382     * @return an ActiveMQObjectMessage
383     * @throws JMSException if the JMS provider fails to create this message due
384     *                 to some internal error.
385     */
386    @Override
387    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
388        ActiveMQObjectMessage message = new ActiveMQObjectMessage();
389        configureMessage(message);
390        message.setObject(object);
391        return message;
392    }
393
394    /**
395     * Creates a <CODE>StreamMessage</CODE> object. A
396     * <CODE>StreamMessage</CODE> object is used to send a self-defining
397     * stream of primitive values in the Java programming language.
398     *
399     * @return an ActiveMQStreamMessage
400     * @throws JMSException if the JMS provider fails to create this message due
401     *                 to some internal error.
402     */
403    @Override
404    public StreamMessage createStreamMessage() throws JMSException {
405        ActiveMQStreamMessage message = new ActiveMQStreamMessage();
406        configureMessage(message);
407        return message;
408    }
409
410    /**
411     * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE>
412     * object is used to send a message containing a <CODE>String</CODE>
413     * object.
414     *
415     * @return an ActiveMQTextMessage
416     * @throws JMSException if the JMS provider fails to create this message due
417     *                 to some internal error.
418     */
419    @Override
420    public TextMessage createTextMessage() throws JMSException {
421        ActiveMQTextMessage message = new ActiveMQTextMessage();
422        configureMessage(message);
423        return message;
424    }
425
426    /**
427     * Creates an initialized <CODE>TextMessage</CODE> object. A
428     * <CODE>TextMessage</CODE> object is used to send a message containing a
429     * <CODE>String</CODE>.
430     *
431     * @param text the string used to initialize this message
432     * @return an ActiveMQTextMessage
433     * @throws JMSException if the JMS provider fails to create this message due
434     *                 to some internal error.
435     */
436    @Override
437    public TextMessage createTextMessage(String text) throws JMSException {
438        ActiveMQTextMessage message = new ActiveMQTextMessage();
439        message.setText(text);
440        configureMessage(message);
441        return message;
442    }
443
444    /**
445     * Creates an initialized <CODE>BlobMessage</CODE> object. A
446     * <CODE>BlobMessage</CODE> object is used to send a message containing a
447     * <CODE>URL</CODE> which points to some network addressible BLOB.
448     *
449     * @param url the network addressable URL used to pass directly to the
450     *                consumer
451     * @return a BlobMessage
452     * @throws JMSException if the JMS provider fails to create this message due
453     *                 to some internal error.
454     */
455    public BlobMessage createBlobMessage(URL url) throws JMSException {
456        return createBlobMessage(url, false);
457    }
458
459    /**
460     * Creates an initialized <CODE>BlobMessage</CODE> object. A
461     * <CODE>BlobMessage</CODE> object is used to send a message containing a
462     * <CODE>URL</CODE> which points to some network addressible BLOB.
463     *
464     * @param url the network addressable URL used to pass directly to the
465     *                consumer
466     * @param deletedByBroker indicates whether or not the resource is deleted
467     *                by the broker when the message is acknowledged
468     * @return a BlobMessage
469     * @throws JMSException if the JMS provider fails to create this message due
470     *                 to some internal error.
471     */
472    public BlobMessage createBlobMessage(URL url, boolean deletedByBroker) throws JMSException {
473        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
474        configureMessage(message);
475        message.setURL(url);
476        message.setDeletedByBroker(deletedByBroker);
477        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
478        return message;
479    }
480
481    /**
482     * Creates an initialized <CODE>BlobMessage</CODE> object. A
483     * <CODE>BlobMessage</CODE> object is used to send a message containing
484     * the <CODE>File</CODE> content. Before the message is sent the file
485     * conent will be uploaded to the broker or some other remote repository
486     * depending on the {@link #getBlobTransferPolicy()}.
487     *
488     * @param file the file to be uploaded to some remote repo (or the broker)
489     *                depending on the strategy
490     * @return a BlobMessage
491     * @throws JMSException if the JMS provider fails to create this message due
492     *                 to some internal error.
493     */
494    public BlobMessage createBlobMessage(File file) throws JMSException {
495        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
496        configureMessage(message);
497        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), file));
498        message.setBlobDownloader(new BlobDownloader((getBlobTransferPolicy())));
499        message.setDeletedByBroker(true);
500        message.setName(file.getName());
501        return message;
502    }
503
504    /**
505     * Creates an initialized <CODE>BlobMessage</CODE> object. A
506     * <CODE>BlobMessage</CODE> object is used to send a message containing
507     * the <CODE>File</CODE> content. Before the message is sent the file
508     * conent will be uploaded to the broker or some other remote repository
509     * depending on the {@link #getBlobTransferPolicy()}.
510     *
511     * @param in the stream to be uploaded to some remote repo (or the broker)
512     *                depending on the strategy
513     * @return a BlobMessage
514     * @throws JMSException if the JMS provider fails to create this message due
515     *                 to some internal error.
516     */
517    public BlobMessage createBlobMessage(InputStream in) throws JMSException {
518        ActiveMQBlobMessage message = new ActiveMQBlobMessage();
519        configureMessage(message);
520        message.setBlobUploader(new BlobUploader(getBlobTransferPolicy(), in));
521        message.setBlobDownloader(new BlobDownloader(getBlobTransferPolicy()));
522        message.setDeletedByBroker(true);
523        return message;
524    }
525
526    /**
527     * Indicates whether the session is in transacted mode.
528     *
529     * @return true if the session is in transacted mode
530     * @throws JMSException if there is some internal error.
531     */
532    @Override
533    public boolean getTransacted() throws JMSException {
534        checkClosed();
535        return isTransacted();
536    }
537
538    /**
539     * Returns the acknowledgement mode of the session. The acknowledgement mode
540     * is set at the time that the session is created. If the session is
541     * transacted, the acknowledgement mode is ignored.
542     *
543     * @return If the session is not transacted, returns the current
544     *         acknowledgement mode for the session. If the session is
545     *         transacted, returns SESSION_TRANSACTED.
546     * @throws JMSException
547     * @see javax.jms.Connection#createSession(boolean,int)
548     * @since 1.1 exception JMSException if there is some internal error.
549     */
550    @Override
551    public int getAcknowledgeMode() throws JMSException {
552        checkClosed();
553        return this.acknowledgementMode;
554    }
555
556    /**
557     * Commits all messages done in this transaction and releases any locks
558     * currently held.
559     *
560     * @throws JMSException if the JMS provider fails to commit the transaction
561     *                 due to some internal error.
562     * @throws TransactionRolledBackException if the transaction is rolled back
563     *                 due to some internal error during commit.
564     * @throws javax.jms.IllegalStateException if the method is not called by a
565     *                 transacted session.
566     */
567    @Override
568    public void commit() throws JMSException {
569        checkClosed();
570        if (!getTransacted()) {
571            throw new javax.jms.IllegalStateException("Not a transacted session");
572        }
573        if (LOG.isDebugEnabled()) {
574            LOG.debug(getSessionId() + " Transaction Commit :" + transactionContext.getTransactionId());
575        }
576        transactionContext.commit();
577    }
578
579    /**
580     * Rolls back any messages done in this transaction and releases any locks
581     * currently held.
582     *
583     * @throws JMSException if the JMS provider fails to roll back the
584     *                 transaction due to some internal error.
585     * @throws javax.jms.IllegalStateException if the method is not called by a
586     *                 transacted session.
587     */
588    @Override
589    public void rollback() throws JMSException {
590        checkClosed();
591        if (!getTransacted()) {
592            throw new javax.jms.IllegalStateException("Not a transacted session");
593        }
594        if (LOG.isDebugEnabled()) {
595            LOG.debug(getSessionId() + " Transaction Rollback, txid:"  + transactionContext.getTransactionId());
596        }
597        transactionContext.rollback();
598    }
599
600    /**
601     * Closes the session.
602     * <P>
603     * Since a provider may allocate some resources on behalf of a session
604     * outside the JVM, clients should close the resources when they are not
605     * needed. Relying on garbage collection to eventually reclaim these
606     * resources may not be timely enough.
607     * <P>
608     * There is no need to close the producers and consumers of a closed
609     * session.
610     * <P>
611     * This call will block until a <CODE>receive</CODE> call or message
612     * listener in progress has completed. A blocked message consumer
613     * <CODE>receive</CODE> call returns <CODE>null</CODE> when this session
614     * is closed.
615     * <P>
616     * Closing a transacted session must roll back the transaction in progress.
617     * <P>
618     * This method is the only <CODE>Session</CODE> method that can be called
619     * concurrently.
620     * <P>
621     * Invoking any other <CODE>Session</CODE> method on a closed session must
622     * throw a <CODE> JMSException.IllegalStateException</CODE>. Closing a
623     * closed session must <I>not </I> throw an exception.
624     *
625     * @throws JMSException if the JMS provider fails to close the session due
626     *                 to some internal error.
627     */
628    @Override
629    public void close() throws JMSException {
630        if (!closed) {
631            if (getTransactionContext().isInXATransaction()) {
632                if (!synchronizationRegistered) {
633                    synchronizationRegistered = true;
634                    getTransactionContext().addSynchronization(new Synchronization() {
635
636                                        @Override
637                                        public void afterCommit() throws Exception {
638                                            doClose();
639                                            synchronizationRegistered = false;
640                                        }
641
642                                        @Override
643                                        public void afterRollback() throws Exception {
644                                            doClose();
645                                            synchronizationRegistered = false;
646                                        }
647                                    });
648                }
649
650            } else {
651                doClose();
652            }
653        }
654    }
655
656    private void doClose() throws JMSException {
657        dispose();
658        RemoveInfo removeCommand = info.createRemoveCommand();
659        removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
660        connection.asyncSendPacket(removeCommand);
661    }
662
663    final AtomicInteger clearRequestsCounter = new AtomicInteger(0);
664    void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) {
665        clearRequestsCounter.incrementAndGet();
666        executor.clearMessagesInProgress();
667        // we are called from inside the transport reconnection logic which involves us
668        // clearing all the connections' consumers dispatch and delivered lists. So rather
669        // than trying to grab a mutex (which could be already owned by the message listener
670        // calling the send or an ack) we allow it to complete in a separate thread via the
671        // scheduler and notify us via connection.transportInterruptionProcessingComplete()
672        //
673        // We must be careful though not to allow multiple calls to this method from a
674        // connection that is having issue becoming fully established from causing a large
675        // build up of scheduled tasks to clear the same consumers over and over.
676        if (consumers.isEmpty()) {
677            return;
678        }
679
680        if (clearInProgress.compareAndSet(false, true)) {
681            for (final ActiveMQMessageConsumer consumer : consumers) {
682                consumer.inProgressClearRequired();
683                transportInterruptionProcessingComplete.incrementAndGet();
684                try {
685                    connection.getScheduler().executeAfterDelay(new Runnable() {
686                        @Override
687                        public void run() {
688                            consumer.clearMessagesInProgress();
689                        }}, 0l);
690                } catch (JMSException e) {
691                    connection.onClientInternalException(e);
692                }
693            }
694
695            try {
696                connection.getScheduler().executeAfterDelay(new Runnable() {
697                    @Override
698                    public void run() {
699                        clearInProgress.set(false);
700                    }}, 0l);
701            } catch (JMSException e) {
702                connection.onClientInternalException(e);
703            }
704        }
705    }
706
707    void deliverAcks() {
708        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
709            ActiveMQMessageConsumer consumer = iter.next();
710            consumer.deliverAcks();
711        }
712    }
713
714    public synchronized void dispose() throws JMSException {
715        if (!closed) {
716
717            try {
718                executor.close();
719
720                for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
721                    ActiveMQMessageConsumer consumer = iter.next();
722                    consumer.setFailureError(connection.getFirstFailureError());
723                    consumer.dispose();
724                    lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, consumer.getLastDeliveredSequenceId());
725                }
726                consumers.clear();
727
728                for (Iterator<ActiveMQMessageProducer> iter = producers.iterator(); iter.hasNext();) {
729                    ActiveMQMessageProducer producer = iter.next();
730                    producer.dispose();
731                }
732                producers.clear();
733
734                try {
735                    if (getTransactionContext().isInLocalTransaction()) {
736                        rollback();
737                    }
738                } catch (JMSException e) {
739                }
740
741            } finally {
742                connection.removeSession(this);
743                this.transactionContext = null;
744                closed = true;
745            }
746        }
747    }
748
749    /**
750     * Checks that the session is not closed then configures the message
751     */
752    protected void configureMessage(ActiveMQMessage message) throws IllegalStateException {
753        checkClosed();
754        message.setConnection(connection);
755    }
756
757    /**
758     * Check if the session is closed. It is used for ensuring that the session
759     * is open before performing various operations.
760     *
761     * @throws IllegalStateException if the Session is closed
762     */
763    protected void checkClosed() throws IllegalStateException {
764        if (closed) {
765            throw new IllegalStateException("The Session is closed");
766        }
767    }
768
769    /**
770     * Checks if the session is closed.
771     *
772     * @return true if the session is closed, false otherwise.
773     */
774    public boolean isClosed() {
775        return closed;
776    }
777
778    /**
779     * Stops message delivery in this session, and restarts message delivery
780     * with the oldest unacknowledged message.
781     * <P>
782     * All consumers deliver messages in a serial order. Acknowledging a
783     * received message automatically acknowledges all messages that have been
784     * delivered to the client.
785     * <P>
786     * Restarting a session causes it to take the following actions:
787     * <UL>
788     * <LI>Stop message delivery
789     * <LI>Mark all messages that might have been delivered but not
790     * acknowledged as "redelivered"
791     * <LI>Restart the delivery sequence including all unacknowledged messages
792     * that had been previously delivered. Redelivered messages do not have to
793     * be delivered in exactly their original delivery order.
794     * </UL>
795     *
796     * @throws JMSException if the JMS provider fails to stop and restart
797     *                 message delivery due to some internal error.
798     * @throws IllegalStateException if the method is called by a transacted
799     *                 session.
800     */
801    @Override
802    public void recover() throws JMSException {
803
804        checkClosed();
805        if (getTransacted()) {
806            throw new IllegalStateException("This session is transacted");
807        }
808
809        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
810            ActiveMQMessageConsumer c = iter.next();
811            c.rollback();
812        }
813
814    }
815
816    /**
817     * Returns the session's distinguished message listener (optional).
818     *
819     * @return the message listener associated with this session
820     * @throws JMSException if the JMS provider fails to get the message
821     *                 listener due to an internal error.
822     * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
823     * @see javax.jms.ServerSessionPool
824     * @see javax.jms.ServerSession
825     */
826    @Override
827    public MessageListener getMessageListener() throws JMSException {
828        checkClosed();
829        return this.messageListener;
830    }
831
832    /**
833     * Sets the session's distinguished message listener (optional).
834     * <P>
835     * When the distinguished message listener is set, no other form of message
836     * receipt in the session can be used; however, all forms of sending
837     * messages are still supported.
838     * <P>
839     * If this session has been closed, then an {@link IllegalStateException} is
840     * thrown, if trying to set a new listener. However setting the listener
841     * to <tt>null</tt> is allowed, to clear the listener, even if this session
842     * has been closed prior.
843     * <P>
844     * This is an expert facility not used by regular JMS clients.
845     *
846     * @param listener the message listener to associate with this session
847     * @throws JMSException if the JMS provider fails to set the message
848     *                 listener due to an internal error.
849     * @see javax.jms.Session#getMessageListener()
850     * @see javax.jms.ServerSessionPool
851     * @see javax.jms.ServerSession
852     */
853    @Override
854    public void setMessageListener(MessageListener listener) throws JMSException {
855        // only check for closed if we set a new listener, as we allow to clear
856        // the listener, such as when an application is shutting down, and is
857        // no longer using a message listener on this session
858        if (listener != null) {
859            checkClosed();
860        }
861        this.messageListener = listener;
862
863        if (listener != null) {
864            executor.setDispatchedBySessionPool(true);
865        }
866    }
867
868    /**
869     * Optional operation, intended to be used only by Application Servers, not
870     * by ordinary JMS clients.
871     *
872     * @see javax.jms.ServerSession
873     */
874    @Override
875    public void run() {
876        MessageDispatch messageDispatch;
877        while ((messageDispatch = executor.dequeueNoWait()) != null) {
878            final MessageDispatch md = messageDispatch;
879            final ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
880
881            MessageAck earlyAck = null;
882            if (message.isExpired()) {
883                earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
884                earlyAck.setFirstMessageId(message.getMessageId());
885            } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
886                LOG.debug("{} got duplicate: {}", this, message.getMessageId());
887                earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
888                earlyAck.setFirstMessageId(md.getMessage().getMessageId());
889                earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
890            }
891            if (earlyAck != null) {
892                try {
893                    asyncSendPacket(earlyAck);
894                } catch (Throwable t) {
895                    LOG.error("error dispatching ack: {} ", earlyAck, t);
896                    connection.onClientInternalException(t);
897                } finally {
898                    continue;
899                }
900            }
901
902            if (isClientAcknowledge()||isIndividualAcknowledge()) {
903                message.setAcknowledgeCallback(new Callback() {
904                    @Override
905                    public void execute() throws Exception {
906                    }
907                });
908            }
909
910            if (deliveryListener != null) {
911                deliveryListener.beforeDelivery(this, message);
912            }
913
914            md.setDeliverySequenceId(getNextDeliveryId());
915            lastDeliveredSequenceId = message.getMessageId().getBrokerSequenceId();
916
917            final MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
918
919            final AtomicBoolean afterDeliveryError = new AtomicBoolean(false);
920            /*
921            * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
922            * We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
923            * */
924            synchronized (redeliveryGuard) {
925                try {
926                    ack.setFirstMessageId(md.getMessage().getMessageId());
927                    doStartTransaction();
928                    ack.setTransactionId(getTransactionContext().getTransactionId());
929                    if (ack.getTransactionId() != null) {
930                        getTransactionContext().addSynchronization(new Synchronization() {
931
932                            final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get());
933
934                            @Override
935                            public void beforeEnd() throws Exception {
936                                // validate our consumer so we don't push stale acks that get ignored
937                                if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
938                                    LOG.debug("forcing rollback - {} consumer no longer active on {}", ack, connection);
939                                    throw new TransactionRolledBackException("consumer " + ack.getConsumerId() + " no longer active on " + connection);
940                                }
941                                LOG.trace("beforeEnd ack {}", ack);
942                                sendAck(ack);
943                            }
944
945                            @Override
946                            public void afterRollback() throws Exception {
947                                LOG.trace("rollback {}", ack, new Throwable("here"));
948                                // ensure we don't filter this as a duplicate
949                                connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
950
951                                // don't redeliver if we have been interrupted b/c the broker will redeliver on reconnect
952                                if (clearRequestsCounter.get() > clearRequestCount) {
953                                    LOG.debug("No redelivery of {} on rollback of {} due to failover of {}", md, ack.getTransactionId(), connection.getTransport());
954                                    return;
955                                }
956
957                                // validate our consumer so we don't push stale acks that get ignored or redeliver what will be redispatched
958                                if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) {
959                                    LOG.debug("No local redelivery of {} on rollback of {} because consumer is no longer active on {}", md, ack.getTransactionId(), connection.getTransport());
960                                    return;
961                                }
962
963                                RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
964                                int redeliveryCounter = md.getMessage().getRedeliveryCounter();
965                                if (redeliveryPolicy.getMaximumRedeliveries() != RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES
966                                        && redeliveryCounter >= redeliveryPolicy.getMaximumRedeliveries()) {
967                                    // We need to NACK the messages so that they get
968                                    // sent to the
969                                    // DLQ.
970                                    // Acknowledge the last message.
971                                    MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
972                                    ack.setFirstMessageId(md.getMessage().getMessageId());
973                                    ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
974                                    asyncSendPacket(ack);
975
976                                } else {
977
978                                    MessageAck ack = new MessageAck(md, MessageAck.REDELIVERED_ACK_TYPE, 1);
979                                    ack.setFirstMessageId(md.getMessage().getMessageId());
980                                    asyncSendPacket(ack);
981
982                                    // Figure out how long we should wait to resend
983                                    // this message.
984                                    long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
985                                    for (int i = 0; i < redeliveryCounter; i++) {
986                                        redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
987                                    }
988
989                                    /*
990                                    * If we are a non blocking delivery then we need to stop the executor to avoid more
991                                    * messages being delivered, once the message is redelivered we can restart it.
992                                    * */
993                                    if (!connection.isNonBlockingRedelivery()) {
994                                        LOG.debug("Blocking session until re-delivery...");
995                                        executor.stop();
996                                    }
997
998                                    connection.getScheduler().executeAfterDelay(new Runnable() {
999
1000                                        @Override
1001                                        public void run() {
1002                                            /*
1003                                            * wait for the first delivery to be complete, i.e. after delivery has been called.
1004                                            * */
1005                                            synchronized (redeliveryGuard) {
1006                                                /*
1007                                                * If its non blocking then we can just dispatch in a new session.
1008                                                * */
1009                                                if (connection.isNonBlockingRedelivery()) {
1010                                                    ((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
1011                                                } else {
1012                                                    /*
1013                                                    * If there has been an error thrown during afterDelivery then the
1014                                                    * endpoint will be marked as dead so redelivery will fail (and eventually
1015                                                    * the session marked as stale), in this case we can only call dispatch
1016                                                    * which will create a new session with a new endpoint.
1017                                                    * */
1018                                                    if (afterDeliveryError.get()) {
1019                                                        ((ActiveMQDispatcher) md.getConsumer()).dispatch(md);
1020                                                    } else {
1021                                                        executor.executeFirst(md);
1022                                                        executor.start();
1023                                                    }
1024                                                }
1025                                            }
1026                                        }
1027                                    }, redeliveryDelay);
1028                                }
1029                                md.getMessage().onMessageRolledBack();
1030                            }
1031                        });
1032                    }
1033
1034                    LOG.trace("{} onMessage({})", this, message.getMessageId());
1035                    messageListener.onMessage(message);
1036
1037                } catch (Throwable e) {
1038                    LOG.error("error dispatching message: ", e);
1039
1040                    if (getTransactionContext().isInXATransaction()) {
1041                        LOG.debug("Marking transaction: {} rollbackOnly", getTransactionContext());
1042                        getTransactionContext().setRollbackOnly(true);
1043                    }
1044
1045                    // A problem while invoking the MessageListener does not
1046                    // in general indicate a problem with the connection to the broker, i.e.
1047                    // it will usually be sufficient to let the afterDelivery() method either
1048                    // commit or roll back in order to deal with the exception.
1049                    // However, we notify any registered client internal exception listener
1050                    // of the problem.
1051                    connection.onClientInternalException(e);
1052                } finally {
1053                    if (ack.getTransactionId() == null) {
1054                        try {
1055                            asyncSendPacket(ack);
1056                        } catch (Throwable e) {
1057                            connection.onClientInternalException(e);
1058                        }
1059                    }
1060                }
1061
1062                if (deliveryListener != null) {
1063                    try {
1064                        deliveryListener.afterDelivery(this, message);
1065                    } catch (Throwable t) {
1066                        LOG.debug("Unable to call after delivery", t);
1067                        afterDeliveryError.set(true);
1068                        throw new RuntimeException(t);
1069                    }
1070                }
1071            }
1072            /*
1073            * this can be outside the try/catch as if an exception is thrown then this session will be marked as stale anyway.
1074            * It also needs to be outside the redelivery guard.
1075            * */
1076            try {
1077                executor.waitForQueueRestart();
1078            } catch (InterruptedException ex) {
1079                connection.onClientInternalException(ex);
1080            }
1081        }
1082    }
1083
1084    /**
1085     * Creates a <CODE>MessageProducer</CODE> to send messages to the
1086     * specified destination.
1087     * <P>
1088     * A client uses a <CODE>MessageProducer</CODE> object to send messages to
1089     * a destination. Since <CODE>Queue </CODE> and <CODE>Topic</CODE> both
1090     * inherit from <CODE>Destination</CODE>, they can be used in the
1091     * destination parameter to create a <CODE>MessageProducer</CODE> object.
1092     *
1093     * @param destination the <CODE>Destination</CODE> to send to, or null if
1094     *                this is a producer which does not have a specified
1095     *                destination.
1096     * @return the MessageProducer
1097     * @throws JMSException if the session fails to create a MessageProducer due
1098     *                 to some internal error.
1099     * @throws InvalidDestinationException if an invalid destination is
1100     *                 specified.
1101     * @since 1.1
1102     */
1103    @Override
1104    public MessageProducer createProducer(Destination destination) throws JMSException {
1105        checkClosed();
1106        if (destination instanceof CustomDestination) {
1107            CustomDestination customDestination = (CustomDestination)destination;
1108            return customDestination.createProducer(this);
1109        }
1110        int timeSendOut = connection.getSendTimeout();
1111        return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
1112    }
1113
1114    /**
1115     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1116     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1117     * <CODE>Destination</CODE>, they can be used in the destination
1118     * parameter to create a <CODE>MessageConsumer</CODE>.
1119     *
1120     * @param destination the <CODE>Destination</CODE> to access.
1121     * @return the MessageConsumer
1122     * @throws JMSException if the session fails to create a consumer due to
1123     *                 some internal error.
1124     * @throws InvalidDestinationException if an invalid destination is
1125     *                 specified.
1126     * @since 1.1
1127     */
1128    @Override
1129    public MessageConsumer createConsumer(Destination destination) throws JMSException {
1130        return createConsumer(destination, (String) null);
1131    }
1132
1133    /**
1134     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1135     * using a message selector. Since <CODE> Queue</CODE> and
1136     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1137     * can be used in the destination parameter to create a
1138     * <CODE>MessageConsumer</CODE>.
1139     * <P>
1140     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1141     * that have been sent to a destination.
1142     *
1143     * @param destination the <CODE>Destination</CODE> to access
1144     * @param messageSelector only messages with properties matching the message
1145     *                selector expression are delivered. A value of null or an
1146     *                empty string indicates that there is no message selector
1147     *                for the message consumer.
1148     * @return the MessageConsumer
1149     * @throws JMSException if the session fails to create a MessageConsumer due
1150     *                 to some internal error.
1151     * @throws InvalidDestinationException if an invalid destination is
1152     *                 specified.
1153     * @throws InvalidSelectorException if the message selector is invalid.
1154     * @since 1.1
1155     */
1156    @Override
1157    public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
1158        return createConsumer(destination, messageSelector, false);
1159    }
1160
1161    /**
1162     * Creates a <CODE>MessageConsumer</CODE> for the specified destination.
1163     * Since <CODE>Queue</CODE> and <CODE> Topic</CODE> both inherit from
1164     * <CODE>Destination</CODE>, they can be used in the destination
1165     * parameter to create a <CODE>MessageConsumer</CODE>.
1166     *
1167     * @param destination the <CODE>Destination</CODE> to access.
1168     * @param messageListener the listener to use for async consumption of messages
1169     * @return the MessageConsumer
1170     * @throws JMSException if the session fails to create a consumer due to
1171     *                 some internal error.
1172     * @throws InvalidDestinationException if an invalid destination is
1173     *                 specified.
1174     * @since 1.1
1175     */
1176    public MessageConsumer createConsumer(Destination destination, MessageListener messageListener) throws JMSException {
1177        return createConsumer(destination, null, messageListener);
1178    }
1179
1180    /**
1181     * Creates a <CODE>MessageConsumer</CODE> for the specified destination,
1182     * using a message selector. Since <CODE> Queue</CODE> and
1183     * <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they
1184     * can be used in the destination parameter to create a
1185     * <CODE>MessageConsumer</CODE>.
1186     * <P>
1187     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1188     * that have been sent to a destination.
1189     *
1190     * @param destination the <CODE>Destination</CODE> to access
1191     * @param messageSelector only messages with properties matching the message
1192     *                selector expression are delivered. A value of null or an
1193     *                empty string indicates that there is no message selector
1194     *                for the message consumer.
1195     * @param messageListener the listener to use for async consumption of messages
1196     * @return the MessageConsumer
1197     * @throws JMSException if the session fails to create a MessageConsumer due
1198     *                 to some internal error.
1199     * @throws InvalidDestinationException if an invalid destination is
1200     *                 specified.
1201     * @throws InvalidSelectorException if the message selector is invalid.
1202     * @since 1.1
1203     */
1204    public MessageConsumer createConsumer(Destination destination, String messageSelector, MessageListener messageListener) throws JMSException {
1205        return createConsumer(destination, messageSelector, false, messageListener);
1206    }
1207
1208    /**
1209     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1210     * using a message selector. This method can specify whether messages
1211     * published by its own connection should be delivered to it, if the
1212     * destination is a topic.
1213     * <P>
1214     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1215     * <CODE>Destination</CODE>, they can be used in the destination
1216     * parameter to create a <CODE>MessageConsumer</CODE>.
1217     * <P>
1218     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1219     * that have been published to a destination.
1220     * <P>
1221     * In some cases, a connection may both publish and subscribe to a topic.
1222     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1223     * inhibit the delivery of messages published by its own connection. The
1224     * default value for this attribute is False. The <CODE>noLocal</CODE>
1225     * value must be supported by destinations that are topics.
1226     *
1227     * @param destination the <CODE>Destination</CODE> to access
1228     * @param messageSelector only messages with properties matching the message
1229     *                selector expression are delivered. A value of null or an
1230     *                empty string indicates that there is no message selector
1231     *                for the message consumer.
1232     * @param noLocal - if true, and the destination is a topic, inhibits the
1233     *                delivery of messages published by its own connection. The
1234     *                behavior for <CODE>NoLocal</CODE> is not specified if
1235     *                the destination is a queue.
1236     * @return the MessageConsumer
1237     * @throws JMSException if the session fails to create a MessageConsumer due
1238     *                 to some internal error.
1239     * @throws InvalidDestinationException if an invalid destination is
1240     *                 specified.
1241     * @throws InvalidSelectorException if the message selector is invalid.
1242     * @since 1.1
1243     */
1244    @Override
1245    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException {
1246        return createConsumer(destination, messageSelector, noLocal, null);
1247    }
1248
1249    /**
1250     * Creates <CODE>MessageConsumer</CODE> for the specified destination,
1251     * using a message selector. This method can specify whether messages
1252     * published by its own connection should be delivered to it, if the
1253     * destination is a topic.
1254     * <P>
1255     * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from
1256     * <CODE>Destination</CODE>, they can be used in the destination
1257     * parameter to create a <CODE>MessageConsumer</CODE>.
1258     * <P>
1259     * A client uses a <CODE>MessageConsumer</CODE> object to receive messages
1260     * that have been published to a destination.
1261     * <P>
1262     * In some cases, a connection may both publish and subscribe to a topic.
1263     * The consumer <CODE>NoLocal</CODE> attribute allows a consumer to
1264     * inhibit the delivery of messages published by its own connection. The
1265     * default value for this attribute is False. The <CODE>noLocal</CODE>
1266     * value must be supported by destinations that are topics.
1267     *
1268     * @param destination the <CODE>Destination</CODE> to access
1269     * @param messageSelector only messages with properties matching the message
1270     *                selector expression are delivered. A value of null or an
1271     *                empty string indicates that there is no message selector
1272     *                for the message consumer.
1273     * @param noLocal - if true, and the destination is a topic, inhibits the
1274     *                delivery of messages published by its own connection. The
1275     *                behavior for <CODE>NoLocal</CODE> is not specified if
1276     *                the destination is a queue.
1277     * @param messageListener the listener to use for async consumption of messages
1278     * @return the MessageConsumer
1279     * @throws JMSException if the session fails to create a MessageConsumer due
1280     *                 to some internal error.
1281     * @throws InvalidDestinationException if an invalid destination is
1282     *                 specified.
1283     * @throws InvalidSelectorException if the message selector is invalid.
1284     * @since 1.1
1285     */
1286    public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal, MessageListener messageListener) throws JMSException {
1287        checkClosed();
1288
1289        if (destination instanceof CustomDestination) {
1290            CustomDestination customDestination = (CustomDestination)destination;
1291            return customDestination.createConsumer(this, messageSelector, noLocal);
1292        }
1293
1294        ActiveMQPrefetchPolicy prefetchPolicy = connection.getPrefetchPolicy();
1295        int prefetch = 0;
1296        if (destination instanceof Topic) {
1297            prefetch = prefetchPolicy.getTopicPrefetch();
1298        } else {
1299            prefetch = prefetchPolicy.getQueuePrefetch();
1300        }
1301        ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
1302        return new ActiveMQMessageConsumer(this, getNextConsumerId(), activemqDestination, null, messageSelector,
1303                prefetch, prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, isAsyncDispatch(), messageListener);
1304    }
1305
1306    /**
1307     * Creates a queue identity given a <CODE>Queue</CODE> name.
1308     * <P>
1309     * This facility is provided for the rare cases where clients need to
1310     * dynamically manipulate queue identity. It allows the creation of a queue
1311     * identity with a provider-specific name. Clients that depend on this
1312     * ability are not portable.
1313     * <P>
1314     * Note that this method is not for creating the physical queue. The
1315     * physical creation of queues is an administrative task and is not to be
1316     * initiated by the JMS API. The one exception is the creation of temporary
1317     * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE>
1318     * method.
1319     *
1320     * @param queueName the name of this <CODE>Queue</CODE>
1321     * @return a <CODE>Queue</CODE> with the given name
1322     * @throws JMSException if the session fails to create a queue due to some
1323     *                 internal error.
1324     * @since 1.1
1325     */
1326    @Override
1327    public Queue createQueue(String queueName) throws JMSException {
1328        checkClosed();
1329        if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1330            return new ActiveMQTempQueue(queueName);
1331        }
1332        return new ActiveMQQueue(queueName);
1333    }
1334
1335    /**
1336     * Creates a topic identity given a <CODE>Topic</CODE> name.
1337     * <P>
1338     * This facility is provided for the rare cases where clients need to
1339     * dynamically manipulate topic identity. This allows the creation of a
1340     * topic identity with a provider-specific name. Clients that depend on this
1341     * ability are not portable.
1342     * <P>
1343     * Note that this method is not for creating the physical topic. The
1344     * physical creation of topics is an administrative task and is not to be
1345     * initiated by the JMS API. The one exception is the creation of temporary
1346     * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE>
1347     * method.
1348     *
1349     * @param topicName the name of this <CODE>Topic</CODE>
1350     * @return a <CODE>Topic</CODE> with the given name
1351     * @throws JMSException if the session fails to create a topic due to some
1352     *                 internal error.
1353     * @since 1.1
1354     */
1355    @Override
1356    public Topic createTopic(String topicName) throws JMSException {
1357        checkClosed();
1358        if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) {
1359            return new ActiveMQTempTopic(topicName);
1360        }
1361        return new ActiveMQTopic(topicName);
1362    }
1363
1364    /**
1365     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1366     * the specified queue.
1367     *
1368     * @param queue the <CODE>queue</CODE> to access
1369     * @exception InvalidDestinationException if an invalid destination is
1370     *                    specified
1371     * @since 1.1
1372     */
1373    /**
1374     * Creates a durable subscriber to the specified topic.
1375     * <P>
1376     * If a client needs to receive all the messages published on a topic,
1377     * including the ones published while the subscriber is inactive, it uses a
1378     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1379     * record of this durable subscription and insures that all messages from
1380     * the topic's publishers are retained until they are acknowledged by this
1381     * durable subscriber or they have expired.
1382     * <P>
1383     * Sessions with durable subscribers must always provide the same client
1384     * identifier. In addition, each client must specify a name that uniquely
1385     * identifies (within client identifier) each durable subscription it
1386     * creates. Only one session at a time can have a
1387     * <CODE>TopicSubscriber</CODE> for a particular durable subscription.
1388     * <P>
1389     * A client can change an existing durable subscription by creating a
1390     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1391     * and/or message selector. Changing a durable subscriber is equivalent to
1392     * unsubscribing (deleting) the old one and creating a new one.
1393     * <P>
1394     * In some cases, a connection may both publish and subscribe to a topic.
1395     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1396     * inhibit the delivery of messages published by its own connection. The
1397     * default value for this attribute is false.
1398     *
1399     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1400     * @param name the name used to identify this subscription
1401     * @return the TopicSubscriber
1402     * @throws JMSException if the session fails to create a subscriber due to
1403     *                 some internal error.
1404     * @throws InvalidDestinationException if an invalid topic is specified.
1405     * @since 1.1
1406     */
1407    @Override
1408    public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
1409        checkClosed();
1410        return createDurableSubscriber(topic, name, null, false);
1411    }
1412
1413    /**
1414     * Creates a durable subscriber to the specified topic, using a message
1415     * selector and specifying whether messages published by its own connection
1416     * should be delivered to it.
1417     * <P>
1418     * If a client needs to receive all the messages published on a topic,
1419     * including the ones published while the subscriber is inactive, it uses a
1420     * durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a
1421     * record of this durable subscription and insures that all messages from
1422     * the topic's publishers are retained until they are acknowledged by this
1423     * durable subscriber or they have expired.
1424     * <P>
1425     * Sessions with durable subscribers must always provide the same client
1426     * identifier. In addition, each client must specify a name which uniquely
1427     * identifies (within client identifier) each durable subscription it
1428     * creates. Only one session at a time can have a
1429     * <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
1430     * inactive durable subscriber is one that exists but does not currently
1431     * have a message consumer associated with it.
1432     * <P>
1433     * A client can change an existing durable subscription by creating a
1434     * durable <CODE>TopicSubscriber</CODE> with the same name and a new topic
1435     * and/or message selector. Changing a durable subscriber is equivalent to
1436     * unsubscribing (deleting) the old one and creating a new one.
1437     *
1438     * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
1439     * @param name the name used to identify this subscription
1440     * @param messageSelector only messages with properties matching the message
1441     *                selector expression are delivered. A value of null or an
1442     *                empty string indicates that there is no message selector
1443     *                for the message consumer.
1444     * @param noLocal if set, inhibits the delivery of messages published by its
1445     *                own connection
1446     * @return the Queue Browser
1447     * @throws JMSException if the session fails to create a subscriber due to
1448     *                 some internal error.
1449     * @throws InvalidDestinationException if an invalid topic is specified.
1450     * @throws InvalidSelectorException if the message selector is invalid.
1451     * @since 1.1
1452     */
1453    @Override
1454    public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
1455        checkClosed();
1456
1457        if (topic == null) {
1458            throw new InvalidDestinationException("Topic cannot be null");
1459        }
1460
1461        if (topic instanceof CustomDestination) {
1462            CustomDestination customDestination = (CustomDestination)topic;
1463            return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal);
1464        }
1465
1466        connection.checkClientIDWasManuallySpecified();
1467        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1468        int prefetch = isAutoAcknowledge() && connection.isOptimizedMessageDispatch() ? prefetchPolicy.getOptimizeDurableTopicPrefetch() : prefetchPolicy.getDurableTopicPrefetch();
1469        int maxPrendingLimit = prefetchPolicy.getMaximumPendingMessageLimit();
1470        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), name, messageSelector, prefetch, maxPrendingLimit,
1471                                           noLocal, false, asyncDispatch);
1472    }
1473
1474    /**
1475     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1476     * the specified queue.
1477     *
1478     * @param queue the <CODE>queue</CODE> to access
1479     * @return the Queue Browser
1480     * @throws JMSException if the session fails to create a browser due to some
1481     *                 internal error.
1482     * @throws InvalidDestinationException if an invalid destination is
1483     *                 specified
1484     * @since 1.1
1485     */
1486    @Override
1487    public QueueBrowser createBrowser(Queue queue) throws JMSException {
1488        checkClosed();
1489        return createBrowser(queue, null);
1490    }
1491
1492    /**
1493     * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on
1494     * the specified queue using a message selector.
1495     *
1496     * @param queue the <CODE>queue</CODE> to access
1497     * @param messageSelector only messages with properties matching the message
1498     *                selector expression are delivered. A value of null or an
1499     *                empty string indicates that there is no message selector
1500     *                for the message consumer.
1501     * @return the Queue Browser
1502     * @throws JMSException if the session fails to create a browser due to some
1503     *                 internal error.
1504     * @throws InvalidDestinationException if an invalid destination is
1505     *                 specified
1506     * @throws InvalidSelectorException if the message selector is invalid.
1507     * @since 1.1
1508     */
1509    @Override
1510    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
1511        checkClosed();
1512        return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch);
1513    }
1514
1515    /**
1516     * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that
1517     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1518     *
1519     * @return a temporary queue identity
1520     * @throws JMSException if the session fails to create a temporary queue due
1521     *                 to some internal error.
1522     * @since 1.1
1523     */
1524    @Override
1525    public TemporaryQueue createTemporaryQueue() throws JMSException {
1526        checkClosed();
1527        return (TemporaryQueue)connection.createTempDestination(false);
1528    }
1529
1530    /**
1531     * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that
1532     * of the <CODE>Connection</CODE> unless it is deleted earlier.
1533     *
1534     * @return a temporary topic identity
1535     * @throws JMSException if the session fails to create a temporary topic due
1536     *                 to some internal error.
1537     * @since 1.1
1538     */
1539    @Override
1540    public TemporaryTopic createTemporaryTopic() throws JMSException {
1541        checkClosed();
1542        return (TemporaryTopic)connection.createTempDestination(true);
1543    }
1544
1545    /**
1546     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1547     * the specified queue.
1548     *
1549     * @param queue the <CODE>Queue</CODE> to access
1550     * @return
1551     * @throws JMSException if the session fails to create a receiver due to
1552     *                 some internal error.
1553     * @throws JMSException
1554     * @throws InvalidDestinationException if an invalid queue is specified.
1555     */
1556    @Override
1557    public QueueReceiver createReceiver(Queue queue) throws JMSException {
1558        checkClosed();
1559        return createReceiver(queue, null);
1560    }
1561
1562    /**
1563     * Creates a <CODE>QueueReceiver</CODE> object to receive messages from
1564     * the specified queue using a message selector.
1565     *
1566     * @param queue the <CODE>Queue</CODE> to access
1567     * @param messageSelector only messages with properties matching the message
1568     *                selector expression are delivered. A value of null or an
1569     *                empty string indicates that there is no message selector
1570     *                for the message consumer.
1571     * @return QueueReceiver
1572     * @throws JMSException if the session fails to create a receiver due to
1573     *                 some internal error.
1574     * @throws InvalidDestinationException if an invalid queue is specified.
1575     * @throws InvalidSelectorException if the message selector is invalid.
1576     */
1577    @Override
1578    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
1579        checkClosed();
1580
1581        if (queue instanceof CustomDestination) {
1582            CustomDestination customDestination = (CustomDestination)queue;
1583            return customDestination.createReceiver(this, messageSelector);
1584        }
1585
1586        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1587        return new ActiveMQQueueReceiver(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, prefetchPolicy.getQueuePrefetch(),
1588                                         prefetchPolicy.getMaximumPendingMessageLimit(), asyncDispatch);
1589    }
1590
1591    /**
1592     * Creates a <CODE>QueueSender</CODE> object to send messages to the
1593     * specified queue.
1594     *
1595     * @param queue the <CODE>Queue</CODE> to access, or null if this is an
1596     *                unidentified producer
1597     * @return QueueSender
1598     * @throws JMSException if the session fails to create a sender due to some
1599     *                 internal error.
1600     * @throws InvalidDestinationException if an invalid queue is specified.
1601     */
1602    @Override
1603    public QueueSender createSender(Queue queue) throws JMSException {
1604        checkClosed();
1605        if (queue instanceof CustomDestination) {
1606            CustomDestination customDestination = (CustomDestination)queue;
1607            return customDestination.createSender(this);
1608        }
1609        int timeSendOut = connection.getSendTimeout();
1610        return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue),timeSendOut);
1611    }
1612
1613    /**
1614     * Creates a nondurable subscriber to the specified topic. <p/>
1615     * <P>
1616     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1617     * that have been published to a topic. <p/>
1618     * <P>
1619     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1620     * receive only messages that are published while they are active. <p/>
1621     * <P>
1622     * In some cases, a connection may both publish and subscribe to a topic.
1623     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1624     * inhibit the delivery of messages published by its own connection. The
1625     * default value for this attribute is false.
1626     *
1627     * @param topic the <CODE>Topic</CODE> to subscribe to
1628     * @return TopicSubscriber
1629     * @throws JMSException if the session fails to create a subscriber due to
1630     *                 some internal error.
1631     * @throws InvalidDestinationException if an invalid topic is specified.
1632     */
1633    @Override
1634    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
1635        checkClosed();
1636        return createSubscriber(topic, null, false);
1637    }
1638
1639    /**
1640     * Creates a nondurable subscriber to the specified topic, using a message
1641     * selector or specifying whether messages published by its own connection
1642     * should be delivered to it. <p/>
1643     * <P>
1644     * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages
1645     * that have been published to a topic. <p/>
1646     * <P>
1647     * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They
1648     * receive only messages that are published while they are active. <p/>
1649     * <P>
1650     * Messages filtered out by a subscriber's message selector will never be
1651     * delivered to the subscriber. From the subscriber's perspective, they do
1652     * not exist. <p/>
1653     * <P>
1654     * In some cases, a connection may both publish and subscribe to a topic.
1655     * The subscriber <CODE>NoLocal</CODE> attribute allows a subscriber to
1656     * inhibit the delivery of messages published by its own connection. The
1657     * default value for this attribute is false.
1658     *
1659     * @param topic the <CODE>Topic</CODE> to subscribe to
1660     * @param messageSelector only messages with properties matching the message
1661     *                selector expression are delivered. A value of null or an
1662     *                empty string indicates that there is no message selector
1663     *                for the message consumer.
1664     * @param noLocal if set, inhibits the delivery of messages published by its
1665     *                own connection
1666     * @return TopicSubscriber
1667     * @throws JMSException if the session fails to create a subscriber due to
1668     *                 some internal error.
1669     * @throws InvalidDestinationException if an invalid topic is specified.
1670     * @throws InvalidSelectorException if the message selector is invalid.
1671     */
1672    @Override
1673    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
1674        checkClosed();
1675
1676        if (topic instanceof CustomDestination) {
1677            CustomDestination customDestination = (CustomDestination)topic;
1678            return customDestination.createSubscriber(this, messageSelector, noLocal);
1679        }
1680
1681        ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
1682        return new ActiveMQTopicSubscriber(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(topic), null, messageSelector, prefetchPolicy
1683            .getTopicPrefetch(), prefetchPolicy.getMaximumPendingMessageLimit(), noLocal, false, asyncDispatch);
1684    }
1685
1686    /**
1687     * Creates a publisher for the specified topic. <p/>
1688     * <P>
1689     * A client uses a <CODE>TopicPublisher</CODE> object to publish messages
1690     * on a topic. Each time a client creates a <CODE>TopicPublisher</CODE> on
1691     * a topic, it defines a new sequence of messages that have no ordering
1692     * relationship with the messages it has previously sent.
1693     *
1694     * @param topic the <CODE>Topic</CODE> to publish to, or null if this is
1695     *                an unidentified producer
1696     * @return TopicPublisher
1697     * @throws JMSException if the session fails to create a publisher due to
1698     *                 some internal error.
1699     * @throws InvalidDestinationException if an invalid topic is specified.
1700     */
1701    @Override
1702    public TopicPublisher createPublisher(Topic topic) throws JMSException {
1703        checkClosed();
1704
1705        if (topic instanceof CustomDestination) {
1706            CustomDestination customDestination = (CustomDestination)topic;
1707            return customDestination.createPublisher(this);
1708        }
1709        int timeSendOut = connection.getSendTimeout();
1710        return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic),timeSendOut);
1711    }
1712
1713    /**
1714     * Unsubscribes a durable subscription that has been created by a client.
1715     * <P>
1716     * This method deletes the state being maintained on behalf of the
1717     * subscriber by its provider.
1718     * <P>
1719     * It is erroneous for a client to delete a durable subscription while there
1720     * is an active <CODE>MessageConsumer </CODE> or
1721     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
1722     * message is part of a pending transaction or has not been acknowledged in
1723     * the session.
1724     *
1725     * @param name the name used to identify this subscription
1726     * @throws JMSException if the session fails to unsubscribe to the durable
1727     *                 subscription due to some internal error.
1728     * @throws InvalidDestinationException if an invalid subscription name is
1729     *                 specified.
1730     * @since 1.1
1731     */
1732    @Override
1733    public void unsubscribe(String name) throws JMSException {
1734        checkClosed();
1735        connection.unsubscribe(name);
1736    }
1737
1738    @Override
1739    public void dispatch(MessageDispatch messageDispatch) {
1740        try {
1741            executor.execute(messageDispatch);
1742        } catch (InterruptedException e) {
1743            Thread.currentThread().interrupt();
1744            connection.onClientInternalException(e);
1745        }
1746    }
1747
1748    /**
1749     * Acknowledges all consumed messages of the session of this consumed
1750     * message.
1751     * <P>
1752     * All consumed JMS messages support the <CODE>acknowledge</CODE> method
1753     * for use when a client has specified that its JMS session's consumed
1754     * messages are to be explicitly acknowledged. By invoking
1755     * <CODE>acknowledge</CODE> on a consumed message, a client acknowledges
1756     * all messages consumed by the session that the message was delivered to.
1757     * <P>
1758     * Calls to <CODE>acknowledge</CODE> are ignored for both transacted
1759     * sessions and sessions specified to use implicit acknowledgement modes.
1760     * <P>
1761     * A client may individually acknowledge each message as it is consumed, or
1762     * it may choose to acknowledge messages as an application-defined group
1763     * (which is done by calling acknowledge on the last received message of the
1764     * group, thereby acknowledging all messages consumed by the session.)
1765     * <P>
1766     * Messages that have been received but not acknowledged may be redelivered.
1767     *
1768     * @throws JMSException if the JMS provider fails to acknowledge the
1769     *                 messages due to some internal error.
1770     * @throws javax.jms.IllegalStateException if this method is called on a
1771     *                 closed session.
1772     * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1773     */
1774    public void acknowledge() throws JMSException {
1775        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1776            ActiveMQMessageConsumer c = iter.next();
1777            c.acknowledge();
1778        }
1779    }
1780
1781    /**
1782     * Add a message consumer.
1783     *
1784     * @param consumer - message consumer.
1785     * @throws JMSException
1786     */
1787    protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1788        this.consumers.add(consumer);
1789        if (consumer.isDurableSubscriber()) {
1790            stats.onCreateDurableSubscriber();
1791        }
1792        this.connection.addDispatcher(consumer.getConsumerId(), this);
1793    }
1794
1795    /**
1796     * Remove the message consumer.
1797     *
1798     * @param consumer - consumer to be removed.
1799     * @throws JMSException
1800     */
1801    protected void removeConsumer(ActiveMQMessageConsumer consumer) {
1802        this.connection.removeDispatcher(consumer.getConsumerId());
1803        if (consumer.isDurableSubscriber()) {
1804            stats.onRemoveDurableSubscriber();
1805        }
1806        this.consumers.remove(consumer);
1807        this.connection.removeDispatcher(consumer);
1808    }
1809
1810    /**
1811     * Adds a message producer.
1812     *
1813     * @param producer - message producer to be added.
1814     * @throws JMSException
1815     */
1816    protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1817        this.producers.add(producer);
1818        this.connection.addProducer(producer.getProducerInfo().getProducerId(), producer);
1819    }
1820
1821    /**
1822     * Removes a message producer.
1823     *
1824     * @param producer - message producer to be removed.
1825     * @throws JMSException
1826     */
1827    protected void removeProducer(ActiveMQMessageProducer producer) {
1828        this.connection.removeProducer(producer.getProducerInfo().getProducerId());
1829        this.producers.remove(producer);
1830    }
1831
1832    /**
1833     * Start this Session.
1834     *
1835     * @throws JMSException
1836     */
1837    protected void start() throws JMSException {
1838        started.set(true);
1839        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1840            ActiveMQMessageConsumer c = iter.next();
1841            c.start();
1842        }
1843        executor.start();
1844    }
1845
1846    /**
1847     * Stops this session.
1848     *
1849     * @throws JMSException
1850     */
1851    protected void stop() throws JMSException {
1852
1853        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
1854            ActiveMQMessageConsumer c = iter.next();
1855            c.stop();
1856        }
1857
1858        started.set(false);
1859        executor.stop();
1860    }
1861
1862    /**
1863     * Returns the session id.
1864     *
1865     * @return value - session id.
1866     */
1867    protected SessionId getSessionId() {
1868        return info.getSessionId();
1869    }
1870
1871    /**
1872     * @return
1873     */
1874    protected ConsumerId getNextConsumerId() {
1875        return new ConsumerId(info.getSessionId(), consumerIdGenerator.getNextSequenceId());
1876    }
1877
1878    /**
1879     * @return
1880     */
1881    protected ProducerId getNextProducerId() {
1882        return new ProducerId(info.getSessionId(), producerIdGenerator.getNextSequenceId());
1883    }
1884
1885    /**
1886     * Sends the message for dispatch by the broker.
1887     *
1888     *
1889     * @param producer - message producer.
1890     * @param destination - message destination.
1891     * @param message - message to be sent.
1892     * @param deliveryMode - JMS messsage delivery mode.
1893     * @param priority - message priority.
1894     * @param timeToLive - message expiration.
1895     * @param producerWindow
1896     * @param onComplete
1897     * @throws JMSException
1898     */
1899    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
1900                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
1901
1902        checkClosed();
1903        if (destination.isTemporary() && connection.isDeleted(destination)) {
1904            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
1905        }
1906        synchronized (sendMutex) {
1907            // tell the Broker we are about to start a new transaction
1908            doStartTransaction();
1909            TransactionId txid = transactionContext.getTransactionId();
1910            long sequenceNumber = producer.getMessageSequence();
1911
1912            //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
1913            message.setJMSDeliveryMode(deliveryMode);
1914            long expiration = 0L;
1915            if (!producer.getDisableMessageTimestamp()) {
1916                long timeStamp = System.currentTimeMillis();
1917                message.setJMSTimestamp(timeStamp);
1918                if (timeToLive > 0) {
1919                    expiration = timeToLive + timeStamp;
1920                }
1921            }
1922            message.setJMSExpiration(expiration);
1923            message.setJMSPriority(priority);
1924            message.setJMSRedelivered(false);
1925
1926            // transform to our own message format here
1927            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
1928            msg.setDestination(destination);
1929            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
1930
1931            // Set the message id.
1932            if (msg != message) {
1933                message.setJMSMessageID(msg.getMessageId().toString());
1934                // Make sure the JMS destination is set on the foreign messages too.
1935                message.setJMSDestination(destination);
1936            }
1937            //clear the brokerPath in case we are re-sending this message
1938            msg.setBrokerPath(null);
1939
1940            msg.setTransactionId(txid);
1941            if (connection.isCopyMessageOnSend()) {
1942                msg = (ActiveMQMessage)msg.copy();
1943            }
1944            msg.setConnection(connection);
1945            msg.onSend();
1946            msg.setProducerId(msg.getMessageId().getProducerId());
1947            if (LOG.isTraceEnabled()) {
1948                LOG.trace(getSessionId() + " sending message: " + msg);
1949            }
1950            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
1951                this.connection.asyncSendPacket(msg);
1952                if (producerWindow != null) {
1953                    // Since we defer lots of the marshaling till we hit the
1954                    // wire, this might not
1955                    // provide and accurate size. We may change over to doing
1956                    // more aggressive marshaling,
1957                    // to get more accurate sizes.. this is more important once
1958                    // users start using producer window
1959                    // flow control.
1960                    int size = msg.getSize();
1961                    producerWindow.increaseUsage(size);
1962                }
1963            } else {
1964                if (sendTimeout > 0 && onComplete==null) {
1965                    this.connection.syncSendPacket(msg,sendTimeout);
1966                }else {
1967                    this.connection.syncSendPacket(msg, onComplete);
1968                }
1969            }
1970
1971        }
1972    }
1973
1974    /**
1975     * Send TransactionInfo to indicate transaction has started
1976     *
1977     * @throws JMSException if some internal error occurs
1978     */
1979    protected void doStartTransaction() throws JMSException {
1980        if (getTransacted() && !transactionContext.isInXATransaction()) {
1981            transactionContext.begin();
1982        }
1983    }
1984
1985    /**
1986     * Checks whether the session has unconsumed messages.
1987     *
1988     * @return true - if there are unconsumed messages.
1989     */
1990    public boolean hasUncomsumedMessages() {
1991        return executor.hasUncomsumedMessages();
1992    }
1993
1994    /**
1995     * Checks whether the session uses transactions.
1996     *
1997     * @return true - if the session uses transactions.
1998     */
1999    public boolean isTransacted() {
2000        return this.acknowledgementMode == Session.SESSION_TRANSACTED || (transactionContext.isInXATransaction());
2001    }
2002
2003    /**
2004     * Checks whether the session used client acknowledgment.
2005     *
2006     * @return true - if the session uses client acknowledgment.
2007     */
2008    protected boolean isClientAcknowledge() {
2009        return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
2010    }
2011
2012    /**
2013     * Checks whether the session used auto acknowledgment.
2014     *
2015     * @return true - if the session uses client acknowledgment.
2016     */
2017    public boolean isAutoAcknowledge() {
2018        return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
2019    }
2020
2021    /**
2022     * Checks whether the session used dup ok acknowledgment.
2023     *
2024     * @return true - if the session uses client acknowledgment.
2025     */
2026    public boolean isDupsOkAcknowledge() {
2027        return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
2028    }
2029
2030    public boolean isIndividualAcknowledge(){
2031        return acknowledgementMode == ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE;
2032    }
2033
2034    /**
2035     * Returns the message delivery listener.
2036     *
2037     * @return deliveryListener - message delivery listener.
2038     */
2039    public DeliveryListener getDeliveryListener() {
2040        return deliveryListener;
2041    }
2042
2043    /**
2044     * Sets the message delivery listener.
2045     *
2046     * @param deliveryListener - message delivery listener.
2047     */
2048    public void setDeliveryListener(DeliveryListener deliveryListener) {
2049        this.deliveryListener = deliveryListener;
2050    }
2051
2052    /**
2053     * Returns the SessionInfo bean.
2054     *
2055     * @return info - SessionInfo bean.
2056     * @throws JMSException
2057     */
2058    protected SessionInfo getSessionInfo() throws JMSException {
2059        SessionInfo info = new SessionInfo(connection.getConnectionInfo(), getSessionId().getValue());
2060        return info;
2061    }
2062
2063    /**
2064     * Send the asynchronus command.
2065     *
2066     * @param command - command to be executed.
2067     * @throws JMSException
2068     */
2069    public void asyncSendPacket(Command command) throws JMSException {
2070        connection.asyncSendPacket(command);
2071    }
2072
2073    /**
2074     * Send the synchronus command.
2075     *
2076     * @param command - command to be executed.
2077     * @return Response
2078     * @throws JMSException
2079     */
2080    public Response syncSendPacket(Command command) throws JMSException {
2081        return connection.syncSendPacket(command);
2082    }
2083
2084    public long getNextDeliveryId() {
2085        return deliveryIdGenerator.getNextSequenceId();
2086    }
2087
2088    public void redispatch(ActiveMQDispatcher dispatcher, MessageDispatchChannel unconsumedMessages) throws JMSException {
2089
2090        List<MessageDispatch> c = unconsumedMessages.removeAll();
2091        for (MessageDispatch md : c) {
2092            this.connection.rollbackDuplicate(dispatcher, md.getMessage());
2093        }
2094        Collections.reverse(c);
2095
2096        for (Iterator<MessageDispatch> iter = c.iterator(); iter.hasNext();) {
2097            MessageDispatch md = iter.next();
2098            executor.executeFirst(md);
2099        }
2100
2101    }
2102
2103    public boolean isRunning() {
2104        return started.get();
2105    }
2106
2107    public boolean isAsyncDispatch() {
2108        return asyncDispatch;
2109    }
2110
2111    public void setAsyncDispatch(boolean asyncDispatch) {
2112        this.asyncDispatch = asyncDispatch;
2113    }
2114
2115    /**
2116     * @return Returns the sessionAsyncDispatch.
2117     */
2118    public boolean isSessionAsyncDispatch() {
2119        return sessionAsyncDispatch;
2120    }
2121
2122    /**
2123     * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
2124     */
2125    public void setSessionAsyncDispatch(boolean sessionAsyncDispatch) {
2126        this.sessionAsyncDispatch = sessionAsyncDispatch;
2127    }
2128
2129    public MessageTransformer getTransformer() {
2130        return transformer;
2131    }
2132
2133    public ActiveMQConnection getConnection() {
2134        return connection;
2135    }
2136
2137    /**
2138     * Sets the transformer used to transform messages before they are sent on
2139     * to the JMS bus or when they are received from the bus but before they are
2140     * delivered to the JMS client
2141     */
2142    public void setTransformer(MessageTransformer transformer) {
2143        this.transformer = transformer;
2144    }
2145
2146    public BlobTransferPolicy getBlobTransferPolicy() {
2147        return blobTransferPolicy;
2148    }
2149
2150    /**
2151     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
2152     * OBjects) are transferred from producers to brokers to consumers
2153     */
2154    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
2155        this.blobTransferPolicy = blobTransferPolicy;
2156    }
2157
2158    public List<MessageDispatch> getUnconsumedMessages() {
2159        return executor.getUnconsumedMessages();
2160    }
2161
2162    @Override
2163    public String toString() {
2164        return "ActiveMQSession {id=" + info.getSessionId() + ",started=" + started.get() + "} " + sendMutex;
2165    }
2166
2167    public void checkMessageListener() throws JMSException {
2168        if (messageListener != null) {
2169            throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2170        }
2171        for (Iterator<ActiveMQMessageConsumer> i = consumers.iterator(); i.hasNext();) {
2172            ActiveMQMessageConsumer consumer = i.next();
2173            if (consumer.hasMessageListener()) {
2174                throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
2175            }
2176        }
2177    }
2178
2179    protected void setOptimizeAcknowledge(boolean value) {
2180        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2181            ActiveMQMessageConsumer c = iter.next();
2182            c.setOptimizeAcknowledge(value);
2183        }
2184    }
2185
2186    protected void setPrefetchSize(ConsumerId id, int prefetch) {
2187        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2188            ActiveMQMessageConsumer c = iter.next();
2189            if (c.getConsumerId().equals(id)) {
2190                c.setPrefetchSize(prefetch);
2191                break;
2192            }
2193        }
2194    }
2195
2196    protected void close(ConsumerId id) {
2197        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2198            ActiveMQMessageConsumer c = iter.next();
2199            if (c.getConsumerId().equals(id)) {
2200                try {
2201                    c.close();
2202                } catch (JMSException e) {
2203                    LOG.warn("Exception closing consumer", e);
2204                }
2205                LOG.warn("Closed consumer on Command, " + id);
2206                break;
2207            }
2208        }
2209    }
2210
2211    public boolean isInUse(ActiveMQTempDestination destination) {
2212        for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) {
2213            ActiveMQMessageConsumer c = iter.next();
2214            if (c.isInUse(destination)) {
2215                return true;
2216            }
2217        }
2218        return false;
2219    }
2220
2221    /**
2222     * highest sequence id of the last message delivered by this session.
2223     * Passed to the broker in the close command, maintained by dispose()
2224     * @return lastDeliveredSequenceId
2225     */
2226    public long getLastDeliveredSequenceId() {
2227        return lastDeliveredSequenceId;
2228    }
2229
2230    protected void sendAck(MessageAck ack) throws JMSException {
2231        sendAck(ack,false);
2232    }
2233
2234    protected void sendAck(MessageAck ack, boolean lazy) throws JMSException {
2235        if (lazy || connection.isSendAcksAsync() || getTransacted()) {
2236            asyncSendPacket(ack);
2237        } else {
2238            syncSendPacket(ack);
2239        }
2240    }
2241
2242    protected Scheduler getScheduler() throws JMSException {
2243        return this.connection.getScheduler();
2244    }
2245
2246    protected ThreadPoolExecutor getConnectionExecutor() {
2247        return this.connectionExecutor;
2248    }
2249}