001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import java.io.IOException; 020import java.net.InetAddress; 021import java.net.InetSocketAddress; 022import java.net.ServerSocket; 023import java.net.Socket; 024import java.net.SocketException; 025import java.net.SocketTimeoutException; 026import java.net.URI; 027import java.net.URISyntaxException; 028import java.net.UnknownHostException; 029import java.nio.channels.SelectionKey; 030import java.nio.channels.Selector; 031import java.nio.channels.ServerSocketChannel; 032import java.nio.channels.SocketChannel; 033import java.util.HashMap; 034import java.util.Iterator; 035import java.util.Set; 036import java.util.concurrent.BlockingQueue; 037import java.util.concurrent.LinkedBlockingQueue; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicInteger; 040 041import javax.net.ServerSocketFactory; 042import javax.net.ssl.SSLServerSocket; 043 044import org.apache.activemq.Service; 045import org.apache.activemq.ThreadPriorities; 046import org.apache.activemq.TransportLoggerSupport; 047import org.apache.activemq.command.BrokerInfo; 048import org.apache.activemq.openwire.OpenWireFormatFactory; 049import org.apache.activemq.transport.Transport; 050import org.apache.activemq.transport.TransportServer; 051import org.apache.activemq.transport.TransportServerThreadSupport; 052import org.apache.activemq.transport.nio.SelectorManager; 053import org.apache.activemq.transport.nio.SelectorSelection; 054import org.apache.activemq.util.IOExceptionSupport; 055import org.apache.activemq.util.InetAddressUtil; 056import org.apache.activemq.util.IntrospectionSupport; 057import org.apache.activemq.util.ServiceListener; 058import org.apache.activemq.util.ServiceStopper; 059import org.apache.activemq.util.ServiceSupport; 060import org.apache.activemq.wireformat.WireFormat; 061import org.apache.activemq.wireformat.WireFormatFactory; 062import org.slf4j.Logger; 063import org.slf4j.LoggerFactory; 064 065/** 066 * A TCP based implementation of {@link TransportServer} 067 */ 068public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener { 069 070 private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class); 071 protected ServerSocket serverSocket; 072 protected Selector selector; 073 protected int backlog = 5000; 074 protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory(); 075 protected final TcpTransportFactory transportFactory; 076 protected long maxInactivityDuration = 30000; 077 protected long maxInactivityDurationInitalDelay = 10000; 078 protected int minmumWireFormatVersion; 079 protected boolean useQueueForAccept = true; 080 protected boolean allowLinkStealing; 081 082 /** 083 * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer 084 * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer, 085 * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or 086 * TransportConnector URIs. 087 */ 088 protected boolean trace = false; 089 090 protected int soTimeout = 0; 091 protected int socketBufferSize = 64 * 1024; 092 protected int connectionTimeout = 30000; 093 094 /** 095 * Name of the LogWriter implementation to use. Names are mapped to classes in the 096 * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably 097 * set in Connection or TransportConnector URIs. 098 */ 099 protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; 100 101 /** 102 * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1 103 * TransportLogger which is manageable, a TransportLoggerControl MBean will me created. 104 */ 105 protected boolean dynamicManagement = false; 106 107 /** 108 * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log. 109 * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the 110 * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or 111 * TransportConnector URIs. 112 */ 113 protected boolean startLogging = true; 114 protected int jmxPort = TransportLoggerSupport.defaultJmxPort; 115 protected final ServerSocketFactory serverSocketFactory; 116 protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); 117 protected Thread socketHandlerThread; 118 119 /** 120 * The maximum number of sockets allowed for this server 121 */ 122 protected int maximumConnections = Integer.MAX_VALUE; 123 protected AtomicInteger currentTransportCount = new AtomicInteger(); 124 125 public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, 126 URISyntaxException { 127 super(location); 128 this.transportFactory = transportFactory; 129 this.serverSocketFactory = serverSocketFactory; 130 } 131 132 public void bind() throws IOException { 133 URI bind = getBindLocation(); 134 135 String host = bind.getHost(); 136 host = (host == null || host.length() == 0) ? "localhost" : host; 137 InetAddress addr = InetAddress.getByName(host); 138 139 try { 140 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr); 141 configureServerSocket(this.serverSocket); 142 } catch (IOException e) { 143 throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e); 144 } 145 try { 146 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), 147 bind.getQuery(), bind.getFragment())); 148 } catch (URISyntaxException e) { 149 150 // it could be that the host name contains invalid characters such 151 // as _ on unix platforms so lets try use the IP address instead 152 try { 153 setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), 154 bind.getQuery(), bind.getFragment())); 155 } catch (URISyntaxException e2) { 156 throw IOExceptionSupport.create(e2); 157 } 158 } 159 } 160 161 private void configureServerSocket(ServerSocket socket) throws SocketException { 162 socket.setSoTimeout(2000); 163 if (transportOptions != null) { 164 165 // If the enabledCipherSuites option is invalid we don't want to ignore it as the call 166 // to SSLServerSocket to configure it has a side effect on the socket rendering it 167 // useless as all suites are enabled many of which are considered as insecure. We 168 // instead trap that option here and throw an exception. We should really consider 169 // all invalid options as breaking and not start the transport but the current design 170 // doesn't really allow for this. 171 // 172 // see: https://issues.apache.org/jira/browse/AMQ-4582 173 // 174 if (socket instanceof SSLServerSocket) { 175 if (transportOptions.containsKey("enabledCipherSuites")) { 176 Object cipherSuites = transportOptions.remove("enabledCipherSuites"); 177 178 if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) { 179 throw new SocketException(String.format( 180 "Invalid transport options {enabledCipherSuites=%s}", cipherSuites)); 181 } 182 } 183 } 184 185 IntrospectionSupport.setProperties(socket, transportOptions); 186 } 187 } 188 189 /** 190 * @return Returns the wireFormatFactory. 191 */ 192 public WireFormatFactory getWireFormatFactory() { 193 return wireFormatFactory; 194 } 195 196 /** 197 * @param wireFormatFactory 198 * The wireFormatFactory to set. 199 */ 200 public void setWireFormatFactory(WireFormatFactory wireFormatFactory) { 201 this.wireFormatFactory = wireFormatFactory; 202 } 203 204 /** 205 * Associates a broker info with the transport server so that the transport can do discovery advertisements of the 206 * broker. 207 * 208 * @param brokerInfo 209 */ 210 @Override 211 public void setBrokerInfo(BrokerInfo brokerInfo) { 212 } 213 214 public long getMaxInactivityDuration() { 215 return maxInactivityDuration; 216 } 217 218 public void setMaxInactivityDuration(long maxInactivityDuration) { 219 this.maxInactivityDuration = maxInactivityDuration; 220 } 221 222 public long getMaxInactivityDurationInitalDelay() { 223 return this.maxInactivityDurationInitalDelay; 224 } 225 226 public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) { 227 this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay; 228 } 229 230 public int getMinmumWireFormatVersion() { 231 return minmumWireFormatVersion; 232 } 233 234 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 235 this.minmumWireFormatVersion = minmumWireFormatVersion; 236 } 237 238 public boolean isTrace() { 239 return trace; 240 } 241 242 public void setTrace(boolean trace) { 243 this.trace = trace; 244 } 245 246 public String getLogWriterName() { 247 return logWriterName; 248 } 249 250 public void setLogWriterName(String logFormat) { 251 this.logWriterName = logFormat; 252 } 253 254 public boolean isDynamicManagement() { 255 return dynamicManagement; 256 } 257 258 public void setDynamicManagement(boolean useJmx) { 259 this.dynamicManagement = useJmx; 260 } 261 262 public void setJmxPort(int jmxPort) { 263 this.jmxPort = jmxPort; 264 } 265 266 public int getJmxPort() { 267 return jmxPort; 268 } 269 270 public boolean isStartLogging() { 271 return startLogging; 272 } 273 274 public void setStartLogging(boolean startLogging) { 275 this.startLogging = startLogging; 276 } 277 278 /** 279 * @return the backlog 280 */ 281 public int getBacklog() { 282 return backlog; 283 } 284 285 /** 286 * @param backlog 287 * the backlog to set 288 */ 289 public void setBacklog(int backlog) { 290 this.backlog = backlog; 291 } 292 293 /** 294 * @return the useQueueForAccept 295 */ 296 public boolean isUseQueueForAccept() { 297 return useQueueForAccept; 298 } 299 300 /** 301 * @param useQueueForAccept 302 * the useQueueForAccept to set 303 */ 304 public void setUseQueueForAccept(boolean useQueueForAccept) { 305 this.useQueueForAccept = useQueueForAccept; 306 } 307 308 /** 309 * pull Sockets from the ServerSocket 310 */ 311 @Override 312 public void run() { 313 final ServerSocketChannel chan = serverSocket.getChannel(); 314 if (chan != null) { 315 try { 316 chan.configureBlocking(false); 317 selector = Selector.open(); 318 chan.register(selector, SelectionKey.OP_ACCEPT); 319 while (!isStopped()) { 320 int count = selector.select(10); 321 322 if (count == 0) { 323 continue; 324 } 325 326 Set<SelectionKey> keys = selector.selectedKeys(); 327 328 for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) { 329 final SelectionKey key = i.next(); 330 if (key.isAcceptable()) { 331 try { 332 SocketChannel sc = chan.accept(); 333 if (sc != null) { 334 if (isStopped() || getAcceptListener() == null) { 335 sc.close(); 336 } else { 337 if (useQueueForAccept) { 338 socketQueue.put(sc.socket()); 339 } else { 340 handleSocket(sc.socket()); 341 } 342 } 343 } 344 345 } catch (SocketTimeoutException ste) { 346 // expect this to happen 347 } catch (Exception e) { 348 e.printStackTrace(); 349 if (!isStopping()) { 350 onAcceptError(e); 351 } else if (!isStopped()) { 352 LOG.warn("run()", e); 353 onAcceptError(e); 354 } 355 } 356 } 357 i.remove(); 358 } 359 360 } 361 } catch (IOException ex) { 362 if (selector != null) { 363 try { 364 selector.close(); 365 } catch (IOException ioe) {} 366 selector = null; 367 } 368 } 369 } else { 370 while (!isStopped()) { 371 Socket socket = null; 372 try { 373 socket = serverSocket.accept(); 374 if (socket != null) { 375 if (isStopped() || getAcceptListener() == null) { 376 socket.close(); 377 } else { 378 if (useQueueForAccept) { 379 socketQueue.put(socket); 380 } else { 381 handleSocket(socket); 382 } 383 } 384 } 385 } catch (SocketTimeoutException ste) { 386 // expect this to happen 387 } catch (Exception e) { 388 if (!isStopping()) { 389 onAcceptError(e); 390 } else if (!isStopped()) { 391 LOG.warn("run()", e); 392 onAcceptError(e); 393 } 394 } 395 } 396 } 397 } 398 399 /** 400 * Allow derived classes to override the Transport implementation that this transport server creates. 401 * 402 * @param socket 403 * @param format 404 * @return 405 * @throws IOException 406 */ 407 protected Transport createTransport(Socket socket, WireFormat format) throws IOException { 408 return new TcpTransport(format, socket); 409 } 410 411 /** 412 * @return pretty print of this 413 */ 414 @Override 415 public String toString() { 416 return "" + getBindLocation(); 417 } 418 419 /** 420 * @param socket 421 * @param bindAddress 422 * @return real hostName 423 * @throws UnknownHostException 424 */ 425 protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException { 426 String result = null; 427 if (socket.isBound()) { 428 if (socket.getInetAddress().isAnyLocalAddress()) { 429 // make it more human readable and useful, an alternative to 0.0.0.0 430 result = InetAddressUtil.getLocalHostName(); 431 } else { 432 result = socket.getInetAddress().getCanonicalHostName(); 433 } 434 } else { 435 result = bindAddress.getCanonicalHostName(); 436 } 437 return result; 438 } 439 440 @Override 441 protected void doStart() throws Exception { 442 if (useQueueForAccept) { 443 Runnable run = new Runnable() { 444 @Override 445 public void run() { 446 try { 447 while (!isStopped() && !isStopping()) { 448 Socket sock = socketQueue.poll(1, TimeUnit.SECONDS); 449 if (sock != null) { 450 try { 451 handleSocket(sock); 452 } catch (Throwable thrown) { 453 if (!isStopping()) { 454 onAcceptError(new Exception(thrown)); 455 } else if (!isStopped()) { 456 LOG.warn("Unexpected error thrown during accept handling: ", thrown); 457 onAcceptError(new Exception(thrown)); 458 } 459 } 460 } 461 } 462 463 } catch (InterruptedException e) { 464 LOG.info("socketQueue interuppted - stopping"); 465 if (!isStopping()) { 466 onAcceptError(e); 467 } 468 } 469 } 470 }; 471 socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize()); 472 socketHandlerThread.setDaemon(true); 473 socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1); 474 socketHandlerThread.start(); 475 } 476 super.doStart(); 477 } 478 479 @Override 480 protected void doStop(ServiceStopper stopper) throws Exception { 481 if (selector != null) { 482 selector.close(); 483 selector = null; 484 } 485 if (serverSocket != null) { 486 serverSocket.close(); 487 serverSocket = null; 488 } 489 super.doStop(stopper); 490 } 491 492 @Override 493 public InetSocketAddress getSocketAddress() { 494 return (InetSocketAddress) serverSocket.getLocalSocketAddress(); 495 } 496 497 protected final void handleSocket(Socket socket) { 498 boolean closeSocket = true; 499 try { 500 if (this.currentTransportCount.get() >= this.maximumConnections) { 501 throw new ExceededMaximumConnectionsException( 502 "Exceeded the maximum number of allowed client connections. See the '" + 503 "maximumConnections' property on the TCP transport configuration URI " + 504 "in the ActiveMQ configuration file (e.g., activemq.xml)"); 505 } else { 506 HashMap<String, Object> options = new HashMap<String, Object>(); 507 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration)); 508 options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay)); 509 options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion)); 510 options.put("trace", Boolean.valueOf(trace)); 511 options.put("soTimeout", Integer.valueOf(soTimeout)); 512 options.put("socketBufferSize", Integer.valueOf(socketBufferSize)); 513 options.put("connectionTimeout", Integer.valueOf(connectionTimeout)); 514 options.put("logWriterName", logWriterName); 515 options.put("dynamicManagement", Boolean.valueOf(dynamicManagement)); 516 options.put("startLogging", Boolean.valueOf(startLogging)); 517 options.put("jmxPort", Integer.valueOf(jmxPort)); 518 options.putAll(transportOptions); 519 520 WireFormat format = wireFormatFactory.createWireFormat(); 521 Transport transport = createTransport(socket, format); 522 closeSocket = false; 523 524 if (transport instanceof ServiceSupport) { 525 ((ServiceSupport) transport).addServiceListener(this); 526 } 527 528 Transport configuredTransport = transportFactory.serverConfigure(transport, format, options); 529 530 getAcceptListener().onAccept(configuredTransport); 531 currentTransportCount.incrementAndGet(); 532 } 533 } catch (SocketTimeoutException ste) { 534 // expect this to happen 535 } catch (Exception e) { 536 if (closeSocket) { 537 try { 538 socket.close(); 539 } catch (Exception ignore) { 540 } 541 } 542 543 if (!isStopping()) { 544 onAcceptError(e); 545 } else if (!isStopped()) { 546 LOG.warn("run()", e); 547 onAcceptError(e); 548 } 549 } 550 } 551 552 public int getSoTimeout() { 553 return soTimeout; 554 } 555 556 public void setSoTimeout(int soTimeout) { 557 this.soTimeout = soTimeout; 558 } 559 560 public int getSocketBufferSize() { 561 return socketBufferSize; 562 } 563 564 public void setSocketBufferSize(int socketBufferSize) { 565 this.socketBufferSize = socketBufferSize; 566 } 567 568 public int getConnectionTimeout() { 569 return connectionTimeout; 570 } 571 572 public void setConnectionTimeout(int connectionTimeout) { 573 this.connectionTimeout = connectionTimeout; 574 } 575 576 /** 577 * @return the maximumConnections 578 */ 579 public int getMaximumConnections() { 580 return maximumConnections; 581 } 582 583 /** 584 * @param maximumConnections 585 * the maximumConnections to set 586 */ 587 public void setMaximumConnections(int maximumConnections) { 588 this.maximumConnections = maximumConnections; 589 } 590 591 @Override 592 public void started(Service service) { 593 } 594 595 @Override 596 public void stopped(Service service) { 597 this.currentTransportCount.decrementAndGet(); 598 } 599 600 @Override 601 public boolean isSslServer() { 602 return false; 603 } 604 605 @Override 606 public boolean isAllowLinkStealing() { 607 return allowLinkStealing; 608 } 609 610 @Override 611 public void setAllowLinkStealing(boolean allowLinkStealing) { 612 this.allowLinkStealing = allowLinkStealing; 613 } 614}