001/**
002gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import org.apache.activemq.Service;
020import org.apache.activemq.TransportLoggerSupport;
021import org.apache.activemq.thread.TaskRunnerFactory;
022import org.apache.activemq.transport.Transport;
023import org.apache.activemq.transport.TransportThreadSupport;
024import org.apache.activemq.util.InetAddressUtil;
025import org.apache.activemq.util.IntrospectionSupport;
026import org.apache.activemq.util.ServiceStopper;
027import org.apache.activemq.wireformat.WireFormat;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031import javax.net.SocketFactory;
032import java.io.DataInputStream;
033import java.io.DataOutputStream;
034import java.io.IOException;
035import java.io.InterruptedIOException;
036import java.net.*;
037import java.util.HashMap;
038import java.util.Map;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.TimeUnit;
041import java.util.concurrent.atomic.AtomicReference;
042
043/**
044 * An implementation of the {@link Transport} interface using raw tcp/ip
045 *
046 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
047 *
048 */
049public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
050    private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
051    protected final URI remoteLocation;
052    protected final URI localLocation;
053    protected final WireFormat wireFormat;
054
055    protected int connectionTimeout = 30000;
056    protected int soTimeout;
057    protected int socketBufferSize = 64 * 1024;
058    protected int ioBufferSize = 8 * 1024;
059    protected boolean closeAsync=true;
060    protected Socket socket;
061    protected DataOutputStream dataOut;
062    protected DataInputStream dataIn;
063    protected TimeStampStream buffOut = null;
064
065    /**
066     * The Traffic Class to be set on the socket.
067     */
068    protected int trafficClass = 0;
069    /**
070     * Keeps track of attempts to set the Traffic Class on the socket.
071     */
072    private boolean trafficClassSet = false;
073    /**
074     * Prevents setting both the Differentiated Services and Type of Service
075     * transport options at the same time, since they share the same spot in
076     * the TCP/IP packet headers.
077     */
078    protected boolean diffServChosen = false;
079    protected boolean typeOfServiceChosen = false;
080    /**
081     * trace=true -> the Transport stack where this TcpTransport
082     * object will be, will have a TransportLogger layer
083     * trace=false -> the Transport stack where this TcpTransport
084     * object will be, will NOT have a TransportLogger layer, and therefore
085     * will never be able to print logging messages.
086     * This parameter is most probably set in Connection or TransportConnector URIs.
087     */
088    protected boolean trace = false;
089    /**
090     * Name of the LogWriter implementation to use.
091     * Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
092     * This parameter is most probably set in Connection or TransportConnector URIs.
093     */
094    protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
095    /**
096     * Specifies if the TransportLogger will be manageable by JMX or not.
097     * Also, as long as there is at least 1 TransportLogger which is manageable,
098     * a TransportLoggerControl MBean will me created.
099     */
100    protected boolean dynamicManagement = false;
101    /**
102     * startLogging=true -> the TransportLogger object of the Transport stack
103     * will initially write messages to the log.
104     * startLogging=false -> the TransportLogger object of the Transport stack
105     * will initially NOT write messages to the log.
106     * This parameter only has an effect if trace == true.
107     * This parameter is most probably set in Connection or TransportConnector URIs.
108     */
109    protected boolean startLogging = true;
110    /**
111     * Specifies the port that will be used by the JMX server to manage
112     * the TransportLoggers.
113     * This should only be set in an URI by a client (producer or consumer) since
114     * a broker will already create a JMX server.
115     * It is useful for people who test a broker and clients in the same machine
116     * and want to control both via JMX; a different port will be needed.
117     */
118    protected int jmxPort = 1099;
119    protected boolean useLocalHost = false;
120    protected int minmumWireFormatVersion;
121    protected SocketFactory socketFactory;
122    protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
123    protected volatile int receiveCounter;
124
125    private Map<String, Object> socketOptions;
126    private int soLinger = Integer.MIN_VALUE;
127    private Boolean keepAlive;
128    private Boolean tcpNoDelay;
129    private Thread runnerThread;
130
131    /**
132     * Connect to a remote Node - e.g. a Broker
133     *
134     * @param wireFormat
135     * @param socketFactory
136     * @param remoteLocation
137     * @param localLocation - e.g. local InetAddress and local port
138     * @throws IOException
139     * @throws UnknownHostException
140     */
141    public TcpTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation,
142                        URI localLocation) throws UnknownHostException, IOException {
143        this.wireFormat = wireFormat;
144        this.socketFactory = socketFactory;
145        try {
146            this.socket = socketFactory.createSocket();
147        } catch (SocketException e) {
148            this.socket = null;
149        }
150        this.remoteLocation = remoteLocation;
151        this.localLocation = localLocation;
152        setDaemon(false);
153    }
154
155    /**
156     * Initialize from a server Socket
157     *
158     * @param wireFormat
159     * @param socket
160     * @throws IOException
161     */
162    public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
163        this.wireFormat = wireFormat;
164        this.socket = socket;
165        this.remoteLocation = null;
166        this.localLocation = null;
167        setDaemon(true);
168    }
169
170    /**
171     * A one way asynchronous send
172     */
173    public void oneway(Object command) throws IOException {
174        checkStarted();
175        wireFormat.marshal(command, dataOut);
176        dataOut.flush();
177    }
178
179    /**
180     * @return pretty print of 'this'
181     */
182    @Override
183    public String toString() {
184        return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort() + "@" + socket.getLocalPort()
185                : (localLocation != null ? localLocation : remoteLocation)) ;
186    }
187
188    /**
189     * reads packets from a Socket
190     */
191    public void run() {
192        LOG.trace("TCP consumer thread for " + this + " starting");
193        this.runnerThread=Thread.currentThread();
194        try {
195            while (!isStopped()) {
196                doRun();
197            }
198        } catch (IOException e) {
199            stoppedLatch.get().countDown();
200            onException(e);
201        } catch (Throwable e){
202            stoppedLatch.get().countDown();
203            IOException ioe=new IOException("Unexpected error occurred: " + e);
204            ioe.initCause(e);
205            onException(ioe);
206        }finally {
207            stoppedLatch.get().countDown();
208        }
209    }
210
211    protected void doRun() throws IOException {
212        try {
213            Object command = readCommand();
214            doConsume(command);
215        } catch (SocketTimeoutException e) {
216        } catch (InterruptedIOException e) {
217        }
218    }
219
220    protected Object readCommand() throws IOException {
221        return wireFormat.unmarshal(dataIn);
222    }
223
224    // Properties
225    // -------------------------------------------------------------------------
226    public String getDiffServ() {
227        // This is the value requested by the user by setting the Tcp Transport
228        // options. If the socket hasn't been created, then this value may not
229        // reflect the value returned by Socket.getTrafficClass().
230        return Integer.toString(this.trafficClass);
231    }
232
233    public void setDiffServ(String diffServ) throws IllegalArgumentException {
234        this.trafficClass = QualityOfServiceUtils.getDSCP(diffServ);
235        this.diffServChosen = true;
236    }
237
238    public int getTypeOfService() {
239        // This is the value requested by the user by setting the Tcp Transport
240        // options. If the socket hasn't been created, then this value may not
241        // reflect the value returned by Socket.getTrafficClass().
242        return this.trafficClass;
243    }
244
245    public void setTypeOfService(int typeOfService) {
246        this.trafficClass = QualityOfServiceUtils.getToS(typeOfService);
247        this.typeOfServiceChosen = true;
248    }
249
250    public boolean isTrace() {
251        return trace;
252    }
253
254    public void setTrace(boolean trace) {
255        this.trace = trace;
256    }
257
258    public String getLogWriterName() {
259        return logWriterName;
260    }
261
262    public void setLogWriterName(String logFormat) {
263        this.logWriterName = logFormat;
264    }
265
266    public boolean isDynamicManagement() {
267        return dynamicManagement;
268    }
269
270    public void setDynamicManagement(boolean useJmx) {
271        this.dynamicManagement = useJmx;
272    }
273
274    public boolean isStartLogging() {
275        return startLogging;
276    }
277
278    public void setStartLogging(boolean startLogging) {
279        this.startLogging = startLogging;
280    }
281
282    public int getJmxPort() {
283        return jmxPort;
284    }
285
286    public void setJmxPort(int jmxPort) {
287        this.jmxPort = jmxPort;
288    }
289
290    public int getMinmumWireFormatVersion() {
291        return minmumWireFormatVersion;
292    }
293
294    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
295        this.minmumWireFormatVersion = minmumWireFormatVersion;
296    }
297
298    public boolean isUseLocalHost() {
299        return useLocalHost;
300    }
301
302    /**
303     * Sets whether 'localhost' or the actual local host name should be used to
304     * make local connections. On some operating systems such as Macs its not
305     * possible to connect as the local host name so localhost is better.
306     */
307    public void setUseLocalHost(boolean useLocalHost) {
308        this.useLocalHost = useLocalHost;
309    }
310
311    public int getSocketBufferSize() {
312        return socketBufferSize;
313    }
314
315    /**
316     * Sets the buffer size to use on the socket
317     */
318    public void setSocketBufferSize(int socketBufferSize) {
319        this.socketBufferSize = socketBufferSize;
320    }
321
322    public int getSoTimeout() {
323        return soTimeout;
324    }
325
326    /**
327     * Sets the socket timeout
328     */
329    public void setSoTimeout(int soTimeout) {
330        this.soTimeout = soTimeout;
331    }
332
333    public int getConnectionTimeout() {
334        return connectionTimeout;
335    }
336
337    /**
338     * Sets the timeout used to connect to the socket
339     */
340    public void setConnectionTimeout(int connectionTimeout) {
341        this.connectionTimeout = connectionTimeout;
342    }
343
344    public Boolean getKeepAlive() {
345        return keepAlive;
346    }
347
348    /**
349     * Enable/disable TCP KEEP_ALIVE mode
350     */
351    public void setKeepAlive(Boolean keepAlive) {
352        this.keepAlive = keepAlive;
353    }
354
355    /**
356     * Enable/disable soLinger
357     * @param soLinger enabled if > -1, disabled if == -1, system default otherwise
358     */
359    public void setSoLinger(int soLinger) {
360        this.soLinger = soLinger;
361    }
362
363    public int getSoLinger() {
364        return soLinger;
365    }
366
367    public Boolean getTcpNoDelay() {
368        return tcpNoDelay;
369    }
370
371    /**
372     * Enable/disable the TCP_NODELAY option on the socket
373     */
374    public void setTcpNoDelay(Boolean tcpNoDelay) {
375        this.tcpNoDelay = tcpNoDelay;
376    }
377
378    /**
379     * @return the ioBufferSize
380     */
381    public int getIoBufferSize() {
382        return this.ioBufferSize;
383    }
384
385    /**
386     * @param ioBufferSize the ioBufferSize to set
387     */
388    public void setIoBufferSize(int ioBufferSize) {
389        this.ioBufferSize = ioBufferSize;
390    }
391
392    /**
393     * @return the closeAsync
394     */
395    public boolean isCloseAsync() {
396        return closeAsync;
397    }
398
399    /**
400     * @param closeAsync the closeAsync to set
401     */
402    public void setCloseAsync(boolean closeAsync) {
403        this.closeAsync = closeAsync;
404    }
405
406    // Implementation methods
407    // -------------------------------------------------------------------------
408    protected String resolveHostName(String host) throws UnknownHostException {
409        if (isUseLocalHost()) {
410            String localName = InetAddressUtil.getLocalHostName();
411            if (localName != null && localName.equals(host)) {
412                return "localhost";
413            }
414        }
415        return host;
416    }
417
418    /**
419     * Configures the socket for use
420     *
421     * @param sock  the socket
422     * @throws SocketException, IllegalArgumentException if setting the options
423     *         on the socket failed.
424     */
425    protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException {
426        if (socketOptions != null) {
427            // copy the map as its used values is being removed when calling setProperties
428            // and we need to be able to set the options again in case socket is re-initailized
429            Map<String, Object> copy = new HashMap<String, Object>(socketOptions);
430            IntrospectionSupport.setProperties(socket, copy);
431            if (!copy.isEmpty()) {
432                throw new IllegalArgumentException("Invalid socket parameters: " + copy);
433            }
434        }
435
436        try {
437            sock.setReceiveBufferSize(socketBufferSize);
438            sock.setSendBufferSize(socketBufferSize);
439        } catch (SocketException se) {
440            LOG.warn("Cannot set socket buffer size = " + socketBufferSize);
441            LOG.debug("Cannot set socket buffer size. Reason: " + se.getMessage() + ". This exception is ignored.", se);
442        }
443        sock.setSoTimeout(soTimeout);
444
445        if (keepAlive != null) {
446            sock.setKeepAlive(keepAlive.booleanValue());
447        }
448
449        if (soLinger > -1) {
450            sock.setSoLinger(true, soLinger);
451        } else if (soLinger == -1) {
452            sock.setSoLinger(false, 0);
453        }
454        if (tcpNoDelay != null) {
455            sock.setTcpNoDelay(tcpNoDelay.booleanValue());
456        }
457        if (!this.trafficClassSet) {
458            this.trafficClassSet = setTrafficClass(sock);
459        }
460    }
461
462    @Override
463    protected void doStart() throws Exception {
464        connect();
465        stoppedLatch.set(new CountDownLatch(1));
466        super.doStart();
467    }
468
469    protected void connect() throws Exception {
470
471        if (socket == null && socketFactory == null) {
472            throw new IllegalStateException("Cannot connect if the socket or socketFactory have not been set");
473        }
474
475        InetSocketAddress localAddress = null;
476        InetSocketAddress remoteAddress = null;
477
478        if (localLocation != null) {
479            localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()),
480                                                 localLocation.getPort());
481        }
482
483        if (remoteLocation != null) {
484            String host = resolveHostName(remoteLocation.getHost());
485            remoteAddress = new InetSocketAddress(host, remoteLocation.getPort());
486        }
487        // Set the traffic class before the socket is connected when possible so
488        // that the connection packets are given the correct traffic class.
489        this.trafficClassSet = setTrafficClass(socket);
490
491        if (socket != null) {
492
493            if (localAddress != null) {
494                socket.bind(localAddress);
495            }
496
497            // If it's a server accepted socket.. we don't need to connect it
498            // to a remote address.
499            if (remoteAddress != null) {
500                if (connectionTimeout >= 0) {
501                    socket.connect(remoteAddress, connectionTimeout);
502                } else {
503                    socket.connect(remoteAddress);
504                }
505            }
506
507        } else {
508            // For SSL sockets.. you can't create an unconnected socket :(
509            // This means the timout option are not supported either.
510            if (localAddress != null) {
511                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(),
512                                                    localAddress.getAddress(), localAddress.getPort());
513            } else {
514                socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
515            }
516        }
517
518        initialiseSocket(socket);
519        initializeStreams();
520    }
521
522    @Override
523    protected void doStop(ServiceStopper stopper) throws Exception {
524        if (LOG.isDebugEnabled()) {
525            LOG.debug("Stopping transport " + this);
526        }
527
528        // Closing the streams flush the sockets before closing.. if the socket
529        // is hung.. then this hangs the close.
530        // closeStreams();
531        if (socket != null) {
532            if (closeAsync) {
533                //closing the socket can hang also
534                final CountDownLatch latch = new CountDownLatch(1);
535
536                // need a async task for this
537                final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
538                taskRunnerFactory.execute(new Runnable() {
539                    public void run() {
540                        LOG.trace("Closing socket {}", socket);
541                        try {
542                            socket.close();
543                            LOG.debug("Closed socket {}", socket);
544                        } catch (IOException e) {
545                            if (LOG.isDebugEnabled()) {
546                                LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
547                            }
548                        } finally {
549                            latch.countDown();
550                        }
551                    }
552                });
553
554                try {
555                    latch.await(1,TimeUnit.SECONDS);
556                } catch (InterruptedException e) {
557                    Thread.currentThread().interrupt();
558                } finally {
559                    taskRunnerFactory.shutdownNow();
560                }
561
562            } else {
563                // close synchronously
564                LOG.trace("Closing socket {}", socket);
565                try {
566                    socket.close();
567                    LOG.debug("Closed socket {}", socket);
568                } catch (IOException e) {
569                    if (LOG.isDebugEnabled()) {
570                        LOG.debug("Caught exception closing socket " + socket + ". This exception will be ignored.", e);
571                    }
572                }
573            }
574        }
575    }
576
577    /**
578     * Override so that stop() blocks until the run thread is no longer running.
579     */
580    @Override
581    public void stop() throws Exception {
582        super.stop();
583        CountDownLatch countDownLatch = stoppedLatch.get();
584        if (countDownLatch != null && Thread.currentThread() != this.runnerThread) {
585            countDownLatch.await(1,TimeUnit.SECONDS);
586        }
587    }
588
589    protected void initializeStreams() throws Exception {
590        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
591            @Override
592            public int read() throws IOException {
593                receiveCounter++;
594                return super.read();
595            }
596            @Override
597            public int read(byte[] b, int off, int len) throws IOException {
598                receiveCounter++;
599                return super.read(b, off, len);
600            }
601            @Override
602            public long skip(long n) throws IOException {
603                receiveCounter++;
604                return super.skip(n);
605            }
606            @Override
607            protected void fill() throws IOException {
608                receiveCounter++;
609                super.fill();
610            }
611        };
612        this.dataIn = new DataInputStream(buffIn);
613        TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
614        this.dataOut = new DataOutputStream(outputStream);
615        this.buffOut = outputStream;
616    }
617
618    protected void closeStreams() throws IOException {
619        if (dataOut != null) {
620            dataOut.close();
621        }
622        if (dataIn != null) {
623            dataIn.close();
624        }
625    }
626
627    public void setSocketOptions(Map<String, Object> socketOptions) {
628        this.socketOptions = new HashMap<String, Object>(socketOptions);
629    }
630
631    public String getRemoteAddress() {
632        if (socket != null) {
633            SocketAddress address = socket.getRemoteSocketAddress();
634            if (address instanceof InetSocketAddress) {
635                return "tcp://" + ((InetSocketAddress)address).getAddress().getHostAddress() + ":" + ((InetSocketAddress)address).getPort();
636            } else {
637                return "" + socket.getRemoteSocketAddress();
638            }
639        }
640        return null;
641    }
642
643    @Override
644    public <T> T narrow(Class<T> target) {
645        if (target == Socket.class) {
646            return target.cast(socket);
647        } else if ( target == TimeStampStream.class) {
648            return target.cast(buffOut);
649        }
650        return super.narrow(target);
651    }
652
653    public int getReceiveCounter() {
654        return receiveCounter;
655    }
656
657    /**
658     * @param sock The socket on which to set the Traffic Class.
659     * @return Whether or not the Traffic Class was set on the given socket.
660     * @throws SocketException if the system does not support setting the
661     *         Traffic Class.
662     * @throws IllegalArgumentException if both the Differentiated Services and
663     *         Type of Services transport options have been set on the same
664     *         connection.
665     */
666    private boolean setTrafficClass(Socket sock) throws SocketException,
667            IllegalArgumentException {
668        if (sock == null
669            || (!this.diffServChosen && !this.typeOfServiceChosen)) {
670            return false;
671        }
672        if (this.diffServChosen && this.typeOfServiceChosen) {
673            throw new IllegalArgumentException("Cannot set both the "
674                + " Differentiated Services and Type of Services transport "
675                + " options on the same connection.");
676        }
677
678        sock.setTrafficClass(this.trafficClass);
679
680        int resultTrafficClass = sock.getTrafficClass();
681        if (this.trafficClass != resultTrafficClass) {
682            // In the case where the user has specified the ECN bits (e.g. in
683            // Type of Service) but the system won't allow the ECN bits to be
684            // set or in the case where setting the traffic class failed for
685            // other reasons, emit a warning.
686            if ((this.trafficClass >> 2) == (resultTrafficClass >> 2)
687                    && (this.trafficClass & 3) != (resultTrafficClass & 3)) {
688                LOG.warn("Attempted to set the Traffic Class to "
689                    + this.trafficClass + " but the result Traffic Class was "
690                    + resultTrafficClass + ". Please check that your system "
691                    + "allows you to set the ECN bits (the first two bits).");
692            } else {
693                LOG.warn("Attempted to set the Traffic Class to "
694                    + this.trafficClass + " but the result Traffic Class was "
695                    + resultTrafficClass + ". Please check that your system "
696                         + "supports java.net.setTrafficClass.");
697            }
698            return false;
699        }
700        // Reset the guards that prevent both the Differentiated Services
701        // option and the Type of Service option from being set on the same
702        // connection.
703        this.diffServChosen = false;
704        this.typeOfServiceChosen = false;
705        return true;
706    }
707
708    public WireFormat getWireFormat() {
709        return wireFormat;
710    }
711}