001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.IOException;
020import java.net.InetAddress;
021import java.net.InetSocketAddress;
022import java.net.ServerSocket;
023import java.net.Socket;
024import java.net.SocketException;
025import java.net.SocketTimeoutException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.UnknownHostException;
029import java.nio.channels.SelectionKey;
030import java.nio.channels.Selector;
031import java.nio.channels.ServerSocketChannel;
032import java.nio.channels.SocketChannel;
033import java.util.HashMap;
034import java.util.Iterator;
035import java.util.Set;
036import java.util.concurrent.BlockingQueue;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicInteger;
040
041import javax.net.ServerSocketFactory;
042import javax.net.ssl.SSLServerSocket;
043
044import org.apache.activemq.Service;
045import org.apache.activemq.ThreadPriorities;
046import org.apache.activemq.TransportLoggerSupport;
047import org.apache.activemq.command.BrokerInfo;
048import org.apache.activemq.openwire.OpenWireFormatFactory;
049import org.apache.activemq.transport.Transport;
050import org.apache.activemq.transport.TransportServer;
051import org.apache.activemq.transport.TransportServerThreadSupport;
052import org.apache.activemq.transport.nio.SelectorManager;
053import org.apache.activemq.transport.nio.SelectorSelection;
054import org.apache.activemq.util.IOExceptionSupport;
055import org.apache.activemq.util.InetAddressUtil;
056import org.apache.activemq.util.IntrospectionSupport;
057import org.apache.activemq.util.ServiceListener;
058import org.apache.activemq.util.ServiceStopper;
059import org.apache.activemq.util.ServiceSupport;
060import org.apache.activemq.wireformat.WireFormat;
061import org.apache.activemq.wireformat.WireFormatFactory;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * A TCP based implementation of {@link TransportServer}
067 */
068public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
069
070    private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
071    protected ServerSocket serverSocket;
072    protected Selector selector;
073    protected int backlog = 5000;
074    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
075    protected final TcpTransportFactory transportFactory;
076    protected long maxInactivityDuration = 30000;
077    protected long maxInactivityDurationInitalDelay = 10000;
078    protected int minmumWireFormatVersion;
079    protected boolean useQueueForAccept = true;
080    protected boolean allowLinkStealing;
081
082    /**
083     * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
084     * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
085     * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or
086     * TransportConnector URIs.
087     */
088    protected boolean trace = false;
089
090    protected int soTimeout = 0;
091    protected int socketBufferSize = 64 * 1024;
092    protected int connectionTimeout = 30000;
093
094    /**
095     * Name of the LogWriter implementation to use. Names are mapped to classes in the
096     * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably
097     * set in Connection or TransportConnector URIs.
098     */
099    protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
100
101    /**
102     * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
103     * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
104     */
105    protected boolean dynamicManagement = false;
106
107    /**
108     * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
109     * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
110     * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or
111     * TransportConnector URIs.
112     */
113    protected boolean startLogging = true;
114    protected final ServerSocketFactory serverSocketFactory;
115    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
116    protected Thread socketHandlerThread;
117
118    /**
119     * The maximum number of sockets allowed for this server
120     */
121    protected int maximumConnections = Integer.MAX_VALUE;
122    protected AtomicInteger currentTransportCount = new AtomicInteger();
123
124    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
125        URISyntaxException {
126        super(location);
127        this.transportFactory = transportFactory;
128        this.serverSocketFactory = serverSocketFactory;
129    }
130
131    public void bind() throws IOException {
132        URI bind = getBindLocation();
133
134        String host = bind.getHost();
135        host = (host == null || host.length() == 0) ? "localhost" : host;
136        InetAddress addr = InetAddress.getByName(host);
137
138        try {
139            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
140            configureServerSocket(this.serverSocket);
141        } catch (IOException e) {
142            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
143        }
144        try {
145            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
146                bind.getQuery(), bind.getFragment()));
147        } catch (URISyntaxException e) {
148
149            // it could be that the host name contains invalid characters such
150            // as _ on unix platforms so lets try use the IP address instead
151            try {
152                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
153                    bind.getQuery(), bind.getFragment()));
154            } catch (URISyntaxException e2) {
155                throw IOExceptionSupport.create(e2);
156            }
157        }
158    }
159
160    private void configureServerSocket(ServerSocket socket) throws SocketException {
161        socket.setSoTimeout(2000);
162        if (transportOptions != null) {
163
164            // If the enabledCipherSuites option is invalid we don't want to ignore it as the call
165            // to SSLServerSocket to configure it has a side effect on the socket rendering it
166            // useless as all suites are enabled many of which are considered as insecure.  We
167            // instead trap that option here and throw an exception.  We should really consider
168            // all invalid options as breaking and not start the transport but the current design
169            // doesn't really allow for this.
170            //
171            //  see: https://issues.apache.org/jira/browse/AMQ-4582
172            //
173            if (socket instanceof SSLServerSocket) {
174                if (transportOptions.containsKey("enabledCipherSuites")) {
175                    Object cipherSuites = transportOptions.remove("enabledCipherSuites");
176
177                    if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) {
178                        throw new SocketException(String.format(
179                            "Invalid transport options {enabledCipherSuites=%s}", cipherSuites));
180                    }
181                }
182            }
183
184            IntrospectionSupport.setProperties(socket, transportOptions);
185        }
186    }
187
188    /**
189     * @return Returns the wireFormatFactory.
190     */
191    public WireFormatFactory getWireFormatFactory() {
192        return wireFormatFactory;
193    }
194
195    /**
196     * @param wireFormatFactory
197     *            The wireFormatFactory to set.
198     */
199    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
200        this.wireFormatFactory = wireFormatFactory;
201    }
202
203    /**
204     * Associates a broker info with the transport server so that the transport can do discovery advertisements of the
205     * broker.
206     *
207     * @param brokerInfo
208     */
209    @Override
210    public void setBrokerInfo(BrokerInfo brokerInfo) {
211    }
212
213    public long getMaxInactivityDuration() {
214        return maxInactivityDuration;
215    }
216
217    public void setMaxInactivityDuration(long maxInactivityDuration) {
218        this.maxInactivityDuration = maxInactivityDuration;
219    }
220
221    public long getMaxInactivityDurationInitalDelay() {
222        return this.maxInactivityDurationInitalDelay;
223    }
224
225    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
226        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
227    }
228
229    public int getMinmumWireFormatVersion() {
230        return minmumWireFormatVersion;
231    }
232
233    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
234        this.minmumWireFormatVersion = minmumWireFormatVersion;
235    }
236
237    public boolean isTrace() {
238        return trace;
239    }
240
241    public void setTrace(boolean trace) {
242        this.trace = trace;
243    }
244
245    public String getLogWriterName() {
246        return logWriterName;
247    }
248
249    public void setLogWriterName(String logFormat) {
250        this.logWriterName = logFormat;
251    }
252
253    public boolean isDynamicManagement() {
254        return dynamicManagement;
255    }
256
257    public void setDynamicManagement(boolean useJmx) {
258        this.dynamicManagement = useJmx;
259    }
260
261    public boolean isStartLogging() {
262        return startLogging;
263    }
264
265    public void setStartLogging(boolean startLogging) {
266        this.startLogging = startLogging;
267    }
268
269    /**
270     * @return the backlog
271     */
272    public int getBacklog() {
273        return backlog;
274    }
275
276    /**
277     * @param backlog
278     *            the backlog to set
279     */
280    public void setBacklog(int backlog) {
281        this.backlog = backlog;
282    }
283
284    /**
285     * @return the useQueueForAccept
286     */
287    public boolean isUseQueueForAccept() {
288        return useQueueForAccept;
289    }
290
291    /**
292     * @param useQueueForAccept
293     *            the useQueueForAccept to set
294     */
295    public void setUseQueueForAccept(boolean useQueueForAccept) {
296        this.useQueueForAccept = useQueueForAccept;
297    }
298
299    /**
300     * pull Sockets from the ServerSocket
301     */
302    @Override
303    public void run() {
304        final ServerSocketChannel chan = serverSocket.getChannel();
305        if (chan != null) {
306            try {
307                chan.configureBlocking(false);
308                selector = Selector.open();
309                chan.register(selector, SelectionKey.OP_ACCEPT);
310                while (!isStopped()) {
311                    int count = selector.select(10);
312
313                    if (count == 0) {
314                        continue;
315                    }
316
317                    Set<SelectionKey> keys = selector.selectedKeys();
318
319                    for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
320                        final SelectionKey key = i.next();
321                        if (key.isAcceptable()) {
322                            try {
323                                SocketChannel sc = chan.accept();
324                                if (sc != null) {
325                                    if (isStopped() || getAcceptListener() == null) {
326                                        sc.close();
327                                    } else {
328                                        if (useQueueForAccept) {
329                                            socketQueue.put(sc.socket());
330                                        } else {
331                                            handleSocket(sc.socket());
332                                        }
333                                    }
334                                }
335
336                            } catch (SocketTimeoutException ste) {
337                                // expect this to happen
338                            } catch (Exception e) {
339                                e.printStackTrace();
340                                if (!isStopping()) {
341                                    onAcceptError(e);
342                                } else if (!isStopped()) {
343                                    LOG.warn("run()", e);
344                                    onAcceptError(e);
345                                }
346                            }
347                        }
348                        i.remove();
349                    }
350
351                }
352            } catch (IOException ex) {
353                if (selector != null) {
354                    try {
355                        selector.close();
356                    } catch (IOException ioe) {}
357                    selector = null;
358                }
359            }
360        } else {
361            while (!isStopped()) {
362                Socket socket = null;
363                try {
364                    socket = serverSocket.accept();
365                    if (socket != null) {
366                        if (isStopped() || getAcceptListener() == null) {
367                            socket.close();
368                        } else {
369                            if (useQueueForAccept) {
370                                socketQueue.put(socket);
371                            } else {
372                                handleSocket(socket);
373                            }
374                        }
375                    }
376                } catch (SocketTimeoutException ste) {
377                    // expect this to happen
378                } catch (Exception e) {
379                    if (!isStopping()) {
380                        onAcceptError(e);
381                    } else if (!isStopped()) {
382                        LOG.warn("run()", e);
383                        onAcceptError(e);
384                    }
385                }
386            }
387        }
388    }
389
390    /**
391     * Allow derived classes to override the Transport implementation that this transport server creates.
392     *
393     * @param socket
394     * @param format
395     * @return
396     * @throws IOException
397     */
398    protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
399        return new TcpTransport(format, socket);
400    }
401
402    /**
403     * @return pretty print of this
404     */
405    @Override
406    public String toString() {
407        return "" + getBindLocation();
408    }
409
410    /**
411     * @param socket
412     * @param bindAddress
413     * @return real hostName
414     * @throws UnknownHostException
415     */
416    protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
417        String result = null;
418        if (socket.isBound()) {
419            if (socket.getInetAddress().isAnyLocalAddress()) {
420                // make it more human readable and useful, an alternative to 0.0.0.0
421                result = InetAddressUtil.getLocalHostName();
422            } else {
423                result = socket.getInetAddress().getCanonicalHostName();
424            }
425        } else {
426            result = bindAddress.getCanonicalHostName();
427        }
428        return result;
429    }
430
431    @Override
432    protected void doStart() throws Exception {
433        if (useQueueForAccept) {
434            Runnable run = new Runnable() {
435                @Override
436                public void run() {
437                    try {
438                        while (!isStopped() && !isStopping()) {
439                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
440                            if (sock != null) {
441                                try {
442                                    handleSocket(sock);
443                                } catch (Throwable thrown) {
444                                    if (!isStopping()) {
445                                        onAcceptError(new Exception(thrown));
446                                    } else if (!isStopped()) {
447                                        LOG.warn("Unexpected error thrown during accept handling: ", thrown);
448                                        onAcceptError(new Exception(thrown));
449                                    }
450                                }
451                            }
452                        }
453
454                    } catch (InterruptedException e) {
455                        LOG.info("socketQueue interuppted - stopping");
456                        if (!isStopping()) {
457                            onAcceptError(e);
458                        }
459                    }
460                }
461            };
462            socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
463            socketHandlerThread.setDaemon(true);
464            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
465            socketHandlerThread.start();
466        }
467        super.doStart();
468    }
469
470    @Override
471    protected void doStop(ServiceStopper stopper) throws Exception {
472        if (selector != null) {
473            selector.close();
474            selector = null;
475        }
476        if (serverSocket != null) {
477            serverSocket.close();
478            serverSocket = null;
479        }
480        super.doStop(stopper);
481    }
482
483    @Override
484    public InetSocketAddress getSocketAddress() {
485        return (InetSocketAddress) serverSocket.getLocalSocketAddress();
486    }
487
488    protected final void handleSocket(Socket socket) {
489        boolean closeSocket = true;
490        try {
491            if (this.currentTransportCount.get() >= this.maximumConnections) {
492                throw new ExceededMaximumConnectionsException(
493                    "Exceeded the maximum number of allowed client connections. See the '" +
494                    "maximumConnections' property on the TCP transport configuration URI " +
495                    "in the ActiveMQ configuration file (e.g., activemq.xml)");
496            } else {
497                HashMap<String, Object> options = new HashMap<String, Object>();
498                options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
499                options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
500                options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
501                options.put("trace", Boolean.valueOf(trace));
502                options.put("soTimeout", Integer.valueOf(soTimeout));
503                options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
504                options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
505                options.put("logWriterName", logWriterName);
506                options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
507                options.put("startLogging", Boolean.valueOf(startLogging));
508                options.putAll(transportOptions);
509
510                WireFormat format = wireFormatFactory.createWireFormat();
511                Transport transport = createTransport(socket, format);
512                closeSocket = false;
513
514                if (transport instanceof ServiceSupport) {
515                    ((ServiceSupport) transport).addServiceListener(this);
516                }
517
518                Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
519
520                getAcceptListener().onAccept(configuredTransport);
521                currentTransportCount.incrementAndGet();
522            }
523        } catch (SocketTimeoutException ste) {
524            // expect this to happen
525        } catch (Exception e) {
526            if (closeSocket) {
527                try {
528                    socket.close();
529                } catch (Exception ignore) {
530                }
531            }
532
533            if (!isStopping()) {
534                onAcceptError(e);
535            } else if (!isStopped()) {
536                LOG.warn("run()", e);
537                onAcceptError(e);
538            }
539        }
540    }
541
542    public int getSoTimeout() {
543        return soTimeout;
544    }
545
546    public void setSoTimeout(int soTimeout) {
547        this.soTimeout = soTimeout;
548    }
549
550    public int getSocketBufferSize() {
551        return socketBufferSize;
552    }
553
554    public void setSocketBufferSize(int socketBufferSize) {
555        this.socketBufferSize = socketBufferSize;
556    }
557
558    public int getConnectionTimeout() {
559        return connectionTimeout;
560    }
561
562    public void setConnectionTimeout(int connectionTimeout) {
563        this.connectionTimeout = connectionTimeout;
564    }
565
566    /**
567     * @return the maximumConnections
568     */
569    public int getMaximumConnections() {
570        return maximumConnections;
571    }
572
573    /**
574     * @param maximumConnections
575     *            the maximumConnections to set
576     */
577    public void setMaximumConnections(int maximumConnections) {
578        this.maximumConnections = maximumConnections;
579    }
580
581    @Override
582    public void started(Service service) {
583    }
584
585    @Override
586    public void stopped(Service service) {
587        this.currentTransportCount.decrementAndGet();
588    }
589
590    @Override
591    public boolean isSslServer() {
592        return false;
593    }
594
595    @Override
596    public boolean isAllowLinkStealing() {
597        return allowLinkStealing;
598    }
599
600    @Override
601    public void setAllowLinkStealing(boolean allowLinkStealing) {
602        this.allowLinkStealing = allowLinkStealing;
603    }
604}