001/** 002gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.tcp; 018 019import org.apache.activemq.Service; 020import org.apache.activemq.TransportLoggerSupport; 021import org.apache.activemq.thread.TaskRunnerFactory; 022import org.apache.activemq.transport.Transport; 023import org.apache.activemq.transport.TransportThreadSupport; 024import org.apache.activemq.util.InetAddressUtil; 025import org.apache.activemq.util.IntrospectionSupport; 026import org.apache.activemq.util.ServiceStopper; 027import org.apache.activemq.wireformat.WireFormat; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import javax.net.SocketFactory; 032import java.io.DataInputStream; 033import java.io.DataOutputStream; 034import java.io.IOException; 035import java.io.InterruptedIOException; 036import java.net.*; 037import java.util.HashMap; 038import java.util.Map; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.atomic.AtomicReference; 042 043/** 044 * An implementation of the {@link Transport} interface using raw tcp/ip 045 * 046 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) 047 * 048 */ 049public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable { 050 private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class); 051 protected final URI remoteLocation; 052 protected final URI localLocation; 053 protected final WireFormat wireFormat; 054 055 protected int connectionTimeout = 30000; 056 protected int soTimeout; 057 protected int socketBufferSize = 64 * 1024; 058 protected int ioBufferSize = 8 * 1024; 059 protected boolean closeAsync=true; 060 protected Socket socket; 061 protected DataOutputStream dataOut; 062 protected DataInputStream dataIn; 063 protected TimeStampStream buffOut = null; 064 065 /** 066 * The Traffic Class to be set on the socket. 067 */ 068 protected int trafficClass = 0; 069 /** 070 * Keeps track of attempts to set the Traffic Class on the socket. 071 */ 072 private boolean trafficClassSet = false; 073 /** 074 * Prevents setting both the Differentiated Services and Type of Service 075 * transport options at the same time, since they share the same spot in 076 * the TCP/IP packet headers. 077 */ 078 protected boolean diffServChosen = false; 079 protected boolean typeOfServiceChosen = false; 080 /** 081 * trace=true -> the Transport stack where this TcpTransport 082 * object will be, will have a TransportLogger layer 083 * trace=false -> the Transport stack where this TcpTransport 084 * object will be, will NOT have a TransportLogger layer, and therefore 085 * will never be able to print logging messages. 086 * This parameter is most probably set in Connection or TransportConnector URIs. 087 */ 088 protected boolean trace = false; 089 /** 090 * Name of the LogWriter implementation to use. 091 * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory. 092 * This parameter is most probably set in Connection or TransportConnector URIs. 093 */ 094 protected String logWriterName = TransportLoggerSupport.defaultLogWriterName; 095 /** 096 * Specifies if the TransportLogger will be manageable by JMX or not. 097 * Also, as long as there is at least 1 TransportLogger which is manageable, 098 * a TransportLoggerControl MBean will me created. 099 */ 100 protected boolean dynamicManagement = false; 101 /** 102 * startLogging=true -> the TransportLogger object of the Transport stack 103 * will initially write messages to the log. 104 * startLogging=false -> the TransportLogger object of the Transport stack 105 * will initially NOT write messages to the log. 106 * This parameter only has an effect if trace == true. 107 * This parameter is most probably set in Connection or TransportConnector URIs. 108 */ 109 protected boolean startLogging = true; 110 /** 111 * Specifies the port that will be used by the JMX server to manage 112 * the TransportLoggers. 113 * This should only be set in an URI by a client (producer or consumer) since 114 * a broker will already create a JMX server. 115 * It is useful for people who test a broker and clients in the same machine 116 * and want to control both via JMX; a different port will be needed. 117 */ 118 protected int jmxPort = 1099; 119 protected boolean useLocalHost = false; 120 protected int minmumWireFormatVersion; 121 protected SocketFactory socketFactory; 122 protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>(); 123 protected volatile int receiveCounter; 124 125 private Map<String, Object> socketOptions; 126 private int soLinger = Integer.MIN_VALUE; 127 private Boolean keepAlive; 128 private Boolean tcpNoDelay; 129 private Thread runnerThread; 130 131 /** 132 * Connect to a remote Node - e.g. a Broker 133 * 134 * @param wireFormat 135 * @param socketFactory 136 * @param remoteLocation 137 * @param localLocation - e.g. local InetAddress and local port 138 * @throws IOException 139 * @throws UnknownHostException 140 */ 141 public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, 142 URI localLocation) throws UnknownHostException, IOException { 143 this.wireFormat = wireFormat; 144 this.socketFactory = socketFactory; 145 try { 146 this.socket = socketFactory.createSocket(); 147 } catch (SocketException e) { 148 this.socket = null; 149 } 150 this.remoteLocation = remoteLocation; 151 this.localLocation = localLocation; 152 setDaemon(false); 153 } 154 155 /** 156 * Initialize from a server Socket 157 * 158 * @param wireFormat 159 * @param socket 160 * @throws IOException 161 */ 162 public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException { 163 this.wireFormat = wireFormat; 164 this.socket = socket; 165 this.remoteLocation = null; 166 this.localLocation = null; 167 setDaemon(true); 168 } 169 170 /** 171 * A one way asynchronous send 172 */ 173 public void oneway(Object command) throws IOException { 174 checkStarted(); 175 wireFormat.marshal(command, dataOut); 176 dataOut.flush(); 177 } 178 179 /** 180 * @return pretty print of 'this' 181 */ 182 @Override 183 public String toString() { 184 return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort() + "@" + socket.getLocalPort() 185 : (localLocation != null ? localLocation : remoteLocation)) ; 186 } 187 188 /** 189 * reads packets from a Socket 190 */ 191 public void run() { 192 LOG.trace("TCP consumer thread for " + this + " starting"); 193 this.runnerThread=Thread.currentThread(); 194 try { 195 while (!isStopped()) { 196 doRun(); 197 } 198 } catch (IOException e) { 199 stoppedLatch.get().countDown(); 200 onException(e); 201 } catch (Throwable e){ 202 stoppedLatch.get().countDown(); 203 IOException ioe=new IOException("Unexpected error occurred: " + e); 204 ioe.initCause(e); 205 onException(ioe); 206 }finally { 207 stoppedLatch.get().countDown(); 208 } 209 } 210 211 protected void doRun() throws IOException { 212 try { 213 Object command = readCommand(); 214 doConsume(command); 215 } catch (SocketTimeoutException e) { 216 } catch (InterruptedIOException e) { 217 } 218 } 219 220 protected Object readCommand() throws IOException { 221 return wireFormat.unmarshal(dataIn); 222 } 223 224 // Properties 225 // ------------------------------------------------------------------------- 226 public String getDiffServ() { 227 // This is the value requested by the user by setting the Tcp Transport 228 // options. If the socket hasn't been created, then this value may not 229 // reflect the value returned by Socket.getTrafficClass(). 230 return Integer.toString(this.trafficClass); 231 } 232 233 public void setDiffServ(String diffServ) throws IllegalArgumentException { 234 this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ); 235 this.diffServChosen = true; 236 } 237 238 public int getTypeOfService() { 239 // This is the value requested by the user by setting the Tcp Transport 240 // options. If the socket hasn't been created, then this value may not 241 // reflect the value returned by Socket.getTrafficClass(). 242 return this.trafficClass; 243 } 244 245 public void setTypeOfService(int typeOfService) { 246 this.trafficClass = QualityOfServiceUtils.getToS(typeOfService); 247 this.typeOfServiceChosen = true; 248 } 249 250 public boolean isTrace() { 251 return trace; 252 } 253 254 public void setTrace(boolean trace) { 255 this.trace = trace; 256 } 257 258 public String getLogWriterName() { 259 return logWriterName; 260 } 261 262 public void setLogWriterName(String logFormat) { 263 this.logWriterName = logFormat; 264 } 265 266 public boolean isDynamicManagement() { 267 return dynamicManagement; 268 } 269 270 public void setDynamicManagement(boolean useJmx) { 271 this.dynamicManagement = useJmx; 272 } 273 274 public boolean isStartLogging() { 275 return startLogging; 276 } 277 278 public void setStartLogging(boolean startLogging) { 279 this.startLogging = startLogging; 280 } 281 282 public int getJmxPort() { 283 return jmxPort; 284 } 285 286 public void setJmxPort(int jmxPort) { 287 this.jmxPort = jmxPort; 288 } 289 290 public int getMinmumWireFormatVersion() { 291 return minmumWireFormatVersion; 292 } 293 294 public void setMinmumWireFormatVersion(int minmumWireFormatVersion) { 295 this.minmumWireFormatVersion = minmumWireFormatVersion; 296 } 297 298 public boolean isUseLocalHost() { 299 return useLocalHost; 300 } 301 302 /** 303 * Sets whether 'localhost' or the actual local host name should be used to 304 * make local connections. On some operating systems such as Macs its not 305 * possible to connect as the local host name so localhost is better. 306 */ 307 public void setUseLocalHost(boolean useLocalHost) { 308 this.useLocalHost = useLocalHost; 309 } 310 311 public int getSocketBufferSize() { 312 return socketBufferSize; 313 } 314 315 /** 316 * Sets the buffer size to use on the socket 317 */ 318 public void setSocketBufferSize(int socketBufferSize) { 319 this.socketBufferSize = socketBufferSize; 320 } 321 322 public int getSoTimeout() { 323 return soTimeout; 324 } 325 326 /** 327 * Sets the socket timeout 328 */ 329 public void setSoTimeout(int soTimeout) { 330 this.soTimeout = soTimeout; 331 } 332 333 public int getConnectionTimeout() { 334 return connectionTimeout; 335 } 336 337 /** 338 * Sets the timeout used to connect to the socket 339 */ 340 public void setConnectionTimeout(int connectionTimeout) { 341 this.connectionTimeout = connectionTimeout; 342 } 343 344 public Boolean getKeepAlive() { 345 return keepAlive; 346 } 347 348 /** 349 * Enable/disable TCP KEEP_ALIVE mode 350 */ 351 public void setKeepAlive(Boolean keepAlive) { 352 this.keepAlive = keepAlive; 353 } 354 355 /** 356 * Enable/disable soLinger 357 * @param soLinger enabled if > -1, disabled if == -1, system default otherwise 358 */ 359 public void setSoLinger(int soLinger) { 360 this.soLinger = soLinger; 361 } 362 363 public int getSoLinger() { 364 return soLinger; 365 } 366 367 public Boolean getTcpNoDelay() { 368 return tcpNoDelay; 369 } 370 371 /** 372 * Enable/disable the TCP_NODELAY option on the socket 373 */ 374 public void setTcpNoDelay(Boolean tcpNoDelay) { 375 this.tcpNoDelay = tcpNoDelay; 376 } 377 378 /** 379 * @return the ioBufferSize 380 */ 381 public int getIoBufferSize() { 382 return this.ioBufferSize; 383 } 384 385 /** 386 * @param ioBufferSize the ioBufferSize to set 387 */ 388 public void setIoBufferSize(int ioBufferSize) { 389 this.ioBufferSize = ioBufferSize; 390 } 391 392 /** 393 * @return the closeAsync 394 */ 395 public boolean isCloseAsync() { 396 return closeAsync; 397 } 398 399 /** 400 * @param closeAsync the closeAsync to set 401 */ 402 public void setCloseAsync(boolean closeAsync) { 403 this.closeAsync = closeAsync; 404 } 405 406 // Implementation methods 407 // ------------------------------------------------------------------------- 408 protected String resolveHostName(String host) throws UnknownHostException { 409 if (isUseLocalHost()) { 410 String localName = InetAddressUtil.getLocalHostName(); 411 if (localName != null && localName.equals(host)) { 412 return "localhost"; 413 } 414 } 415 return host; 416 } 417 418 /** 419 * Configures the socket for use 420 * 421 * @param sock the socket 422 * @throws SocketException, IllegalArgumentException if setting the options 423 * on the socket failed. 424 */ 425 protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException { 426 if (socketOptions != null) { 427 // copy the map as its used values is being removed when calling setProperties 428 // and we need to be able to set the options again in case socket is re-initailized 429 Map<String, Object> copy = new HashMap<String, Object>(socketOptions); 430 IntrospectionSupport.setProperties(socket, copy); 431 if (!copy.isEmpty()) { 432 throw new IllegalArgumentException("Invalid socket parameters: " + copy); 433 } 434 } 435 436 try { 437 sock.setReceiveBufferSize(socketBufferSize); 438 sock.setSendBufferSize(socketBufferSize); 439 } catch (SocketException se) { 440 LOG.warn("Cannot set socket buffer size = " + socketBufferSize); 441 LOG.debug("Cannot set socket buffer size. Reason: " + se.getMessage() + ". This exception is ignored.", se); 442 } 443 sock.setSoTimeout(soTimeout); 444 445 if (keepAlive != null) { 446 sock.setKeepAlive(keepAlive.booleanValue()); 447 } 448 449 if (soLinger > -1) { 450 sock.setSoLinger(true, soLinger); 451 } else if (soLinger == -1) { 452 sock.setSoLinger(false, 0); 453 } 454 if (tcpNoDelay != null) { 455 sock.setTcpNoDelay(tcpNoDelay.booleanValue()); 456 } 457 if (!this.trafficClassSet) { 458 this.trafficClassSet = setTrafficClass(sock); 459 } 460 } 461 462 @Override 463 protected void doStart() throws Exception { 464 connect(); 465 stoppedLatch.set(new CountDownLatch(1)); 466 super.doStart(); 467 } 468 469 protected void connect() throws Exception { 470 471 if (socket == null && socketFactory == null) { 472 throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set"); 473 } 474 475 InetSocketAddress localAddress = null; 476 InetSocketAddress remoteAddress = null; 477 478 if (localLocation != null) { 479 localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), 480 localLocation.getPort()); 481 } 482 483 if (remoteLocation != null) { 484 String host = resolveHostName(remoteLocation.getHost()); 485 remoteAddress = new InetSocketAddress(host, remoteLocation.getPort()); 486 } 487 // Set the traffic class before the socket is connected when possible so 488 // that the connection packets are given the correct traffic class. 489 this.trafficClassSet = setTrafficClass(socket); 490 491 if (socket != null) { 492 493 if (localAddress != null) { 494 socket.bind(localAddress); 495 } 496 497 // If it's a server accepted socket.. we don't need to connect it 498 // to a remote address. 499 if (remoteAddress != null) { 500 if (connectionTimeout >= 0) { 501 socket.connect(remoteAddress, connectionTimeout); 502 } else { 503 socket.connect(remoteAddress); 504 } 505 } 506 507 } else { 508 // For SSL sockets.. you can't create an unconnected socket :( 509 // This means the timout option are not supported either. 510 if (localAddress != null) { 511 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), 512 localAddress.getAddress(), localAddress.getPort()); 513 } else { 514 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort()); 515 } 516 } 517 518 initialiseSocket(socket); 519 initializeStreams(); 520 } 521 522 @Override 523 protected void doStop(ServiceStopper stopper) throws Exception { 524 if (LOG.isDebugEnabled()) { 525 LOG.debug("Stopping transport " + this); 526 } 527 528 // Closing the streams flush the sockets before closing.. if the socket 529 // is hung.. then this hangs the close. 530 // closeStreams(); 531 if (socket != null) { 532 if (closeAsync) { 533 //closing the socket can hang also 534 final CountDownLatch latch = new CountDownLatch(1); 535 536 // need a async task for this 537 final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory(); 538 taskRunnerFactory.execute(new Runnable() { 539 public void run() { 540 LOG.trace("Closing socket {}", socket); 541 try { 542 socket.close(); 543 LOG.debug("Closed socket {}", socket); 544 } catch (IOException e) { 545 if (LOG.isDebugEnabled()) { 546 LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); 547 } 548 } finally { 549 latch.countDown(); 550 } 551 } 552 }); 553 554 try { 555 latch.await(1,TimeUnit.SECONDS); 556 } catch (InterruptedException e) { 557 Thread.currentThread().interrupt(); 558 } finally { 559 taskRunnerFactory.shutdownNow(); 560 } 561 562 } else { 563 // close synchronously 564 LOG.trace("Closing socket {}", socket); 565 try { 566 socket.close(); 567 LOG.debug("Closed socket {}", socket); 568 } catch (IOException e) { 569 if (LOG.isDebugEnabled()) { 570 LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e); 571 } 572 } 573 } 574 } 575 } 576 577 /** 578 * Override so that stop() blocks until the run thread is no longer running. 579 */ 580 @Override 581 public void stop() throws Exception { 582 super.stop(); 583 CountDownLatch countDownLatch = stoppedLatch.get(); 584 if (countDownLatch != null && Thread.currentThread() != this.runnerThread) { 585 countDownLatch.await(1,TimeUnit.SECONDS); 586 } 587 } 588 589 protected void initializeStreams() throws Exception { 590 TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) { 591 @Override 592 public int read() throws IOException { 593 receiveCounter++; 594 return super.read(); 595 } 596 @Override 597 public int read(byte[] b, int off, int len) throws IOException { 598 receiveCounter++; 599 return super.read(b, off, len); 600 } 601 @Override 602 public long skip(long n) throws IOException { 603 receiveCounter++; 604 return super.skip(n); 605 } 606 @Override 607 protected void fill() throws IOException { 608 receiveCounter++; 609 super.fill(); 610 } 611 }; 612 this.dataIn = new DataInputStream(buffIn); 613 TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize); 614 this.dataOut = new DataOutputStream(outputStream); 615 this.buffOut = outputStream; 616 } 617 618 protected void closeStreams() throws IOException { 619 if (dataOut != null) { 620 dataOut.close(); 621 } 622 if (dataIn != null) { 623 dataIn.close(); 624 } 625 } 626 627 public void setSocketOptions(Map<String, Object> socketOptions) { 628 this.socketOptions = new HashMap<String, Object>(socketOptions); 629 } 630 631 public String getRemoteAddress() { 632 if (socket != null) { 633 SocketAddress address = socket.getRemoteSocketAddress(); 634 if (address instanceof InetSocketAddress) { 635 return "tcp://" + ((InetSocketAddress)address).getAddress().getHostAddress() + ":" + ((InetSocketAddress)address).getPort(); 636 } else { 637 return "" + socket.getRemoteSocketAddress(); 638 } 639 } 640 return null; 641 } 642 643 @Override 644 public <T> T narrow(Class<T> target) { 645 if (target == Socket.class) { 646 return target.cast(socket); 647 } else if ( target == TimeStampStream.class) { 648 return target.cast(buffOut); 649 } 650 return super.narrow(target); 651 } 652 653 public int getReceiveCounter() { 654 return receiveCounter; 655 } 656 657 /** 658 * @param sock The socket on which to set the Traffic Class. 659 * @return Whether or not the Traffic Class was set on the given socket. 660 * @throws SocketException if the system does not support setting the 661 * Traffic Class. 662 * @throws IllegalArgumentException if both the Differentiated Services and 663 * Type of Services transport options have been set on the same 664 * connection. 665 */ 666 private boolean setTrafficClass(Socket sock) throws SocketException, 667 IllegalArgumentException { 668 if (sock == null 669 || (!this.diffServChosen && !this.typeOfServiceChosen)) { 670 return false; 671 } 672 if (this.diffServChosen && this.typeOfServiceChosen) { 673 throw new IllegalArgumentException("Cannot set both the " 674 + " Differentiated Services and Type of Services transport " 675 + " options on the same connection."); 676 } 677 678 sock.setTrafficClass(this.trafficClass); 679 680 int resultTrafficClass = sock.getTrafficClass(); 681 if (this.trafficClass != resultTrafficClass) { 682 // In the case where the user has specified the ECN bits (e.g. in 683 // Type of Service) but the system won't allow the ECN bits to be 684 // set or in the case where setting the traffic class failed for 685 // other reasons, emit a warning. 686 if ((this.trafficClass >> 2) == (resultTrafficClass >> 2) 687 && (this.trafficClass & 3) != (resultTrafficClass & 3)) { 688 LOG.warn("Attempted to set the Traffic Class to " 689 + this.trafficClass + " but the result Traffic Class was " 690 + resultTrafficClass + ". Please check that your system " 691 + "allows you to set the ECN bits (the first two bits)."); 692 } else { 693 LOG.warn("Attempted to set the Traffic Class to " 694 + this.trafficClass + " but the result Traffic Class was " 695 + resultTrafficClass + ". Please check that your system " 696 + "supports java.net.setTrafficClass."); 697 } 698 return false; 699 } 700 // Reset the guards that prevent both the Differentiated Services 701 // option and the Type of Service option from being set on the same 702 // connection. 703 this.diffServChosen = false; 704 this.typeOfServiceChosen = false; 705 return true; 706 } 707 708 public WireFormat getWireFormat() { 709 return wireFormat; 710 } 711}