001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.*;
023import java.util.concurrent.ConcurrentHashMap;
024import java.util.concurrent.ConcurrentMap;
025import java.util.concurrent.CopyOnWriteArrayList;
026import java.util.concurrent.CountDownLatch;
027import java.util.concurrent.LinkedBlockingQueue;
028import java.util.concurrent.RejectedExecutionHandler;
029import java.util.concurrent.ThreadFactory;
030import java.util.concurrent.ThreadPoolExecutor;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034
035import javax.jms.Connection;
036import javax.jms.ConnectionConsumer;
037import javax.jms.ConnectionMetaData;
038import javax.jms.Destination;
039import javax.jms.ExceptionListener;
040import javax.jms.IllegalStateException;
041import javax.jms.InvalidDestinationException;
042import javax.jms.JMSException;
043import javax.jms.Queue;
044import javax.jms.QueueConnection;
045import javax.jms.QueueSession;
046import javax.jms.ServerSessionPool;
047import javax.jms.Session;
048import javax.jms.Topic;
049import javax.jms.TopicConnection;
050import javax.jms.TopicSession;
051import javax.jms.XAConnection;
052
053import org.apache.activemq.advisory.DestinationSource;
054import org.apache.activemq.blob.BlobTransferPolicy;
055import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
056import org.apache.activemq.command.ActiveMQDestination;
057import org.apache.activemq.command.ActiveMQMessage;
058import org.apache.activemq.command.ActiveMQTempDestination;
059import org.apache.activemq.command.ActiveMQTempQueue;
060import org.apache.activemq.command.ActiveMQTempTopic;
061import org.apache.activemq.command.BrokerInfo;
062import org.apache.activemq.command.Command;
063import org.apache.activemq.command.CommandTypes;
064import org.apache.activemq.command.ConnectionControl;
065import org.apache.activemq.command.ConnectionError;
066import org.apache.activemq.command.ConnectionId;
067import org.apache.activemq.command.ConnectionInfo;
068import org.apache.activemq.command.ConsumerControl;
069import org.apache.activemq.command.ConsumerId;
070import org.apache.activemq.command.ConsumerInfo;
071import org.apache.activemq.command.ControlCommand;
072import org.apache.activemq.command.DestinationInfo;
073import org.apache.activemq.command.ExceptionResponse;
074import org.apache.activemq.command.Message;
075import org.apache.activemq.command.MessageDispatch;
076import org.apache.activemq.command.MessageId;
077import org.apache.activemq.command.ProducerAck;
078import org.apache.activemq.command.ProducerId;
079import org.apache.activemq.command.RemoveInfo;
080import org.apache.activemq.command.RemoveSubscriptionInfo;
081import org.apache.activemq.command.Response;
082import org.apache.activemq.command.SessionId;
083import org.apache.activemq.command.ShutdownInfo;
084import org.apache.activemq.command.WireFormatInfo;
085import org.apache.activemq.management.JMSConnectionStatsImpl;
086import org.apache.activemq.management.JMSStatsImpl;
087import org.apache.activemq.management.StatsCapable;
088import org.apache.activemq.management.StatsImpl;
089import org.apache.activemq.state.CommandVisitorAdapter;
090import org.apache.activemq.thread.Scheduler;
091import org.apache.activemq.thread.TaskRunnerFactory;
092import org.apache.activemq.transport.FutureResponse;
093import org.apache.activemq.transport.RequestTimedOutIOException;
094import org.apache.activemq.transport.ResponseCallback;
095import org.apache.activemq.transport.Transport;
096import org.apache.activemq.transport.TransportListener;
097import org.apache.activemq.transport.failover.FailoverTransport;
098import org.apache.activemq.util.IdGenerator;
099import org.apache.activemq.util.IntrospectionSupport;
100import org.apache.activemq.util.JMSExceptionSupport;
101import org.apache.activemq.util.LongSequenceGenerator;
102import org.apache.activemq.util.ServiceSupport;
103import org.apache.activemq.util.ThreadPoolUtils;
104import org.slf4j.Logger;
105import org.slf4j.LoggerFactory;
106
107public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
108
109    public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
110    public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
111    public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
112    public static int DEFAULT_THREAD_POOL_SIZE = 1000;
113
114    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class);
115
116    public final ConcurrentMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>();
117
118    protected boolean dispatchAsync=true;
119    protected boolean alwaysSessionAsync = true;
120
121    private TaskRunnerFactory sessionTaskRunner;
122    private final ThreadPoolExecutor executor;
123
124    // Connection state variables
125    private final ConnectionInfo info;
126    private ExceptionListener exceptionListener;
127    private ClientInternalExceptionListener clientInternalExceptionListener;
128    private boolean clientIDSet;
129    private boolean isConnectionInfoSentToBroker;
130    private boolean userSpecifiedClientID;
131
132    // Configuration options variables
133    private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
134    private BlobTransferPolicy blobTransferPolicy;
135    private RedeliveryPolicyMap redeliveryPolicyMap;
136    private MessageTransformer transformer;
137
138    private boolean disableTimeStampsByDefault;
139    private boolean optimizedMessageDispatch = true;
140    private boolean copyMessageOnSend = true;
141    private boolean useCompression;
142    private boolean objectMessageSerializationDefered;
143    private boolean useAsyncSend;
144    private boolean optimizeAcknowledge;
145    private long optimizeAcknowledgeTimeOut = 0;
146    private long optimizedAckScheduledAckInterval = 0;
147    private boolean nestedMapAndListEnabled = true;
148    private boolean useRetroactiveConsumer;
149    private boolean exclusiveConsumer;
150    private boolean alwaysSyncSend;
151    private int closeTimeout = 15000;
152    private boolean watchTopicAdvisories = true;
153    private long warnAboutUnstartedConnectionTimeout = 500L;
154    private int sendTimeout =0;
155    private boolean sendAcksAsync=true;
156    private boolean checkForDuplicates = true;
157    private boolean queueOnlyConnection = false;
158    private boolean consumerExpiryCheckEnabled = true;
159
160    private final Transport transport;
161    private final IdGenerator clientIdGenerator;
162    private final JMSStatsImpl factoryStats;
163    private final JMSConnectionStatsImpl stats;
164
165    private final AtomicBoolean started = new AtomicBoolean(false);
166    private final AtomicBoolean closing = new AtomicBoolean(false);
167    private final AtomicBoolean closed = new AtomicBoolean(false);
168    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
169    private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>();
170    private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>();
171    private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>();
172
173    // Maps ConsumerIds to ActiveMQConsumer objects
174    private final ConcurrentMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>();
175    private final ConcurrentMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>();
176    private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator();
177    private final SessionId connectionSessionId;
178    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
179    private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator();
180    private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator();
181
182    private AdvisoryConsumer advisoryConsumer;
183    private final CountDownLatch brokerInfoReceived = new CountDownLatch(1);
184    private BrokerInfo brokerInfo;
185    private IOException firstFailureError;
186    private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE;
187
188    // Assume that protocol is the latest. Change to the actual protocol
189    // version when a WireFormatInfo is received.
190    private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
191    private final long timeCreated;
192    private final ConnectionAudit connectionAudit = new ConnectionAudit();
193    private DestinationSource destinationSource;
194    private final Object ensureConnectionInfoSentMutex = new Object();
195    private boolean useDedicatedTaskRunner;
196    protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0);
197    private long consumerFailoverRedeliveryWaitPeriod;
198    private Scheduler scheduler;
199    private boolean messagePrioritySupported = false;
200    private boolean transactedIndividualAck = false;
201    private boolean nonBlockingRedelivery = false;
202    private boolean rmIdFromConnectionId = false;
203
204    private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE;
205    private RejectedExecutionHandler rejectedTaskHandler = null;
206
207    private List<String> trustedPackages = new ArrayList<String>();
208    private boolean trustAllPackages = false;
209        private int connectResponseTimeout;
210
211    /**
212     * Construct an <code>ActiveMQConnection</code>
213     *
214     * @param transport
215     * @param factoryStats
216     * @throws Exception
217     */
218    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
219
220        this.transport = transport;
221        this.clientIdGenerator = clientIdGenerator;
222        this.factoryStats = factoryStats;
223
224        // Configure a single threaded executor who's core thread can timeout if
225        // idle
226        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
227            @Override
228            public Thread newThread(Runnable r) {
229                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
230                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
231                //thread.setDaemon(true);
232                return thread;
233            }
234        });
235        // asyncConnectionThread.allowCoreThreadTimeOut(true);
236        String uniqueId = connectionIdGenerator.generateId();
237        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
238        this.info.setManageable(true);
239        this.info.setFaultTolerant(transport.isFaultTolerant());
240        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
241
242        this.transport.setTransportListener(this);
243
244        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
245        this.factoryStats.addConnection(this);
246        this.timeCreated = System.currentTimeMillis();
247        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
248    }
249
250    protected void setUserName(String userName) {
251        this.info.setUserName(userName);
252    }
253
254    protected void setPassword(String password) {
255        this.info.setPassword(password);
256    }
257
258    /**
259     * A static helper method to create a new connection
260     *
261     * @return an ActiveMQConnection
262     * @throws JMSException
263     */
264    public static ActiveMQConnection makeConnection() throws JMSException {
265        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
266        return (ActiveMQConnection)factory.createConnection();
267    }
268
269    /**
270     * A static helper method to create a new connection
271     *
272     * @param uri
273     * @return and ActiveMQConnection
274     * @throws JMSException
275     */
276    public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException {
277        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
278        return (ActiveMQConnection)factory.createConnection();
279    }
280
281    /**
282     * A static helper method to create a new connection
283     *
284     * @param user
285     * @param password
286     * @param uri
287     * @return an ActiveMQConnection
288     * @throws JMSException
289     */
290    public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException {
291        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri));
292        return (ActiveMQConnection)factory.createConnection();
293    }
294
295    /**
296     * @return a number unique for this connection
297     */
298    public JMSConnectionStatsImpl getConnectionStats() {
299        return stats;
300    }
301
302    /**
303     * Creates a <CODE>Session</CODE> object.
304     *
305     * @param transacted indicates whether the session is transacted
306     * @param acknowledgeMode indicates whether the consumer or the client will
307     *                acknowledge any messages it receives; ignored if the
308     *                session is transacted. Legal values are
309     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
310     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
311     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
312     * @return a newly created session
313     * @throws JMSException if the <CODE>Connection</CODE> object fails to
314     *                 create a session due to some internal error or lack of
315     *                 support for the specific transaction and acknowledgement
316     *                 mode.
317     * @see Session#AUTO_ACKNOWLEDGE
318     * @see Session#CLIENT_ACKNOWLEDGE
319     * @see Session#DUPS_OK_ACKNOWLEDGE
320     * @since 1.1
321     */
322    @Override
323    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
324        checkClosedOrFailed();
325        ensureConnectionInfoSent();
326        if(!transacted) {
327            if (acknowledgeMode==Session.SESSION_TRANSACTED) {
328                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
329            } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
330                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
331                        "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
332            }
333        }
334        return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED
335            ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync());
336    }
337
338    /**
339     * @return sessionId
340     */
341    protected SessionId getNextSessionId() {
342        return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId());
343    }
344
345    /**
346     * Gets the client identifier for this connection.
347     * <P>
348     * This value is specific to the JMS provider. It is either preconfigured by
349     * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
350     * dynamically by the application by calling the <code>setClientID</code>
351     * method.
352     *
353     * @return the unique client identifier
354     * @throws JMSException if the JMS provider fails to return the client ID
355     *                 for this connection due to some internal error.
356     */
357    @Override
358    public String getClientID() throws JMSException {
359        checkClosedOrFailed();
360        return this.info.getClientId();
361    }
362
363    /**
364     * Sets the client identifier for this connection.
365     * <P>
366     * The preferred way to assign a JMS client's client identifier is for it to
367     * be configured in a client-specific <CODE>ConnectionFactory</CODE>
368     * object and transparently assigned to the <CODE>Connection</CODE> object
369     * it creates.
370     * <P>
371     * Alternatively, a client can set a connection's client identifier using a
372     * provider-specific value. The facility to set a connection's client
373     * identifier explicitly is not a mechanism for overriding the identifier
374     * that has been administratively configured. It is provided for the case
375     * where no administratively specified identifier exists. If one does exist,
376     * an attempt to change it by setting it must throw an
377     * <CODE>IllegalStateException</CODE>. If a client sets the client
378     * identifier explicitly, it must do so immediately after it creates the
379     * connection and before any other action on the connection is taken. After
380     * this point, setting the client identifier is a programming error that
381     * should throw an <CODE>IllegalStateException</CODE>.
382     * <P>
383     * The purpose of the client identifier is to associate a connection and its
384     * objects with a state maintained on behalf of the client by a provider.
385     * The only such state identified by the JMS API is that required to support
386     * durable subscriptions.
387     * <P>
388     * If another connection with the same <code>clientID</code> is already
389     * running when this method is called, the JMS provider should detect the
390     * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
391     *
392     * @param newClientID the unique client identifier
393     * @throws JMSException if the JMS provider fails to set the client ID for
394     *                 this connection due to some internal error.
395     * @throws javax.jms.InvalidClientIDException if the JMS client specifies an
396     *                 invalid or duplicate client ID.
397     * @throws javax.jms.IllegalStateException if the JMS client attempts to set
398     *                 a connection's client ID at the wrong time or when it has
399     *                 been administratively configured.
400     */
401    @Override
402    public void setClientID(String newClientID) throws JMSException {
403        checkClosedOrFailed();
404
405        if (this.clientIDSet) {
406            throw new IllegalStateException("The clientID has already been set");
407        }
408
409        if (this.isConnectionInfoSentToBroker) {
410            throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
411        }
412
413        this.info.setClientId(newClientID);
414        this.userSpecifiedClientID = true;
415        ensureConnectionInfoSent();
416    }
417
418    /**
419     * Sets the default client id that the connection will use if explicitly not
420     * set with the setClientId() call.
421     */
422    public void setDefaultClientID(String clientID) throws JMSException {
423        this.info.setClientId(clientID);
424        this.userSpecifiedClientID = true;
425    }
426
427    /**
428     * Gets the metadata for this connection.
429     *
430     * @return the connection metadata
431     * @throws JMSException if the JMS provider fails to get the connection
432     *                 metadata for this connection.
433     * @see javax.jms.ConnectionMetaData
434     */
435    @Override
436    public ConnectionMetaData getMetaData() throws JMSException {
437        checkClosedOrFailed();
438        return ActiveMQConnectionMetaData.INSTANCE;
439    }
440
441    /**
442     * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
443     * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
444     * associated with it.
445     *
446     * @return the <CODE>ExceptionListener</CODE> for this connection, or
447     *         null, if no <CODE>ExceptionListener</CODE> is associated with
448     *         this connection.
449     * @throws JMSException if the JMS provider fails to get the
450     *                 <CODE>ExceptionListener</CODE> for this connection.
451     * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
452     */
453    @Override
454    public ExceptionListener getExceptionListener() throws JMSException {
455        checkClosedOrFailed();
456        return this.exceptionListener;
457    }
458
459    /**
460     * Sets an exception listener for this connection.
461     * <P>
462     * If a JMS provider detects a serious problem with a connection, it informs
463     * the connection's <CODE> ExceptionListener</CODE>, if one has been
464     * registered. It does this by calling the listener's <CODE>onException
465     * </CODE>
466     * method, passing it a <CODE>JMSException</CODE> object describing the
467     * problem.
468     * <P>
469     * An exception listener allows a client to be notified of a problem
470     * asynchronously. Some connections only consume messages, so they would
471     * have no other way to learn their connection has failed.
472     * <P>
473     * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
474     * <P>
475     * A JMS provider should attempt to resolve connection problems itself
476     * before it notifies the client of them.
477     *
478     * @param listener the exception listener
479     * @throws JMSException if the JMS provider fails to set the exception
480     *                 listener for this connection.
481     */
482    @Override
483    public void setExceptionListener(ExceptionListener listener) throws JMSException {
484        checkClosedOrFailed();
485        this.exceptionListener = listener;
486    }
487
488    /**
489     * Gets the <code>ClientInternalExceptionListener</code> object for this connection.
490     * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE>
491     * associated with it.
492     *
493     * @return the listener or <code>null</code> if no listener is registered with the connection.
494     */
495    public ClientInternalExceptionListener getClientInternalExceptionListener() {
496        return clientInternalExceptionListener;
497    }
498
499    /**
500     * Sets a client internal exception listener for this connection.
501     * The connection will notify the listener, if one has been registered, of exceptions thrown by container components
502     * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message.
503     * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code>
504     * describing the problem.
505     *
506     * @param listener the exception listener
507     */
508    public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) {
509        this.clientInternalExceptionListener = listener;
510    }
511
512    /**
513     * Starts (or restarts) a connection's delivery of incoming messages. A call
514     * to <CODE>start</CODE> on a connection that has already been started is
515     * ignored.
516     *
517     * @throws JMSException if the JMS provider fails to start message delivery
518     *                 due to some internal error.
519     * @see javax.jms.Connection#stop()
520     */
521    @Override
522    public void start() throws JMSException {
523        checkClosedOrFailed();
524        ensureConnectionInfoSent();
525        if (started.compareAndSet(false, true)) {
526            for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
527                ActiveMQSession session = i.next();
528                session.start();
529            }
530        }
531    }
532
533    /**
534     * Temporarily stops a connection's delivery of incoming messages. Delivery
535     * can be restarted using the connection's <CODE>start</CODE> method. When
536     * the connection is stopped, delivery to all the connection's message
537     * consumers is inhibited: synchronous receives block, and messages are not
538     * delivered to message listeners.
539     * <P>
540     * This call blocks until receives and/or message listeners in progress have
541     * completed.
542     * <P>
543     * Stopping a connection has no effect on its ability to send messages. A
544     * call to <CODE>stop</CODE> on a connection that has already been stopped
545     * is ignored.
546     * <P>
547     * A call to <CODE>stop</CODE> must not return until delivery of messages
548     * has paused. This means that a client can rely on the fact that none of
549     * its message listeners will be called and that all threads of control
550     * waiting for <CODE>receive</CODE> calls to return will not return with a
551     * message until the connection is restarted. The receive timers for a
552     * stopped connection continue to advance, so receives may time out while
553     * the connection is stopped.
554     * <P>
555     * If message listeners are running when <CODE>stop</CODE> is invoked, the
556     * <CODE>stop</CODE> call must wait until all of them have returned before
557     * it may return. While these message listeners are completing, they must
558     * have the full services of the connection available to them.
559     *
560     * @throws JMSException if the JMS provider fails to stop message delivery
561     *                 due to some internal error.
562     * @see javax.jms.Connection#start()
563     */
564    @Override
565    public void stop() throws JMSException {
566        doStop(true);
567    }
568
569    /**
570     * @see #stop()
571     * @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed,
572     *                    <tt>false</tt> to skip this check
573     * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
574     */
575    void doStop(boolean checkClosed) throws JMSException {
576        if (checkClosed) {
577            checkClosedOrFailed();
578        }
579        if (started.compareAndSet(true, false)) {
580            synchronized(sessions) {
581                for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) {
582                    ActiveMQSession s = i.next();
583                    s.stop();
584                }
585            }
586        }
587    }
588
589    /**
590     * Closes the connection.
591     * <P>
592     * Since a provider typically allocates significant resources outside the
593     * JVM on behalf of a connection, clients should close these resources when
594     * they are not needed. Relying on garbage collection to eventually reclaim
595     * these resources may not be timely enough.
596     * <P>
597     * There is no need to close the sessions, producers, and consumers of a
598     * closed connection.
599     * <P>
600     * Closing a connection causes all temporary destinations to be deleted.
601     * <P>
602     * When this method is invoked, it should not return until message
603     * processing has been shut down in an orderly fashion. This means that all
604     * message listeners that may have been running have returned, and that all
605     * pending receives have returned. A close terminates all pending message
606     * receives on the connection's sessions' consumers. The receives may return
607     * with a message or with null, depending on whether there was a message
608     * available at the time of the close. If one or more of the connection's
609     * sessions' message listeners is processing a message at the time when
610     * connection <CODE>close</CODE> is invoked, all the facilities of the
611     * connection and its sessions must remain available to those listeners
612     * until they return control to the JMS provider.
613     * <P>
614     * Closing a connection causes any of its sessions' transactions in progress
615     * to be rolled back. In the case where a session's work is coordinated by
616     * an external transaction manager, a session's <CODE>commit</CODE> and
617     * <CODE> rollback</CODE> methods are not used and the result of a closed
618     * session's work is determined later by the transaction manager. Closing a
619     * connection does NOT force an acknowledgment of client-acknowledged
620     * sessions.
621     * <P>
622     * Invoking the <CODE>acknowledge</CODE> method of a received message from
623     * a closed connection's session must throw an
624     * <CODE>IllegalStateException</CODE>. Closing a closed connection must
625     * NOT throw an exception.
626     *
627     * @throws JMSException if the JMS provider fails to close the connection
628     *                 due to some internal error. For example, a failure to
629     *                 release resources or to close a socket connection can
630     *                 cause this exception to be thrown.
631     */
632    @Override
633    public void close() throws JMSException {
634        try {
635            // If we were running, lets stop first.
636            if (!closed.get() && !transportFailed.get()) {
637                // do not fail if already closed as according to JMS spec we must not
638                // throw exception if already closed
639                doStop(false);
640            }
641
642            synchronized (this) {
643                if (!closed.get()) {
644                    closing.set(true);
645
646                    if (destinationSource != null) {
647                        destinationSource.stop();
648                        destinationSource = null;
649                    }
650                    if (advisoryConsumer != null) {
651                        advisoryConsumer.dispose();
652                        advisoryConsumer = null;
653                    }
654
655                    Scheduler scheduler = this.scheduler;
656                    if (scheduler != null) {
657                        try {
658                            scheduler.stop();
659                        } catch (Exception e) {
660                            JMSException ex =  JMSExceptionSupport.create(e);
661                            throw ex;
662                        }
663                    }
664
665                    long lastDeliveredSequenceId = -1;
666                    for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
667                        ActiveMQSession s = i.next();
668                        s.dispose();
669                        lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId());
670                    }
671                    for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
672                        ActiveMQConnectionConsumer c = i.next();
673                        c.dispose();
674                    }
675
676                    this.activeTempDestinations.clear();
677
678                    try {
679                        if (isConnectionInfoSentToBroker) {
680                            // If we announced ourselves to the broker.. Try to let the broker
681                            // know that the connection is being shutdown.
682                            RemoveInfo removeCommand = info.createRemoveCommand();
683                            removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
684                            try {
685                                syncSendPacket(removeCommand, closeTimeout);
686                            } catch (JMSException e) {
687                                if (e.getCause() instanceof RequestTimedOutIOException) {
688                                    // expected
689                                } else {
690                                    throw e;
691                                }
692                            }
693                            doAsyncSendPacket(new ShutdownInfo());
694                        }
695                    } finally { // release anyway even if previous communication fails
696                        started.set(false);
697
698                        // TODO if we move the TaskRunnerFactory to the connection
699                        // factory
700                        // then we may need to call
701                        // factory.onConnectionClose(this);
702                        if (sessionTaskRunner != null) {
703                            sessionTaskRunner.shutdown();
704                        }
705                        closed.set(true);
706                        closing.set(false);
707                    }
708                }
709            }
710        } finally {
711            try {
712                if (executor != null) {
713                    ThreadPoolUtils.shutdown(executor);
714                }
715            } catch (Throwable e) {
716                LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
717            }
718
719            ServiceSupport.dispose(this.transport);
720
721            factoryStats.removeConnection(this);
722        }
723    }
724
725    /**
726     * Tells the broker to terminate its VM. This can be used to cleanly
727     * terminate a broker running in a standalone java process. Server must have
728     * property enable.vm.shutdown=true defined to allow this to work.
729     */
730    // TODO : org.apache.activemq.message.BrokerAdminCommand not yet
731    // implemented.
732    /*
733     * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand
734     * command = new BrokerAdminCommand();
735     * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
736     * asyncSendPacket(command); }
737     */
738
739    /**
740     * Create a durable connection consumer for this connection (optional
741     * operation). This is an expert facility not used by regular JMS clients.
742     *
743     * @param topic topic to access
744     * @param subscriptionName durable subscription name
745     * @param messageSelector only messages with properties matching the message
746     *                selector expression are delivered. A value of null or an
747     *                empty string indicates that there is no message selector
748     *                for the message consumer.
749     * @param sessionPool the server session pool to associate with this durable
750     *                connection consumer
751     * @param maxMessages the maximum number of messages that can be assigned to
752     *                a server session at one time
753     * @return the durable connection consumer
754     * @throws JMSException if the <CODE>Connection</CODE> object fails to
755     *                 create a connection consumer due to some internal error
756     *                 or invalid arguments for <CODE>sessionPool</CODE> and
757     *                 <CODE>messageSelector</CODE>.
758     * @throws javax.jms.InvalidDestinationException if an invalid destination
759     *                 is specified.
760     * @throws javax.jms.InvalidSelectorException if the message selector is
761     *                 invalid.
762     * @see javax.jms.ConnectionConsumer
763     * @since 1.1
764     */
765    @Override
766    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages)
767        throws JMSException {
768        return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false);
769    }
770
771    /**
772     * Create a durable connection consumer for this connection (optional
773     * operation). This is an expert facility not used by regular JMS clients.
774     *
775     * @param topic topic to access
776     * @param subscriptionName durable subscription name
777     * @param messageSelector only messages with properties matching the message
778     *                selector expression are delivered. A value of null or an
779     *                empty string indicates that there is no message selector
780     *                for the message consumer.
781     * @param sessionPool the server session pool to associate with this durable
782     *                connection consumer
783     * @param maxMessages the maximum number of messages that can be assigned to
784     *                a server session at one time
785     * @param noLocal set true if you want to filter out messages published
786     *                locally
787     * @return the durable connection consumer
788     * @throws JMSException if the <CODE>Connection</CODE> object fails to
789     *                 create a connection consumer due to some internal error
790     *                 or invalid arguments for <CODE>sessionPool</CODE> and
791     *                 <CODE>messageSelector</CODE>.
792     * @throws javax.jms.InvalidDestinationException if an invalid destination
793     *                 is specified.
794     * @throws javax.jms.InvalidSelectorException if the message selector is
795     *                 invalid.
796     * @see javax.jms.ConnectionConsumer
797     * @since 1.1
798     */
799    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages,
800                                                              boolean noLocal) throws JMSException {
801        checkClosedOrFailed();
802
803        if (queueOnlyConnection) {
804            throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources.");
805        }
806
807        ensureConnectionInfoSent();
808        SessionId sessionId = new SessionId(info.getConnectionId(), -1);
809        ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()));
810        info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
811        info.setSubscriptionName(subscriptionName);
812        info.setSelector(messageSelector);
813        info.setPrefetchSize(maxMessages);
814        info.setDispatchAsync(isDispatchAsync());
815
816        // Allows the options on the destination to configure the consumerInfo
817        if (info.getDestination().getOptions() != null) {
818            Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions());
819            IntrospectionSupport.setProperties(this.info, options, "consumer.");
820        }
821
822        return new ActiveMQConnectionConsumer(this, sessionPool, info);
823    }
824
825    // Properties
826    // -------------------------------------------------------------------------
827
828    /**
829     * Returns true if this connection has been started
830     *
831     * @return true if this Connection is started
832     */
833    public boolean isStarted() {
834        return started.get();
835    }
836
837    /**
838     * Returns true if the connection is closed
839     */
840    public boolean isClosed() {
841        return closed.get();
842    }
843
844    /**
845     * Returns true if the connection is in the process of being closed
846     */
847    public boolean isClosing() {
848        return closing.get();
849    }
850
851    /**
852     * Returns true if the underlying transport has failed
853     */
854    public boolean isTransportFailed() {
855        return transportFailed.get();
856    }
857
858    /**
859     * @return Returns the prefetchPolicy.
860     */
861    public ActiveMQPrefetchPolicy getPrefetchPolicy() {
862        return prefetchPolicy;
863    }
864
865    /**
866     * Sets the <a
867     * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch
868     * policy</a> for consumers created by this connection.
869     */
870    public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
871        this.prefetchPolicy = prefetchPolicy;
872    }
873
874    /**
875     */
876    public Transport getTransportChannel() {
877        return transport;
878    }
879
880    /**
881     * @return Returns the clientID of the connection, forcing one to be
882     *         generated if one has not yet been configured.
883     */
884    public String getInitializedClientID() throws JMSException {
885        ensureConnectionInfoSent();
886        return info.getClientId();
887    }
888
889    /**
890     * @return Returns the timeStampsDisableByDefault.
891     */
892    public boolean isDisableTimeStampsByDefault() {
893        return disableTimeStampsByDefault;
894    }
895
896    /**
897     * Sets whether or not timestamps on messages should be disabled or not. If
898     * you disable them it adds a small performance boost.
899     */
900    public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) {
901        this.disableTimeStampsByDefault = timeStampsDisableByDefault;
902    }
903
904    /**
905     * @return Returns the dispatchOptimizedMessage.
906     */
907    public boolean isOptimizedMessageDispatch() {
908        return optimizedMessageDispatch;
909    }
910
911    /**
912     * If this flag is set then an larger prefetch limit is used - only
913     * applicable for durable topic subscribers.
914     */
915    public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) {
916        this.optimizedMessageDispatch = dispatchOptimizedMessage;
917    }
918
919    /**
920     * @return Returns the closeTimeout.
921     */
922    public int getCloseTimeout() {
923        return closeTimeout;
924    }
925
926    /**
927     * Sets the timeout before a close is considered complete. Normally a
928     * close() on a connection waits for confirmation from the broker; this
929     * allows that operation to timeout to save the client hanging if there is
930     * no broker
931     */
932    public void setCloseTimeout(int closeTimeout) {
933        this.closeTimeout = closeTimeout;
934    }
935
936    /**
937     * @return ConnectionInfo
938     */
939    public ConnectionInfo getConnectionInfo() {
940        return this.info;
941    }
942
943    public boolean isUseRetroactiveConsumer() {
944        return useRetroactiveConsumer;
945    }
946
947    /**
948     * Sets whether or not retroactive consumers are enabled. Retroactive
949     * consumers allow non-durable topic subscribers to receive old messages
950     * that were published before the non-durable subscriber started.
951     */
952    public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) {
953        this.useRetroactiveConsumer = useRetroactiveConsumer;
954    }
955
956    public boolean isNestedMapAndListEnabled() {
957        return nestedMapAndListEnabled;
958    }
959
960    /**
961     * Enables/disables whether or not Message properties and MapMessage entries
962     * support <a
963     * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested
964     * Structures</a> of Map and List objects
965     */
966    public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) {
967        this.nestedMapAndListEnabled = structuredMapsEnabled;
968    }
969
970    public boolean isExclusiveConsumer() {
971        return exclusiveConsumer;
972    }
973
974    /**
975     * Enables or disables whether or not queue consumers should be exclusive or
976     * not for example to preserve ordering when not using <a
977     * href="http://activemq.apache.org/message-groups.html">Message Groups</a>
978     *
979     * @param exclusiveConsumer
980     */
981    public void setExclusiveConsumer(boolean exclusiveConsumer) {
982        this.exclusiveConsumer = exclusiveConsumer;
983    }
984
985    /**
986     * Adds a transport listener so that a client can be notified of events in
987     * the underlying transport
988     */
989    public void addTransportListener(TransportListener transportListener) {
990        transportListeners.add(transportListener);
991    }
992
993    public void removeTransportListener(TransportListener transportListener) {
994        transportListeners.remove(transportListener);
995    }
996
997    public boolean isUseDedicatedTaskRunner() {
998        return useDedicatedTaskRunner;
999    }
1000
1001    public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) {
1002        this.useDedicatedTaskRunner = useDedicatedTaskRunner;
1003    }
1004
1005    public TaskRunnerFactory getSessionTaskRunner() {
1006        synchronized (this) {
1007            if (sessionTaskRunner == null) {
1008                sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize);
1009                sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler);
1010            }
1011        }
1012        return sessionTaskRunner;
1013    }
1014
1015    public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
1016        this.sessionTaskRunner = sessionTaskRunner;
1017    }
1018
1019    public MessageTransformer getTransformer() {
1020        return transformer;
1021    }
1022
1023    /**
1024     * Sets the transformer used to transform messages before they are sent on
1025     * to the JMS bus or when they are received from the bus but before they are
1026     * delivered to the JMS client
1027     */
1028    public void setTransformer(MessageTransformer transformer) {
1029        this.transformer = transformer;
1030    }
1031
1032    /**
1033     * @return the statsEnabled
1034     */
1035    public boolean isStatsEnabled() {
1036        return this.stats.isEnabled();
1037    }
1038
1039    /**
1040     * @param statsEnabled the statsEnabled to set
1041     */
1042    public void setStatsEnabled(boolean statsEnabled) {
1043        this.stats.setEnabled(statsEnabled);
1044    }
1045
1046    /**
1047     * Returns the {@link DestinationSource} object which can be used to listen to destinations
1048     * being created or destroyed or to enquire about the current destinations available on the broker
1049     *
1050     * @return a lazily created destination source
1051     * @throws JMSException
1052     */
1053    @Override
1054    public DestinationSource getDestinationSource() throws JMSException {
1055        if (destinationSource == null) {
1056            destinationSource = new DestinationSource(this);
1057            destinationSource.start();
1058        }
1059        return destinationSource;
1060    }
1061
1062    // Implementation methods
1063    // -------------------------------------------------------------------------
1064
1065    /**
1066     * Used internally for adding Sessions to the Connection
1067     *
1068     * @param session
1069     * @throws JMSException
1070     * @throws JMSException
1071     */
1072    protected void addSession(ActiveMQSession session) throws JMSException {
1073        this.sessions.add(session);
1074        if (sessions.size() > 1 || session.isTransacted()) {
1075            optimizedMessageDispatch = false;
1076        }
1077    }
1078
1079    /**
1080     * Used interanlly for removing Sessions from a Connection
1081     *
1082     * @param session
1083     */
1084    protected void removeSession(ActiveMQSession session) {
1085        this.sessions.remove(session);
1086        this.removeDispatcher(session);
1087    }
1088
1089    /**
1090     * Add a ConnectionConsumer
1091     *
1092     * @param connectionConsumer
1093     * @throws JMSException
1094     */
1095    protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1096        this.connectionConsumers.add(connectionConsumer);
1097    }
1098
1099    /**
1100     * Remove a ConnectionConsumer
1101     *
1102     * @param connectionConsumer
1103     */
1104    protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
1105        this.connectionConsumers.remove(connectionConsumer);
1106        this.removeDispatcher(connectionConsumer);
1107    }
1108
1109    /**
1110     * Creates a <CODE>TopicSession</CODE> object.
1111     *
1112     * @param transacted indicates whether the session is transacted
1113     * @param acknowledgeMode indicates whether the consumer or the client will
1114     *                acknowledge any messages it receives; ignored if the
1115     *                session is transacted. Legal values are
1116     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1117     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1118     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1119     * @return a newly created topic session
1120     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1121     *                 to create a session due to some internal error or lack of
1122     *                 support for the specific transaction and acknowledgement
1123     *                 mode.
1124     * @see Session#AUTO_ACKNOWLEDGE
1125     * @see Session#CLIENT_ACKNOWLEDGE
1126     * @see Session#DUPS_OK_ACKNOWLEDGE
1127     */
1128    @Override
1129    public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
1130        return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1131    }
1132
1133    /**
1134     * Creates a connection consumer for this connection (optional operation).
1135     * This is an expert facility not used by regular JMS clients.
1136     *
1137     * @param topic the topic to access
1138     * @param messageSelector only messages with properties matching the message
1139     *                selector expression are delivered. A value of null or an
1140     *                empty string indicates that there is no message selector
1141     *                for the message consumer.
1142     * @param sessionPool the server session pool to associate with this
1143     *                connection consumer
1144     * @param maxMessages the maximum number of messages that can be assigned to
1145     *                a server session at one time
1146     * @return the connection consumer
1147     * @throws JMSException if the <CODE>TopicConnection</CODE> object fails
1148     *                 to create a connection consumer due to some internal
1149     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1150     *                 and <CODE>messageSelector</CODE>.
1151     * @throws javax.jms.InvalidDestinationException if an invalid topic is
1152     *                 specified.
1153     * @throws javax.jms.InvalidSelectorException if the message selector is
1154     *                 invalid.
1155     * @see javax.jms.ConnectionConsumer
1156     */
1157    @Override
1158    public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1159        return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false);
1160    }
1161
1162    /**
1163     * Creates a connection consumer for this connection (optional operation).
1164     * This is an expert facility not used by regular JMS clients.
1165     *
1166     * @param queue the queue to access
1167     * @param messageSelector only messages with properties matching the message
1168     *                selector expression are delivered. A value of null or an
1169     *                empty string indicates that there is no message selector
1170     *                for the message consumer.
1171     * @param sessionPool the server session pool to associate with this
1172     *                connection consumer
1173     * @param maxMessages the maximum number of messages that can be assigned to
1174     *                a server session at one time
1175     * @return the connection consumer
1176     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1177     *                 to create a connection consumer due to some internal
1178     *                 error or invalid arguments for <CODE>sessionPool</CODE>
1179     *                 and <CODE>messageSelector</CODE>.
1180     * @throws javax.jms.InvalidDestinationException if an invalid queue is
1181     *                 specified.
1182     * @throws javax.jms.InvalidSelectorException if the message selector is
1183     *                 invalid.
1184     * @see javax.jms.ConnectionConsumer
1185     */
1186    @Override
1187    public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1188        return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false);
1189    }
1190
1191    /**
1192     * Creates a connection consumer for this connection (optional operation).
1193     * This is an expert facility not used by regular JMS clients.
1194     *
1195     * @param destination the destination to access
1196     * @param messageSelector only messages with properties matching the message
1197     *                selector expression are delivered. A value of null or an
1198     *                empty string indicates that there is no message selector
1199     *                for the message consumer.
1200     * @param sessionPool the server session pool to associate with this
1201     *                connection consumer
1202     * @param maxMessages the maximum number of messages that can be assigned to
1203     *                a server session at one time
1204     * @return the connection consumer
1205     * @throws JMSException if the <CODE>Connection</CODE> object fails to
1206     *                 create a connection consumer due to some internal error
1207     *                 or invalid arguments for <CODE>sessionPool</CODE> and
1208     *                 <CODE>messageSelector</CODE>.
1209     * @throws javax.jms.InvalidDestinationException if an invalid destination
1210     *                 is specified.
1211     * @throws javax.jms.InvalidSelectorException if the message selector is
1212     *                 invalid.
1213     * @see javax.jms.ConnectionConsumer
1214     * @since 1.1
1215     */
1216    @Override
1217    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
1218        return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false);
1219    }
1220
1221    public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal)
1222        throws JMSException {
1223
1224        checkClosedOrFailed();
1225        ensureConnectionInfoSent();
1226
1227        ConsumerId consumerId = createConsumerId();
1228        ConsumerInfo consumerInfo = new ConsumerInfo(consumerId);
1229        consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
1230        consumerInfo.setSelector(messageSelector);
1231        consumerInfo.setPrefetchSize(maxMessages);
1232        consumerInfo.setNoLocal(noLocal);
1233        consumerInfo.setDispatchAsync(isDispatchAsync());
1234
1235        // Allows the options on the destination to configure the consumerInfo
1236        if (consumerInfo.getDestination().getOptions() != null) {
1237            Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions());
1238            IntrospectionSupport.setProperties(consumerInfo, options, "consumer.");
1239        }
1240
1241        return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo);
1242    }
1243
1244    /**
1245     * @return
1246     */
1247    private ConsumerId createConsumerId() {
1248        return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId());
1249    }
1250
1251    /**
1252     * Creates a <CODE>QueueSession</CODE> object.
1253     *
1254     * @param transacted indicates whether the session is transacted
1255     * @param acknowledgeMode indicates whether the consumer or the client will
1256     *                acknowledge any messages it receives; ignored if the
1257     *                session is transacted. Legal values are
1258     *                <code>Session.AUTO_ACKNOWLEDGE</code>,
1259     *                <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1260     *                <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1261     * @return a newly created queue session
1262     * @throws JMSException if the <CODE>QueueConnection</CODE> object fails
1263     *                 to create a session due to some internal error or lack of
1264     *                 support for the specific transaction and acknowledgement
1265     *                 mode.
1266     * @see Session#AUTO_ACKNOWLEDGE
1267     * @see Session#CLIENT_ACKNOWLEDGE
1268     * @see Session#DUPS_OK_ACKNOWLEDGE
1269     */
1270    @Override
1271    public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
1272        return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
1273    }
1274
1275    /**
1276     * Ensures that the clientID was manually specified and not auto-generated.
1277     * If the clientID was not specified this method will throw an exception.
1278     * This method is used to ensure that the clientID + durableSubscriber name
1279     * are used correctly.
1280     *
1281     * @throws JMSException
1282     */
1283    public void checkClientIDWasManuallySpecified() throws JMSException {
1284        if (!userSpecifiedClientID) {
1285            throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1286        }
1287    }
1288
1289    /**
1290     * send a Packet through the Connection - for internal use only
1291     *
1292     * @param command
1293     * @throws JMSException
1294     */
1295    public void asyncSendPacket(Command command) throws JMSException {
1296        if (isClosed()) {
1297            throw new ConnectionClosedException();
1298        } else {
1299            doAsyncSendPacket(command);
1300        }
1301    }
1302
1303    private void doAsyncSendPacket(Command command) throws JMSException {
1304        try {
1305            this.transport.oneway(command);
1306        } catch (IOException e) {
1307            throw JMSExceptionSupport.create(e);
1308        }
1309    }
1310
1311    /**
1312     * Send a packet through a Connection - for internal use only
1313     *
1314     * @param command
1315     * @return
1316     * @throws JMSException
1317     */
1318    public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException {
1319        if(onComplete==null) {
1320            syncSendPacket(command);
1321        } else {
1322            if (isClosed()) {
1323                throw new ConnectionClosedException();
1324            }
1325            try {
1326                this.transport.asyncRequest(command, new ResponseCallback() {
1327                    @Override
1328                    public void onCompletion(FutureResponse resp) {
1329                        Response response;
1330                        Throwable exception = null;
1331                        try {
1332                            response = resp.getResult();
1333                            if (response.isException()) {
1334                                ExceptionResponse er = (ExceptionResponse)response;
1335                                exception = er.getException();
1336                            }
1337                        } catch (Exception e) {
1338                            exception = e;
1339                        }
1340                        if(exception!=null) {
1341                            if ( exception instanceof JMSException) {
1342                                onComplete.onException((JMSException) exception);
1343                            } else {
1344                                if (isClosed()||closing.get()) {
1345                                    LOG.debug("Received an exception but connection is closing");
1346                                }
1347                                JMSException jmsEx = null;
1348                                try {
1349                                    jmsEx = JMSExceptionSupport.create(exception);
1350                                } catch(Throwable e) {
1351                                    LOG.error("Caught an exception trying to create a JMSException for " +exception,e);
1352                                }
1353                                // dispose of transport for security exceptions on connection initiation
1354                                if (exception instanceof SecurityException && command instanceof ConnectionInfo){
1355                                    forceCloseOnSecurityException(exception);
1356                                }
1357                                if (jmsEx !=null) {
1358                                    onComplete.onException(jmsEx);
1359                                }
1360                            }
1361                        } else {
1362                            onComplete.onSuccess();
1363                        }
1364                    }
1365                });
1366            } catch (IOException e) {
1367                throw JMSExceptionSupport.create(e);
1368            }
1369        }
1370    }
1371
1372    private void forceCloseOnSecurityException(Throwable exception) {
1373        LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception);
1374        onException(new IOException("Force close due to SecurityException on connect", exception));
1375    }
1376
1377    public Response syncSendPacket(Command command, int timeout) throws JMSException {
1378        if (isClosed()) {
1379            throw new ConnectionClosedException();
1380        } else {
1381
1382            try {
1383                Response response = (Response)(timeout > 0
1384                        ? this.transport.request(command, timeout)
1385                        : this.transport.request(command));
1386                if (response.isException()) {
1387                    ExceptionResponse er = (ExceptionResponse)response;
1388                    if (er.getException() instanceof JMSException) {
1389                        throw (JMSException)er.getException();
1390                    } else {
1391                        if (isClosed()||closing.get()) {
1392                            LOG.debug("Received an exception but connection is closing");
1393                        }
1394                        JMSException jmsEx = null;
1395                        try {
1396                            jmsEx = JMSExceptionSupport.create(er.getException());
1397                        } catch(Throwable e) {
1398                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
1399                        }
1400                        if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
1401                            forceCloseOnSecurityException(er.getException());
1402                        }
1403                        if (jmsEx !=null) {
1404                            throw jmsEx;
1405                        }
1406                    }
1407                }
1408                return response;
1409            } catch (IOException e) {
1410                throw JMSExceptionSupport.create(e);
1411            }
1412        }
1413    }
1414
1415    /**
1416     * Send a packet through a Connection - for internal use only
1417     *
1418     * @param command
1419     * @return
1420     * @throws JMSException
1421     */
1422    public Response syncSendPacket(Command command) throws JMSException {
1423        return syncSendPacket(command, 0);
1424    }
1425
1426    /**
1427     * @return statistics for this Connection
1428     */
1429    @Override
1430    public StatsImpl getStats() {
1431        return stats;
1432    }
1433
1434    /**
1435     * simply throws an exception if the Connection is already closed or the
1436     * Transport has failed
1437     *
1438     * @throws JMSException
1439     */
1440    protected synchronized void checkClosedOrFailed() throws JMSException {
1441        checkClosed();
1442        if (transportFailed.get()) {
1443            throw new ConnectionFailedException(firstFailureError);
1444        }
1445    }
1446
1447    /**
1448     * simply throws an exception if the Connection is already closed
1449     *
1450     * @throws JMSException
1451     */
1452    protected synchronized void checkClosed() throws JMSException {
1453        if (closed.get()) {
1454            throw new ConnectionClosedException();
1455        }
1456    }
1457
1458    /**
1459     * Send the ConnectionInfo to the Broker
1460     *
1461     * @throws JMSException
1462     */
1463    protected void ensureConnectionInfoSent() throws JMSException {
1464        synchronized(this.ensureConnectionInfoSentMutex) {
1465            // Can we skip sending the ConnectionInfo packet??
1466            if (isConnectionInfoSentToBroker || closed.get()) {
1467                return;
1468            }
1469            //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID?
1470            if (info.getClientId() == null || info.getClientId().trim().length() == 0) {
1471                info.setClientId(clientIdGenerator.generateId());
1472            }
1473            syncSendPacket(info.copy(), getConnectResponseTimeout());
1474
1475            this.isConnectionInfoSentToBroker = true;
1476            // Add a temp destination advisory consumer so that
1477            // We know what the valid temporary destinations are on the
1478            // broker without having to do an RPC to the broker.
1479
1480            ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId());
1481            if (watchTopicAdvisories) {
1482                advisoryConsumer = new AdvisoryConsumer(this, consumerId);
1483            }
1484        }
1485    }
1486
1487    public synchronized boolean isWatchTopicAdvisories() {
1488        return watchTopicAdvisories;
1489    }
1490
1491    public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) {
1492        this.watchTopicAdvisories = watchTopicAdvisories;
1493    }
1494
1495    /**
1496     * @return Returns the useAsyncSend.
1497     */
1498    public boolean isUseAsyncSend() {
1499        return useAsyncSend;
1500    }
1501
1502    /**
1503     * Forces the use of <a
1504     * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which
1505     * adds a massive performance boost; but means that the send() method will
1506     * return immediately whether the message has been sent or not which could
1507     * lead to message loss.
1508     */
1509    public void setUseAsyncSend(boolean useAsyncSend) {
1510        this.useAsyncSend = useAsyncSend;
1511    }
1512
1513    /**
1514     * @return true if always sync send messages
1515     */
1516    public boolean isAlwaysSyncSend() {
1517        return this.alwaysSyncSend;
1518    }
1519
1520    /**
1521     * Set true if always require messages to be sync sent
1522     *
1523     * @param alwaysSyncSend
1524     */
1525    public void setAlwaysSyncSend(boolean alwaysSyncSend) {
1526        this.alwaysSyncSend = alwaysSyncSend;
1527    }
1528
1529    /**
1530     * @return the messagePrioritySupported
1531     */
1532    public boolean isMessagePrioritySupported() {
1533        return this.messagePrioritySupported;
1534    }
1535
1536    /**
1537     * @param messagePrioritySupported the messagePrioritySupported to set
1538     */
1539    public void setMessagePrioritySupported(boolean messagePrioritySupported) {
1540        this.messagePrioritySupported = messagePrioritySupported;
1541    }
1542
1543    /**
1544     * Cleans up this connection so that it's state is as if the connection was
1545     * just created. This allows the Resource Adapter to clean up a connection
1546     * so that it can be reused without having to close and recreate the
1547     * connection.
1548     */
1549    public void cleanup() throws JMSException {
1550        doCleanup(false);
1551    }
1552
1553    public void doCleanup(boolean removeConnection) throws JMSException {
1554        if (advisoryConsumer != null && !isTransportFailed()) {
1555            advisoryConsumer.dispose();
1556            advisoryConsumer = null;
1557        }
1558
1559        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1560            ActiveMQSession s = i.next();
1561            s.dispose();
1562        }
1563        for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) {
1564            ActiveMQConnectionConsumer c = i.next();
1565            c.dispose();
1566        }
1567
1568        if (removeConnection) {
1569            if (isConnectionInfoSentToBroker) {
1570                if (!transportFailed.get() && !closing.get()) {
1571                    syncSendPacket(info.createRemoveCommand());
1572                }
1573                isConnectionInfoSentToBroker = false;
1574            }
1575            if (userSpecifiedClientID) {
1576                info.setClientId(null);
1577                userSpecifiedClientID = false;
1578            }
1579            clientIDSet = false;
1580        }
1581
1582        started.set(false);
1583    }
1584
1585    /**
1586     * Changes the associated username/password that is associated with this
1587     * connection. If the connection has been used, you must called cleanup()
1588     * before calling this method.
1589     *
1590     * @throws IllegalStateException if the connection is in used.
1591     */
1592    public void changeUserInfo(String userName, String password) throws JMSException {
1593        if (isConnectionInfoSentToBroker) {
1594            throw new IllegalStateException("changeUserInfo used Connection is not allowed");
1595        }
1596        this.info.setUserName(userName);
1597        this.info.setPassword(password);
1598    }
1599
1600    /**
1601     * @return Returns the resourceManagerId.
1602     * @throws JMSException
1603     */
1604    public String getResourceManagerId() throws JMSException {
1605        if (isRmIdFromConnectionId()) {
1606            return info.getConnectionId().getValue();
1607        }
1608        waitForBrokerInfo();
1609        if (brokerInfo == null) {
1610            throw new JMSException("Connection failed before Broker info was received.");
1611        }
1612        return brokerInfo.getBrokerId().getValue();
1613    }
1614
1615    /**
1616     * Returns the broker name if one is available or null if one is not
1617     * available yet.
1618     */
1619    public String getBrokerName() {
1620        try {
1621            brokerInfoReceived.await(5, TimeUnit.SECONDS);
1622            if (brokerInfo == null) {
1623                return null;
1624            }
1625            return brokerInfo.getBrokerName();
1626        } catch (InterruptedException e) {
1627            Thread.currentThread().interrupt();
1628            return null;
1629        }
1630    }
1631
1632    /**
1633     * Returns the broker information if it is available or null if it is not
1634     * available yet.
1635     */
1636    public BrokerInfo getBrokerInfo() {
1637        return brokerInfo;
1638    }
1639
1640    /**
1641     * @return Returns the RedeliveryPolicy.
1642     * @throws JMSException
1643     */
1644    public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
1645        return redeliveryPolicyMap.getDefaultEntry();
1646    }
1647
1648    /**
1649     * Sets the redelivery policy to be used when messages are rolled back
1650     */
1651    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
1652        this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
1653    }
1654
1655    public BlobTransferPolicy getBlobTransferPolicy() {
1656        if (blobTransferPolicy == null) {
1657            blobTransferPolicy = createBlobTransferPolicy();
1658        }
1659        return blobTransferPolicy;
1660    }
1661
1662    /**
1663     * Sets the policy used to describe how out-of-band BLOBs (Binary Large
1664     * OBjects) are transferred from producers to brokers to consumers
1665     */
1666    public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) {
1667        this.blobTransferPolicy = blobTransferPolicy;
1668    }
1669
1670    /**
1671     * @return Returns the alwaysSessionAsync.
1672     */
1673    public boolean isAlwaysSessionAsync() {
1674        return alwaysSessionAsync;
1675    }
1676
1677    /**
1678     * If this flag is not set then a separate thread is not used for dispatching messages for each Session in
1679     * the Connection. However, a separate thread is always used if there is more than one session, or the session
1680     * isn't in auto acknowledge or duplicates ok mode.  By default this value is set to true and session dispatch
1681     * happens asynchronously.
1682     */
1683    public void setAlwaysSessionAsync(boolean alwaysSessionAsync) {
1684        this.alwaysSessionAsync = alwaysSessionAsync;
1685    }
1686
1687    /**
1688     * @return Returns the optimizeAcknowledge.
1689     */
1690    public boolean isOptimizeAcknowledge() {
1691        return optimizeAcknowledge;
1692    }
1693
1694    /**
1695     * Enables an optimised acknowledgement mode where messages are acknowledged
1696     * in batches rather than individually
1697     *
1698     * @param optimizeAcknowledge The optimizeAcknowledge to set.
1699     */
1700    public void setOptimizeAcknowledge(boolean optimizeAcknowledge) {
1701        this.optimizeAcknowledge = optimizeAcknowledge;
1702    }
1703
1704    /**
1705     * The max time in milliseconds between optimized ack batches
1706     * @param optimizeAcknowledgeTimeOut
1707     */
1708    public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) {
1709        this.optimizeAcknowledgeTimeOut =  optimizeAcknowledgeTimeOut;
1710    }
1711
1712    public long getOptimizeAcknowledgeTimeOut() {
1713        return optimizeAcknowledgeTimeOut;
1714    }
1715
1716    public long getWarnAboutUnstartedConnectionTimeout() {
1717        return warnAboutUnstartedConnectionTimeout;
1718    }
1719
1720    /**
1721     * Enables the timeout from a connection creation to when a warning is
1722     * generated if the connection is not properly started via {@link #start()}
1723     * and a message is received by a consumer. It is a very common gotcha to
1724     * forget to <a
1725     * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start
1726     * the connection</a> so this option makes the default case to create a
1727     * warning if the user forgets. To disable the warning just set the value to <
1728     * 0 (say -1).
1729     */
1730    public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) {
1731        this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout;
1732    }
1733
1734    /**
1735     * @return the sendTimeout (in milliseconds)
1736     */
1737    public int getSendTimeout() {
1738        return sendTimeout;
1739    }
1740
1741    /**
1742     * @param sendTimeout the sendTimeout to set (in milliseconds)
1743     */
1744    public void setSendTimeout(int sendTimeout) {
1745        this.sendTimeout = sendTimeout;
1746    }
1747
1748    /**
1749     * @return the sendAcksAsync
1750     */
1751    public boolean isSendAcksAsync() {
1752        return sendAcksAsync;
1753    }
1754
1755    /**
1756     * @param sendAcksAsync the sendAcksAsync to set
1757     */
1758    public void setSendAcksAsync(boolean sendAcksAsync) {
1759        this.sendAcksAsync = sendAcksAsync;
1760    }
1761
1762    /**
1763     * Returns the time this connection was created
1764     */
1765    public long getTimeCreated() {
1766        return timeCreated;
1767    }
1768
1769    private void waitForBrokerInfo() throws JMSException {
1770        try {
1771            brokerInfoReceived.await();
1772        } catch (InterruptedException e) {
1773            Thread.currentThread().interrupt();
1774            throw JMSExceptionSupport.create(e);
1775        }
1776    }
1777
1778    // Package protected so that it can be used in unit tests
1779    public Transport getTransport() {
1780        return transport;
1781    }
1782
1783    public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) {
1784        producers.put(producerId, producer);
1785    }
1786
1787    public void removeProducer(ProducerId producerId) {
1788        producers.remove(producerId);
1789    }
1790
1791    public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) {
1792        dispatchers.put(consumerId, dispatcher);
1793    }
1794
1795    public void removeDispatcher(ConsumerId consumerId) {
1796        dispatchers.remove(consumerId);
1797    }
1798
1799    public boolean hasDispatcher(ConsumerId consumerId) {
1800        return dispatchers.containsKey(consumerId);
1801    }
1802
1803    /**
1804     * @param o - the command to consume
1805     */
1806    @Override
1807    public void onCommand(final Object o) {
1808        final Command command = (Command)o;
1809        if (!closed.get() && command != null) {
1810            try {
1811                command.visit(new CommandVisitorAdapter() {
1812                    @Override
1813                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
1814                        waitForTransportInterruptionProcessingToComplete();
1815                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
1816                        if (dispatcher != null) {
1817                            // Copy in case a embedded broker is dispatching via
1818                            // vm://
1819                            // md.getMessage() == null to signal end of queue
1820                            // browse.
1821                            Message msg = md.getMessage();
1822                            if (msg != null) {
1823                                msg = msg.copy();
1824                                msg.setReadOnlyBody(true);
1825                                msg.setReadOnlyProperties(true);
1826                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
1827                                msg.setConnection(ActiveMQConnection.this);
1828                                msg.setMemoryUsage(null);
1829                                md.setMessage(msg);
1830                            }
1831                            dispatcher.dispatch(md);
1832                        } else {
1833                            LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
1834                        }
1835                        return null;
1836                    }
1837
1838                    @Override
1839                    public Response processProducerAck(ProducerAck pa) throws Exception {
1840                        if (pa != null && pa.getProducerId() != null) {
1841                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
1842                            if (producer != null) {
1843                                producer.onProducerAck(pa);
1844                            }
1845                        }
1846                        return null;
1847                    }
1848
1849                    @Override
1850                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
1851                        brokerInfo = info;
1852                        brokerInfoReceived.countDown();
1853                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
1854                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
1855                        return null;
1856                    }
1857
1858                    @Override
1859                    public Response processConnectionError(final ConnectionError error) throws Exception {
1860                        executor.execute(new Runnable() {
1861                            @Override
1862                            public void run() {
1863                                onAsyncException(error.getException());
1864                            }
1865                        });
1866                        return null;
1867                    }
1868
1869                    @Override
1870                    public Response processControlCommand(ControlCommand command) throws Exception {
1871                        onControlCommand(command);
1872                        return null;
1873                    }
1874
1875                    @Override
1876                    public Response processConnectionControl(ConnectionControl control) throws Exception {
1877                        onConnectionControl((ConnectionControl)command);
1878                        return null;
1879                    }
1880
1881                    @Override
1882                    public Response processConsumerControl(ConsumerControl control) throws Exception {
1883                        onConsumerControl((ConsumerControl)command);
1884                        return null;
1885                    }
1886
1887                    @Override
1888                    public Response processWireFormat(WireFormatInfo info) throws Exception {
1889                        onWireFormatInfo((WireFormatInfo)command);
1890                        return null;
1891                    }
1892                });
1893            } catch (Exception e) {
1894                onClientInternalException(e);
1895            }
1896        }
1897
1898        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1899            TransportListener listener = iter.next();
1900            listener.onCommand(command);
1901        }
1902    }
1903
1904    protected void onWireFormatInfo(WireFormatInfo info) {
1905        protocolVersion.set(info.getVersion());
1906    }
1907
1908    /**
1909     * Handles async client internal exceptions.
1910     * A client internal exception is usually one that has been thrown
1911     * by a container runtime component during asynchronous processing of a
1912     * message that does not affect the connection itself.
1913     * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
1914     * its <code>onException</code> method, if one has been registered with this connection.
1915     *
1916     * @param error the exception that the problem
1917     */
1918    public void onClientInternalException(final Throwable error) {
1919        if ( !closed.get() && !closing.get() ) {
1920            if ( this.clientInternalExceptionListener != null ) {
1921                executor.execute(new Runnable() {
1922                    @Override
1923                    public void run() {
1924                        ActiveMQConnection.this.clientInternalExceptionListener.onException(error);
1925                    }
1926                });
1927            } else {
1928                LOG.debug("Async client internal exception occurred with no exception listener registered: "
1929                        + error, error);
1930            }
1931        }
1932    }
1933
1934    /**
1935     * Used for handling async exceptions
1936     *
1937     * @param error
1938     */
1939    public void onAsyncException(Throwable error) {
1940        if (!closed.get() && !closing.get()) {
1941            if (this.exceptionListener != null) {
1942
1943                if (!(error instanceof JMSException)) {
1944                    error = JMSExceptionSupport.create(error);
1945                }
1946                final JMSException e = (JMSException)error;
1947
1948                executor.execute(new Runnable() {
1949                    @Override
1950                    public void run() {
1951                        ActiveMQConnection.this.exceptionListener.onException(e);
1952                    }
1953                });
1954
1955            } else {
1956                LOG.debug("Async exception with no exception listener: " + error, error);
1957            }
1958        }
1959    }
1960
1961    @Override
1962    public void onException(final IOException error) {
1963        onAsyncException(error);
1964        if (!closing.get() && !closed.get()) {
1965            executor.execute(new Runnable() {
1966                @Override
1967                public void run() {
1968                    transportFailed(error);
1969                    ServiceSupport.dispose(ActiveMQConnection.this.transport);
1970                    brokerInfoReceived.countDown();
1971                    try {
1972                        doCleanup(true);
1973                    } catch (JMSException e) {
1974                        LOG.warn("Exception during connection cleanup, " + e, e);
1975                    }
1976                    for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
1977                        TransportListener listener = iter.next();
1978                        listener.onException(error);
1979                    }
1980                }
1981            });
1982        }
1983    }
1984
1985    @Override
1986    public void transportInterupted() {
1987        transportInterruptionProcessingComplete.set(1);
1988        for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
1989            ActiveMQSession s = i.next();
1990            s.clearMessagesInProgress(transportInterruptionProcessingComplete);
1991        }
1992
1993        for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) {
1994            connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete);
1995        }
1996
1997        if (transportInterruptionProcessingComplete.decrementAndGet() > 0) {
1998            if (LOG.isDebugEnabled()) {
1999                LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get());
2000            }
2001            signalInterruptionProcessingNeeded();
2002        }
2003
2004        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2005            TransportListener listener = iter.next();
2006            listener.transportInterupted();
2007        }
2008    }
2009
2010    @Override
2011    public void transportResumed() {
2012        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
2013            TransportListener listener = iter.next();
2014            listener.transportResumed();
2015        }
2016    }
2017
2018    /**
2019     * Create the DestinationInfo object for the temporary destination.
2020     *
2021     * @param topic - if its true topic, else queue.
2022     * @return DestinationInfo
2023     * @throws JMSException
2024     */
2025    protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException {
2026
2027        // Check if Destination info is of temporary type.
2028        ActiveMQTempDestination dest;
2029        if (topic) {
2030            dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2031        } else {
2032            dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId());
2033        }
2034
2035        DestinationInfo info = new DestinationInfo();
2036        info.setConnectionId(this.info.getConnectionId());
2037        info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
2038        info.setDestination(dest);
2039        syncSendPacket(info);
2040
2041        dest.setConnection(this);
2042        activeTempDestinations.put(dest, dest);
2043        return dest;
2044    }
2045
2046    /**
2047     * @param destination
2048     * @throws JMSException
2049     */
2050    public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException {
2051
2052        checkClosedOrFailed();
2053
2054        for (ActiveMQSession session : this.sessions) {
2055            if (session.isInUse(destination)) {
2056                throw new JMSException("A consumer is consuming from the temporary destination");
2057            }
2058        }
2059
2060        activeTempDestinations.remove(destination);
2061
2062        DestinationInfo destInfo = new DestinationInfo();
2063        destInfo.setConnectionId(this.info.getConnectionId());
2064        destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2065        destInfo.setDestination(destination);
2066        destInfo.setTimeout(0);
2067        syncSendPacket(destInfo);
2068    }
2069
2070    public boolean isDeleted(ActiveMQDestination dest) {
2071
2072        // If we are not watching the advisories.. then
2073        // we will assume that the temp destination does exist.
2074        if (advisoryConsumer == null) {
2075            return false;
2076        }
2077
2078        return !activeTempDestinations.containsValue(dest);
2079    }
2080
2081    public boolean isCopyMessageOnSend() {
2082        return copyMessageOnSend;
2083    }
2084
2085    public LongSequenceGenerator getLocalTransactionIdGenerator() {
2086        return localTransactionIdGenerator;
2087    }
2088
2089    public boolean isUseCompression() {
2090        return useCompression;
2091    }
2092
2093    /**
2094     * Enables the use of compression of the message bodies
2095     */
2096    public void setUseCompression(boolean useCompression) {
2097        this.useCompression = useCompression;
2098    }
2099
2100    public void destroyDestination(ActiveMQDestination destination) throws JMSException {
2101
2102        checkClosedOrFailed();
2103        ensureConnectionInfoSent();
2104
2105        DestinationInfo info = new DestinationInfo();
2106        info.setConnectionId(this.info.getConnectionId());
2107        info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
2108        info.setDestination(destination);
2109        info.setTimeout(0);
2110        syncSendPacket(info);
2111    }
2112
2113    public boolean isDispatchAsync() {
2114        return dispatchAsync;
2115    }
2116
2117    /**
2118     * Enables or disables the default setting of whether or not consumers have
2119     * their messages <a
2120     * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched
2121     * synchronously or asynchronously by the broker</a>. For non-durable
2122     * topics for example we typically dispatch synchronously by default to
2123     * minimize context switches which boost performance. However sometimes its
2124     * better to go slower to ensure that a single blocked consumer socket does
2125     * not block delivery to other consumers.
2126     *
2127     * @param asyncDispatch If true then consumers created on this connection
2128     *                will default to having their messages dispatched
2129     *                asynchronously. The default value is true.
2130     */
2131    public void setDispatchAsync(boolean asyncDispatch) {
2132        this.dispatchAsync = asyncDispatch;
2133    }
2134
2135    public boolean isObjectMessageSerializationDefered() {
2136        return objectMessageSerializationDefered;
2137    }
2138
2139    /**
2140     * When an object is set on an ObjectMessage, the JMS spec requires the
2141     * object to be serialized by that set method. Enabling this flag causes the
2142     * object to not get serialized. The object may subsequently get serialized
2143     * if the message needs to be sent over a socket or stored to disk.
2144     */
2145    public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) {
2146        this.objectMessageSerializationDefered = objectMessageSerializationDefered;
2147    }
2148
2149    /**
2150     * Unsubscribes a durable subscription that has been created by a client.
2151     * <P>
2152     * This method deletes the state being maintained on behalf of the
2153     * subscriber by its provider.
2154     * <P>
2155     * It is erroneous for a client to delete a durable subscription while there
2156     * is an active <CODE>MessageConsumer </CODE> or
2157     * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed
2158     * message is part of a pending transaction or has not been acknowledged in
2159     * the session.
2160     *
2161     * @param name the name used to identify this subscription
2162     * @throws JMSException if the session fails to unsubscribe to the durable
2163     *                 subscription due to some internal error.
2164     * @throws InvalidDestinationException if an invalid subscription name is
2165     *                 specified.
2166     * @since 1.1
2167     */
2168    public void unsubscribe(String name) throws InvalidDestinationException, JMSException {
2169        checkClosedOrFailed();
2170        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
2171        rsi.setConnectionId(getConnectionInfo().getConnectionId());
2172        rsi.setSubscriptionName(name);
2173        rsi.setClientId(getConnectionInfo().getClientId());
2174        syncSendPacket(rsi);
2175    }
2176
2177    /**
2178     * Internal send method optimized: - It does not copy the message - It can
2179     * only handle ActiveMQ messages. - You can specify if the send is async or
2180     * sync - Does not allow you to send /w a transaction.
2181     */
2182    void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException {
2183        checkClosedOrFailed();
2184
2185        if (destination.isTemporary() && isDeleted(destination)) {
2186            throw new JMSException("Cannot publish to a deleted Destination: " + destination);
2187        }
2188
2189        msg.setJMSDestination(destination);
2190        msg.setJMSDeliveryMode(deliveryMode);
2191        long expiration = 0L;
2192
2193        if (!isDisableTimeStampsByDefault()) {
2194            long timeStamp = System.currentTimeMillis();
2195            msg.setJMSTimestamp(timeStamp);
2196            if (timeToLive > 0) {
2197                expiration = timeToLive + timeStamp;
2198            }
2199        }
2200
2201        msg.setJMSExpiration(expiration);
2202        msg.setJMSPriority(priority);
2203        msg.setJMSRedelivered(false);
2204        msg.setMessageId(messageId);
2205        msg.onSend();
2206        msg.setProducerId(msg.getMessageId().getProducerId());
2207
2208        if (LOG.isDebugEnabled()) {
2209            LOG.debug("Sending message: " + msg);
2210        }
2211
2212        if (async) {
2213            asyncSendPacket(msg);
2214        } else {
2215            syncSendPacket(msg);
2216        }
2217    }
2218
2219    protected void onControlCommand(ControlCommand command) {
2220        String text = command.getCommand();
2221        if (text != null) {
2222            if ("shutdown".equals(text)) {
2223                LOG.info("JVM told to shutdown");
2224                System.exit(0);
2225            }
2226
2227            // TODO Should we handle the "close" case?
2228            // if (false && "close".equals(text)){
2229            //     LOG.error("Broker " + getBrokerInfo() + "shutdown connection");
2230            //     try {
2231            //         close();
2232            //     } catch (JMSException e) {
2233            //     }
2234            // }
2235        }
2236    }
2237
2238    protected void onConnectionControl(ConnectionControl command) {
2239        if (command.isFaultTolerant()) {
2240            this.optimizeAcknowledge = false;
2241            for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
2242                ActiveMQSession s = i.next();
2243                s.setOptimizeAcknowledge(false);
2244            }
2245        }
2246    }
2247
2248    protected void onConsumerControl(ConsumerControl command) {
2249        if (command.isClose()) {
2250            for (ActiveMQSession session : this.sessions) {
2251                session.close(command.getConsumerId());
2252            }
2253        } else {
2254            for (ActiveMQSession session : this.sessions) {
2255                session.setPrefetchSize(command.getConsumerId(), command.getPrefetch());
2256            }
2257            for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) {
2258                ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
2259                if (consumerInfo.getConsumerId().equals(command.getConsumerId())) {
2260                    consumerInfo.setPrefetchSize(command.getPrefetch());
2261                }
2262            }
2263        }
2264    }
2265
2266    protected void transportFailed(IOException error) {
2267        transportFailed.set(true);
2268        if (firstFailureError == null) {
2269            firstFailureError = error;
2270        }
2271    }
2272
2273    /**
2274     * Should a JMS message be copied to a new JMS Message object as part of the
2275     * send() method in JMS. This is enabled by default to be compliant with the
2276     * JMS specification. You can disable it if you do not mutate JMS messages
2277     * after they are sent for a performance boost
2278     */
2279    public void setCopyMessageOnSend(boolean copyMessageOnSend) {
2280        this.copyMessageOnSend = copyMessageOnSend;
2281    }
2282
2283    @Override
2284    public String toString() {
2285        return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}";
2286    }
2287
2288    protected BlobTransferPolicy createBlobTransferPolicy() {
2289        return new BlobTransferPolicy();
2290    }
2291
2292    public int getProtocolVersion() {
2293        return protocolVersion.get();
2294    }
2295
2296    public int getProducerWindowSize() {
2297        return producerWindowSize;
2298    }
2299
2300    public void setProducerWindowSize(int producerWindowSize) {
2301        this.producerWindowSize = producerWindowSize;
2302    }
2303
2304    public void setAuditDepth(int auditDepth) {
2305        connectionAudit.setAuditDepth(auditDepth);
2306    }
2307
2308    public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
2309        connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber);
2310    }
2311
2312    protected void removeDispatcher(ActiveMQDispatcher dispatcher) {
2313        connectionAudit.removeDispatcher(dispatcher);
2314    }
2315
2316    protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2317        return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message);
2318    }
2319
2320    protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) {
2321        connectionAudit.rollbackDuplicate(dispatcher, message);
2322    }
2323
2324    public IOException getFirstFailureError() {
2325        return firstFailureError;
2326    }
2327
2328    protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException {
2329        if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) {
2330            LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get());
2331            signalInterruptionProcessingComplete();
2332        }
2333    }
2334
2335    protected void transportInterruptionProcessingComplete() {
2336        if (transportInterruptionProcessingComplete.decrementAndGet() == 0) {
2337            signalInterruptionProcessingComplete();
2338        }
2339    }
2340
2341    private void signalInterruptionProcessingComplete() {
2342            if (LOG.isDebugEnabled()) {
2343                LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get()
2344                        + " for:" + this.getConnectionInfo().getConnectionId());
2345            }
2346
2347            FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2348            if (failoverTransport != null) {
2349                failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId());
2350                if (LOG.isDebugEnabled()) {
2351                    LOG.debug("notified failover transport (" + failoverTransport
2352                            + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId());
2353                }
2354            }
2355            transportInterruptionProcessingComplete.set(0);
2356    }
2357
2358    private void signalInterruptionProcessingNeeded() {
2359        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
2360        if (failoverTransport != null) {
2361            failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
2362            if (LOG.isDebugEnabled()) {
2363                LOG.debug("notified failover transport (" + failoverTransport
2364                        + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
2365            }
2366        }
2367    }
2368
2369    /*
2370     * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
2371     * will wait to receive re dispatched messages.
2372     * default value is 0 so there is no wait by default.
2373     */
2374    public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) {
2375        this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod;
2376    }
2377
2378    public long getConsumerFailoverRedeliveryWaitPeriod() {
2379        return consumerFailoverRedeliveryWaitPeriod;
2380    }
2381
2382    protected Scheduler getScheduler() throws JMSException {
2383        Scheduler result = scheduler;
2384        if (result == null) {
2385            if (isClosing() || isClosed()) {
2386                // without lock contention report the closing state
2387                throw new ConnectionClosedException();
2388            }
2389            synchronized (this) {
2390                result = scheduler;
2391                if (result == null) {
2392                    checkClosed();
2393                    try {
2394                        result = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler");
2395                        result.start();
2396                        scheduler = result;
2397                    } catch(Exception e) {
2398                        throw JMSExceptionSupport.create(e);
2399                    }
2400                }
2401            }
2402        }
2403        return result;
2404    }
2405
2406    protected ThreadPoolExecutor getExecutor() {
2407        return this.executor;
2408    }
2409
2410    protected CopyOnWriteArrayList<ActiveMQSession> getSessions() {
2411        return sessions;
2412    }
2413
2414    /**
2415     * @return the checkForDuplicates
2416     */
2417    public boolean isCheckForDuplicates() {
2418        return this.checkForDuplicates;
2419    }
2420
2421    /**
2422     * @param checkForDuplicates the checkForDuplicates to set
2423     */
2424    public void setCheckForDuplicates(boolean checkForDuplicates) {
2425        this.checkForDuplicates = checkForDuplicates;
2426    }
2427
2428    public boolean isTransactedIndividualAck() {
2429        return transactedIndividualAck;
2430    }
2431
2432    public void setTransactedIndividualAck(boolean transactedIndividualAck) {
2433        this.transactedIndividualAck = transactedIndividualAck;
2434    }
2435
2436    public boolean isNonBlockingRedelivery() {
2437        return nonBlockingRedelivery;
2438    }
2439
2440    public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) {
2441        this.nonBlockingRedelivery = nonBlockingRedelivery;
2442    }
2443
2444    public boolean isRmIdFromConnectionId() {
2445        return rmIdFromConnectionId;
2446    }
2447
2448    public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) {
2449        this.rmIdFromConnectionId = rmIdFromConnectionId;
2450    }
2451
2452    /**
2453     * Removes any TempDestinations that this connection has cached, ignoring
2454     * any exceptions generated because the destination is in use as they should
2455     * not be removed.
2456     * Used from a pooled connection, b/c it will not be explicitly closed.
2457     */
2458    public void cleanUpTempDestinations() {
2459
2460        if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) {
2461            return;
2462        }
2463
2464        Iterator<ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries
2465            = this.activeTempDestinations.entrySet().iterator();
2466        while(entries.hasNext()) {
2467            ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next();
2468            try {
2469                // Only delete this temp destination if it was created from this connection. The connection used
2470                // for the advisory consumer may also have a reference to this temp destination.
2471                ActiveMQTempDestination dest = entry.getValue();
2472                String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString();
2473                if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) {
2474                    this.deleteTempDestination(entry.getValue());
2475                }
2476            } catch (Exception ex) {
2477                // the temp dest is in use so it can not be deleted.
2478                // it is ok to leave it to connection tear down phase
2479            }
2480        }
2481    }
2482
2483    /**
2484     * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
2485     * @param redeliveryPolicyMap the redeliveryPolicyMap to set
2486     */
2487    public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
2488        this.redeliveryPolicyMap = redeliveryPolicyMap;
2489    }
2490
2491    /**
2492     * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
2493     * Consumers when dealing with transaction messages that have been rolled back.
2494     *
2495     * @return the redeliveryPolicyMap
2496     */
2497    public RedeliveryPolicyMap getRedeliveryPolicyMap() {
2498        return redeliveryPolicyMap;
2499    }
2500
2501    public int getMaxThreadPoolSize() {
2502        return maxThreadPoolSize;
2503    }
2504
2505    public void setMaxThreadPoolSize(int maxThreadPoolSize) {
2506        this.maxThreadPoolSize = maxThreadPoolSize;
2507    }
2508
2509    /**
2510     * Enable enforcement of QueueConnection semantics.
2511     *
2512     * @return this object, useful for chaining
2513     */
2514    ActiveMQConnection enforceQueueOnlyConnection() {
2515        this.queueOnlyConnection = true;
2516        return this;
2517    }
2518
2519    public RejectedExecutionHandler getRejectedTaskHandler() {
2520        return rejectedTaskHandler;
2521    }
2522
2523    public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) {
2524        this.rejectedTaskHandler = rejectedTaskHandler;
2525    }
2526
2527    /**
2528     * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled
2529     * to send an ack for any outstanding Message Acks.  By default this value is set to zero meaning that the consumers
2530     * will not do any background Message acknowledgment.
2531     *
2532     * @return the scheduledOptimizedAckInterval
2533     */
2534    public long getOptimizedAckScheduledAckInterval() {
2535        return optimizedAckScheduledAckInterval;
2536    }
2537
2538    /**
2539     * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
2540     * have been configured with optimizeAcknowledge enabled.
2541     *
2542     * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
2543     */
2544    public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
2545        this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
2546    }
2547
2548    /**
2549     * @return true if MessageConsumer instance will check for expired messages before dispatch.
2550     */
2551    public boolean isConsumerExpiryCheckEnabled() {
2552        return consumerExpiryCheckEnabled;
2553    }
2554
2555    /**
2556     * Controls whether message expiration checking is done in each MessageConsumer
2557     * prior to dispatching a message.  Disabling this check can lead to consumption
2558     * of expired messages.
2559     *
2560     * @param consumerExpiryCheckEnabled
2561     *        controls whether expiration checking is done prior to dispatch.
2562     */
2563    public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) {
2564        this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
2565    }
2566
2567    public List<String> getTrustedPackages() {
2568        return trustedPackages;
2569    }
2570
2571    public void setTrustedPackages(List<String> trustedPackages) {
2572        this.trustedPackages = trustedPackages;
2573    }
2574
2575    public boolean isTrustAllPackages() {
2576        return trustAllPackages;
2577    }
2578
2579    public void setTrustAllPackages(boolean trustAllPackages) {
2580        this.trustAllPackages = trustAllPackages;
2581    }
2582
2583    public int getConnectResponseTimeout() {
2584        return connectResponseTimeout;
2585    }
2586
2587        public void setConnectResponseTimeout(int connectResponseTimeout) {
2588                this.connectResponseTimeout = connectResponseTimeout;
2589        }
2590}