001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.util.LinkedList; 023import java.util.StringTokenizer; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.regex.Pattern; 026 027import javax.management.ObjectName; 028 029import org.apache.activemq.broker.jmx.ManagedTransportConnector; 030import org.apache.activemq.broker.jmx.ManagementContext; 031import org.apache.activemq.broker.region.ConnectorStatistics; 032import org.apache.activemq.command.BrokerInfo; 033import org.apache.activemq.command.ConnectionControl; 034import org.apache.activemq.security.MessageAuthorizationPolicy; 035import org.apache.activemq.thread.TaskRunnerFactory; 036import org.apache.activemq.transport.Transport; 037import org.apache.activemq.transport.TransportAcceptListener; 038import org.apache.activemq.transport.TransportFactorySupport; 039import org.apache.activemq.transport.TransportServer; 040import org.apache.activemq.transport.discovery.DiscoveryAgent; 041import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 042import org.apache.activemq.util.ServiceStopper; 043import org.apache.activemq.util.ServiceSupport; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * @org.apache.xbean.XBean 049 */ 050public class TransportConnector implements Connector, BrokerServiceAware { 051 052 final Logger LOG = LoggerFactory.getLogger(TransportConnector.class); 053 054 protected final CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>(); 055 protected TransportStatusDetector statusDector; 056 private BrokerService brokerService; 057 private TransportServer server; 058 private URI uri; 059 private BrokerInfo brokerInfo = new BrokerInfo(); 060 private TaskRunnerFactory taskRunnerFactory; 061 private MessageAuthorizationPolicy messageAuthorizationPolicy; 062 private DiscoveryAgent discoveryAgent; 063 private final ConnectorStatistics statistics = new ConnectorStatistics(); 064 private URI discoveryUri; 065 private String name; 066 private boolean disableAsyncDispatch; 067 private boolean enableStatusMonitor = false; 068 private Broker broker; 069 private boolean updateClusterClients = false; 070 private boolean rebalanceClusterClients; 071 private boolean updateClusterClientsOnRemove = false; 072 private String updateClusterFilter; 073 private boolean auditNetworkProducers = false; 074 private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE; 075 private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE; 076 private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy(); 077 private boolean warnOnRemoteClose = false; 078 079 LinkedList<String> peerBrokers = new LinkedList<String>(); 080 081 public TransportConnector() { 082 } 083 084 public TransportConnector(TransportServer server) { 085 this(); 086 setServer(server); 087 if (server != null && server.getConnectURI() != null) { 088 URI uri = server.getConnectURI(); 089 if (uri != null && uri.getScheme().equals("vm")) { 090 setEnableStatusMonitor(false); 091 } 092 } 093 } 094 095 /** 096 * @return Returns the connections. 097 */ 098 public CopyOnWriteArrayList<TransportConnection> getConnections() { 099 return connections; 100 } 101 102 /** 103 * Factory method to create a JMX managed version of this transport 104 * connector 105 */ 106 public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException { 107 ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer()); 108 rc.setBrokerInfo(getBrokerInfo()); 109 rc.setDisableAsyncDispatch(isDisableAsyncDispatch()); 110 rc.setDiscoveryAgent(getDiscoveryAgent()); 111 rc.setDiscoveryUri(getDiscoveryUri()); 112 rc.setEnableStatusMonitor(isEnableStatusMonitor()); 113 rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 114 rc.setName(getName()); 115 rc.setTaskRunnerFactory(getTaskRunnerFactory()); 116 rc.setUri(getUri()); 117 rc.setBrokerService(brokerService); 118 rc.setUpdateClusterClients(isUpdateClusterClients()); 119 rc.setRebalanceClusterClients(isRebalanceClusterClients()); 120 rc.setUpdateClusterFilter(getUpdateClusterFilter()); 121 rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove()); 122 rc.setAuditNetworkProducers(isAuditNetworkProducers()); 123 rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection()); 124 rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection()); 125 rc.setPublishedAddressPolicy(getPublishedAddressPolicy()); 126 rc.setWarnOnRemoteClose(isWarnOnRemoteClose()); 127 return rc; 128 } 129 130 @Override 131 public BrokerInfo getBrokerInfo() { 132 return brokerInfo; 133 } 134 135 public void setBrokerInfo(BrokerInfo brokerInfo) { 136 this.brokerInfo = brokerInfo; 137 } 138 139 public TransportServer getServer() throws IOException, URISyntaxException { 140 if (server == null) { 141 setServer(createTransportServer()); 142 } 143 return server; 144 } 145 146 public void setServer(TransportServer server) { 147 this.server = server; 148 } 149 150 public URI getUri() { 151 if (uri == null) { 152 try { 153 uri = getConnectUri(); 154 } catch (Throwable e) { 155 } 156 } 157 return uri; 158 } 159 160 /** 161 * Sets the server transport URI to use if there is not a 162 * {@link TransportServer} configured via the 163 * {@link #setServer(TransportServer)} method. This value is used to lazy 164 * create a {@link TransportServer} instance 165 * 166 * @param uri 167 */ 168 public void setUri(URI uri) { 169 this.uri = uri; 170 } 171 172 public TaskRunnerFactory getTaskRunnerFactory() { 173 return taskRunnerFactory; 174 } 175 176 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 177 this.taskRunnerFactory = taskRunnerFactory; 178 } 179 180 /** 181 * @return the statistics for this connector 182 */ 183 @Override 184 public ConnectorStatistics getStatistics() { 185 return statistics; 186 } 187 188 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 189 return messageAuthorizationPolicy; 190 } 191 192 /** 193 * Sets the policy used to decide if the current connection is authorized to 194 * consume a given message 195 */ 196 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 197 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 198 } 199 200 @Override 201 public void start() throws Exception { 202 broker = brokerService.getBroker(); 203 brokerInfo.setBrokerName(broker.getBrokerName()); 204 brokerInfo.setBrokerId(broker.getBrokerId()); 205 brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); 206 brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); 207 brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString()); 208 getServer().setAcceptListener(new TransportAcceptListener() { 209 @Override 210 public void onAccept(final Transport transport) { 211 final String remoteHost = transport.getRemoteAddress(); 212 try { 213 brokerService.getTaskRunnerFactory().execute(new Runnable() { 214 @Override 215 public void run() { 216 try { 217 if (!brokerService.isStopping()) { 218 Connection connection = createConnection(transport); 219 connection.start(); 220 } else { 221 throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); 222 } 223 } catch (Exception e) { 224 ServiceSupport.dispose(transport); 225 onAcceptError(e, remoteHost); 226 } 227 } 228 }); 229 } catch (Exception e) { 230 ServiceSupport.dispose(transport); 231 onAcceptError(e, remoteHost); 232 } 233 } 234 235 @Override 236 public void onAcceptError(Exception error) { 237 onAcceptError(error, null); 238 } 239 240 private void onAcceptError(Exception error, String remoteHost) { 241 LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": " 242 + error); 243 LOG.debug("Reason: " + error, error); 244 } 245 }); 246 getServer().setBrokerInfo(brokerInfo); 247 getServer().start(); 248 249 DiscoveryAgent da = getDiscoveryAgent(); 250 if (da != null) { 251 da.registerService(getPublishableConnectString()); 252 da.start(); 253 } 254 if (enableStatusMonitor) { 255 this.statusDector = new TransportStatusDetector(this); 256 this.statusDector.start(); 257 } 258 259 LOG.info("Connector {} started", getName()); 260 } 261 262 public String getPublishableConnectString() throws Exception { 263 String publishableConnectString = publishedAddressPolicy.getPublishableConnectString(this); 264 LOG.debug("Publishing: {} for broker transport URI: {}", publishableConnectString, getConnectUri()); 265 return publishableConnectString; 266 } 267 268 public URI getPublishableConnectURI() throws Exception { 269 return publishedAddressPolicy.getPublishableConnectURI(this); 270 } 271 272 @Override 273 public void stop() throws Exception { 274 ServiceStopper ss = new ServiceStopper(); 275 if (discoveryAgent != null) { 276 ss.stop(discoveryAgent); 277 } 278 if (server != null) { 279 ss.stop(server); 280 } 281 if (this.statusDector != null) { 282 this.statusDector.stop(); 283 } 284 285 for (TransportConnection connection : connections) { 286 ss.stop(connection); 287 } 288 server = null; 289 ss.throwFirstException(); 290 LOG.info("Connector {} stopped", getName()); 291 } 292 293 // Implementation methods 294 // ------------------------------------------------------------------------- 295 protected Connection createConnection(Transport transport) throws IOException { 296 // prefer to use task runner from broker service as stop task runner, as we can then 297 // tie it to the lifecycle of the broker service 298 TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null 299 : taskRunnerFactory, brokerService.getTaskRunnerFactory()); 300 boolean statEnabled = this.getStatistics().isEnabled(); 301 answer.getStatistics().setEnabled(statEnabled); 302 answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy); 303 return answer; 304 } 305 306 protected TransportServer createTransportServer() throws IOException, URISyntaxException { 307 if (uri == null) { 308 throw new IllegalArgumentException("You must specify either a server or uri property"); 309 } 310 if (brokerService == null) { 311 throw new IllegalArgumentException( 312 "You must specify the brokerService property. Maybe this connector should be added to a broker?"); 313 } 314 return TransportFactorySupport.bind(brokerService, uri); 315 } 316 317 public DiscoveryAgent getDiscoveryAgent() throws IOException { 318 if (discoveryAgent == null) { 319 discoveryAgent = createDiscoveryAgent(); 320 } 321 return discoveryAgent; 322 } 323 324 protected DiscoveryAgent createDiscoveryAgent() throws IOException { 325 if (discoveryUri != null) { 326 DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri); 327 328 if (agent != null && agent instanceof BrokerServiceAware) { 329 ((BrokerServiceAware) agent).setBrokerService(brokerService); 330 } 331 332 return agent; 333 } 334 return null; 335 } 336 337 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 338 this.discoveryAgent = discoveryAgent; 339 } 340 341 public URI getDiscoveryUri() { 342 return discoveryUri; 343 } 344 345 public void setDiscoveryUri(URI discoveryUri) { 346 this.discoveryUri = discoveryUri; 347 } 348 349 public URI getConnectUri() throws IOException, URISyntaxException { 350 if (server != null) { 351 return server.getConnectURI(); 352 } else { 353 return uri; 354 } 355 } 356 357 public void onStarted(TransportConnection connection) { 358 connections.add(connection); 359 } 360 361 public void onStopped(TransportConnection connection) { 362 connections.remove(connection); 363 } 364 365 public String getName() { 366 if (name == null) { 367 uri = getUri(); 368 if (uri != null) { 369 name = uri.toString(); 370 } 371 } 372 return name; 373 } 374 375 public void setName(String name) { 376 this.name = name; 377 } 378 379 @Override 380 public String toString() { 381 String rc = getName(); 382 if (rc == null) { 383 rc = super.toString(); 384 } 385 return rc; 386 } 387 388 protected ConnectionControl getConnectionControl() { 389 boolean rebalance = isRebalanceClusterClients(); 390 String connectedBrokers = ""; 391 String separator = ""; 392 393 if (isUpdateClusterClients()) { 394 synchronized (peerBrokers) { 395 for (String uri : getPeerBrokers()) { 396 connectedBrokers += separator + uri; 397 separator = ","; 398 } 399 400 if (rebalance) { 401 String shuffle = peerBrokers.removeFirst(); 402 peerBrokers.addLast(shuffle); 403 } 404 } 405 } 406 ConnectionControl control = new ConnectionControl(); 407 control.setConnectedBrokers(connectedBrokers); 408 control.setRebalanceConnection(rebalance); 409 return control; 410 } 411 412 public void addPeerBroker(BrokerInfo info) { 413 if (isMatchesClusterFilter(info.getBrokerName())) { 414 synchronized (peerBrokers) { 415 getPeerBrokers().addLast(info.getBrokerURL()); 416 } 417 } 418 } 419 420 public void removePeerBroker(BrokerInfo info) { 421 synchronized (peerBrokers) { 422 getPeerBrokers().remove(info.getBrokerURL()); 423 } 424 } 425 426 public LinkedList<String> getPeerBrokers() { 427 synchronized (peerBrokers) { 428 if (peerBrokers.isEmpty()) { 429 peerBrokers.add(brokerService.getDefaultSocketURIString()); 430 } 431 return peerBrokers; 432 } 433 } 434 435 @Override 436 public void updateClientClusterInfo() { 437 if (isRebalanceClusterClients() || isUpdateClusterClients()) { 438 ConnectionControl control = getConnectionControl(); 439 for (Connection c : this.connections) { 440 c.updateClient(control); 441 if (isRebalanceClusterClients()) { 442 control = getConnectionControl(); 443 } 444 } 445 } 446 } 447 448 private boolean isMatchesClusterFilter(String brokerName) { 449 boolean result = true; 450 String filter = getUpdateClusterFilter(); 451 if (filter != null) { 452 filter = filter.trim(); 453 if (filter.length() > 0) { 454 result = false; 455 StringTokenizer tokenizer = new StringTokenizer(filter, ","); 456 while (!result && tokenizer.hasMoreTokens()) { 457 String token = tokenizer.nextToken(); 458 result = isMatchesClusterFilter(brokerName, token); 459 } 460 } 461 } 462 463 return result; 464 } 465 466 private boolean isMatchesClusterFilter(String brokerName, String match) { 467 boolean result = false; 468 if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) { 469 result = Pattern.matches(match, brokerName); 470 } 471 return result; 472 } 473 474 public boolean isDisableAsyncDispatch() { 475 return disableAsyncDispatch; 476 } 477 478 public void setDisableAsyncDispatch(boolean disableAsyncDispatch) { 479 this.disableAsyncDispatch = disableAsyncDispatch; 480 } 481 482 /** 483 * @return the enableStatusMonitor 484 */ 485 public boolean isEnableStatusMonitor() { 486 return enableStatusMonitor; 487 } 488 489 /** 490 * @param enableStatusMonitor 491 * the enableStatusMonitor to set 492 */ 493 public void setEnableStatusMonitor(boolean enableStatusMonitor) { 494 this.enableStatusMonitor = enableStatusMonitor; 495 } 496 497 /** 498 * This is called by the BrokerService right before it starts the transport. 499 */ 500 @Override 501 public void setBrokerService(BrokerService brokerService) { 502 this.brokerService = brokerService; 503 } 504 505 public Broker getBroker() { 506 return broker; 507 } 508 509 public BrokerService getBrokerService() { 510 return brokerService; 511 } 512 513 /** 514 * @return the updateClusterClients 515 */ 516 @Override 517 public boolean isUpdateClusterClients() { 518 return this.updateClusterClients; 519 } 520 521 /** 522 * @param updateClusterClients 523 * the updateClusterClients to set 524 */ 525 public void setUpdateClusterClients(boolean updateClusterClients) { 526 this.updateClusterClients = updateClusterClients; 527 } 528 529 /** 530 * @return the rebalanceClusterClients 531 */ 532 @Override 533 public boolean isRebalanceClusterClients() { 534 return this.rebalanceClusterClients; 535 } 536 537 /** 538 * @param rebalanceClusterClients 539 * the rebalanceClusterClients to set 540 */ 541 public void setRebalanceClusterClients(boolean rebalanceClusterClients) { 542 this.rebalanceClusterClients = rebalanceClusterClients; 543 } 544 545 /** 546 * @return the updateClusterClientsOnRemove 547 */ 548 @Override 549 public boolean isUpdateClusterClientsOnRemove() { 550 return this.updateClusterClientsOnRemove; 551 } 552 553 /** 554 * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set 555 */ 556 public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) { 557 this.updateClusterClientsOnRemove = updateClusterClientsOnRemove; 558 } 559 560 /** 561 * @return the updateClusterFilter 562 */ 563 public String getUpdateClusterFilter() { 564 return this.updateClusterFilter; 565 } 566 567 /** 568 * @param updateClusterFilter 569 * the updateClusterFilter to set 570 */ 571 public void setUpdateClusterFilter(String updateClusterFilter) { 572 this.updateClusterFilter = updateClusterFilter; 573 } 574 575 @Override 576 public int connectionCount() { 577 return connections.size(); 578 } 579 580 @Override 581 public boolean isAllowLinkStealing() { 582 return server.isAllowLinkStealing(); 583 } 584 585 public boolean isAuditNetworkProducers() { 586 return auditNetworkProducers; 587 } 588 589 /** 590 * Enable a producer audit on network connections, Traps the case of a missing send reply and resend. 591 * Note: does not work with conduit=false, networked composite destinations or networked virtual topics 592 * @param auditNetworkProducers 593 */ 594 public void setAuditNetworkProducers(boolean auditNetworkProducers) { 595 this.auditNetworkProducers = auditNetworkProducers; 596 } 597 598 public int getMaximumProducersAllowedPerConnection() { 599 return maximumProducersAllowedPerConnection; 600 } 601 602 public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) { 603 this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection; 604 } 605 606 public int getMaximumConsumersAllowedPerConnection() { 607 return maximumConsumersAllowedPerConnection; 608 } 609 610 public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) { 611 this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection; 612 } 613 614 /** 615 * Gets the currently configured policy for creating the published connection address of this 616 * TransportConnector. 617 * 618 * @return the publishedAddressPolicy 619 */ 620 public PublishedAddressPolicy getPublishedAddressPolicy() { 621 return publishedAddressPolicy; 622 } 623 624 /** 625 * Sets the configured policy for creating the published connection address of this 626 * TransportConnector. 627 * 628 * @return the publishedAddressPolicy 629 */ 630 public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) { 631 this.publishedAddressPolicy = publishedAddressPolicy; 632 } 633 634 public boolean isWarnOnRemoteClose() { 635 return warnOnRemoteClose; 636 } 637 638 public void setWarnOnRemoteClose(boolean warnOnRemoteClose) { 639 this.warnOnRemoteClose = warnOnRemoteClose; 640 } 641}