001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.util.LinkedList; 023import java.util.StringTokenizer; 024import java.util.concurrent.CopyOnWriteArrayList; 025import java.util.regex.Pattern; 026 027import javax.management.ObjectName; 028 029import org.apache.activemq.broker.jmx.ManagedTransportConnector; 030import org.apache.activemq.broker.jmx.ManagementContext; 031import org.apache.activemq.broker.region.ConnectorStatistics; 032import org.apache.activemq.command.BrokerInfo; 033import org.apache.activemq.command.ConnectionControl; 034import org.apache.activemq.security.MessageAuthorizationPolicy; 035import org.apache.activemq.thread.TaskRunnerFactory; 036import org.apache.activemq.transport.Transport; 037import org.apache.activemq.transport.TransportAcceptListener; 038import org.apache.activemq.transport.TransportFactorySupport; 039import org.apache.activemq.transport.TransportServer; 040import org.apache.activemq.transport.discovery.DiscoveryAgent; 041import org.apache.activemq.transport.discovery.DiscoveryAgentFactory; 042import org.apache.activemq.util.ServiceStopper; 043import org.apache.activemq.util.ServiceSupport; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * @org.apache.xbean.XBean 049 */ 050public class TransportConnector implements Connector, BrokerServiceAware { 051 052 final Logger LOG = LoggerFactory.getLogger(TransportConnector.class); 053 054 protected final CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>(); 055 protected TransportStatusDetector statusDector; 056 private BrokerService brokerService; 057 private TransportServer server; 058 private URI uri; 059 private BrokerInfo brokerInfo = new BrokerInfo(); 060 private TaskRunnerFactory taskRunnerFactory; 061 private MessageAuthorizationPolicy messageAuthorizationPolicy; 062 private DiscoveryAgent discoveryAgent; 063 private final ConnectorStatistics statistics = new ConnectorStatistics(); 064 private URI discoveryUri; 065 private String name; 066 private boolean disableAsyncDispatch; 067 private boolean enableStatusMonitor = false; 068 private Broker broker; 069 private boolean updateClusterClients = false; 070 private boolean rebalanceClusterClients; 071 private boolean updateClusterClientsOnRemove = false; 072 private String updateClusterFilter; 073 private boolean auditNetworkProducers = false; 074 private int maximumProducersAllowedPerConnection = Integer.MAX_VALUE; 075 private int maximumConsumersAllowedPerConnection = Integer.MAX_VALUE; 076 private PublishedAddressPolicy publishedAddressPolicy = new PublishedAddressPolicy(); 077 private boolean allowLinkStealing; 078 079 LinkedList<String> peerBrokers = new LinkedList<String>(); 080 081 public TransportConnector() { 082 } 083 084 public TransportConnector(TransportServer server) { 085 this(); 086 setServer(server); 087 if (server != null && server.getConnectURI() != null) { 088 URI uri = server.getConnectURI(); 089 if (uri != null && uri.getScheme().equals("vm")) { 090 setEnableStatusMonitor(false); 091 } 092 } 093 } 094 095 /** 096 * @return Returns the connections. 097 */ 098 public CopyOnWriteArrayList<TransportConnection> getConnections() { 099 return connections; 100 } 101 102 /** 103 * Factory method to create a JMX managed version of this transport 104 * connector 105 */ 106 public ManagedTransportConnector asManagedConnector(ManagementContext context, ObjectName connectorName) throws IOException, URISyntaxException { 107 ManagedTransportConnector rc = new ManagedTransportConnector(context, connectorName, getServer()); 108 rc.setBrokerInfo(getBrokerInfo()); 109 rc.setDisableAsyncDispatch(isDisableAsyncDispatch()); 110 rc.setDiscoveryAgent(getDiscoveryAgent()); 111 rc.setDiscoveryUri(getDiscoveryUri()); 112 rc.setEnableStatusMonitor(isEnableStatusMonitor()); 113 rc.setMessageAuthorizationPolicy(getMessageAuthorizationPolicy()); 114 rc.setName(getName()); 115 rc.setTaskRunnerFactory(getTaskRunnerFactory()); 116 rc.setUri(getUri()); 117 rc.setBrokerService(brokerService); 118 rc.setUpdateClusterClients(isUpdateClusterClients()); 119 rc.setRebalanceClusterClients(isRebalanceClusterClients()); 120 rc.setUpdateClusterFilter(getUpdateClusterFilter()); 121 rc.setUpdateClusterClientsOnRemove(isUpdateClusterClientsOnRemove()); 122 rc.setAuditNetworkProducers(isAuditNetworkProducers()); 123 rc.setMaximumConsumersAllowedPerConnection(getMaximumConsumersAllowedPerConnection()); 124 rc.setMaximumProducersAllowedPerConnection(getMaximumProducersAllowedPerConnection()); 125 rc.setPublishedAddressPolicy(getPublishedAddressPolicy()); 126 rc.setAllowLinkStealing(isAllowLinkStealing()); 127 return rc; 128 } 129 130 @Override 131 public BrokerInfo getBrokerInfo() { 132 return brokerInfo; 133 } 134 135 public void setBrokerInfo(BrokerInfo brokerInfo) { 136 this.brokerInfo = brokerInfo; 137 } 138 139 public TransportServer getServer() throws IOException, URISyntaxException { 140 if (server == null) { 141 setServer(createTransportServer()); 142 } 143 return server; 144 } 145 146 public void setServer(TransportServer server) { 147 this.server = server; 148 } 149 150 public URI getUri() { 151 if (uri == null) { 152 try { 153 uri = getConnectUri(); 154 } catch (Throwable e) { 155 } 156 } 157 return uri; 158 } 159 160 /** 161 * Sets the server transport URI to use if there is not a 162 * {@link TransportServer} configured via the 163 * {@link #setServer(TransportServer)} method. This value is used to lazy 164 * create a {@link TransportServer} instance 165 * 166 * @param uri 167 */ 168 public void setUri(URI uri) { 169 this.uri = uri; 170 } 171 172 public TaskRunnerFactory getTaskRunnerFactory() { 173 return taskRunnerFactory; 174 } 175 176 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 177 this.taskRunnerFactory = taskRunnerFactory; 178 } 179 180 /** 181 * @return the statistics for this connector 182 */ 183 @Override 184 public ConnectorStatistics getStatistics() { 185 return statistics; 186 } 187 188 public MessageAuthorizationPolicy getMessageAuthorizationPolicy() { 189 return messageAuthorizationPolicy; 190 } 191 192 /** 193 * Sets the policy used to decide if the current connection is authorized to 194 * consume a given message 195 */ 196 public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) { 197 this.messageAuthorizationPolicy = messageAuthorizationPolicy; 198 } 199 200 @Override 201 public void start() throws Exception { 202 broker = brokerService.getBroker(); 203 brokerInfo.setBrokerName(broker.getBrokerName()); 204 brokerInfo.setBrokerId(broker.getBrokerId()); 205 brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos()); 206 brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration()); 207 brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString()); 208 getServer().setAcceptListener(new TransportAcceptListener() { 209 @Override 210 public void onAccept(final Transport transport) { 211 try { 212 brokerService.getTaskRunnerFactory().execute(new Runnable() { 213 @Override 214 public void run() { 215 try { 216 if (!brokerService.isStopping()) { 217 Connection connection = createConnection(transport); 218 connection.start(); 219 } else { 220 throw new BrokerStoppedException("Broker " + brokerService + " is being stopped"); 221 } 222 } catch (Exception e) { 223 String remoteHost = transport.getRemoteAddress(); 224 ServiceSupport.dispose(transport); 225 onAcceptError(e, remoteHost); 226 } 227 } 228 }); 229 } catch (Exception e) { 230 String remoteHost = transport.getRemoteAddress(); 231 ServiceSupport.dispose(transport); 232 onAcceptError(e, remoteHost); 233 } 234 } 235 236 @Override 237 public void onAcceptError(Exception error) { 238 onAcceptError(error, null); 239 } 240 241 private void onAcceptError(Exception error, String remoteHost) { 242 LOG.error("Could not accept connection " + (remoteHost == null ? "" : "from " + remoteHost) + ": " 243 + error); 244 LOG.debug("Reason: " + error, error); 245 } 246 }); 247 getServer().setBrokerInfo(brokerInfo); 248 getServer().start(); 249 250 DiscoveryAgent da = getDiscoveryAgent(); 251 if (da != null) { 252 da.registerService(getPublishableConnectString()); 253 da.start(); 254 } 255 if (enableStatusMonitor) { 256 this.statusDector = new TransportStatusDetector(this); 257 this.statusDector.start(); 258 } 259 260 LOG.info("Connector {} started", getName()); 261 } 262 263 public String getPublishableConnectString() throws Exception { 264 String publishableConnectString = publishedAddressPolicy.getPublishableConnectString(this); 265 LOG.debug("Publishing: {} for broker transport URI: {}", publishableConnectString, getConnectUri()); 266 return publishableConnectString; 267 } 268 269 public URI getPublishableConnectURI() throws Exception { 270 return publishedAddressPolicy.getPublishableConnectURI(this); 271 } 272 273 @Override 274 public void stop() throws Exception { 275 ServiceStopper ss = new ServiceStopper(); 276 if (discoveryAgent != null) { 277 ss.stop(discoveryAgent); 278 } 279 if (server != null) { 280 ss.stop(server); 281 } 282 if (this.statusDector != null) { 283 this.statusDector.stop(); 284 } 285 286 for (TransportConnection connection : connections) { 287 ss.stop(connection); 288 } 289 server = null; 290 ss.throwFirstException(); 291 LOG.info("Connector {} stopped", getName()); 292 } 293 294 // Implementation methods 295 // ------------------------------------------------------------------------- 296 protected Connection createConnection(Transport transport) throws IOException { 297 // prefer to use task runner from broker service as stop task runner, as we can then 298 // tie it to the lifecycle of the broker service 299 TransportConnection answer = new TransportConnection(this, transport, broker, disableAsyncDispatch ? null 300 : taskRunnerFactory, brokerService.getTaskRunnerFactory()); 301 boolean statEnabled = this.getStatistics().isEnabled(); 302 answer.getStatistics().setEnabled(statEnabled); 303 answer.setMessageAuthorizationPolicy(messageAuthorizationPolicy); 304 return answer; 305 } 306 307 protected TransportServer createTransportServer() throws IOException, URISyntaxException { 308 if (uri == null) { 309 throw new IllegalArgumentException("You must specify either a server or uri property"); 310 } 311 if (brokerService == null) { 312 throw new IllegalArgumentException( 313 "You must specify the brokerService property. Maybe this connector should be added to a broker?"); 314 } 315 return TransportFactorySupport.bind(brokerService, uri); 316 } 317 318 public DiscoveryAgent getDiscoveryAgent() throws IOException { 319 if (discoveryAgent == null) { 320 discoveryAgent = createDiscoveryAgent(); 321 } 322 return discoveryAgent; 323 } 324 325 protected DiscoveryAgent createDiscoveryAgent() throws IOException { 326 if (discoveryUri != null) { 327 DiscoveryAgent agent = DiscoveryAgentFactory.createDiscoveryAgent(discoveryUri); 328 329 if (agent != null && agent instanceof BrokerServiceAware) { 330 ((BrokerServiceAware) agent).setBrokerService(brokerService); 331 } 332 333 return agent; 334 } 335 return null; 336 } 337 338 public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) { 339 this.discoveryAgent = discoveryAgent; 340 } 341 342 public URI getDiscoveryUri() { 343 return discoveryUri; 344 } 345 346 public void setDiscoveryUri(URI discoveryUri) { 347 this.discoveryUri = discoveryUri; 348 } 349 350 public URI getConnectUri() throws IOException, URISyntaxException { 351 if (server != null) { 352 return server.getConnectURI(); 353 } else { 354 return uri; 355 } 356 } 357 358 public void onStarted(TransportConnection connection) { 359 connections.add(connection); 360 } 361 362 public void onStopped(TransportConnection connection) { 363 connections.remove(connection); 364 } 365 366 public String getName() { 367 if (name == null) { 368 uri = getUri(); 369 if (uri != null) { 370 name = uri.toString(); 371 } 372 } 373 return name; 374 } 375 376 public void setName(String name) { 377 this.name = name; 378 } 379 380 @Override 381 public String toString() { 382 String rc = getName(); 383 if (rc == null) { 384 rc = super.toString(); 385 } 386 return rc; 387 } 388 389 protected ConnectionControl getConnectionControl() { 390 boolean rebalance = isRebalanceClusterClients(); 391 String connectedBrokers = ""; 392 String separator = ""; 393 394 if (isUpdateClusterClients()) { 395 synchronized (peerBrokers) { 396 for (String uri : getPeerBrokers()) { 397 connectedBrokers += separator + uri; 398 separator = ","; 399 } 400 401 if (rebalance) { 402 String shuffle = peerBrokers.removeFirst(); 403 peerBrokers.addLast(shuffle); 404 } 405 } 406 } 407 ConnectionControl control = new ConnectionControl(); 408 control.setConnectedBrokers(connectedBrokers); 409 control.setRebalanceConnection(rebalance); 410 return control; 411 } 412 413 public void addPeerBroker(BrokerInfo info) { 414 if (isMatchesClusterFilter(info.getBrokerName())) { 415 synchronized (peerBrokers) { 416 getPeerBrokers().addLast(info.getBrokerURL()); 417 } 418 } 419 } 420 421 public void removePeerBroker(BrokerInfo info) { 422 synchronized (peerBrokers) { 423 getPeerBrokers().remove(info.getBrokerURL()); 424 } 425 } 426 427 public LinkedList<String> getPeerBrokers() { 428 synchronized (peerBrokers) { 429 if (peerBrokers.isEmpty()) { 430 peerBrokers.add(brokerService.getDefaultSocketURIString()); 431 } 432 return peerBrokers; 433 } 434 } 435 436 @Override 437 public void updateClientClusterInfo() { 438 if (isRebalanceClusterClients() || isUpdateClusterClients()) { 439 ConnectionControl control = getConnectionControl(); 440 for (Connection c : this.connections) { 441 c.updateClient(control); 442 if (isRebalanceClusterClients()) { 443 control = getConnectionControl(); 444 } 445 } 446 } 447 } 448 449 private boolean isMatchesClusterFilter(String brokerName) { 450 boolean result = true; 451 String filter = getUpdateClusterFilter(); 452 if (filter != null) { 453 filter = filter.trim(); 454 if (filter.length() > 0) { 455 result = false; 456 StringTokenizer tokenizer = new StringTokenizer(filter, ","); 457 while (!result && tokenizer.hasMoreTokens()) { 458 String token = tokenizer.nextToken(); 459 result = isMatchesClusterFilter(brokerName, token); 460 } 461 } 462 } 463 464 return result; 465 } 466 467 private boolean isMatchesClusterFilter(String brokerName, String match) { 468 boolean result = false; 469 if (brokerName != null && match != null && brokerName.length() > 0 && match.length() > 0) { 470 result = Pattern.matches(match, brokerName); 471 } 472 return result; 473 } 474 475 public boolean isDisableAsyncDispatch() { 476 return disableAsyncDispatch; 477 } 478 479 public void setDisableAsyncDispatch(boolean disableAsyncDispatch) { 480 this.disableAsyncDispatch = disableAsyncDispatch; 481 } 482 483 /** 484 * @return the enableStatusMonitor 485 */ 486 public boolean isEnableStatusMonitor() { 487 return enableStatusMonitor; 488 } 489 490 /** 491 * @param enableStatusMonitor 492 * the enableStatusMonitor to set 493 */ 494 public void setEnableStatusMonitor(boolean enableStatusMonitor) { 495 this.enableStatusMonitor = enableStatusMonitor; 496 } 497 498 /** 499 * This is called by the BrokerService right before it starts the transport. 500 */ 501 @Override 502 public void setBrokerService(BrokerService brokerService) { 503 this.brokerService = brokerService; 504 } 505 506 public Broker getBroker() { 507 return broker; 508 } 509 510 public BrokerService getBrokerService() { 511 return brokerService; 512 } 513 514 /** 515 * @return the updateClusterClients 516 */ 517 @Override 518 public boolean isUpdateClusterClients() { 519 return this.updateClusterClients; 520 } 521 522 /** 523 * @param updateClusterClients 524 * the updateClusterClients to set 525 */ 526 public void setUpdateClusterClients(boolean updateClusterClients) { 527 this.updateClusterClients = updateClusterClients; 528 } 529 530 /** 531 * @return the rebalanceClusterClients 532 */ 533 @Override 534 public boolean isRebalanceClusterClients() { 535 return this.rebalanceClusterClients; 536 } 537 538 /** 539 * @param rebalanceClusterClients 540 * the rebalanceClusterClients to set 541 */ 542 public void setRebalanceClusterClients(boolean rebalanceClusterClients) { 543 this.rebalanceClusterClients = rebalanceClusterClients; 544 } 545 546 /** 547 * @return the updateClusterClientsOnRemove 548 */ 549 @Override 550 public boolean isUpdateClusterClientsOnRemove() { 551 return this.updateClusterClientsOnRemove; 552 } 553 554 /** 555 * @param updateClusterClientsOnRemove the updateClusterClientsOnRemove to set 556 */ 557 public void setUpdateClusterClientsOnRemove(boolean updateClusterClientsOnRemove) { 558 this.updateClusterClientsOnRemove = updateClusterClientsOnRemove; 559 } 560 561 /** 562 * @return the updateClusterFilter 563 */ 564 public String getUpdateClusterFilter() { 565 return this.updateClusterFilter; 566 } 567 568 /** 569 * @param updateClusterFilter 570 * the updateClusterFilter to set 571 */ 572 public void setUpdateClusterFilter(String updateClusterFilter) { 573 this.updateClusterFilter = updateClusterFilter; 574 } 575 576 @Override 577 public int connectionCount() { 578 return connections.size(); 579 } 580 581 @Override 582 public boolean isAllowLinkStealing() { 583 return server.isAllowLinkStealing(); 584 } 585 586 public void setAllowLinkStealing (boolean allowLinkStealing) { 587 this.allowLinkStealing=allowLinkStealing; 588 } 589 590 public boolean isAuditNetworkProducers() { 591 return auditNetworkProducers; 592 } 593 594 /** 595 * Enable a producer audit on network connections, Traps the case of a missing send reply and resend. 596 * Note: does not work with conduit=false, networked composite destinations or networked virtual topics 597 * @param auditNetworkProducers 598 */ 599 public void setAuditNetworkProducers(boolean auditNetworkProducers) { 600 this.auditNetworkProducers = auditNetworkProducers; 601 } 602 603 public int getMaximumProducersAllowedPerConnection() { 604 return maximumProducersAllowedPerConnection; 605 } 606 607 public void setMaximumProducersAllowedPerConnection(int maximumProducersAllowedPerConnection) { 608 this.maximumProducersAllowedPerConnection = maximumProducersAllowedPerConnection; 609 } 610 611 public int getMaximumConsumersAllowedPerConnection() { 612 return maximumConsumersAllowedPerConnection; 613 } 614 615 public void setMaximumConsumersAllowedPerConnection(int maximumConsumersAllowedPerConnection) { 616 this.maximumConsumersAllowedPerConnection = maximumConsumersAllowedPerConnection; 617 } 618 619 /** 620 * Gets the currently configured policy for creating the published connection address of this 621 * TransportConnector. 622 * 623 * @return the publishedAddressPolicy 624 */ 625 public PublishedAddressPolicy getPublishedAddressPolicy() { 626 return publishedAddressPolicy; 627 } 628 629 /** 630 * Sets the configured policy for creating the published connection address of this 631 * TransportConnector. 632 * 633 * @return the publishedAddressPolicy 634 */ 635 public void setPublishedAddressPolicy(PublishedAddressPolicy publishedAddressPolicy) { 636 this.publishedAddressPolicy = publishedAddressPolicy; 637 } 638}