001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.EOFException; 020import java.io.IOException; 021import java.net.SocketException; 022import java.net.URI; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.Iterator; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Properties; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.CountDownLatch; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.atomic.AtomicBoolean; 035import java.util.concurrent.atomic.AtomicInteger; 036import java.util.concurrent.atomic.AtomicReference; 037import java.util.concurrent.locks.ReentrantReadWriteLock; 038 039import javax.transaction.xa.XAResource; 040 041import org.apache.activemq.advisory.AdvisorySupport; 042import org.apache.activemq.broker.region.ConnectionStatistics; 043import org.apache.activemq.broker.region.RegionBroker; 044import org.apache.activemq.command.ActiveMQDestination; 045import org.apache.activemq.command.BrokerInfo; 046import org.apache.activemq.command.Command; 047import org.apache.activemq.command.CommandTypes; 048import org.apache.activemq.command.ConnectionControl; 049import org.apache.activemq.command.ConnectionError; 050import org.apache.activemq.command.ConnectionId; 051import org.apache.activemq.command.ConnectionInfo; 052import org.apache.activemq.command.ConsumerControl; 053import org.apache.activemq.command.ConsumerId; 054import org.apache.activemq.command.ConsumerInfo; 055import org.apache.activemq.command.ControlCommand; 056import org.apache.activemq.command.DataArrayResponse; 057import org.apache.activemq.command.DestinationInfo; 058import org.apache.activemq.command.ExceptionResponse; 059import org.apache.activemq.command.FlushCommand; 060import org.apache.activemq.command.IntegerResponse; 061import org.apache.activemq.command.KeepAliveInfo; 062import org.apache.activemq.command.Message; 063import org.apache.activemq.command.MessageAck; 064import org.apache.activemq.command.MessageDispatch; 065import org.apache.activemq.command.MessageDispatchNotification; 066import org.apache.activemq.command.MessagePull; 067import org.apache.activemq.command.ProducerAck; 068import org.apache.activemq.command.ProducerId; 069import org.apache.activemq.command.ProducerInfo; 070import org.apache.activemq.command.RemoveInfo; 071import org.apache.activemq.command.RemoveSubscriptionInfo; 072import org.apache.activemq.command.Response; 073import org.apache.activemq.command.SessionId; 074import org.apache.activemq.command.SessionInfo; 075import org.apache.activemq.command.ShutdownInfo; 076import org.apache.activemq.command.TransactionId; 077import org.apache.activemq.command.TransactionInfo; 078import org.apache.activemq.command.WireFormatInfo; 079import org.apache.activemq.network.DemandForwardingBridge; 080import org.apache.activemq.network.MBeanNetworkListener; 081import org.apache.activemq.network.NetworkBridgeConfiguration; 082import org.apache.activemq.network.NetworkBridgeFactory; 083import org.apache.activemq.security.MessageAuthorizationPolicy; 084import org.apache.activemq.state.CommandVisitor; 085import org.apache.activemq.state.ConnectionState; 086import org.apache.activemq.state.ConsumerState; 087import org.apache.activemq.state.ProducerState; 088import org.apache.activemq.state.SessionState; 089import org.apache.activemq.state.TransactionState; 090import org.apache.activemq.thread.Task; 091import org.apache.activemq.thread.TaskRunner; 092import org.apache.activemq.thread.TaskRunnerFactory; 093import org.apache.activemq.transaction.Transaction; 094import org.apache.activemq.transport.DefaultTransportListener; 095import org.apache.activemq.transport.ResponseCorrelator; 096import org.apache.activemq.transport.TransmitCallback; 097import org.apache.activemq.transport.Transport; 098import org.apache.activemq.transport.TransportDisposedIOException; 099import org.apache.activemq.util.IntrospectionSupport; 100import org.apache.activemq.util.MarshallingSupport; 101import org.slf4j.Logger; 102import org.slf4j.LoggerFactory; 103import org.slf4j.MDC; 104 105public class TransportConnection implements Connection, Task, CommandVisitor { 106 private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class); 107 private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport"); 108 private static final Logger SERVICELOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Service"); 109 // Keeps track of the broker and connector that created this connection. 110 protected final Broker broker; 111 protected final BrokerService brokerService; 112 protected final TransportConnector connector; 113 // Keeps track of the state of the connections. 114 // protected final ConcurrentHashMap localConnectionStates=new 115 // ConcurrentHashMap(); 116 protected final Map<ConnectionId, ConnectionState> brokerConnectionStates; 117 // The broker and wireformat info that was exchanged. 118 protected BrokerInfo brokerInfo; 119 protected final List<Command> dispatchQueue = new LinkedList<Command>(); 120 protected TaskRunner taskRunner; 121 protected final AtomicReference<Throwable> transportException = new AtomicReference<Throwable>(); 122 protected AtomicBoolean dispatchStopped = new AtomicBoolean(false); 123 private final Transport transport; 124 private MessageAuthorizationPolicy messageAuthorizationPolicy; 125 private WireFormatInfo wireFormatInfo; 126 // Used to do async dispatch.. this should perhaps be pushed down into the 127 // transport layer.. 128 private boolean inServiceException; 129 private final ConnectionStatistics statistics = new ConnectionStatistics(); 130 private boolean manageable; 131 private boolean slow; 132 private boolean markedCandidate; 133 private boolean blockedCandidate; 134 private boolean blocked; 135 private boolean connected; 136 private boolean active; 137 private final AtomicBoolean starting = new AtomicBoolean(); 138 private final AtomicBoolean pendingStop = new AtomicBoolean(); 139 private long timeStamp; 140 private final AtomicBoolean stopping = new AtomicBoolean(false); 141 private final CountDownLatch stopped = new CountDownLatch(1); 142 private final AtomicBoolean asyncException = new AtomicBoolean(false); 143 private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>(); 144 private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>(); 145 private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1); 146 private ConnectionContext context; 147 private boolean networkConnection; 148 private boolean faultTolerantConnection; 149 private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION); 150 private DemandForwardingBridge duplexBridge; 151 private final TaskRunnerFactory taskRunnerFactory; 152 private final TaskRunnerFactory stopTaskRunnerFactory; 153 private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister(); 154 private final ReentrantReadWriteLock serviceLock = new ReentrantReadWriteLock(); 155 private String duplexNetworkConnectorId; 156 157 /** 158 * @param taskRunnerFactory - can be null if you want direct dispatch to the transport 159 * else commands are sent async. 160 * @param stopTaskRunnerFactory - can <b>not</b> be null, used for stopping this connection. 161 */ 162 public TransportConnection(TransportConnector connector, final Transport transport, Broker broker, 163 TaskRunnerFactory taskRunnerFactory, TaskRunnerFactory stopTaskRunnerFactory) { 164 this.connector = connector; 165 this.broker = broker; 166 this.brokerService = broker.getBrokerService(); 167 168 RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class); 169 brokerConnectionStates = rb.getConnectionStates(); 170 if (connector != null) { 171 this.statistics.setParent(connector.getStatistics()); 172 this.messageAuthorizationPolicy = connector.getMessageAuthorizationPolicy(); 173 } 174 this.taskRunnerFactory = taskRunnerFactory; 175 this.stopTaskRunnerFactory = stopTaskRunnerFactory; 176 this.transport = transport; 177 if( this.transport instanceof BrokerServiceAware ) { 178 ((BrokerServiceAware)this.transport).setBrokerService(brokerService); 179 } 180 this.transport.setTransportListener(new DefaultTransportListener() { 181 @Override 182 public void onCommand(Object o) { 183 serviceLock.readLock().lock(); 184 try { 185 if (!(o instanceof Command)) { 186 throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString()); 187 } 188 Command command = (Command) o; 189 if (!brokerService.isStopping()) { 190 Response response = service(command); 191 if (response != null && !brokerService.isStopping()) { 192 dispatchSync(response); 193 } 194 } else { 195 throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); 196 } 197 } finally { 198 serviceLock.readLock().unlock(); 199 } 200 } 201 202 @Override 203 public void onException(IOException exception) { 204 serviceLock.readLock().lock(); 205 try { 206 serviceTransportException(exception); 207 } finally { 208 serviceLock.readLock().unlock(); 209 } 210 } 211 }); 212 connected = true; 213 } 214 215 /** 216 * Returns the number of messages to be dispatched to this connection 217 * 218 * @return size of dispatch queue 219 */ 220 @Override 221 public int getDispatchQueueSize() { 222 synchronized (dispatchQueue) { 223 return dispatchQueue.size(); 224 } 225 } 226 227 public void serviceTransportException(IOException e) { 228 if (!stopping.get() && !pendingStop.get()) { 229 transportException.set(e); 230 if (TRANSPORTLOG.isDebugEnabled()) { 231 TRANSPORTLOG.debug(this + " failed: " + e, e); 232 } else if (TRANSPORTLOG.isWarnEnabled() && !suppressed(e)) { 233 TRANSPORTLOG.warn(this + " failed: " + e); 234 } 235 stopAsync(e); 236 } 237 } 238 239 private boolean suppressed(IOException e) { 240 return !connector.isWarnOnRemoteClose() && ((e instanceof SocketException && e.getMessage().indexOf("reset") != -1) || e instanceof EOFException); 241 } 242 243 /** 244 * Calls the serviceException method in an async thread. Since handling a 245 * service exception closes a socket, we should not tie up broker threads 246 * since client sockets may hang or cause deadlocks. 247 */ 248 @Override 249 public void serviceExceptionAsync(final IOException e) { 250 if (asyncException.compareAndSet(false, true)) { 251 new Thread("Async Exception Handler") { 252 @Override 253 public void run() { 254 serviceException(e); 255 } 256 }.start(); 257 } 258 } 259 260 /** 261 * Closes a clients connection due to a detected error. Errors are ignored 262 * if: the client is closing or broker is closing. Otherwise, the connection 263 * error transmitted to the client before stopping it's transport. 264 */ 265 @Override 266 public void serviceException(Throwable e) { 267 // are we a transport exception such as not being able to dispatch 268 // synchronously to a transport 269 if (e instanceof IOException) { 270 serviceTransportException((IOException) e); 271 } else if (e.getClass() == BrokerStoppedException.class) { 272 // Handle the case where the broker is stopped 273 // But the client is still connected. 274 if (!stopping.get()) { 275 SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection."); 276 ConnectionError ce = new ConnectionError(); 277 ce.setException(e); 278 dispatchSync(ce); 279 // Record the error that caused the transport to stop 280 transportException.set(e); 281 // Wait a little bit to try to get the output buffer to flush 282 // the exception notification to the client. 283 try { 284 Thread.sleep(500); 285 } catch (InterruptedException ie) { 286 Thread.currentThread().interrupt(); 287 } 288 // Worst case is we just kill the connection before the 289 // notification gets to him. 290 stopAsync(); 291 } 292 } else if (!stopping.get() && !inServiceException) { 293 inServiceException = true; 294 try { 295 if (SERVICELOG.isDebugEnabled()) { 296 SERVICELOG.debug("Async error occurred: " + e, e); 297 } else { 298 SERVICELOG.warn("Async error occurred: " + e); 299 } 300 ConnectionError ce = new ConnectionError(); 301 ce.setException(e); 302 if (pendingStop.get()) { 303 dispatchSync(ce); 304 } else { 305 dispatchAsync(ce); 306 } 307 } finally { 308 inServiceException = false; 309 } 310 } 311 } 312 313 @Override 314 public Response service(Command command) { 315 MDC.put("activemq.connector", connector.getUri().toString()); 316 Response response = null; 317 boolean responseRequired = command.isResponseRequired(); 318 int commandId = command.getCommandId(); 319 try { 320 if (!pendingStop.get()) { 321 response = command.visit(this); 322 } else { 323 response = new ExceptionResponse(transportException.get()); 324 } 325 } catch (Throwable e) { 326 if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) { 327 SERVICELOG.debug("Error occured while processing " + (responseRequired ? "sync" : "async") 328 + " command: " + command + ", exception: " + e, e); 329 } 330 331 if (e instanceof SuppressReplyException || (e.getCause() instanceof SuppressReplyException)) { 332 LOG.info("Suppressing reply to: " + command + " on: " + e + ", cause: " + e.getCause()); 333 responseRequired = false; 334 } 335 336 if (responseRequired) { 337 if (e instanceof SecurityException || e.getCause() instanceof SecurityException) { 338 SERVICELOG.warn("Security Error occurred on connection to: {}, {}", 339 transport.getRemoteAddress(), e.getMessage()); 340 } 341 response = new ExceptionResponse(e); 342 } else { 343 forceRollbackOnlyOnFailedAsyncTransactionOp(e, command); 344 serviceException(e); 345 } 346 } 347 if (responseRequired) { 348 if (response == null) { 349 response = new Response(); 350 } 351 response.setCorrelationId(commandId); 352 } 353 // The context may have been flagged so that the response is not 354 // sent. 355 if (context != null) { 356 if (context.isDontSendReponse()) { 357 context.setDontSendReponse(false); 358 response = null; 359 } 360 context = null; 361 } 362 MDC.remove("activemq.connector"); 363 return response; 364 } 365 366 private void forceRollbackOnlyOnFailedAsyncTransactionOp(Throwable e, Command command) { 367 if (brokerService.isRollbackOnlyOnAsyncException() && !(e instanceof IOException) && isInTransaction(command)) { 368 Transaction transaction = getActiveTransaction(command); 369 if (transaction != null && !transaction.isRollbackOnly()) { 370 LOG.debug("on async exception, force rollback of transaction for: " + command, e); 371 transaction.setRollbackOnly(e); 372 } 373 } 374 } 375 376 private Transaction getActiveTransaction(Command command) { 377 Transaction transaction = null; 378 try { 379 if (command instanceof Message) { 380 Message messageSend = (Message) command; 381 ProducerId producerId = messageSend.getProducerId(); 382 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 383 transaction = producerExchange.getConnectionContext().getTransactions().get(messageSend.getTransactionId()); 384 } else if (command instanceof MessageAck) { 385 MessageAck messageAck = (MessageAck) command; 386 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(messageAck.getConsumerId()); 387 if (consumerExchange != null) { 388 transaction = consumerExchange.getConnectionContext().getTransactions().get(messageAck.getTransactionId()); 389 } 390 } 391 } catch(Exception ignored){ 392 LOG.trace("failed to find active transaction for command: " + command, ignored); 393 } 394 return transaction; 395 } 396 397 private boolean isInTransaction(Command command) { 398 return command instanceof Message && ((Message)command).isInTransaction() 399 || command instanceof MessageAck && ((MessageAck)command).isInTransaction(); 400 } 401 402 @Override 403 public Response processKeepAlive(KeepAliveInfo info) throws Exception { 404 return null; 405 } 406 407 @Override 408 public Response processRemoveSubscription(RemoveSubscriptionInfo info) throws Exception { 409 broker.removeSubscription(lookupConnectionState(info.getConnectionId()).getContext(), info); 410 return null; 411 } 412 413 @Override 414 public Response processWireFormat(WireFormatInfo info) throws Exception { 415 wireFormatInfo = info; 416 protocolVersion.set(info.getVersion()); 417 return null; 418 } 419 420 @Override 421 public Response processShutdown(ShutdownInfo info) throws Exception { 422 stopAsync(); 423 return null; 424 } 425 426 @Override 427 public Response processFlush(FlushCommand command) throws Exception { 428 return null; 429 } 430 431 @Override 432 public Response processBeginTransaction(TransactionInfo info) throws Exception { 433 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 434 context = null; 435 if (cs != null) { 436 context = cs.getContext(); 437 } 438 if (cs == null) { 439 throw new NullPointerException("Context is null"); 440 } 441 // Avoid replaying dup commands 442 if (cs.getTransactionState(info.getTransactionId()) == null) { 443 cs.addTransactionState(info.getTransactionId()); 444 broker.beginTransaction(context, info.getTransactionId()); 445 } 446 return null; 447 } 448 449 @Override 450 public int getActiveTransactionCount() { 451 int rc = 0; 452 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 453 Collection<TransactionState> transactions = cs.getTransactionStates(); 454 for (TransactionState transaction : transactions) { 455 rc++; 456 } 457 } 458 return rc; 459 } 460 461 @Override 462 public Long getOldestActiveTransactionDuration() { 463 TransactionState oldestTX = null; 464 for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) { 465 Collection<TransactionState> transactions = cs.getTransactionStates(); 466 for (TransactionState transaction : transactions) { 467 if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) { 468 oldestTX = transaction; 469 } 470 } 471 } 472 if( oldestTX == null ) { 473 return null; 474 } 475 return System.currentTimeMillis() - oldestTX.getCreatedAt(); 476 } 477 478 @Override 479 public Response processEndTransaction(TransactionInfo info) throws Exception { 480 // No need to do anything. This packet is just sent by the client 481 // make sure he is synced with the server as commit command could 482 // come from a different connection. 483 return null; 484 } 485 486 @Override 487 public Response processPrepareTransaction(TransactionInfo info) throws Exception { 488 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 489 context = null; 490 if (cs != null) { 491 context = cs.getContext(); 492 } 493 if (cs == null) { 494 throw new NullPointerException("Context is null"); 495 } 496 TransactionState transactionState = cs.getTransactionState(info.getTransactionId()); 497 if (transactionState == null) { 498 throw new IllegalStateException("Cannot prepare a transaction that had not been started or previously returned XA_RDONLY: " 499 + info.getTransactionId()); 500 } 501 // Avoid dups. 502 if (!transactionState.isPrepared()) { 503 transactionState.setPrepared(true); 504 int result = broker.prepareTransaction(context, info.getTransactionId()); 505 transactionState.setPreparedResult(result); 506 if (result == XAResource.XA_RDONLY) { 507 // we are done, no further rollback or commit from TM 508 cs.removeTransactionState(info.getTransactionId()); 509 } 510 IntegerResponse response = new IntegerResponse(result); 511 return response; 512 } else { 513 IntegerResponse response = new IntegerResponse(transactionState.getPreparedResult()); 514 return response; 515 } 516 } 517 518 @Override 519 public Response processCommitTransactionOnePhase(TransactionInfo info) throws Exception { 520 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 521 context = cs.getContext(); 522 cs.removeTransactionState(info.getTransactionId()); 523 broker.commitTransaction(context, info.getTransactionId(), true); 524 return null; 525 } 526 527 @Override 528 public Response processCommitTransactionTwoPhase(TransactionInfo info) throws Exception { 529 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 530 context = cs.getContext(); 531 cs.removeTransactionState(info.getTransactionId()); 532 broker.commitTransaction(context, info.getTransactionId(), false); 533 return null; 534 } 535 536 @Override 537 public Response processRollbackTransaction(TransactionInfo info) throws Exception { 538 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 539 context = cs.getContext(); 540 cs.removeTransactionState(info.getTransactionId()); 541 broker.rollbackTransaction(context, info.getTransactionId()); 542 return null; 543 } 544 545 @Override 546 public Response processForgetTransaction(TransactionInfo info) throws Exception { 547 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 548 context = cs.getContext(); 549 broker.forgetTransaction(context, info.getTransactionId()); 550 return null; 551 } 552 553 @Override 554 public Response processRecoverTransactions(TransactionInfo info) throws Exception { 555 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 556 context = cs.getContext(); 557 TransactionId[] preparedTransactions = broker.getPreparedTransactions(context); 558 return new DataArrayResponse(preparedTransactions); 559 } 560 561 @Override 562 public Response processMessage(Message messageSend) throws Exception { 563 ProducerId producerId = messageSend.getProducerId(); 564 ProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId); 565 if (producerExchange.canDispatch(messageSend)) { 566 broker.send(producerExchange, messageSend); 567 } 568 return null; 569 } 570 571 @Override 572 public Response processMessageAck(MessageAck ack) throws Exception { 573 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(ack.getConsumerId()); 574 if (consumerExchange != null) { 575 broker.acknowledge(consumerExchange, ack); 576 } else if (ack.isInTransaction()) { 577 LOG.warn("no matching consumer, ignoring ack {}", consumerExchange, ack); 578 } 579 return null; 580 } 581 582 @Override 583 public Response processMessagePull(MessagePull pull) throws Exception { 584 return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull); 585 } 586 587 @Override 588 public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception { 589 broker.processDispatchNotification(notification); 590 return null; 591 } 592 593 @Override 594 public Response processAddDestination(DestinationInfo info) throws Exception { 595 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 596 broker.addDestinationInfo(cs.getContext(), info); 597 if (info.getDestination().isTemporary()) { 598 cs.addTempDestination(info); 599 } 600 return null; 601 } 602 603 @Override 604 public Response processRemoveDestination(DestinationInfo info) throws Exception { 605 TransportConnectionState cs = lookupConnectionState(info.getConnectionId()); 606 broker.removeDestinationInfo(cs.getContext(), info); 607 if (info.getDestination().isTemporary()) { 608 cs.removeTempDestination(info.getDestination()); 609 } 610 return null; 611 } 612 613 @Override 614 public Response processAddProducer(ProducerInfo info) throws Exception { 615 SessionId sessionId = info.getProducerId().getParentId(); 616 ConnectionId connectionId = sessionId.getParentId(); 617 TransportConnectionState cs = lookupConnectionState(connectionId); 618 if (cs == null) { 619 throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " 620 + connectionId); 621 } 622 SessionState ss = cs.getSessionState(sessionId); 623 if (ss == null) { 624 throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " 625 + sessionId); 626 } 627 // Avoid replaying dup commands 628 if (!ss.getProducerIds().contains(info.getProducerId())) { 629 ActiveMQDestination destination = info.getDestination(); 630 // Do not check for null here as it would cause the count of max producers to exclude 631 // anonymous producers. The isAdvisoryTopic method checks for null so it is safe to 632 // call it from here with a null Destination value. 633 if (!AdvisorySupport.isAdvisoryTopic(destination)) { 634 if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){ 635 throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection()); 636 } 637 } 638 broker.addProducer(cs.getContext(), info); 639 try { 640 ss.addProducer(info); 641 } catch (IllegalStateException e) { 642 broker.removeProducer(cs.getContext(), info); 643 } 644 645 } 646 return null; 647 } 648 649 @Override 650 public Response processRemoveProducer(ProducerId id) throws Exception { 651 SessionId sessionId = id.getParentId(); 652 ConnectionId connectionId = sessionId.getParentId(); 653 TransportConnectionState cs = lookupConnectionState(connectionId); 654 SessionState ss = cs.getSessionState(sessionId); 655 if (ss == null) { 656 throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " 657 + sessionId); 658 } 659 ProducerState ps = ss.removeProducer(id); 660 if (ps == null) { 661 throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id); 662 } 663 removeProducerBrokerExchange(id); 664 broker.removeProducer(cs.getContext(), ps.getInfo()); 665 return null; 666 } 667 668 @Override 669 public Response processAddConsumer(ConsumerInfo info) throws Exception { 670 SessionId sessionId = info.getConsumerId().getParentId(); 671 ConnectionId connectionId = sessionId.getParentId(); 672 TransportConnectionState cs = lookupConnectionState(connectionId); 673 if (cs == null) { 674 throw new IllegalStateException("Cannot add a consumer to a connection that had not been registered: " 675 + connectionId); 676 } 677 SessionState ss = cs.getSessionState(sessionId); 678 if (ss == null) { 679 throw new IllegalStateException(broker.getBrokerName() 680 + " Cannot add a consumer to a session that had not been registered: " + sessionId); 681 } 682 // Avoid replaying dup commands 683 if (!ss.getConsumerIds().contains(info.getConsumerId())) { 684 ActiveMQDestination destination = info.getDestination(); 685 if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) { 686 if (getConsumerCount(connectionId) >= connector.getMaximumConsumersAllowedPerConnection()){ 687 throw new IllegalStateException("Can't add consumer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumConsumersAllowedPerConnection()); 688 } 689 } 690 691 broker.addConsumer(cs.getContext(), info); 692 try { 693 ss.addConsumer(info); 694 addConsumerBrokerExchange(cs, info.getConsumerId()); 695 } catch (IllegalStateException e) { 696 broker.removeConsumer(cs.getContext(), info); 697 } 698 699 } 700 return null; 701 } 702 703 @Override 704 public Response processRemoveConsumer(ConsumerId id, long lastDeliveredSequenceId) throws Exception { 705 SessionId sessionId = id.getParentId(); 706 ConnectionId connectionId = sessionId.getParentId(); 707 TransportConnectionState cs = lookupConnectionState(connectionId); 708 if (cs == null) { 709 throw new IllegalStateException("Cannot remove a consumer from a connection that had not been registered: " 710 + connectionId); 711 } 712 SessionState ss = cs.getSessionState(sessionId); 713 if (ss == null) { 714 throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " 715 + sessionId); 716 } 717 ConsumerState consumerState = ss.removeConsumer(id); 718 if (consumerState == null) { 719 throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id); 720 } 721 ConsumerInfo info = consumerState.getInfo(); 722 info.setLastDeliveredSequenceId(lastDeliveredSequenceId); 723 broker.removeConsumer(cs.getContext(), consumerState.getInfo()); 724 removeConsumerBrokerExchange(id); 725 return null; 726 } 727 728 @Override 729 public Response processAddSession(SessionInfo info) throws Exception { 730 ConnectionId connectionId = info.getSessionId().getParentId(); 731 TransportConnectionState cs = lookupConnectionState(connectionId); 732 // Avoid replaying dup commands 733 if (cs != null && !cs.getSessionIds().contains(info.getSessionId())) { 734 broker.addSession(cs.getContext(), info); 735 try { 736 cs.addSession(info); 737 } catch (IllegalStateException e) { 738 e.printStackTrace(); 739 broker.removeSession(cs.getContext(), info); 740 } 741 } 742 return null; 743 } 744 745 @Override 746 public Response processRemoveSession(SessionId id, long lastDeliveredSequenceId) throws Exception { 747 ConnectionId connectionId = id.getParentId(); 748 TransportConnectionState cs = lookupConnectionState(connectionId); 749 if (cs == null) { 750 throw new IllegalStateException("Cannot remove session from connection that had not been registered: " + connectionId); 751 } 752 SessionState session = cs.getSessionState(id); 753 if (session == null) { 754 throw new IllegalStateException("Cannot remove session that had not been registered: " + id); 755 } 756 // Don't let new consumers or producers get added while we are closing 757 // this down. 758 session.shutdown(); 759 // Cascade the connection stop to the consumers and producers. 760 for (ConsumerId consumerId : session.getConsumerIds()) { 761 try { 762 processRemoveConsumer(consumerId, lastDeliveredSequenceId); 763 } catch (Throwable e) { 764 LOG.warn("Failed to remove consumer: {}", consumerId, e); 765 } 766 } 767 for (ProducerId producerId : session.getProducerIds()) { 768 try { 769 processRemoveProducer(producerId); 770 } catch (Throwable e) { 771 LOG.warn("Failed to remove producer: {}", producerId, e); 772 } 773 } 774 cs.removeSession(id); 775 broker.removeSession(cs.getContext(), session.getInfo()); 776 return null; 777 } 778 779 @Override 780 public Response processAddConnection(ConnectionInfo info) throws Exception { 781 // Older clients should have been defaulting this field to true.. but 782 // they were not. 783 if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) { 784 info.setClientMaster(true); 785 } 786 TransportConnectionState state; 787 // Make sure 2 concurrent connections by the same ID only generate 1 788 // TransportConnectionState object. 789 synchronized (brokerConnectionStates) { 790 state = (TransportConnectionState) brokerConnectionStates.get(info.getConnectionId()); 791 if (state == null) { 792 state = new TransportConnectionState(info, this); 793 brokerConnectionStates.put(info.getConnectionId(), state); 794 } 795 state.incrementReference(); 796 } 797 // If there are 2 concurrent connections for the same connection id, 798 // then last one in wins, we need to sync here 799 // to figure out the winner. 800 synchronized (state.getConnectionMutex()) { 801 if (state.getConnection() != this) { 802 LOG.debug("Killing previous stale connection: {}", state.getConnection().getRemoteAddress()); 803 state.getConnection().stop(); 804 LOG.debug("Connection {} taking over previous connection: {}", getRemoteAddress(), state.getConnection().getRemoteAddress()); 805 state.setConnection(this); 806 state.reset(info); 807 } 808 } 809 registerConnectionState(info.getConnectionId(), state); 810 LOG.debug("Setting up new connection id: {}, address: {}, info: {}", new Object[]{ info.getConnectionId(), getRemoteAddress(), info }); 811 this.faultTolerantConnection = info.isFaultTolerant(); 812 // Setup the context. 813 String clientId = info.getClientId(); 814 context = new ConnectionContext(); 815 context.setBroker(broker); 816 context.setClientId(clientId); 817 context.setClientMaster(info.isClientMaster()); 818 context.setConnection(this); 819 context.setConnectionId(info.getConnectionId()); 820 context.setConnector(connector); 821 context.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 822 context.setNetworkConnection(networkConnection); 823 context.setFaultTolerant(faultTolerantConnection); 824 context.setTransactions(new ConcurrentHashMap<TransactionId, Transaction>()); 825 context.setUserName(info.getUserName()); 826 context.setWireFormatInfo(wireFormatInfo); 827 context.setReconnect(info.isFailoverReconnect()); 828 this.manageable = info.isManageable(); 829 context.setConnectionState(state); 830 state.setContext(context); 831 state.setConnection(this); 832 if (info.getClientIp() == null) { 833 info.setClientIp(getRemoteAddress()); 834 } 835 836 try { 837 broker.addConnection(context, info); 838 } catch (Exception e) { 839 synchronized (brokerConnectionStates) { 840 brokerConnectionStates.remove(info.getConnectionId()); 841 } 842 unregisterConnectionState(info.getConnectionId()); 843 LOG.warn("Failed to add Connection {} due to {}", info.getConnectionId(), e); 844 if (e instanceof SecurityException) { 845 // close this down - in case the peer of this transport doesn't play nice 846 delayedStop(2000, "Failed with SecurityException: " + e.getLocalizedMessage(), e); 847 } 848 throw e; 849 } 850 if (info.isManageable()) { 851 // send ConnectionCommand 852 ConnectionControl command = this.connector.getConnectionControl(); 853 command.setFaultTolerant(broker.isFaultTolerantConfiguration()); 854 if (info.isFailoverReconnect()) { 855 command.setRebalanceConnection(false); 856 } 857 dispatchAsync(command); 858 } 859 return null; 860 } 861 862 @Override 863 public synchronized Response processRemoveConnection(ConnectionId id, long lastDeliveredSequenceId) 864 throws InterruptedException { 865 LOG.debug("remove connection id: {}", id); 866 TransportConnectionState cs = lookupConnectionState(id); 867 if (cs != null) { 868 // Don't allow things to be added to the connection state while we 869 // are shutting down. 870 cs.shutdown(); 871 // Cascade the connection stop to the sessions. 872 for (SessionId sessionId : cs.getSessionIds()) { 873 try { 874 processRemoveSession(sessionId, lastDeliveredSequenceId); 875 } catch (Throwable e) { 876 SERVICELOG.warn("Failed to remove session {}", sessionId, e); 877 } 878 } 879 // Cascade the connection stop to temp destinations. 880 for (Iterator<DestinationInfo> iter = cs.getTempDestinations().iterator(); iter.hasNext(); ) { 881 DestinationInfo di = iter.next(); 882 try { 883 broker.removeDestination(cs.getContext(), di.getDestination(), 0); 884 } catch (Throwable e) { 885 SERVICELOG.warn("Failed to remove tmp destination {}", di.getDestination(), e); 886 } 887 iter.remove(); 888 } 889 try { 890 broker.removeConnection(cs.getContext(), cs.getInfo(), transportException.get()); 891 } catch (Throwable e) { 892 SERVICELOG.warn("Failed to remove connection {}", cs.getInfo(), e); 893 } 894 TransportConnectionState state = unregisterConnectionState(id); 895 if (state != null) { 896 synchronized (brokerConnectionStates) { 897 // If we are the last reference, we should remove the state 898 // from the broker. 899 if (state.decrementReference() == 0) { 900 brokerConnectionStates.remove(id); 901 } 902 } 903 } 904 } 905 return null; 906 } 907 908 @Override 909 public Response processProducerAck(ProducerAck ack) throws Exception { 910 // A broker should not get ProducerAck messages. 911 return null; 912 } 913 914 @Override 915 public Connector getConnector() { 916 return connector; 917 } 918 919 @Override 920 public void dispatchSync(Command message) { 921 try { 922 processDispatch(message); 923 } catch (IOException e) { 924 serviceExceptionAsync(e); 925 } 926 } 927 928 @Override 929 public void dispatchAsync(Command message) { 930 if (!stopping.get()) { 931 if (taskRunner == null) { 932 dispatchSync(message); 933 } else { 934 synchronized (dispatchQueue) { 935 dispatchQueue.add(message); 936 } 937 try { 938 taskRunner.wakeup(); 939 } catch (InterruptedException e) { 940 Thread.currentThread().interrupt(); 941 } 942 } 943 } else { 944 if (message.isMessageDispatch()) { 945 MessageDispatch md = (MessageDispatch) message; 946 TransmitCallback sub = md.getTransmitCallback(); 947 broker.postProcessDispatch(md); 948 if (sub != null) { 949 sub.onFailure(); 950 } 951 } 952 } 953 } 954 955 protected void processDispatch(Command command) throws IOException { 956 MessageDispatch messageDispatch = (MessageDispatch) (command.isMessageDispatch() ? command : null); 957 try { 958 if (!stopping.get()) { 959 if (messageDispatch != null) { 960 try { 961 broker.preProcessDispatch(messageDispatch); 962 } catch (RuntimeException convertToIO) { 963 throw new IOException(convertToIO); 964 } 965 } 966 dispatch(command); 967 } 968 } catch (IOException e) { 969 if (messageDispatch != null) { 970 TransmitCallback sub = messageDispatch.getTransmitCallback(); 971 broker.postProcessDispatch(messageDispatch); 972 if (sub != null) { 973 sub.onFailure(); 974 } 975 messageDispatch = null; 976 throw e; 977 } else { 978 if (TRANSPORTLOG.isDebugEnabled()) { 979 TRANSPORTLOG.debug("Unexpected exception on asyncDispatch, command of type: " + command.getDataStructureType(), e); 980 } 981 } 982 } finally { 983 if (messageDispatch != null) { 984 TransmitCallback sub = messageDispatch.getTransmitCallback(); 985 broker.postProcessDispatch(messageDispatch); 986 if (sub != null) { 987 sub.onSuccess(); 988 } 989 } 990 } 991 } 992 993 @Override 994 public boolean iterate() { 995 try { 996 if (pendingStop.get() || stopping.get()) { 997 if (dispatchStopped.compareAndSet(false, true)) { 998 if (transportException.get() == null) { 999 try { 1000 dispatch(new ShutdownInfo()); 1001 } catch (Throwable ignore) { 1002 } 1003 } 1004 dispatchStoppedLatch.countDown(); 1005 } 1006 return false; 1007 } 1008 if (!dispatchStopped.get()) { 1009 Command command = null; 1010 synchronized (dispatchQueue) { 1011 if (dispatchQueue.isEmpty()) { 1012 return false; 1013 } 1014 command = dispatchQueue.remove(0); 1015 } 1016 processDispatch(command); 1017 return true; 1018 } 1019 return false; 1020 } catch (IOException e) { 1021 if (dispatchStopped.compareAndSet(false, true)) { 1022 dispatchStoppedLatch.countDown(); 1023 } 1024 serviceExceptionAsync(e); 1025 return false; 1026 } 1027 } 1028 1029 /** 1030 * Returns the statistics for this connection 1031 */ 1032 @Override 1033 public ConnectionStatistics getStatistics() { 1034 return statistics; 1035 } 1036 1037 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 1038 return messageAuthorizationPolicy; 1039 } 1040 1041 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 1042 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 1043 } 1044 1045 @Override 1046 public boolean isManageable() { 1047 return manageable; 1048 } 1049 1050 @Override 1051 public void start() throws Exception { 1052 try { 1053 synchronized (this) { 1054 starting.set(true); 1055 if (taskRunnerFactory != null) { 1056 taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: " 1057 + getRemoteAddress()); 1058 } else { 1059 taskRunner = null; 1060 } 1061 transport.start(); 1062 active = true; 1063 BrokerInfo info = connector.getBrokerInfo().copy(); 1064 if (connector.isUpdateClusterClients()) { 1065 info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos()); 1066 } else { 1067 info.setPeerBrokerInfos(null); 1068 } 1069 dispatchAsync(info); 1070 1071 connector.onStarted(this); 1072 } 1073 } catch (Exception e) { 1074 // Force clean up on an error starting up. 1075 pendingStop.set(true); 1076 throw e; 1077 } finally { 1078 // stop() can be called from within the above block, 1079 // but we want to be sure start() completes before 1080 // stop() runs, so queue the stop until right now: 1081 setStarting(false); 1082 if (isPendingStop()) { 1083 LOG.debug("Calling the delayed stop() after start() {}", this); 1084 stop(); 1085 } 1086 } 1087 } 1088 1089 @Override 1090 public void stop() throws Exception { 1091 // do not stop task the task runner factories (taskRunnerFactory, stopTaskRunnerFactory) 1092 // as their lifecycle is handled elsewhere 1093 1094 stopAsync(); 1095 while (!stopped.await(5, TimeUnit.SECONDS)) { 1096 LOG.info("The connection to '{}' is taking a long time to shutdown.", transport.getRemoteAddress()); 1097 } 1098 } 1099 1100 public void delayedStop(final int waitTime, final String reason, Throwable cause) { 1101 if (waitTime > 0) { 1102 synchronized (this) { 1103 pendingStop.set(true); 1104 transportException.set(cause); 1105 } 1106 try { 1107 stopTaskRunnerFactory.execute(new Runnable() { 1108 @Override 1109 public void run() { 1110 try { 1111 Thread.sleep(waitTime); 1112 stopAsync(); 1113 LOG.info("Stopping {} because {}", transport.getRemoteAddress(), reason); 1114 } catch (InterruptedException e) { 1115 } 1116 } 1117 }); 1118 } catch (Throwable t) { 1119 LOG.warn("Cannot create stopAsync. This exception will be ignored.", t); 1120 } 1121 } 1122 } 1123 1124 public void stopAsync(Throwable cause) { 1125 transportException.set(cause); 1126 stopAsync(); 1127 } 1128 1129 public void stopAsync() { 1130 // If we're in the middle of starting then go no further... for now. 1131 synchronized (this) { 1132 pendingStop.set(true); 1133 if (starting.get()) { 1134 LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes.."); 1135 return; 1136 } 1137 } 1138 if (stopping.compareAndSet(false, true)) { 1139 // Let all the connection contexts know we are shutting down 1140 // so that in progress operations can notice and unblock. 1141 List<TransportConnectionState> connectionStates = listConnectionStates(); 1142 for (TransportConnectionState cs : connectionStates) { 1143 ConnectionContext connectionContext = cs.getContext(); 1144 if (connectionContext != null) { 1145 connectionContext.getStopping().set(true); 1146 } 1147 } 1148 try { 1149 stopTaskRunnerFactory.execute(new Runnable() { 1150 @Override 1151 public void run() { 1152 serviceLock.writeLock().lock(); 1153 try { 1154 doStop(); 1155 } catch (Throwable e) { 1156 LOG.debug("Error occurred while shutting down a connection {}", this, e); 1157 } finally { 1158 stopped.countDown(); 1159 serviceLock.writeLock().unlock(); 1160 } 1161 } 1162 }); 1163 } catch (Throwable t) { 1164 LOG.warn("Cannot create async transport stopper thread. This exception is ignored. Not waiting for stop to complete", t); 1165 stopped.countDown(); 1166 } 1167 } 1168 } 1169 1170 @Override 1171 public String toString() { 1172 return "Transport Connection to: " + transport.getRemoteAddress(); 1173 } 1174 1175 protected void doStop() throws Exception { 1176 LOG.debug("Stopping connection: {}", transport.getRemoteAddress()); 1177 connector.onStopped(this); 1178 try { 1179 synchronized (this) { 1180 if (duplexBridge != null) { 1181 duplexBridge.stop(); 1182 } 1183 } 1184 } catch (Exception ignore) { 1185 LOG.trace("Exception caught stopping. This exception is ignored.", ignore); 1186 } 1187 try { 1188 transport.stop(); 1189 LOG.debug("Stopped transport: {}", transport.getRemoteAddress()); 1190 } catch (Exception e) { 1191 LOG.debug("Could not stop transport to {}. This exception is ignored.", transport.getRemoteAddress(), e); 1192 } 1193 if (taskRunner != null) { 1194 taskRunner.shutdown(1); 1195 taskRunner = null; 1196 } 1197 active = false; 1198 // Run the MessageDispatch callbacks so that message references get 1199 // cleaned up. 1200 synchronized (dispatchQueue) { 1201 for (Iterator<Command> iter = dispatchQueue.iterator(); iter.hasNext(); ) { 1202 Command command = iter.next(); 1203 if (command.isMessageDispatch()) { 1204 MessageDispatch md = (MessageDispatch) command; 1205 TransmitCallback sub = md.getTransmitCallback(); 1206 broker.postProcessDispatch(md); 1207 if (sub != null) { 1208 sub.onFailure(); 1209 } 1210 } 1211 } 1212 dispatchQueue.clear(); 1213 } 1214 // 1215 // Remove all logical connection associated with this connection 1216 // from the broker. 1217 if (!broker.isStopped()) { 1218 List<TransportConnectionState> connectionStates = listConnectionStates(); 1219 connectionStates = listConnectionStates(); 1220 for (TransportConnectionState cs : connectionStates) { 1221 cs.getContext().getStopping().set(true); 1222 try { 1223 LOG.debug("Cleaning up connection resources: {}", getRemoteAddress()); 1224 processRemoveConnection(cs.getInfo().getConnectionId(), RemoveInfo.LAST_DELIVERED_UNKNOWN); 1225 } catch (Throwable ignore) { 1226 ignore.printStackTrace(); 1227 } 1228 } 1229 } 1230 LOG.debug("Connection Stopped: {}", getRemoteAddress()); 1231 } 1232 1233 /** 1234 * @return Returns the blockedCandidate. 1235 */ 1236 public boolean isBlockedCandidate() { 1237 return blockedCandidate; 1238 } 1239 1240 /** 1241 * @param blockedCandidate The blockedCandidate to set. 1242 */ 1243 public void setBlockedCandidate(boolean blockedCandidate) { 1244 this.blockedCandidate = blockedCandidate; 1245 } 1246 1247 /** 1248 * @return Returns the markedCandidate. 1249 */ 1250 public boolean isMarkedCandidate() { 1251 return markedCandidate; 1252 } 1253 1254 /** 1255 * @param markedCandidate The markedCandidate to set. 1256 */ 1257 public void setMarkedCandidate(boolean markedCandidate) { 1258 this.markedCandidate = markedCandidate; 1259 if (!markedCandidate) { 1260 timeStamp = 0; 1261 blockedCandidate = false; 1262 } 1263 } 1264 1265 /** 1266 * @param slow The slow to set. 1267 */ 1268 public void setSlow(boolean slow) { 1269 this.slow = slow; 1270 } 1271 1272 /** 1273 * @return true if the Connection is slow 1274 */ 1275 @Override 1276 public boolean isSlow() { 1277 return slow; 1278 } 1279 1280 /** 1281 * @return true if the Connection is potentially blocked 1282 */ 1283 public boolean isMarkedBlockedCandidate() { 1284 return markedCandidate; 1285 } 1286 1287 /** 1288 * Mark the Connection, so we can deem if it's collectable on the next sweep 1289 */ 1290 public void doMark() { 1291 if (timeStamp == 0) { 1292 timeStamp = System.currentTimeMillis(); 1293 } 1294 } 1295 1296 /** 1297 * @return if after being marked, the Connection is still writing 1298 */ 1299 @Override 1300 public boolean isBlocked() { 1301 return blocked; 1302 } 1303 1304 /** 1305 * @return true if the Connection is connected 1306 */ 1307 @Override 1308 public boolean isConnected() { 1309 return connected; 1310 } 1311 1312 /** 1313 * @param blocked The blocked to set. 1314 */ 1315 public void setBlocked(boolean blocked) { 1316 this.blocked = blocked; 1317 } 1318 1319 /** 1320 * @param connected The connected to set. 1321 */ 1322 public void setConnected(boolean connected) { 1323 this.connected = connected; 1324 } 1325 1326 /** 1327 * @return true if the Connection is active 1328 */ 1329 @Override 1330 public boolean isActive() { 1331 return active; 1332 } 1333 1334 /** 1335 * @param active The active to set. 1336 */ 1337 public void setActive(boolean active) { 1338 this.active = active; 1339 } 1340 1341 /** 1342 * @return true if the Connection is starting 1343 */ 1344 public boolean isStarting() { 1345 return starting.get(); 1346 } 1347 1348 @Override 1349 public synchronized boolean isNetworkConnection() { 1350 return networkConnection; 1351 } 1352 1353 @Override 1354 public boolean isFaultTolerantConnection() { 1355 return this.faultTolerantConnection; 1356 } 1357 1358 protected void setStarting(boolean starting) { 1359 this.starting.set(starting); 1360 } 1361 1362 /** 1363 * @return true if the Connection needs to stop 1364 */ 1365 public boolean isPendingStop() { 1366 return pendingStop.get(); 1367 } 1368 1369 protected void setPendingStop(boolean pendingStop) { 1370 this.pendingStop.set(pendingStop); 1371 } 1372 1373 @Override 1374 public Response processBrokerInfo(BrokerInfo info) { 1375 if (info.isSlaveBroker()) { 1376 LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); 1377 } else if (info.isNetworkConnection() && info.isDuplexConnection()) { 1378 // so this TransportConnection is the rear end of a network bridge 1379 // We have been requested to create a two way pipe ... 1380 try { 1381 Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties()); 1382 Map<String, String> props = createMap(properties); 1383 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); 1384 IntrospectionSupport.setProperties(config, props, ""); 1385 config.setBrokerName(broker.getBrokerName()); 1386 1387 // check for existing duplex connection hanging about 1388 1389 // We first look if existing network connection already exists for the same broker Id and network connector name 1390 // It's possible in case of brief network fault to have this transport connector side of the connection always active 1391 // and the duplex network connector side wanting to open a new one 1392 // In this case, the old connection must be broken 1393 String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId(); 1394 CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections(); 1395 synchronized (connections) { 1396 for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext(); ) { 1397 TransportConnection c = iter.next(); 1398 if ((c != this) && (duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) { 1399 LOG.warn("Stopping an existing active duplex connection [{}] for network connector ({}).", c, duplexNetworkConnectorId); 1400 c.stopAsync(); 1401 // better to wait for a bit rather than get connection id already in use and failure to start new bridge 1402 c.getStopped().await(1, TimeUnit.SECONDS); 1403 } 1404 } 1405 setDuplexNetworkConnectorId(duplexNetworkConnectorId); 1406 } 1407 Transport localTransport = NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI()); 1408 Transport remoteBridgeTransport = transport; 1409 if (! (remoteBridgeTransport instanceof ResponseCorrelator)) { 1410 // the vm transport case is already wrapped 1411 remoteBridgeTransport = new ResponseCorrelator(remoteBridgeTransport); 1412 } 1413 String duplexName = localTransport.toString(); 1414 if (duplexName.contains("#")) { 1415 duplexName = duplexName.substring(duplexName.lastIndexOf("#")); 1416 } 1417 MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName)); 1418 listener.setCreatedByDuplex(true); 1419 duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener); 1420 duplexBridge.setBrokerService(brokerService); 1421 // now turn duplex off this side 1422 info.setDuplexConnection(false); 1423 duplexBridge.setCreatedByDuplex(true); 1424 duplexBridge.duplexStart(this, brokerInfo, info); 1425 LOG.info("Started responder end of duplex bridge {}", duplexNetworkConnectorId); 1426 return null; 1427 } catch (TransportDisposedIOException e) { 1428 LOG.warn("Duplex bridge {} was stopped before it was correctly started.", duplexNetworkConnectorId); 1429 return null; 1430 } catch (Exception e) { 1431 LOG.error("Failed to create responder end of duplex network bridge {}", duplexNetworkConnectorId, e); 1432 return null; 1433 } 1434 } 1435 // We only expect to get one broker info command per connection 1436 if (this.brokerInfo != null) { 1437 LOG.warn("Unexpected extra broker info command received: {}", info); 1438 } 1439 this.brokerInfo = info; 1440 networkConnection = true; 1441 List<TransportConnectionState> connectionStates = listConnectionStates(); 1442 for (TransportConnectionState cs : connectionStates) { 1443 cs.getContext().setNetworkConnection(true); 1444 } 1445 return null; 1446 } 1447 1448 @SuppressWarnings({"unchecked", "rawtypes"}) 1449 private HashMap<String, String> createMap(Properties properties) { 1450 return new HashMap(properties); 1451 } 1452 1453 protected void dispatch(Command command) throws IOException { 1454 try { 1455 setMarkedCandidate(true); 1456 transport.oneway(command); 1457 } finally { 1458 setMarkedCandidate(false); 1459 } 1460 } 1461 1462 @Override 1463 public String getRemoteAddress() { 1464 return transport.getRemoteAddress(); 1465 } 1466 1467 public Transport getTransport() { 1468 return transport; 1469 } 1470 1471 @Override 1472 public String getConnectionId() { 1473 List<TransportConnectionState> connectionStates = listConnectionStates(); 1474 for (TransportConnectionState cs : connectionStates) { 1475 if (cs.getInfo().getClientId() != null) { 1476 return cs.getInfo().getClientId(); 1477 } 1478 return cs.getInfo().getConnectionId().toString(); 1479 } 1480 return null; 1481 } 1482 1483 @Override 1484 public void updateClient(ConnectionControl control) { 1485 if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null 1486 && this.wireFormatInfo.getVersion() >= 6) { 1487 dispatchAsync(control); 1488 } 1489 } 1490 1491 public ProducerBrokerExchange getProducerBrokerExchangeIfExists(ProducerInfo producerInfo){ 1492 ProducerBrokerExchange result = null; 1493 if (producerInfo != null && producerInfo.getProducerId() != null){ 1494 synchronized (producerExchanges){ 1495 result = producerExchanges.get(producerInfo.getProducerId()); 1496 } 1497 } 1498 return result; 1499 } 1500 1501 private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException { 1502 ProducerBrokerExchange result = producerExchanges.get(id); 1503 if (result == null) { 1504 synchronized (producerExchanges) { 1505 result = new ProducerBrokerExchange(); 1506 TransportConnectionState state = lookupConnectionState(id); 1507 context = state.getContext(); 1508 result.setConnectionContext(context); 1509 if (context.isReconnect() || (context.isNetworkConnection() && connector.isAuditNetworkProducers())) { 1510 result.setLastStoredSequenceId(brokerService.getPersistenceAdapter().getLastProducerSequenceId(id)); 1511 } 1512 SessionState ss = state.getSessionState(id.getParentId()); 1513 if (ss != null) { 1514 result.setProducerState(ss.getProducerState(id)); 1515 ProducerState producerState = ss.getProducerState(id); 1516 if (producerState != null && producerState.getInfo() != null) { 1517 ProducerInfo info = producerState.getInfo(); 1518 result.setMutable(info.getDestination() == null || info.getDestination().isComposite()); 1519 } 1520 } 1521 producerExchanges.put(id, result); 1522 } 1523 } else { 1524 context = result.getConnectionContext(); 1525 } 1526 return result; 1527 } 1528 1529 private void removeProducerBrokerExchange(ProducerId id) { 1530 synchronized (producerExchanges) { 1531 producerExchanges.remove(id); 1532 } 1533 } 1534 1535 private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) { 1536 ConsumerBrokerExchange result = consumerExchanges.get(id); 1537 return result; 1538 } 1539 1540 private ConsumerBrokerExchange addConsumerBrokerExchange(TransportConnectionState connectionState, ConsumerId id) { 1541 ConsumerBrokerExchange result = consumerExchanges.get(id); 1542 if (result == null) { 1543 synchronized (consumerExchanges) { 1544 result = new ConsumerBrokerExchange(); 1545 context = connectionState.getContext(); 1546 result.setConnectionContext(context); 1547 SessionState ss = connectionState.getSessionState(id.getParentId()); 1548 if (ss != null) { 1549 ConsumerState cs = ss.getConsumerState(id); 1550 if (cs != null) { 1551 ConsumerInfo info = cs.getInfo(); 1552 if (info != null) { 1553 if (info.getDestination() != null && info.getDestination().isPattern()) { 1554 result.setWildcard(true); 1555 } 1556 } 1557 } 1558 } 1559 consumerExchanges.put(id, result); 1560 } 1561 } 1562 return result; 1563 } 1564 1565 private void removeConsumerBrokerExchange(ConsumerId id) { 1566 synchronized (consumerExchanges) { 1567 consumerExchanges.remove(id); 1568 } 1569 } 1570 1571 public int getProtocolVersion() { 1572 return protocolVersion.get(); 1573 } 1574 1575 @Override 1576 public Response processControlCommand(ControlCommand command) throws Exception { 1577 return null; 1578 } 1579 1580 @Override 1581 public Response processMessageDispatch(MessageDispatch dispatch) throws Exception { 1582 return null; 1583 } 1584 1585 @Override 1586 public Response processConnectionControl(ConnectionControl control) throws Exception { 1587 if (control != null) { 1588 faultTolerantConnection = control.isFaultTolerant(); 1589 } 1590 return null; 1591 } 1592 1593 @Override 1594 public Response processConnectionError(ConnectionError error) throws Exception { 1595 return null; 1596 } 1597 1598 @Override 1599 public Response processConsumerControl(ConsumerControl control) throws Exception { 1600 ConsumerBrokerExchange consumerExchange = getConsumerBrokerExchange(control.getConsumerId()); 1601 broker.processConsumerControl(consumerExchange, control); 1602 return null; 1603 } 1604 1605 protected synchronized TransportConnectionState registerConnectionState(ConnectionId connectionId, 1606 TransportConnectionState state) { 1607 TransportConnectionState cs = null; 1608 if (!connectionStateRegister.isEmpty() && !connectionStateRegister.doesHandleMultipleConnectionStates()) { 1609 // swap implementations 1610 TransportConnectionStateRegister newRegister = new MapTransportConnectionStateRegister(); 1611 newRegister.intialize(connectionStateRegister); 1612 connectionStateRegister = newRegister; 1613 } 1614 cs = connectionStateRegister.registerConnectionState(connectionId, state); 1615 return cs; 1616 } 1617 1618 protected synchronized TransportConnectionState unregisterConnectionState(ConnectionId connectionId) { 1619 return connectionStateRegister.unregisterConnectionState(connectionId); 1620 } 1621 1622 protected synchronized List<TransportConnectionState> listConnectionStates() { 1623 return connectionStateRegister.listConnectionStates(); 1624 } 1625 1626 protected synchronized TransportConnectionState lookupConnectionState(String connectionId) { 1627 return connectionStateRegister.lookupConnectionState(connectionId); 1628 } 1629 1630 protected synchronized TransportConnectionState lookupConnectionState(ConsumerId id) { 1631 return connectionStateRegister.lookupConnectionState(id); 1632 } 1633 1634 protected synchronized TransportConnectionState lookupConnectionState(ProducerId id) { 1635 return connectionStateRegister.lookupConnectionState(id); 1636 } 1637 1638 protected synchronized TransportConnectionState lookupConnectionState(SessionId id) { 1639 return connectionStateRegister.lookupConnectionState(id); 1640 } 1641 1642 // public only for testing 1643 public synchronized TransportConnectionState lookupConnectionState(ConnectionId connectionId) { 1644 return connectionStateRegister.lookupConnectionState(connectionId); 1645 } 1646 1647 protected synchronized void setDuplexNetworkConnectorId(String duplexNetworkConnectorId) { 1648 this.duplexNetworkConnectorId = duplexNetworkConnectorId; 1649 } 1650 1651 protected synchronized String getDuplexNetworkConnectorId() { 1652 return this.duplexNetworkConnectorId; 1653 } 1654 1655 public boolean isStopping() { 1656 return stopping.get(); 1657 } 1658 1659 protected CountDownLatch getStopped() { 1660 return stopped; 1661 } 1662 1663 private int getProducerCount(ConnectionId connectionId) { 1664 int result = 0; 1665 TransportConnectionState cs = lookupConnectionState(connectionId); 1666 if (cs != null) { 1667 for (SessionId sessionId : cs.getSessionIds()) { 1668 SessionState sessionState = cs.getSessionState(sessionId); 1669 if (sessionState != null) { 1670 result += sessionState.getProducerIds().size(); 1671 } 1672 } 1673 } 1674 return result; 1675 } 1676 1677 private int getConsumerCount(ConnectionId connectionId) { 1678 int result = 0; 1679 TransportConnectionState cs = lookupConnectionState(connectionId); 1680 if (cs != null) { 1681 for (SessionId sessionId : cs.getSessionIds()) { 1682 SessionState sessionState = cs.getSessionState(sessionId); 1683 if (sessionState != null) { 1684 result += sessionState.getConsumerIds().size(); 1685 } 1686 } 1687 } 1688 return result; 1689 } 1690 1691 public WireFormatInfo getRemoteWireFormatInfo() { 1692 return wireFormatInfo; 1693 } 1694}