001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.util.*; 023import java.util.concurrent.ConcurrentHashMap; 024import java.util.concurrent.ConcurrentMap; 025import java.util.concurrent.CopyOnWriteArrayList; 026import java.util.concurrent.CountDownLatch; 027import java.util.concurrent.LinkedBlockingQueue; 028import java.util.concurrent.RejectedExecutionHandler; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.ThreadPoolExecutor; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.concurrent.atomic.AtomicInteger; 034 035import javax.jms.Connection; 036import javax.jms.ConnectionConsumer; 037import javax.jms.ConnectionMetaData; 038import javax.jms.Destination; 039import javax.jms.ExceptionListener; 040import javax.jms.IllegalStateException; 041import javax.jms.InvalidDestinationException; 042import javax.jms.JMSException; 043import javax.jms.Queue; 044import javax.jms.QueueConnection; 045import javax.jms.QueueSession; 046import javax.jms.ServerSessionPool; 047import javax.jms.Session; 048import javax.jms.Topic; 049import javax.jms.TopicConnection; 050import javax.jms.TopicSession; 051import javax.jms.XAConnection; 052 053import org.apache.activemq.advisory.DestinationSource; 054import org.apache.activemq.blob.BlobTransferPolicy; 055import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; 056import org.apache.activemq.command.ActiveMQDestination; 057import org.apache.activemq.command.ActiveMQMessage; 058import org.apache.activemq.command.ActiveMQTempDestination; 059import org.apache.activemq.command.ActiveMQTempQueue; 060import org.apache.activemq.command.ActiveMQTempTopic; 061import org.apache.activemq.command.BrokerInfo; 062import org.apache.activemq.command.Command; 063import org.apache.activemq.command.CommandTypes; 064import org.apache.activemq.command.ConnectionControl; 065import org.apache.activemq.command.ConnectionError; 066import org.apache.activemq.command.ConnectionId; 067import org.apache.activemq.command.ConnectionInfo; 068import org.apache.activemq.command.ConsumerControl; 069import org.apache.activemq.command.ConsumerId; 070import org.apache.activemq.command.ConsumerInfo; 071import org.apache.activemq.command.ControlCommand; 072import org.apache.activemq.command.DestinationInfo; 073import org.apache.activemq.command.ExceptionResponse; 074import org.apache.activemq.command.Message; 075import org.apache.activemq.command.MessageDispatch; 076import org.apache.activemq.command.MessageId; 077import org.apache.activemq.command.ProducerAck; 078import org.apache.activemq.command.ProducerId; 079import org.apache.activemq.command.RemoveInfo; 080import org.apache.activemq.command.RemoveSubscriptionInfo; 081import org.apache.activemq.command.Response; 082import org.apache.activemq.command.SessionId; 083import org.apache.activemq.command.ShutdownInfo; 084import org.apache.activemq.command.WireFormatInfo; 085import org.apache.activemq.management.JMSConnectionStatsImpl; 086import org.apache.activemq.management.JMSStatsImpl; 087import org.apache.activemq.management.StatsCapable; 088import org.apache.activemq.management.StatsImpl; 089import org.apache.activemq.state.CommandVisitorAdapter; 090import org.apache.activemq.thread.Scheduler; 091import org.apache.activemq.thread.TaskRunnerFactory; 092import org.apache.activemq.transport.FutureResponse; 093import org.apache.activemq.transport.RequestTimedOutIOException; 094import org.apache.activemq.transport.ResponseCallback; 095import org.apache.activemq.transport.Transport; 096import org.apache.activemq.transport.TransportListener; 097import org.apache.activemq.transport.failover.FailoverTransport; 098import org.apache.activemq.util.IdGenerator; 099import org.apache.activemq.util.IntrospectionSupport; 100import org.apache.activemq.util.JMSExceptionSupport; 101import org.apache.activemq.util.LongSequenceGenerator; 102import org.apache.activemq.util.ServiceSupport; 103import org.apache.activemq.util.ThreadPoolUtils; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection { 108 109 public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER; 110 public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD; 111 public static final String DEFAULT_BROKER_URL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL; 112 public static int DEFAULT_THREAD_POOL_SIZE = 1000; 113 114 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQConnection.class); 115 116 public final ConcurrentMap<ActiveMQTempDestination, ActiveMQTempDestination> activeTempDestinations = new ConcurrentHashMap<ActiveMQTempDestination, ActiveMQTempDestination>(); 117 118 protected boolean dispatchAsync=true; 119 protected boolean alwaysSessionAsync = true; 120 121 private TaskRunnerFactory sessionTaskRunner; 122 private final ThreadPoolExecutor executor; 123 124 // Connection state variables 125 private final ConnectionInfo info; 126 private ExceptionListener exceptionListener; 127 private ClientInternalExceptionListener clientInternalExceptionListener; 128 private boolean clientIDSet; 129 private boolean isConnectionInfoSentToBroker; 130 private boolean userSpecifiedClientID; 131 132 // Configuration options variables 133 private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); 134 private BlobTransferPolicy blobTransferPolicy; 135 private RedeliveryPolicyMap redeliveryPolicyMap; 136 private MessageTransformer transformer; 137 138 private boolean disableTimeStampsByDefault; 139 private boolean optimizedMessageDispatch = true; 140 private boolean copyMessageOnSend = true; 141 private boolean useCompression; 142 private boolean objectMessageSerializationDefered; 143 private boolean useAsyncSend; 144 private boolean optimizeAcknowledge; 145 private long optimizeAcknowledgeTimeOut = 0; 146 private long optimizedAckScheduledAckInterval = 0; 147 private boolean nestedMapAndListEnabled = true; 148 private boolean useRetroactiveConsumer; 149 private boolean exclusiveConsumer; 150 private boolean alwaysSyncSend; 151 private int closeTimeout = 15000; 152 private boolean watchTopicAdvisories = true; 153 private long warnAboutUnstartedConnectionTimeout = 500L; 154 private int sendTimeout =0; 155 private boolean sendAcksAsync=true; 156 private boolean checkForDuplicates = true; 157 private boolean queueOnlyConnection = false; 158 private boolean consumerExpiryCheckEnabled = true; 159 160 private final Transport transport; 161 private final IdGenerator clientIdGenerator; 162 private final JMSStatsImpl factoryStats; 163 private final JMSConnectionStatsImpl stats; 164 165 private final AtomicBoolean started = new AtomicBoolean(false); 166 private final AtomicBoolean closing = new AtomicBoolean(false); 167 private final AtomicBoolean closed = new AtomicBoolean(false); 168 private final AtomicBoolean transportFailed = new AtomicBoolean(false); 169 private final CopyOnWriteArrayList<ActiveMQSession> sessions = new CopyOnWriteArrayList<ActiveMQSession>(); 170 private final CopyOnWriteArrayList<ActiveMQConnectionConsumer> connectionConsumers = new CopyOnWriteArrayList<ActiveMQConnectionConsumer>(); 171 private final CopyOnWriteArrayList<TransportListener> transportListeners = new CopyOnWriteArrayList<TransportListener>(); 172 173 // Maps ConsumerIds to ActiveMQConsumer objects 174 private final ConcurrentMap<ConsumerId, ActiveMQDispatcher> dispatchers = new ConcurrentHashMap<ConsumerId, ActiveMQDispatcher>(); 175 private final ConcurrentMap<ProducerId, ActiveMQMessageProducer> producers = new ConcurrentHashMap<ProducerId, ActiveMQMessageProducer>(); 176 private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); 177 private final SessionId connectionSessionId; 178 private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); 179 private final LongSequenceGenerator tempDestinationIdGenerator = new LongSequenceGenerator(); 180 private final LongSequenceGenerator localTransactionIdGenerator = new LongSequenceGenerator(); 181 182 private AdvisoryConsumer advisoryConsumer; 183 private final CountDownLatch brokerInfoReceived = new CountDownLatch(1); 184 private BrokerInfo brokerInfo; 185 private IOException firstFailureError; 186 private int producerWindowSize = ActiveMQConnectionFactory.DEFAULT_PRODUCER_WINDOW_SIZE; 187 188 // Assume that protocol is the latest. Change to the actual protocol 189 // version when a WireFormatInfo is received. 190 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 191 private final long timeCreated; 192 private final ConnectionAudit connectionAudit = new ConnectionAudit(); 193 private DestinationSource destinationSource; 194 private final Object ensureConnectionInfoSentMutex = new Object(); 195 private boolean useDedicatedTaskRunner; 196 protected AtomicInteger transportInterruptionProcessingComplete = new AtomicInteger(0); 197 private long consumerFailoverRedeliveryWaitPeriod; 198 private Scheduler scheduler; 199 private boolean messagePrioritySupported = false; 200 private boolean transactedIndividualAck = false; 201 private boolean nonBlockingRedelivery = false; 202 private boolean rmIdFromConnectionId = false; 203 204 private int maxThreadPoolSize = DEFAULT_THREAD_POOL_SIZE; 205 private RejectedExecutionHandler rejectedTaskHandler = null; 206 207 private List<String> trustedPackages = new ArrayList<String>(); 208 private boolean trustAllPackages = false; 209 private int connectResponseTimeout; 210 211 /** 212 * Construct an <code>ActiveMQConnection</code> 213 * 214 * @param transport 215 * @param factoryStats 216 * @throws Exception 217 */ 218 protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception { 219 220 this.transport = transport; 221 this.clientIdGenerator = clientIdGenerator; 222 this.factoryStats = factoryStats; 223 224 // Configure a single threaded executor who's core thread can timeout if 225 // idle 226 executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 227 @Override 228 public Thread newThread(Runnable r) { 229 Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); 230 //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796 231 //thread.setDaemon(true); 232 return thread; 233 } 234 }); 235 // asyncConnectionThread.allowCoreThreadTimeOut(true); 236 String uniqueId = connectionIdGenerator.generateId(); 237 this.info = new ConnectionInfo(new ConnectionId(uniqueId)); 238 this.info.setManageable(true); 239 this.info.setFaultTolerant(transport.isFaultTolerant()); 240 this.connectionSessionId = new SessionId(info.getConnectionId(), -1); 241 242 this.transport.setTransportListener(this); 243 244 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); 245 this.factoryStats.addConnection(this); 246 this.timeCreated = System.currentTimeMillis(); 247 this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); 248 } 249 250 protected void setUserName(String userName) { 251 this.info.setUserName(userName); 252 } 253 254 protected void setPassword(String password) { 255 this.info.setPassword(password); 256 } 257 258 /** 259 * A static helper method to create a new connection 260 * 261 * @return an ActiveMQConnection 262 * @throws JMSException 263 */ 264 public static ActiveMQConnection makeConnection() throws JMSException { 265 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); 266 return (ActiveMQConnection)factory.createConnection(); 267 } 268 269 /** 270 * A static helper method to create a new connection 271 * 272 * @param uri 273 * @return and ActiveMQConnection 274 * @throws JMSException 275 */ 276 public static ActiveMQConnection makeConnection(String uri) throws JMSException, URISyntaxException { 277 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri); 278 return (ActiveMQConnection)factory.createConnection(); 279 } 280 281 /** 282 * A static helper method to create a new connection 283 * 284 * @param user 285 * @param password 286 * @param uri 287 * @return an ActiveMQConnection 288 * @throws JMSException 289 */ 290 public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException, URISyntaxException { 291 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, new URI(uri)); 292 return (ActiveMQConnection)factory.createConnection(); 293 } 294 295 /** 296 * @return a number unique for this connection 297 */ 298 public JMSConnectionStatsImpl getConnectionStats() { 299 return stats; 300 } 301 302 /** 303 * Creates a <CODE>Session</CODE> object. 304 * 305 * @param transacted indicates whether the session is transacted 306 * @param acknowledgeMode indicates whether the consumer or the client will 307 * acknowledge any messages it receives; ignored if the 308 * session is transacted. Legal values are 309 * <code>Session.AUTO_ACKNOWLEDGE</code>, 310 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 311 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 312 * @return a newly created session 313 * @throws JMSException if the <CODE>Connection</CODE> object fails to 314 * create a session due to some internal error or lack of 315 * support for the specific transaction and acknowledgement 316 * mode. 317 * @see Session#AUTO_ACKNOWLEDGE 318 * @see Session#CLIENT_ACKNOWLEDGE 319 * @see Session#DUPS_OK_ACKNOWLEDGE 320 * @since 1.1 321 */ 322 @Override 323 public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException { 324 checkClosedOrFailed(); 325 ensureConnectionInfoSent(); 326 if(!transacted) { 327 if (acknowledgeMode==Session.SESSION_TRANSACTED) { 328 throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session"); 329 } else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) { 330 throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " + 331 "Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)"); 332 } 333 } 334 return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : (acknowledgeMode == Session.SESSION_TRANSACTED 335 ? Session.AUTO_ACKNOWLEDGE : acknowledgeMode), isDispatchAsync(), isAlwaysSessionAsync()); 336 } 337 338 /** 339 * @return sessionId 340 */ 341 protected SessionId getNextSessionId() { 342 return new SessionId(info.getConnectionId(), sessionIdGenerator.getNextSequenceId()); 343 } 344 345 /** 346 * Gets the client identifier for this connection. 347 * <P> 348 * This value is specific to the JMS provider. It is either preconfigured by 349 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned 350 * dynamically by the application by calling the <code>setClientID</code> 351 * method. 352 * 353 * @return the unique client identifier 354 * @throws JMSException if the JMS provider fails to return the client ID 355 * for this connection due to some internal error. 356 */ 357 @Override 358 public String getClientID() throws JMSException { 359 checkClosedOrFailed(); 360 return this.info.getClientId(); 361 } 362 363 /** 364 * Sets the client identifier for this connection. 365 * <P> 366 * The preferred way to assign a JMS client's client identifier is for it to 367 * be configured in a client-specific <CODE>ConnectionFactory</CODE> 368 * object and transparently assigned to the <CODE>Connection</CODE> object 369 * it creates. 370 * <P> 371 * Alternatively, a client can set a connection's client identifier using a 372 * provider-specific value. The facility to set a connection's client 373 * identifier explicitly is not a mechanism for overriding the identifier 374 * that has been administratively configured. It is provided for the case 375 * where no administratively specified identifier exists. If one does exist, 376 * an attempt to change it by setting it must throw an 377 * <CODE>IllegalStateException</CODE>. If a client sets the client 378 * identifier explicitly, it must do so immediately after it creates the 379 * connection and before any other action on the connection is taken. After 380 * this point, setting the client identifier is a programming error that 381 * should throw an <CODE>IllegalStateException</CODE>. 382 * <P> 383 * The purpose of the client identifier is to associate a connection and its 384 * objects with a state maintained on behalf of the client by a provider. 385 * The only such state identified by the JMS API is that required to support 386 * durable subscriptions. 387 * <P> 388 * If another connection with the same <code>clientID</code> is already 389 * running when this method is called, the JMS provider should detect the 390 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>. 391 * 392 * @param newClientID the unique client identifier 393 * @throws JMSException if the JMS provider fails to set the client ID for 394 * this connection due to some internal error. 395 * @throws javax.jms.InvalidClientIDException if the JMS client specifies an 396 * invalid or duplicate client ID. 397 * @throws javax.jms.IllegalStateException if the JMS client attempts to set 398 * a connection's client ID at the wrong time or when it has 399 * been administratively configured. 400 */ 401 @Override 402 public void setClientID(String newClientID) throws JMSException { 403 checkClosedOrFailed(); 404 405 if (this.clientIDSet) { 406 throw new IllegalStateException("The clientID has already been set"); 407 } 408 409 if (this.isConnectionInfoSentToBroker) { 410 throw new IllegalStateException("Setting clientID on a used Connection is not allowed"); 411 } 412 413 this.info.setClientId(newClientID); 414 this.userSpecifiedClientID = true; 415 ensureConnectionInfoSent(); 416 } 417 418 /** 419 * Sets the default client id that the connection will use if explicitly not 420 * set with the setClientId() call. 421 */ 422 public void setDefaultClientID(String clientID) throws JMSException { 423 this.info.setClientId(clientID); 424 this.userSpecifiedClientID = true; 425 } 426 427 /** 428 * Gets the metadata for this connection. 429 * 430 * @return the connection metadata 431 * @throws JMSException if the JMS provider fails to get the connection 432 * metadata for this connection. 433 * @see javax.jms.ConnectionMetaData 434 */ 435 @Override 436 public ConnectionMetaData getMetaData() throws JMSException { 437 checkClosedOrFailed(); 438 return ActiveMQConnectionMetaData.INSTANCE; 439 } 440 441 /** 442 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not 443 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE> 444 * associated with it. 445 * 446 * @return the <CODE>ExceptionListener</CODE> for this connection, or 447 * null, if no <CODE>ExceptionListener</CODE> is associated with 448 * this connection. 449 * @throws JMSException if the JMS provider fails to get the 450 * <CODE>ExceptionListener</CODE> for this connection. 451 * @see javax.jms.Connection#setExceptionListener(ExceptionListener) 452 */ 453 @Override 454 public ExceptionListener getExceptionListener() throws JMSException { 455 checkClosedOrFailed(); 456 return this.exceptionListener; 457 } 458 459 /** 460 * Sets an exception listener for this connection. 461 * <P> 462 * If a JMS provider detects a serious problem with a connection, it informs 463 * the connection's <CODE> ExceptionListener</CODE>, if one has been 464 * registered. It does this by calling the listener's <CODE>onException 465 * </CODE> 466 * method, passing it a <CODE>JMSException</CODE> object describing the 467 * problem. 468 * <P> 469 * An exception listener allows a client to be notified of a problem 470 * asynchronously. Some connections only consume messages, so they would 471 * have no other way to learn their connection has failed. 472 * <P> 473 * A connection serializes execution of its <CODE>ExceptionListener</CODE>. 474 * <P> 475 * A JMS provider should attempt to resolve connection problems itself 476 * before it notifies the client of them. 477 * 478 * @param listener the exception listener 479 * @throws JMSException if the JMS provider fails to set the exception 480 * listener for this connection. 481 */ 482 @Override 483 public void setExceptionListener(ExceptionListener listener) throws JMSException { 484 checkClosedOrFailed(); 485 this.exceptionListener = listener; 486 } 487 488 /** 489 * Gets the <code>ClientInternalExceptionListener</code> object for this connection. 490 * Not every <CODE>ActiveMQConnectionn</CODE> has a <CODE>ClientInternalExceptionListener</CODE> 491 * associated with it. 492 * 493 * @return the listener or <code>null</code> if no listener is registered with the connection. 494 */ 495 public ClientInternalExceptionListener getClientInternalExceptionListener() { 496 return clientInternalExceptionListener; 497 } 498 499 /** 500 * Sets a client internal exception listener for this connection. 501 * The connection will notify the listener, if one has been registered, of exceptions thrown by container components 502 * (e.g. an EJB container in case of Message Driven Beans) during asynchronous processing of a message. 503 * It does this by calling the listener's <code>onException()</code> method passing it a <code>Throwable</code> 504 * describing the problem. 505 * 506 * @param listener the exception listener 507 */ 508 public void setClientInternalExceptionListener(ClientInternalExceptionListener listener) { 509 this.clientInternalExceptionListener = listener; 510 } 511 512 /** 513 * Starts (or restarts) a connection's delivery of incoming messages. A call 514 * to <CODE>start</CODE> on a connection that has already been started is 515 * ignored. 516 * 517 * @throws JMSException if the JMS provider fails to start message delivery 518 * due to some internal error. 519 * @see javax.jms.Connection#stop() 520 */ 521 @Override 522 public void start() throws JMSException { 523 checkClosedOrFailed(); 524 ensureConnectionInfoSent(); 525 if (started.compareAndSet(false, true)) { 526 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 527 ActiveMQSession session = i.next(); 528 session.start(); 529 } 530 } 531 } 532 533 /** 534 * Temporarily stops a connection's delivery of incoming messages. Delivery 535 * can be restarted using the connection's <CODE>start</CODE> method. When 536 * the connection is stopped, delivery to all the connection's message 537 * consumers is inhibited: synchronous receives block, and messages are not 538 * delivered to message listeners. 539 * <P> 540 * This call blocks until receives and/or message listeners in progress have 541 * completed. 542 * <P> 543 * Stopping a connection has no effect on its ability to send messages. A 544 * call to <CODE>stop</CODE> on a connection that has already been stopped 545 * is ignored. 546 * <P> 547 * A call to <CODE>stop</CODE> must not return until delivery of messages 548 * has paused. This means that a client can rely on the fact that none of 549 * its message listeners will be called and that all threads of control 550 * waiting for <CODE>receive</CODE> calls to return will not return with a 551 * message until the connection is restarted. The receive timers for a 552 * stopped connection continue to advance, so receives may time out while 553 * the connection is stopped. 554 * <P> 555 * If message listeners are running when <CODE>stop</CODE> is invoked, the 556 * <CODE>stop</CODE> call must wait until all of them have returned before 557 * it may return. While these message listeners are completing, they must 558 * have the full services of the connection available to them. 559 * 560 * @throws JMSException if the JMS provider fails to stop message delivery 561 * due to some internal error. 562 * @see javax.jms.Connection#start() 563 */ 564 @Override 565 public void stop() throws JMSException { 566 doStop(true); 567 } 568 569 /** 570 * @see #stop() 571 * @param checkClosed <tt>true</tt> to check for already closed and throw {@link java.lang.IllegalStateException} if already closed, 572 * <tt>false</tt> to skip this check 573 * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error. 574 */ 575 void doStop(boolean checkClosed) throws JMSException { 576 if (checkClosed) { 577 checkClosedOrFailed(); 578 } 579 if (started.compareAndSet(true, false)) { 580 synchronized(sessions) { 581 for (Iterator<ActiveMQSession> i = sessions.iterator(); i.hasNext();) { 582 ActiveMQSession s = i.next(); 583 s.stop(); 584 } 585 } 586 } 587 } 588 589 /** 590 * Closes the connection. 591 * <P> 592 * Since a provider typically allocates significant resources outside the 593 * JVM on behalf of a connection, clients should close these resources when 594 * they are not needed. Relying on garbage collection to eventually reclaim 595 * these resources may not be timely enough. 596 * <P> 597 * There is no need to close the sessions, producers, and consumers of a 598 * closed connection. 599 * <P> 600 * Closing a connection causes all temporary destinations to be deleted. 601 * <P> 602 * When this method is invoked, it should not return until message 603 * processing has been shut down in an orderly fashion. This means that all 604 * message listeners that may have been running have returned, and that all 605 * pending receives have returned. A close terminates all pending message 606 * receives on the connection's sessions' consumers. The receives may return 607 * with a message or with null, depending on whether there was a message 608 * available at the time of the close. If one or more of the connection's 609 * sessions' message listeners is processing a message at the time when 610 * connection <CODE>close</CODE> is invoked, all the facilities of the 611 * connection and its sessions must remain available to those listeners 612 * until they return control to the JMS provider. 613 * <P> 614 * Closing a connection causes any of its sessions' transactions in progress 615 * to be rolled back. In the case where a session's work is coordinated by 616 * an external transaction manager, a session's <CODE>commit</CODE> and 617 * <CODE> rollback</CODE> methods are not used and the result of a closed 618 * session's work is determined later by the transaction manager. Closing a 619 * connection does NOT force an acknowledgment of client-acknowledged 620 * sessions. 621 * <P> 622 * Invoking the <CODE>acknowledge</CODE> method of a received message from 623 * a closed connection's session must throw an 624 * <CODE>IllegalStateException</CODE>. Closing a closed connection must 625 * NOT throw an exception. 626 * 627 * @throws JMSException if the JMS provider fails to close the connection 628 * due to some internal error. For example, a failure to 629 * release resources or to close a socket connection can 630 * cause this exception to be thrown. 631 */ 632 @Override 633 public void close() throws JMSException { 634 try { 635 // If we were running, lets stop first. 636 if (!closed.get() && !transportFailed.get()) { 637 // do not fail if already closed as according to JMS spec we must not 638 // throw exception if already closed 639 doStop(false); 640 } 641 642 synchronized (this) { 643 if (!closed.get()) { 644 closing.set(true); 645 646 if (destinationSource != null) { 647 destinationSource.stop(); 648 destinationSource = null; 649 } 650 if (advisoryConsumer != null) { 651 advisoryConsumer.dispose(); 652 advisoryConsumer = null; 653 } 654 655 Scheduler scheduler = this.scheduler; 656 if (scheduler != null) { 657 try { 658 scheduler.stop(); 659 } catch (Exception e) { 660 JMSException ex = JMSExceptionSupport.create(e); 661 throw ex; 662 } 663 } 664 665 long lastDeliveredSequenceId = -1; 666 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 667 ActiveMQSession s = i.next(); 668 s.dispose(); 669 lastDeliveredSequenceId = Math.max(lastDeliveredSequenceId, s.getLastDeliveredSequenceId()); 670 } 671 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 672 ActiveMQConnectionConsumer c = i.next(); 673 c.dispose(); 674 } 675 676 this.activeTempDestinations.clear(); 677 678 try { 679 if (isConnectionInfoSentToBroker) { 680 // If we announced ourselves to the broker.. Try to let the broker 681 // know that the connection is being shutdown. 682 RemoveInfo removeCommand = info.createRemoveCommand(); 683 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); 684 try { 685 syncSendPacket(removeCommand, closeTimeout); 686 } catch (JMSException e) { 687 if (e.getCause() instanceof RequestTimedOutIOException) { 688 // expected 689 } else { 690 throw e; 691 } 692 } 693 doAsyncSendPacket(new ShutdownInfo()); 694 } 695 } finally { // release anyway even if previous communication fails 696 started.set(false); 697 698 // TODO if we move the TaskRunnerFactory to the connection 699 // factory 700 // then we may need to call 701 // factory.onConnectionClose(this); 702 if (sessionTaskRunner != null) { 703 sessionTaskRunner.shutdown(); 704 } 705 closed.set(true); 706 closing.set(false); 707 } 708 } 709 } 710 } finally { 711 try { 712 if (executor != null) { 713 ThreadPoolUtils.shutdown(executor); 714 } 715 } catch (Throwable e) { 716 LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e); 717 } 718 719 ServiceSupport.dispose(this.transport); 720 721 factoryStats.removeConnection(this); 722 } 723 } 724 725 /** 726 * Tells the broker to terminate its VM. This can be used to cleanly 727 * terminate a broker running in a standalone java process. Server must have 728 * property enable.vm.shutdown=true defined to allow this to work. 729 */ 730 // TODO : org.apache.activemq.message.BrokerAdminCommand not yet 731 // implemented. 732 /* 733 * public void terminateBrokerVM() throws JMSException { BrokerAdminCommand 734 * command = new BrokerAdminCommand(); 735 * command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM); 736 * asyncSendPacket(command); } 737 */ 738 739 /** 740 * Create a durable connection consumer for this connection (optional 741 * operation). This is an expert facility not used by regular JMS clients. 742 * 743 * @param topic topic to access 744 * @param subscriptionName durable subscription name 745 * @param messageSelector only messages with properties matching the message 746 * selector expression are delivered. A value of null or an 747 * empty string indicates that there is no message selector 748 * for the message consumer. 749 * @param sessionPool the server session pool to associate with this durable 750 * connection consumer 751 * @param maxMessages the maximum number of messages that can be assigned to 752 * a server session at one time 753 * @return the durable connection consumer 754 * @throws JMSException if the <CODE>Connection</CODE> object fails to 755 * create a connection consumer due to some internal error 756 * or invalid arguments for <CODE>sessionPool</CODE> and 757 * <CODE>messageSelector</CODE>. 758 * @throws javax.jms.InvalidDestinationException if an invalid destination 759 * is specified. 760 * @throws javax.jms.InvalidSelectorException if the message selector is 761 * invalid. 762 * @see javax.jms.ConnectionConsumer 763 * @since 1.1 764 */ 765 @Override 766 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) 767 throws JMSException { 768 return this.createDurableConnectionConsumer(topic, subscriptionName, messageSelector, sessionPool, maxMessages, false); 769 } 770 771 /** 772 * Create a durable connection consumer for this connection (optional 773 * operation). This is an expert facility not used by regular JMS clients. 774 * 775 * @param topic topic to access 776 * @param subscriptionName durable subscription name 777 * @param messageSelector only messages with properties matching the message 778 * selector expression are delivered. A value of null or an 779 * empty string indicates that there is no message selector 780 * for the message consumer. 781 * @param sessionPool the server session pool to associate with this durable 782 * connection consumer 783 * @param maxMessages the maximum number of messages that can be assigned to 784 * a server session at one time 785 * @param noLocal set true if you want to filter out messages published 786 * locally 787 * @return the durable connection consumer 788 * @throws JMSException if the <CODE>Connection</CODE> object fails to 789 * create a connection consumer due to some internal error 790 * or invalid arguments for <CODE>sessionPool</CODE> and 791 * <CODE>messageSelector</CODE>. 792 * @throws javax.jms.InvalidDestinationException if an invalid destination 793 * is specified. 794 * @throws javax.jms.InvalidSelectorException if the message selector is 795 * invalid. 796 * @see javax.jms.ConnectionConsumer 797 * @since 1.1 798 */ 799 public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages, 800 boolean noLocal) throws JMSException { 801 checkClosedOrFailed(); 802 803 if (queueOnlyConnection) { 804 throw new IllegalStateException("QueueConnection cannot be used to create Pub/Sub based resources."); 805 } 806 807 ensureConnectionInfoSent(); 808 SessionId sessionId = new SessionId(info.getConnectionId(), -1); 809 ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId())); 810 info.setDestination(ActiveMQMessageTransformation.transformDestination(topic)); 811 info.setSubscriptionName(subscriptionName); 812 info.setSelector(messageSelector); 813 info.setPrefetchSize(maxMessages); 814 info.setDispatchAsync(isDispatchAsync()); 815 816 // Allows the options on the destination to configure the consumerInfo 817 if (info.getDestination().getOptions() != null) { 818 Map<String, String> options = new HashMap<String, String>(info.getDestination().getOptions()); 819 IntrospectionSupport.setProperties(this.info, options, "consumer."); 820 } 821 822 return new ActiveMQConnectionConsumer(this, sessionPool, info); 823 } 824 825 // Properties 826 // ------------------------------------------------------------------------- 827 828 /** 829 * Returns true if this connection has been started 830 * 831 * @return true if this Connection is started 832 */ 833 public boolean isStarted() { 834 return started.get(); 835 } 836 837 /** 838 * Returns true if the connection is closed 839 */ 840 public boolean isClosed() { 841 return closed.get(); 842 } 843 844 /** 845 * Returns true if the connection is in the process of being closed 846 */ 847 public boolean isClosing() { 848 return closing.get(); 849 } 850 851 /** 852 * Returns true if the underlying transport has failed 853 */ 854 public boolean isTransportFailed() { 855 return transportFailed.get(); 856 } 857 858 /** 859 * @return Returns the prefetchPolicy. 860 */ 861 public ActiveMQPrefetchPolicy getPrefetchPolicy() { 862 return prefetchPolicy; 863 } 864 865 /** 866 * Sets the <a 867 * href="http://activemq.apache.org/what-is-the-prefetch-limit-for.html">prefetch 868 * policy</a> for consumers created by this connection. 869 */ 870 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) { 871 this.prefetchPolicy = prefetchPolicy; 872 } 873 874 /** 875 */ 876 public Transport getTransportChannel() { 877 return transport; 878 } 879 880 /** 881 * @return Returns the clientID of the connection, forcing one to be 882 * generated if one has not yet been configured. 883 */ 884 public String getInitializedClientID() throws JMSException { 885 ensureConnectionInfoSent(); 886 return info.getClientId(); 887 } 888 889 /** 890 * @return Returns the timeStampsDisableByDefault. 891 */ 892 public boolean isDisableTimeStampsByDefault() { 893 return disableTimeStampsByDefault; 894 } 895 896 /** 897 * Sets whether or not timestamps on messages should be disabled or not. If 898 * you disable them it adds a small performance boost. 899 */ 900 public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) { 901 this.disableTimeStampsByDefault = timeStampsDisableByDefault; 902 } 903 904 /** 905 * @return Returns the dispatchOptimizedMessage. 906 */ 907 public boolean isOptimizedMessageDispatch() { 908 return optimizedMessageDispatch; 909 } 910 911 /** 912 * If this flag is set then an larger prefetch limit is used - only 913 * applicable for durable topic subscribers. 914 */ 915 public void setOptimizedMessageDispatch(boolean dispatchOptimizedMessage) { 916 this.optimizedMessageDispatch = dispatchOptimizedMessage; 917 } 918 919 /** 920 * @return Returns the closeTimeout. 921 */ 922 public int getCloseTimeout() { 923 return closeTimeout; 924 } 925 926 /** 927 * Sets the timeout before a close is considered complete. Normally a 928 * close() on a connection waits for confirmation from the broker; this 929 * allows that operation to timeout to save the client hanging if there is 930 * no broker 931 */ 932 public void setCloseTimeout(int closeTimeout) { 933 this.closeTimeout = closeTimeout; 934 } 935 936 /** 937 * @return ConnectionInfo 938 */ 939 public ConnectionInfo getConnectionInfo() { 940 return this.info; 941 } 942 943 public boolean isUseRetroactiveConsumer() { 944 return useRetroactiveConsumer; 945 } 946 947 /** 948 * Sets whether or not retroactive consumers are enabled. Retroactive 949 * consumers allow non-durable topic subscribers to receive old messages 950 * that were published before the non-durable subscriber started. 951 */ 952 public void setUseRetroactiveConsumer(boolean useRetroactiveConsumer) { 953 this.useRetroactiveConsumer = useRetroactiveConsumer; 954 } 955 956 public boolean isNestedMapAndListEnabled() { 957 return nestedMapAndListEnabled; 958 } 959 960 /** 961 * Enables/disables whether or not Message properties and MapMessage entries 962 * support <a 963 * href="http://activemq.apache.org/structured-message-properties-and-mapmessages.html">Nested 964 * Structures</a> of Map and List objects 965 */ 966 public void setNestedMapAndListEnabled(boolean structuredMapsEnabled) { 967 this.nestedMapAndListEnabled = structuredMapsEnabled; 968 } 969 970 public boolean isExclusiveConsumer() { 971 return exclusiveConsumer; 972 } 973 974 /** 975 * Enables or disables whether or not queue consumers should be exclusive or 976 * not for example to preserve ordering when not using <a 977 * href="http://activemq.apache.org/message-groups.html">Message Groups</a> 978 * 979 * @param exclusiveConsumer 980 */ 981 public void setExclusiveConsumer(boolean exclusiveConsumer) { 982 this.exclusiveConsumer = exclusiveConsumer; 983 } 984 985 /** 986 * Adds a transport listener so that a client can be notified of events in 987 * the underlying transport 988 */ 989 public void addTransportListener(TransportListener transportListener) { 990 transportListeners.add(transportListener); 991 } 992 993 public void removeTransportListener(TransportListener transportListener) { 994 transportListeners.remove(transportListener); 995 } 996 997 public boolean isUseDedicatedTaskRunner() { 998 return useDedicatedTaskRunner; 999 } 1000 1001 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 1002 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 1003 } 1004 1005 public TaskRunnerFactory getSessionTaskRunner() { 1006 synchronized (this) { 1007 if (sessionTaskRunner == null) { 1008 sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task", ThreadPriorities.INBOUND_CLIENT_SESSION, false, 1000, isUseDedicatedTaskRunner(), maxThreadPoolSize); 1009 sessionTaskRunner.setRejectedTaskHandler(rejectedTaskHandler); 1010 } 1011 } 1012 return sessionTaskRunner; 1013 } 1014 1015 public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { 1016 this.sessionTaskRunner = sessionTaskRunner; 1017 } 1018 1019 public MessageTransformer getTransformer() { 1020 return transformer; 1021 } 1022 1023 /** 1024 * Sets the transformer used to transform messages before they are sent on 1025 * to the JMS bus or when they are received from the bus but before they are 1026 * delivered to the JMS client 1027 */ 1028 public void setTransformer(MessageTransformer transformer) { 1029 this.transformer = transformer; 1030 } 1031 1032 /** 1033 * @return the statsEnabled 1034 */ 1035 public boolean isStatsEnabled() { 1036 return this.stats.isEnabled(); 1037 } 1038 1039 /** 1040 * @param statsEnabled the statsEnabled to set 1041 */ 1042 public void setStatsEnabled(boolean statsEnabled) { 1043 this.stats.setEnabled(statsEnabled); 1044 } 1045 1046 /** 1047 * Returns the {@link DestinationSource} object which can be used to listen to destinations 1048 * being created or destroyed or to enquire about the current destinations available on the broker 1049 * 1050 * @return a lazily created destination source 1051 * @throws JMSException 1052 */ 1053 @Override 1054 public DestinationSource getDestinationSource() throws JMSException { 1055 if (destinationSource == null) { 1056 destinationSource = new DestinationSource(this); 1057 destinationSource.start(); 1058 } 1059 return destinationSource; 1060 } 1061 1062 // Implementation methods 1063 // ------------------------------------------------------------------------- 1064 1065 /** 1066 * Used internally for adding Sessions to the Connection 1067 * 1068 * @param session 1069 * @throws JMSException 1070 * @throws JMSException 1071 */ 1072 protected void addSession(ActiveMQSession session) throws JMSException { 1073 this.sessions.add(session); 1074 if (sessions.size() > 1 || session.isTransacted()) { 1075 optimizedMessageDispatch = false; 1076 } 1077 } 1078 1079 /** 1080 * Used interanlly for removing Sessions from a Connection 1081 * 1082 * @param session 1083 */ 1084 protected void removeSession(ActiveMQSession session) { 1085 this.sessions.remove(session); 1086 this.removeDispatcher(session); 1087 } 1088 1089 /** 1090 * Add a ConnectionConsumer 1091 * 1092 * @param connectionConsumer 1093 * @throws JMSException 1094 */ 1095 protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException { 1096 this.connectionConsumers.add(connectionConsumer); 1097 } 1098 1099 /** 1100 * Remove a ConnectionConsumer 1101 * 1102 * @param connectionConsumer 1103 */ 1104 protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) { 1105 this.connectionConsumers.remove(connectionConsumer); 1106 this.removeDispatcher(connectionConsumer); 1107 } 1108 1109 /** 1110 * Creates a <CODE>TopicSession</CODE> object. 1111 * 1112 * @param transacted indicates whether the session is transacted 1113 * @param acknowledgeMode indicates whether the consumer or the client will 1114 * acknowledge any messages it receives; ignored if the 1115 * session is transacted. Legal values are 1116 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1117 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1118 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1119 * @return a newly created topic session 1120 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1121 * to create a session due to some internal error or lack of 1122 * support for the specific transaction and acknowledgement 1123 * mode. 1124 * @see Session#AUTO_ACKNOWLEDGE 1125 * @see Session#CLIENT_ACKNOWLEDGE 1126 * @see Session#DUPS_OK_ACKNOWLEDGE 1127 */ 1128 @Override 1129 public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException { 1130 return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1131 } 1132 1133 /** 1134 * Creates a connection consumer for this connection (optional operation). 1135 * This is an expert facility not used by regular JMS clients. 1136 * 1137 * @param topic the topic to access 1138 * @param messageSelector only messages with properties matching the message 1139 * selector expression are delivered. A value of null or an 1140 * empty string indicates that there is no message selector 1141 * for the message consumer. 1142 * @param sessionPool the server session pool to associate with this 1143 * connection consumer 1144 * @param maxMessages the maximum number of messages that can be assigned to 1145 * a server session at one time 1146 * @return the connection consumer 1147 * @throws JMSException if the <CODE>TopicConnection</CODE> object fails 1148 * to create a connection consumer due to some internal 1149 * error or invalid arguments for <CODE>sessionPool</CODE> 1150 * and <CODE>messageSelector</CODE>. 1151 * @throws javax.jms.InvalidDestinationException if an invalid topic is 1152 * specified. 1153 * @throws javax.jms.InvalidSelectorException if the message selector is 1154 * invalid. 1155 * @see javax.jms.ConnectionConsumer 1156 */ 1157 @Override 1158 public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1159 return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, false); 1160 } 1161 1162 /** 1163 * Creates a connection consumer for this connection (optional operation). 1164 * This is an expert facility not used by regular JMS clients. 1165 * 1166 * @param queue the queue to access 1167 * @param messageSelector only messages with properties matching the message 1168 * selector expression are delivered. A value of null or an 1169 * empty string indicates that there is no message selector 1170 * for the message consumer. 1171 * @param sessionPool the server session pool to associate with this 1172 * connection consumer 1173 * @param maxMessages the maximum number of messages that can be assigned to 1174 * a server session at one time 1175 * @return the connection consumer 1176 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1177 * to create a connection consumer due to some internal 1178 * error or invalid arguments for <CODE>sessionPool</CODE> 1179 * and <CODE>messageSelector</CODE>. 1180 * @throws javax.jms.InvalidDestinationException if an invalid queue is 1181 * specified. 1182 * @throws javax.jms.InvalidSelectorException if the message selector is 1183 * invalid. 1184 * @see javax.jms.ConnectionConsumer 1185 */ 1186 @Override 1187 public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1188 return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, false); 1189 } 1190 1191 /** 1192 * Creates a connection consumer for this connection (optional operation). 1193 * This is an expert facility not used by regular JMS clients. 1194 * 1195 * @param destination the destination to access 1196 * @param messageSelector only messages with properties matching the message 1197 * selector expression are delivered. A value of null or an 1198 * empty string indicates that there is no message selector 1199 * for the message consumer. 1200 * @param sessionPool the server session pool to associate with this 1201 * connection consumer 1202 * @param maxMessages the maximum number of messages that can be assigned to 1203 * a server session at one time 1204 * @return the connection consumer 1205 * @throws JMSException if the <CODE>Connection</CODE> object fails to 1206 * create a connection consumer due to some internal error 1207 * or invalid arguments for <CODE>sessionPool</CODE> and 1208 * <CODE>messageSelector</CODE>. 1209 * @throws javax.jms.InvalidDestinationException if an invalid destination 1210 * is specified. 1211 * @throws javax.jms.InvalidSelectorException if the message selector is 1212 * invalid. 1213 * @see javax.jms.ConnectionConsumer 1214 * @since 1.1 1215 */ 1216 @Override 1217 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException { 1218 return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, false); 1219 } 1220 1221 public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, boolean noLocal) 1222 throws JMSException { 1223 1224 checkClosedOrFailed(); 1225 ensureConnectionInfoSent(); 1226 1227 ConsumerId consumerId = createConsumerId(); 1228 ConsumerInfo consumerInfo = new ConsumerInfo(consumerId); 1229 consumerInfo.setDestination(ActiveMQMessageTransformation.transformDestination(destination)); 1230 consumerInfo.setSelector(messageSelector); 1231 consumerInfo.setPrefetchSize(maxMessages); 1232 consumerInfo.setNoLocal(noLocal); 1233 consumerInfo.setDispatchAsync(isDispatchAsync()); 1234 1235 // Allows the options on the destination to configure the consumerInfo 1236 if (consumerInfo.getDestination().getOptions() != null) { 1237 Map<String, String> options = new HashMap<String, String>(consumerInfo.getDestination().getOptions()); 1238 IntrospectionSupport.setProperties(consumerInfo, options, "consumer."); 1239 } 1240 1241 return new ActiveMQConnectionConsumer(this, sessionPool, consumerInfo); 1242 } 1243 1244 /** 1245 * @return 1246 */ 1247 private ConsumerId createConsumerId() { 1248 return new ConsumerId(connectionSessionId, consumerIdGenerator.getNextSequenceId()); 1249 } 1250 1251 /** 1252 * Creates a <CODE>QueueSession</CODE> object. 1253 * 1254 * @param transacted indicates whether the session is transacted 1255 * @param acknowledgeMode indicates whether the consumer or the client will 1256 * acknowledge any messages it receives; ignored if the 1257 * session is transacted. Legal values are 1258 * <code>Session.AUTO_ACKNOWLEDGE</code>, 1259 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and 1260 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>. 1261 * @return a newly created queue session 1262 * @throws JMSException if the <CODE>QueueConnection</CODE> object fails 1263 * to create a session due to some internal error or lack of 1264 * support for the specific transaction and acknowledgement 1265 * mode. 1266 * @see Session#AUTO_ACKNOWLEDGE 1267 * @see Session#CLIENT_ACKNOWLEDGE 1268 * @see Session#DUPS_OK_ACKNOWLEDGE 1269 */ 1270 @Override 1271 public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException { 1272 return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode)); 1273 } 1274 1275 /** 1276 * Ensures that the clientID was manually specified and not auto-generated. 1277 * If the clientID was not specified this method will throw an exception. 1278 * This method is used to ensure that the clientID + durableSubscriber name 1279 * are used correctly. 1280 * 1281 * @throws JMSException 1282 */ 1283 public void checkClientIDWasManuallySpecified() throws JMSException { 1284 if (!userSpecifiedClientID) { 1285 throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection"); 1286 } 1287 } 1288 1289 /** 1290 * send a Packet through the Connection - for internal use only 1291 * 1292 * @param command 1293 * @throws JMSException 1294 */ 1295 public void asyncSendPacket(Command command) throws JMSException { 1296 if (isClosed()) { 1297 throw new ConnectionClosedException(); 1298 } else { 1299 doAsyncSendPacket(command); 1300 } 1301 } 1302 1303 private void doAsyncSendPacket(Command command) throws JMSException { 1304 try { 1305 this.transport.oneway(command); 1306 } catch (IOException e) { 1307 throw JMSExceptionSupport.create(e); 1308 } 1309 } 1310 1311 /** 1312 * Send a packet through a Connection - for internal use only 1313 * 1314 * @param command 1315 * @return 1316 * @throws JMSException 1317 */ 1318 public void syncSendPacket(final Command command, final AsyncCallback onComplete) throws JMSException { 1319 if(onComplete==null) { 1320 syncSendPacket(command); 1321 } else { 1322 if (isClosed()) { 1323 throw new ConnectionClosedException(); 1324 } 1325 try { 1326 this.transport.asyncRequest(command, new ResponseCallback() { 1327 @Override 1328 public void onCompletion(FutureResponse resp) { 1329 Response response; 1330 Throwable exception = null; 1331 try { 1332 response = resp.getResult(); 1333 if (response.isException()) { 1334 ExceptionResponse er = (ExceptionResponse)response; 1335 exception = er.getException(); 1336 } 1337 } catch (Exception e) { 1338 exception = e; 1339 } 1340 if(exception!=null) { 1341 if ( exception instanceof JMSException) { 1342 onComplete.onException((JMSException) exception); 1343 } else { 1344 if (isClosed()||closing.get()) { 1345 LOG.debug("Received an exception but connection is closing"); 1346 } 1347 JMSException jmsEx = null; 1348 try { 1349 jmsEx = JMSExceptionSupport.create(exception); 1350 } catch(Throwable e) { 1351 LOG.error("Caught an exception trying to create a JMSException for " +exception,e); 1352 } 1353 // dispose of transport for security exceptions on connection initiation 1354 if (exception instanceof SecurityException && command instanceof ConnectionInfo){ 1355 forceCloseOnSecurityException(exception); 1356 } 1357 if (jmsEx !=null) { 1358 onComplete.onException(jmsEx); 1359 } 1360 } 1361 } else { 1362 onComplete.onSuccess(); 1363 } 1364 } 1365 }); 1366 } catch (IOException e) { 1367 throw JMSExceptionSupport.create(e); 1368 } 1369 } 1370 } 1371 1372 private void forceCloseOnSecurityException(Throwable exception) { 1373 LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception); 1374 onException(new IOException("Force close due to SecurityException on connect", exception)); 1375 } 1376 1377 public Response syncSendPacket(Command command, int timeout) throws JMSException { 1378 if (isClosed()) { 1379 throw new ConnectionClosedException(); 1380 } else { 1381 1382 try { 1383 Response response = (Response)(timeout > 0 1384 ? this.transport.request(command, timeout) 1385 : this.transport.request(command)); 1386 if (response.isException()) { 1387 ExceptionResponse er = (ExceptionResponse)response; 1388 if (er.getException() instanceof JMSException) { 1389 throw (JMSException)er.getException(); 1390 } else { 1391 if (isClosed()||closing.get()) { 1392 LOG.debug("Received an exception but connection is closing"); 1393 } 1394 JMSException jmsEx = null; 1395 try { 1396 jmsEx = JMSExceptionSupport.create(er.getException()); 1397 } catch(Throwable e) { 1398 LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e); 1399 } 1400 if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){ 1401 forceCloseOnSecurityException(er.getException()); 1402 } 1403 if (jmsEx !=null) { 1404 throw jmsEx; 1405 } 1406 } 1407 } 1408 return response; 1409 } catch (IOException e) { 1410 throw JMSExceptionSupport.create(e); 1411 } 1412 } 1413 } 1414 1415 /** 1416 * Send a packet through a Connection - for internal use only 1417 * 1418 * @param command 1419 * @return 1420 * @throws JMSException 1421 */ 1422 public Response syncSendPacket(Command command) throws JMSException { 1423 return syncSendPacket(command, 0); 1424 } 1425 1426 /** 1427 * @return statistics for this Connection 1428 */ 1429 @Override 1430 public StatsImpl getStats() { 1431 return stats; 1432 } 1433 1434 /** 1435 * simply throws an exception if the Connection is already closed or the 1436 * Transport has failed 1437 * 1438 * @throws JMSException 1439 */ 1440 protected synchronized void checkClosedOrFailed() throws JMSException { 1441 checkClosed(); 1442 if (transportFailed.get()) { 1443 throw new ConnectionFailedException(firstFailureError); 1444 } 1445 } 1446 1447 /** 1448 * simply throws an exception if the Connection is already closed 1449 * 1450 * @throws JMSException 1451 */ 1452 protected synchronized void checkClosed() throws JMSException { 1453 if (closed.get()) { 1454 throw new ConnectionClosedException(); 1455 } 1456 } 1457 1458 /** 1459 * Send the ConnectionInfo to the Broker 1460 * 1461 * @throws JMSException 1462 */ 1463 protected void ensureConnectionInfoSent() throws JMSException { 1464 synchronized(this.ensureConnectionInfoSentMutex) { 1465 // Can we skip sending the ConnectionInfo packet?? 1466 if (isConnectionInfoSentToBroker || closed.get()) { 1467 return; 1468 } 1469 //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID? 1470 if (info.getClientId() == null || info.getClientId().trim().length() == 0) { 1471 info.setClientId(clientIdGenerator.generateId()); 1472 } 1473 syncSendPacket(info.copy(), getConnectResponseTimeout()); 1474 1475 this.isConnectionInfoSentToBroker = true; 1476 // Add a temp destination advisory consumer so that 1477 // We know what the valid temporary destinations are on the 1478 // broker without having to do an RPC to the broker. 1479 1480 ConsumerId consumerId = new ConsumerId(new SessionId(info.getConnectionId(), -1), consumerIdGenerator.getNextSequenceId()); 1481 if (watchTopicAdvisories) { 1482 advisoryConsumer = new AdvisoryConsumer(this, consumerId); 1483 } 1484 } 1485 } 1486 1487 public synchronized boolean isWatchTopicAdvisories() { 1488 return watchTopicAdvisories; 1489 } 1490 1491 public synchronized void setWatchTopicAdvisories(boolean watchTopicAdvisories) { 1492 this.watchTopicAdvisories = watchTopicAdvisories; 1493 } 1494 1495 /** 1496 * @return Returns the useAsyncSend. 1497 */ 1498 public boolean isUseAsyncSend() { 1499 return useAsyncSend; 1500 } 1501 1502 /** 1503 * Forces the use of <a 1504 * href="http://activemq.apache.org/async-sends.html">Async Sends</a> which 1505 * adds a massive performance boost; but means that the send() method will 1506 * return immediately whether the message has been sent or not which could 1507 * lead to message loss. 1508 */ 1509 public void setUseAsyncSend(boolean useAsyncSend) { 1510 this.useAsyncSend = useAsyncSend; 1511 } 1512 1513 /** 1514 * @return true if always sync send messages 1515 */ 1516 public boolean isAlwaysSyncSend() { 1517 return this.alwaysSyncSend; 1518 } 1519 1520 /** 1521 * Set true if always require messages to be sync sent 1522 * 1523 * @param alwaysSyncSend 1524 */ 1525 public void setAlwaysSyncSend(boolean alwaysSyncSend) { 1526 this.alwaysSyncSend = alwaysSyncSend; 1527 } 1528 1529 /** 1530 * @return the messagePrioritySupported 1531 */ 1532 public boolean isMessagePrioritySupported() { 1533 return this.messagePrioritySupported; 1534 } 1535 1536 /** 1537 * @param messagePrioritySupported the messagePrioritySupported to set 1538 */ 1539 public void setMessagePrioritySupported(boolean messagePrioritySupported) { 1540 this.messagePrioritySupported = messagePrioritySupported; 1541 } 1542 1543 /** 1544 * Cleans up this connection so that it's state is as if the connection was 1545 * just created. This allows the Resource Adapter to clean up a connection 1546 * so that it can be reused without having to close and recreate the 1547 * connection. 1548 */ 1549 public void cleanup() throws JMSException { 1550 doCleanup(false); 1551 } 1552 1553 public void doCleanup(boolean removeConnection) throws JMSException { 1554 if (advisoryConsumer != null && !isTransportFailed()) { 1555 advisoryConsumer.dispose(); 1556 advisoryConsumer = null; 1557 } 1558 1559 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1560 ActiveMQSession s = i.next(); 1561 s.dispose(); 1562 } 1563 for (Iterator<ActiveMQConnectionConsumer> i = this.connectionConsumers.iterator(); i.hasNext();) { 1564 ActiveMQConnectionConsumer c = i.next(); 1565 c.dispose(); 1566 } 1567 1568 if (removeConnection) { 1569 if (isConnectionInfoSentToBroker) { 1570 if (!transportFailed.get() && !closing.get()) { 1571 syncSendPacket(info.createRemoveCommand()); 1572 } 1573 isConnectionInfoSentToBroker = false; 1574 } 1575 if (userSpecifiedClientID) { 1576 info.setClientId(null); 1577 userSpecifiedClientID = false; 1578 } 1579 clientIDSet = false; 1580 } 1581 1582 started.set(false); 1583 } 1584 1585 /** 1586 * Changes the associated username/password that is associated with this 1587 * connection. If the connection has been used, you must called cleanup() 1588 * before calling this method. 1589 * 1590 * @throws IllegalStateException if the connection is in used. 1591 */ 1592 public void changeUserInfo(String userName, String password) throws JMSException { 1593 if (isConnectionInfoSentToBroker) { 1594 throw new IllegalStateException("changeUserInfo used Connection is not allowed"); 1595 } 1596 this.info.setUserName(userName); 1597 this.info.setPassword(password); 1598 } 1599 1600 /** 1601 * @return Returns the resourceManagerId. 1602 * @throws JMSException 1603 */ 1604 public String getResourceManagerId() throws JMSException { 1605 if (isRmIdFromConnectionId()) { 1606 return info.getConnectionId().getValue(); 1607 } 1608 waitForBrokerInfo(); 1609 if (brokerInfo == null) { 1610 throw new JMSException("Connection failed before Broker info was received."); 1611 } 1612 return brokerInfo.getBrokerId().getValue(); 1613 } 1614 1615 /** 1616 * Returns the broker name if one is available or null if one is not 1617 * available yet. 1618 */ 1619 public String getBrokerName() { 1620 try { 1621 brokerInfoReceived.await(5, TimeUnit.SECONDS); 1622 if (brokerInfo == null) { 1623 return null; 1624 } 1625 return brokerInfo.getBrokerName(); 1626 } catch (InterruptedException e) { 1627 Thread.currentThread().interrupt(); 1628 return null; 1629 } 1630 } 1631 1632 /** 1633 * Returns the broker information if it is available or null if it is not 1634 * available yet. 1635 */ 1636 public BrokerInfo getBrokerInfo() { 1637 return brokerInfo; 1638 } 1639 1640 /** 1641 * @return Returns the RedeliveryPolicy. 1642 * @throws JMSException 1643 */ 1644 public RedeliveryPolicy getRedeliveryPolicy() throws JMSException { 1645 return redeliveryPolicyMap.getDefaultEntry(); 1646 } 1647 1648 /** 1649 * Sets the redelivery policy to be used when messages are rolled back 1650 */ 1651 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 1652 this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy); 1653 } 1654 1655 public BlobTransferPolicy getBlobTransferPolicy() { 1656 if (blobTransferPolicy == null) { 1657 blobTransferPolicy = createBlobTransferPolicy(); 1658 } 1659 return blobTransferPolicy; 1660 } 1661 1662 /** 1663 * Sets the policy used to describe how out-of-band BLOBs (Binary Large 1664 * OBjects) are transferred from producers to brokers to consumers 1665 */ 1666 public void setBlobTransferPolicy(BlobTransferPolicy blobTransferPolicy) { 1667 this.blobTransferPolicy = blobTransferPolicy; 1668 } 1669 1670 /** 1671 * @return Returns the alwaysSessionAsync. 1672 */ 1673 public boolean isAlwaysSessionAsync() { 1674 return alwaysSessionAsync; 1675 } 1676 1677 /** 1678 * If this flag is not set then a separate thread is not used for dispatching messages for each Session in 1679 * the Connection. However, a separate thread is always used if there is more than one session, or the session 1680 * isn't in auto acknowledge or duplicates ok mode. By default this value is set to true and session dispatch 1681 * happens asynchronously. 1682 */ 1683 public void setAlwaysSessionAsync(boolean alwaysSessionAsync) { 1684 this.alwaysSessionAsync = alwaysSessionAsync; 1685 } 1686 1687 /** 1688 * @return Returns the optimizeAcknowledge. 1689 */ 1690 public boolean isOptimizeAcknowledge() { 1691 return optimizeAcknowledge; 1692 } 1693 1694 /** 1695 * Enables an optimised acknowledgement mode where messages are acknowledged 1696 * in batches rather than individually 1697 * 1698 * @param optimizeAcknowledge The optimizeAcknowledge to set. 1699 */ 1700 public void setOptimizeAcknowledge(boolean optimizeAcknowledge) { 1701 this.optimizeAcknowledge = optimizeAcknowledge; 1702 } 1703 1704 /** 1705 * The max time in milliseconds between optimized ack batches 1706 * @param optimizeAcknowledgeTimeOut 1707 */ 1708 public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut) { 1709 this.optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut; 1710 } 1711 1712 public long getOptimizeAcknowledgeTimeOut() { 1713 return optimizeAcknowledgeTimeOut; 1714 } 1715 1716 public long getWarnAboutUnstartedConnectionTimeout() { 1717 return warnAboutUnstartedConnectionTimeout; 1718 } 1719 1720 /** 1721 * Enables the timeout from a connection creation to when a warning is 1722 * generated if the connection is not properly started via {@link #start()} 1723 * and a message is received by a consumer. It is a very common gotcha to 1724 * forget to <a 1725 * href="http://activemq.apache.org/i-am-not-receiving-any-messages-what-is-wrong.html">start 1726 * the connection</a> so this option makes the default case to create a 1727 * warning if the user forgets. To disable the warning just set the value to < 1728 * 0 (say -1). 1729 */ 1730 public void setWarnAboutUnstartedConnectionTimeout(long warnAboutUnstartedConnectionTimeout) { 1731 this.warnAboutUnstartedConnectionTimeout = warnAboutUnstartedConnectionTimeout; 1732 } 1733 1734 /** 1735 * @return the sendTimeout (in milliseconds) 1736 */ 1737 public int getSendTimeout() { 1738 return sendTimeout; 1739 } 1740 1741 /** 1742 * @param sendTimeout the sendTimeout to set (in milliseconds) 1743 */ 1744 public void setSendTimeout(int sendTimeout) { 1745 this.sendTimeout = sendTimeout; 1746 } 1747 1748 /** 1749 * @return the sendAcksAsync 1750 */ 1751 public boolean isSendAcksAsync() { 1752 return sendAcksAsync; 1753 } 1754 1755 /** 1756 * @param sendAcksAsync the sendAcksAsync to set 1757 */ 1758 public void setSendAcksAsync(boolean sendAcksAsync) { 1759 this.sendAcksAsync = sendAcksAsync; 1760 } 1761 1762 /** 1763 * Returns the time this connection was created 1764 */ 1765 public long getTimeCreated() { 1766 return timeCreated; 1767 } 1768 1769 private void waitForBrokerInfo() throws JMSException { 1770 try { 1771 brokerInfoReceived.await(); 1772 } catch (InterruptedException e) { 1773 Thread.currentThread().interrupt(); 1774 throw JMSExceptionSupport.create(e); 1775 } 1776 } 1777 1778 // Package protected so that it can be used in unit tests 1779 public Transport getTransport() { 1780 return transport; 1781 } 1782 1783 public void addProducer(ProducerId producerId, ActiveMQMessageProducer producer) { 1784 producers.put(producerId, producer); 1785 } 1786 1787 public void removeProducer(ProducerId producerId) { 1788 producers.remove(producerId); 1789 } 1790 1791 public void addDispatcher(ConsumerId consumerId, ActiveMQDispatcher dispatcher) { 1792 dispatchers.put(consumerId, dispatcher); 1793 } 1794 1795 public void removeDispatcher(ConsumerId consumerId) { 1796 dispatchers.remove(consumerId); 1797 } 1798 1799 public boolean hasDispatcher(ConsumerId consumerId) { 1800 return dispatchers.containsKey(consumerId); 1801 } 1802 1803 /** 1804 * @param o - the command to consume 1805 */ 1806 @Override 1807 public void onCommand(final Object o) { 1808 final Command command = (Command)o; 1809 if (!closed.get() && command != null) { 1810 try { 1811 command.visit(new CommandVisitorAdapter() { 1812 @Override 1813 public Response processMessageDispatch(MessageDispatch md) throws Exception { 1814 waitForTransportInterruptionProcessingToComplete(); 1815 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); 1816 if (dispatcher != null) { 1817 // Copy in case a embedded broker is dispatching via 1818 // vm:// 1819 // md.getMessage() == null to signal end of queue 1820 // browse. 1821 Message msg = md.getMessage(); 1822 if (msg != null) { 1823 msg = msg.copy(); 1824 msg.setReadOnlyBody(true); 1825 msg.setReadOnlyProperties(true); 1826 msg.setRedeliveryCounter(md.getRedeliveryCounter()); 1827 msg.setConnection(ActiveMQConnection.this); 1828 msg.setMemoryUsage(null); 1829 md.setMessage(msg); 1830 } 1831 dispatcher.dispatch(md); 1832 } else { 1833 LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers); 1834 } 1835 return null; 1836 } 1837 1838 @Override 1839 public Response processProducerAck(ProducerAck pa) throws Exception { 1840 if (pa != null && pa.getProducerId() != null) { 1841 ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); 1842 if (producer != null) { 1843 producer.onProducerAck(pa); 1844 } 1845 } 1846 return null; 1847 } 1848 1849 @Override 1850 public Response processBrokerInfo(BrokerInfo info) throws Exception { 1851 brokerInfo = info; 1852 brokerInfoReceived.countDown(); 1853 optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration(); 1854 getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl()); 1855 return null; 1856 } 1857 1858 @Override 1859 public Response processConnectionError(final ConnectionError error) throws Exception { 1860 executor.execute(new Runnable() { 1861 @Override 1862 public void run() { 1863 onAsyncException(error.getException()); 1864 } 1865 }); 1866 return null; 1867 } 1868 1869 @Override 1870 public Response processControlCommand(ControlCommand command) throws Exception { 1871 onControlCommand(command); 1872 return null; 1873 } 1874 1875 @Override 1876 public Response processConnectionControl(ConnectionControl control) throws Exception { 1877 onConnectionControl((ConnectionControl)command); 1878 return null; 1879 } 1880 1881 @Override 1882 public Response processConsumerControl(ConsumerControl control) throws Exception { 1883 onConsumerControl((ConsumerControl)command); 1884 return null; 1885 } 1886 1887 @Override 1888 public Response processWireFormat(WireFormatInfo info) throws Exception { 1889 onWireFormatInfo((WireFormatInfo)command); 1890 return null; 1891 } 1892 }); 1893 } catch (Exception e) { 1894 onClientInternalException(e); 1895 } 1896 } 1897 1898 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1899 TransportListener listener = iter.next(); 1900 listener.onCommand(command); 1901 } 1902 } 1903 1904 protected void onWireFormatInfo(WireFormatInfo info) { 1905 protocolVersion.set(info.getVersion()); 1906 } 1907 1908 /** 1909 * Handles async client internal exceptions. 1910 * A client internal exception is usually one that has been thrown 1911 * by a container runtime component during asynchronous processing of a 1912 * message that does not affect the connection itself. 1913 * This method notifies the <code>ClientInternalExceptionListener</code> by invoking 1914 * its <code>onException</code> method, if one has been registered with this connection. 1915 * 1916 * @param error the exception that the problem 1917 */ 1918 public void onClientInternalException(final Throwable error) { 1919 if ( !closed.get() && !closing.get() ) { 1920 if ( this.clientInternalExceptionListener != null ) { 1921 executor.execute(new Runnable() { 1922 @Override 1923 public void run() { 1924 ActiveMQConnection.this.clientInternalExceptionListener.onException(error); 1925 } 1926 }); 1927 } else { 1928 LOG.debug("Async client internal exception occurred with no exception listener registered: " 1929 + error, error); 1930 } 1931 } 1932 } 1933 1934 /** 1935 * Used for handling async exceptions 1936 * 1937 * @param error 1938 */ 1939 public void onAsyncException(Throwable error) { 1940 if (!closed.get() && !closing.get()) { 1941 if (this.exceptionListener != null) { 1942 1943 if (!(error instanceof JMSException)) { 1944 error = JMSExceptionSupport.create(error); 1945 } 1946 final JMSException e = (JMSException)error; 1947 1948 executor.execute(new Runnable() { 1949 @Override 1950 public void run() { 1951 ActiveMQConnection.this.exceptionListener.onException(e); 1952 } 1953 }); 1954 1955 } else { 1956 LOG.debug("Async exception with no exception listener: " + error, error); 1957 } 1958 } 1959 } 1960 1961 @Override 1962 public void onException(final IOException error) { 1963 onAsyncException(error); 1964 if (!closing.get() && !closed.get()) { 1965 executor.execute(new Runnable() { 1966 @Override 1967 public void run() { 1968 transportFailed(error); 1969 ServiceSupport.dispose(ActiveMQConnection.this.transport); 1970 brokerInfoReceived.countDown(); 1971 try { 1972 doCleanup(true); 1973 } catch (JMSException e) { 1974 LOG.warn("Exception during connection cleanup, " + e, e); 1975 } 1976 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 1977 TransportListener listener = iter.next(); 1978 listener.onException(error); 1979 } 1980 } 1981 }); 1982 } 1983 } 1984 1985 @Override 1986 public void transportInterupted() { 1987 transportInterruptionProcessingComplete.set(1); 1988 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 1989 ActiveMQSession s = i.next(); 1990 s.clearMessagesInProgress(transportInterruptionProcessingComplete); 1991 } 1992 1993 for (ActiveMQConnectionConsumer connectionConsumer : this.connectionConsumers) { 1994 connectionConsumer.clearMessagesInProgress(transportInterruptionProcessingComplete); 1995 } 1996 1997 if (transportInterruptionProcessingComplete.decrementAndGet() > 0) { 1998 if (LOG.isDebugEnabled()) { 1999 LOG.debug("transport interrupted - processing required, dispatchers: " + transportInterruptionProcessingComplete.get()); 2000 } 2001 signalInterruptionProcessingNeeded(); 2002 } 2003 2004 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 2005 TransportListener listener = iter.next(); 2006 listener.transportInterupted(); 2007 } 2008 } 2009 2010 @Override 2011 public void transportResumed() { 2012 for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { 2013 TransportListener listener = iter.next(); 2014 listener.transportResumed(); 2015 } 2016 } 2017 2018 /** 2019 * Create the DestinationInfo object for the temporary destination. 2020 * 2021 * @param topic - if its true topic, else queue. 2022 * @return DestinationInfo 2023 * @throws JMSException 2024 */ 2025 protected ActiveMQTempDestination createTempDestination(boolean topic) throws JMSException { 2026 2027 // Check if Destination info is of temporary type. 2028 ActiveMQTempDestination dest; 2029 if (topic) { 2030 dest = new ActiveMQTempTopic(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 2031 } else { 2032 dest = new ActiveMQTempQueue(info.getConnectionId(), tempDestinationIdGenerator.getNextSequenceId()); 2033 } 2034 2035 DestinationInfo info = new DestinationInfo(); 2036 info.setConnectionId(this.info.getConnectionId()); 2037 info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE); 2038 info.setDestination(dest); 2039 syncSendPacket(info); 2040 2041 dest.setConnection(this); 2042 activeTempDestinations.put(dest, dest); 2043 return dest; 2044 } 2045 2046 /** 2047 * @param destination 2048 * @throws JMSException 2049 */ 2050 public void deleteTempDestination(ActiveMQTempDestination destination) throws JMSException { 2051 2052 checkClosedOrFailed(); 2053 2054 for (ActiveMQSession session : this.sessions) { 2055 if (session.isInUse(destination)) { 2056 throw new JMSException("A consumer is consuming from the temporary destination"); 2057 } 2058 } 2059 2060 activeTempDestinations.remove(destination); 2061 2062 DestinationInfo destInfo = new DestinationInfo(); 2063 destInfo.setConnectionId(this.info.getConnectionId()); 2064 destInfo.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 2065 destInfo.setDestination(destination); 2066 destInfo.setTimeout(0); 2067 syncSendPacket(destInfo); 2068 } 2069 2070 public boolean isDeleted(ActiveMQDestination dest) { 2071 2072 // If we are not watching the advisories.. then 2073 // we will assume that the temp destination does exist. 2074 if (advisoryConsumer == null) { 2075 return false; 2076 } 2077 2078 return !activeTempDestinations.containsValue(dest); 2079 } 2080 2081 public boolean isCopyMessageOnSend() { 2082 return copyMessageOnSend; 2083 } 2084 2085 public LongSequenceGenerator getLocalTransactionIdGenerator() { 2086 return localTransactionIdGenerator; 2087 } 2088 2089 public boolean isUseCompression() { 2090 return useCompression; 2091 } 2092 2093 /** 2094 * Enables the use of compression of the message bodies 2095 */ 2096 public void setUseCompression(boolean useCompression) { 2097 this.useCompression = useCompression; 2098 } 2099 2100 public void destroyDestination(ActiveMQDestination destination) throws JMSException { 2101 2102 checkClosedOrFailed(); 2103 ensureConnectionInfoSent(); 2104 2105 DestinationInfo info = new DestinationInfo(); 2106 info.setConnectionId(this.info.getConnectionId()); 2107 info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); 2108 info.setDestination(destination); 2109 info.setTimeout(0); 2110 syncSendPacket(info); 2111 } 2112 2113 public boolean isDispatchAsync() { 2114 return dispatchAsync; 2115 } 2116 2117 /** 2118 * Enables or disables the default setting of whether or not consumers have 2119 * their messages <a 2120 * href="http://activemq.apache.org/consumer-dispatch-async.html">dispatched 2121 * synchronously or asynchronously by the broker</a>. For non-durable 2122 * topics for example we typically dispatch synchronously by default to 2123 * minimize context switches which boost performance. However sometimes its 2124 * better to go slower to ensure that a single blocked consumer socket does 2125 * not block delivery to other consumers. 2126 * 2127 * @param asyncDispatch If true then consumers created on this connection 2128 * will default to having their messages dispatched 2129 * asynchronously. The default value is true. 2130 */ 2131 public void setDispatchAsync(boolean asyncDispatch) { 2132 this.dispatchAsync = asyncDispatch; 2133 } 2134 2135 public boolean isObjectMessageSerializationDefered() { 2136 return objectMessageSerializationDefered; 2137 } 2138 2139 /** 2140 * When an object is set on an ObjectMessage, the JMS spec requires the 2141 * object to be serialized by that set method. Enabling this flag causes the 2142 * object to not get serialized. The object may subsequently get serialized 2143 * if the message needs to be sent over a socket or stored to disk. 2144 */ 2145 public void setObjectMessageSerializationDefered(boolean objectMessageSerializationDefered) { 2146 this.objectMessageSerializationDefered = objectMessageSerializationDefered; 2147 } 2148 2149 /** 2150 * Unsubscribes a durable subscription that has been created by a client. 2151 * <P> 2152 * This method deletes the state being maintained on behalf of the 2153 * subscriber by its provider. 2154 * <P> 2155 * It is erroneous for a client to delete a durable subscription while there 2156 * is an active <CODE>MessageConsumer </CODE> or 2157 * <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed 2158 * message is part of a pending transaction or has not been acknowledged in 2159 * the session. 2160 * 2161 * @param name the name used to identify this subscription 2162 * @throws JMSException if the session fails to unsubscribe to the durable 2163 * subscription due to some internal error. 2164 * @throws InvalidDestinationException if an invalid subscription name is 2165 * specified. 2166 * @since 1.1 2167 */ 2168 public void unsubscribe(String name) throws InvalidDestinationException, JMSException { 2169 checkClosedOrFailed(); 2170 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); 2171 rsi.setConnectionId(getConnectionInfo().getConnectionId()); 2172 rsi.setSubscriptionName(name); 2173 rsi.setClientId(getConnectionInfo().getClientId()); 2174 syncSendPacket(rsi); 2175 } 2176 2177 /** 2178 * Internal send method optimized: - It does not copy the message - It can 2179 * only handle ActiveMQ messages. - You can specify if the send is async or 2180 * sync - Does not allow you to send /w a transaction. 2181 */ 2182 void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException { 2183 checkClosedOrFailed(); 2184 2185 if (destination.isTemporary() && isDeleted(destination)) { 2186 throw new JMSException("Cannot publish to a deleted Destination: " + destination); 2187 } 2188 2189 msg.setJMSDestination(destination); 2190 msg.setJMSDeliveryMode(deliveryMode); 2191 long expiration = 0L; 2192 2193 if (!isDisableTimeStampsByDefault()) { 2194 long timeStamp = System.currentTimeMillis(); 2195 msg.setJMSTimestamp(timeStamp); 2196 if (timeToLive > 0) { 2197 expiration = timeToLive + timeStamp; 2198 } 2199 } 2200 2201 msg.setJMSExpiration(expiration); 2202 msg.setJMSPriority(priority); 2203 msg.setJMSRedelivered(false); 2204 msg.setMessageId(messageId); 2205 msg.onSend(); 2206 msg.setProducerId(msg.getMessageId().getProducerId()); 2207 2208 if (LOG.isDebugEnabled()) { 2209 LOG.debug("Sending message: " + msg); 2210 } 2211 2212 if (async) { 2213 asyncSendPacket(msg); 2214 } else { 2215 syncSendPacket(msg); 2216 } 2217 } 2218 2219 protected void onControlCommand(ControlCommand command) { 2220 String text = command.getCommand(); 2221 if (text != null) { 2222 if ("shutdown".equals(text)) { 2223 LOG.info("JVM told to shutdown"); 2224 System.exit(0); 2225 } 2226 2227 // TODO Should we handle the "close" case? 2228 // if (false && "close".equals(text)){ 2229 // LOG.error("Broker " + getBrokerInfo() + "shutdown connection"); 2230 // try { 2231 // close(); 2232 // } catch (JMSException e) { 2233 // } 2234 // } 2235 } 2236 } 2237 2238 protected void onConnectionControl(ConnectionControl command) { 2239 if (command.isFaultTolerant()) { 2240 this.optimizeAcknowledge = false; 2241 for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { 2242 ActiveMQSession s = i.next(); 2243 s.setOptimizeAcknowledge(false); 2244 } 2245 } 2246 } 2247 2248 protected void onConsumerControl(ConsumerControl command) { 2249 if (command.isClose()) { 2250 for (ActiveMQSession session : this.sessions) { 2251 session.close(command.getConsumerId()); 2252 } 2253 } else { 2254 for (ActiveMQSession session : this.sessions) { 2255 session.setPrefetchSize(command.getConsumerId(), command.getPrefetch()); 2256 } 2257 for (ActiveMQConnectionConsumer connectionConsumer: connectionConsumers) { 2258 ConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo(); 2259 if (consumerInfo.getConsumerId().equals(command.getConsumerId())) { 2260 consumerInfo.setPrefetchSize(command.getPrefetch()); 2261 } 2262 } 2263 } 2264 } 2265 2266 protected void transportFailed(IOException error) { 2267 transportFailed.set(true); 2268 if (firstFailureError == null) { 2269 firstFailureError = error; 2270 } 2271 } 2272 2273 /** 2274 * Should a JMS message be copied to a new JMS Message object as part of the 2275 * send() method in JMS. This is enabled by default to be compliant with the 2276 * JMS specification. You can disable it if you do not mutate JMS messages 2277 * after they are sent for a performance boost 2278 */ 2279 public void setCopyMessageOnSend(boolean copyMessageOnSend) { 2280 this.copyMessageOnSend = copyMessageOnSend; 2281 } 2282 2283 @Override 2284 public String toString() { 2285 return "ActiveMQConnection {id=" + info.getConnectionId() + ",clientId=" + info.getClientId() + ",started=" + started.get() + "}"; 2286 } 2287 2288 protected BlobTransferPolicy createBlobTransferPolicy() { 2289 return new BlobTransferPolicy(); 2290 } 2291 2292 public int getProtocolVersion() { 2293 return protocolVersion.get(); 2294 } 2295 2296 public int getProducerWindowSize() { 2297 return producerWindowSize; 2298 } 2299 2300 public void setProducerWindowSize(int producerWindowSize) { 2301 this.producerWindowSize = producerWindowSize; 2302 } 2303 2304 public void setAuditDepth(int auditDepth) { 2305 connectionAudit.setAuditDepth(auditDepth); 2306 } 2307 2308 public void setAuditMaximumProducerNumber(int auditMaximumProducerNumber) { 2309 connectionAudit.setAuditMaximumProducerNumber(auditMaximumProducerNumber); 2310 } 2311 2312 protected void removeDispatcher(ActiveMQDispatcher dispatcher) { 2313 connectionAudit.removeDispatcher(dispatcher); 2314 } 2315 2316 protected boolean isDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2317 return checkForDuplicates && connectionAudit.isDuplicate(dispatcher, message); 2318 } 2319 2320 protected void rollbackDuplicate(ActiveMQDispatcher dispatcher, Message message) { 2321 connectionAudit.rollbackDuplicate(dispatcher, message); 2322 } 2323 2324 public IOException getFirstFailureError() { 2325 return firstFailureError; 2326 } 2327 2328 protected void waitForTransportInterruptionProcessingToComplete() throws InterruptedException { 2329 if (!closed.get() && !transportFailed.get() && transportInterruptionProcessingComplete.get()>0) { 2330 LOG.warn("dispatch with outstanding dispatch interruption processing count " + transportInterruptionProcessingComplete.get()); 2331 signalInterruptionProcessingComplete(); 2332 } 2333 } 2334 2335 protected void transportInterruptionProcessingComplete() { 2336 if (transportInterruptionProcessingComplete.decrementAndGet() == 0) { 2337 signalInterruptionProcessingComplete(); 2338 } 2339 } 2340 2341 private void signalInterruptionProcessingComplete() { 2342 if (LOG.isDebugEnabled()) { 2343 LOG.debug("transportInterruptionProcessingComplete: " + transportInterruptionProcessingComplete.get() 2344 + " for:" + this.getConnectionInfo().getConnectionId()); 2345 } 2346 2347 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 2348 if (failoverTransport != null) { 2349 failoverTransport.connectionInterruptProcessingComplete(this.getConnectionInfo().getConnectionId()); 2350 if (LOG.isDebugEnabled()) { 2351 LOG.debug("notified failover transport (" + failoverTransport 2352 + ") of interruption completion for: " + this.getConnectionInfo().getConnectionId()); 2353 } 2354 } 2355 transportInterruptionProcessingComplete.set(0); 2356 } 2357 2358 private void signalInterruptionProcessingNeeded() { 2359 FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class); 2360 if (failoverTransport != null) { 2361 failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId()); 2362 if (LOG.isDebugEnabled()) { 2363 LOG.debug("notified failover transport (" + failoverTransport 2364 + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId()); 2365 } 2366 } 2367 } 2368 2369 /* 2370 * specify the amount of time in milliseconds that a consumer with a transaction pending recovery 2371 * will wait to receive re dispatched messages. 2372 * default value is 0 so there is no wait by default. 2373 */ 2374 public void setConsumerFailoverRedeliveryWaitPeriod(long consumerFailoverRedeliveryWaitPeriod) { 2375 this.consumerFailoverRedeliveryWaitPeriod = consumerFailoverRedeliveryWaitPeriod; 2376 } 2377 2378 public long getConsumerFailoverRedeliveryWaitPeriod() { 2379 return consumerFailoverRedeliveryWaitPeriod; 2380 } 2381 2382 protected Scheduler getScheduler() throws JMSException { 2383 Scheduler result = scheduler; 2384 if (result == null) { 2385 if (isClosing() || isClosed()) { 2386 // without lock contention report the closing state 2387 throw new ConnectionClosedException(); 2388 } 2389 synchronized (this) { 2390 result = scheduler; 2391 if (result == null) { 2392 checkClosed(); 2393 try { 2394 result = new Scheduler("ActiveMQConnection["+info.getConnectionId().getValue()+"] Scheduler"); 2395 result.start(); 2396 scheduler = result; 2397 } catch(Exception e) { 2398 throw JMSExceptionSupport.create(e); 2399 } 2400 } 2401 } 2402 } 2403 return result; 2404 } 2405 2406 protected ThreadPoolExecutor getExecutor() { 2407 return this.executor; 2408 } 2409 2410 protected CopyOnWriteArrayList<ActiveMQSession> getSessions() { 2411 return sessions; 2412 } 2413 2414 /** 2415 * @return the checkForDuplicates 2416 */ 2417 public boolean isCheckForDuplicates() { 2418 return this.checkForDuplicates; 2419 } 2420 2421 /** 2422 * @param checkForDuplicates the checkForDuplicates to set 2423 */ 2424 public void setCheckForDuplicates(boolean checkForDuplicates) { 2425 this.checkForDuplicates = checkForDuplicates; 2426 } 2427 2428 public boolean isTransactedIndividualAck() { 2429 return transactedIndividualAck; 2430 } 2431 2432 public void setTransactedIndividualAck(boolean transactedIndividualAck) { 2433 this.transactedIndividualAck = transactedIndividualAck; 2434 } 2435 2436 public boolean isNonBlockingRedelivery() { 2437 return nonBlockingRedelivery; 2438 } 2439 2440 public void setNonBlockingRedelivery(boolean nonBlockingRedelivery) { 2441 this.nonBlockingRedelivery = nonBlockingRedelivery; 2442 } 2443 2444 public boolean isRmIdFromConnectionId() { 2445 return rmIdFromConnectionId; 2446 } 2447 2448 public void setRmIdFromConnectionId(boolean rmIdFromConnectionId) { 2449 this.rmIdFromConnectionId = rmIdFromConnectionId; 2450 } 2451 2452 /** 2453 * Removes any TempDestinations that this connection has cached, ignoring 2454 * any exceptions generated because the destination is in use as they should 2455 * not be removed. 2456 * Used from a pooled connection, b/c it will not be explicitly closed. 2457 */ 2458 public void cleanUpTempDestinations() { 2459 2460 if (this.activeTempDestinations == null || this.activeTempDestinations.isEmpty()) { 2461 return; 2462 } 2463 2464 Iterator<ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination>> entries 2465 = this.activeTempDestinations.entrySet().iterator(); 2466 while(entries.hasNext()) { 2467 ConcurrentMap.Entry<ActiveMQTempDestination, ActiveMQTempDestination> entry = entries.next(); 2468 try { 2469 // Only delete this temp destination if it was created from this connection. The connection used 2470 // for the advisory consumer may also have a reference to this temp destination. 2471 ActiveMQTempDestination dest = entry.getValue(); 2472 String thisConnectionId = (info.getConnectionId() == null) ? "" : info.getConnectionId().toString(); 2473 if (dest.getConnectionId() != null && dest.getConnectionId().equals(thisConnectionId)) { 2474 this.deleteTempDestination(entry.getValue()); 2475 } 2476 } catch (Exception ex) { 2477 // the temp dest is in use so it can not be deleted. 2478 // it is ok to leave it to connection tear down phase 2479 } 2480 } 2481 } 2482 2483 /** 2484 * Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back. 2485 * @param redeliveryPolicyMap the redeliveryPolicyMap to set 2486 */ 2487 public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) { 2488 this.redeliveryPolicyMap = redeliveryPolicyMap; 2489 } 2490 2491 /** 2492 * Gets the Connection's configured RedeliveryPolicyMap which will be used by all the 2493 * Consumers when dealing with transaction messages that have been rolled back. 2494 * 2495 * @return the redeliveryPolicyMap 2496 */ 2497 public RedeliveryPolicyMap getRedeliveryPolicyMap() { 2498 return redeliveryPolicyMap; 2499 } 2500 2501 public int getMaxThreadPoolSize() { 2502 return maxThreadPoolSize; 2503 } 2504 2505 public void setMaxThreadPoolSize(int maxThreadPoolSize) { 2506 this.maxThreadPoolSize = maxThreadPoolSize; 2507 } 2508 2509 /** 2510 * Enable enforcement of QueueConnection semantics. 2511 * 2512 * @return this object, useful for chaining 2513 */ 2514 ActiveMQConnection enforceQueueOnlyConnection() { 2515 this.queueOnlyConnection = true; 2516 return this; 2517 } 2518 2519 public RejectedExecutionHandler getRejectedTaskHandler() { 2520 return rejectedTaskHandler; 2521 } 2522 2523 public void setRejectedTaskHandler(RejectedExecutionHandler rejectedTaskHandler) { 2524 this.rejectedTaskHandler = rejectedTaskHandler; 2525 } 2526 2527 /** 2528 * Gets the configured time interval that is used to force all MessageConsumers that have optimizedAcknowledge enabled 2529 * to send an ack for any outstanding Message Acks. By default this value is set to zero meaning that the consumers 2530 * will not do any background Message acknowledgment. 2531 * 2532 * @return the scheduledOptimizedAckInterval 2533 */ 2534 public long getOptimizedAckScheduledAckInterval() { 2535 return optimizedAckScheduledAckInterval; 2536 } 2537 2538 /** 2539 * Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that 2540 * have been configured with optimizeAcknowledge enabled. 2541 * 2542 * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set 2543 */ 2544 public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) { 2545 this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval; 2546 } 2547 2548 /** 2549 * @return true if MessageConsumer instance will check for expired messages before dispatch. 2550 */ 2551 public boolean isConsumerExpiryCheckEnabled() { 2552 return consumerExpiryCheckEnabled; 2553 } 2554 2555 /** 2556 * Controls whether message expiration checking is done in each MessageConsumer 2557 * prior to dispatching a message. Disabling this check can lead to consumption 2558 * of expired messages. 2559 * 2560 * @param consumerExpiryCheckEnabled 2561 * controls whether expiration checking is done prior to dispatch. 2562 */ 2563 public void setConsumerExpiryCheckEnabled(boolean consumerExpiryCheckEnabled) { 2564 this.consumerExpiryCheckEnabled = consumerExpiryCheckEnabled; 2565 } 2566 2567 public List<String> getTrustedPackages() { 2568 return trustedPackages; 2569 } 2570 2571 public void setTrustedPackages(List<String> trustedPackages) { 2572 this.trustedPackages = trustedPackages; 2573 } 2574 2575 public boolean isTrustAllPackages() { 2576 return trustAllPackages; 2577 } 2578 2579 public void setTrustAllPackages(boolean trustAllPackages) { 2580 this.trustAllPackages = trustAllPackages; 2581 } 2582 2583 public int getConnectResponseTimeout() { 2584 return connectResponseTimeout; 2585 } 2586 2587 public void setConnectResponseTimeout(int connectResponseTimeout) { 2588 this.connectResponseTimeout = connectResponseTimeout; 2589 } 2590}