001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.IOException;
020import java.net.InetAddress;
021import java.net.InetSocketAddress;
022import java.net.ServerSocket;
023import java.net.Socket;
024import java.net.SocketException;
025import java.net.SocketTimeoutException;
026import java.net.URI;
027import java.net.URISyntaxException;
028import java.net.UnknownHostException;
029import java.nio.channels.SelectionKey;
030import java.nio.channels.Selector;
031import java.nio.channels.ServerSocketChannel;
032import java.nio.channels.SocketChannel;
033import java.util.HashMap;
034import java.util.Iterator;
035import java.util.Set;
036import java.util.concurrent.BlockingQueue;
037import java.util.concurrent.LinkedBlockingQueue;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.atomic.AtomicInteger;
040
041import javax.net.ServerSocketFactory;
042import javax.net.ssl.SSLServerSocket;
043
044import org.apache.activemq.Service;
045import org.apache.activemq.ThreadPriorities;
046import org.apache.activemq.TransportLoggerSupport;
047import org.apache.activemq.command.BrokerInfo;
048import org.apache.activemq.openwire.OpenWireFormatFactory;
049import org.apache.activemq.transport.Transport;
050import org.apache.activemq.transport.TransportServer;
051import org.apache.activemq.transport.TransportServerThreadSupport;
052import org.apache.activemq.transport.nio.SelectorManager;
053import org.apache.activemq.transport.nio.SelectorSelection;
054import org.apache.activemq.util.IOExceptionSupport;
055import org.apache.activemq.util.InetAddressUtil;
056import org.apache.activemq.util.IntrospectionSupport;
057import org.apache.activemq.util.ServiceListener;
058import org.apache.activemq.util.ServiceStopper;
059import org.apache.activemq.util.ServiceSupport;
060import org.apache.activemq.wireformat.WireFormat;
061import org.apache.activemq.wireformat.WireFormatFactory;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065/**
066 * A TCP based implementation of {@link TransportServer}
067 */
068public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
069
070    private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
071    protected ServerSocket serverSocket;
072    protected Selector selector;
073    protected int backlog = 5000;
074    protected WireFormatFactory wireFormatFactory = new OpenWireFormatFactory();
075    protected final TcpTransportFactory transportFactory;
076    protected long maxInactivityDuration = 30000;
077    protected long maxInactivityDurationInitalDelay = 10000;
078    protected int minmumWireFormatVersion;
079    protected boolean useQueueForAccept = true;
080    protected boolean allowLinkStealing;
081
082    /**
083     * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
084     * trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
085     * and therefore will never be able to print logging messages. This parameter is most probably set in Connection or
086     * TransportConnector URIs.
087     */
088    protected boolean trace = false;
089
090    protected int soTimeout = 0;
091    protected int socketBufferSize = 64 * 1024;
092    protected int connectionTimeout = 30000;
093
094    /**
095     * Name of the LogWriter implementation to use. Names are mapped to classes in the
096     * resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably
097     * set in Connection or TransportConnector URIs.
098     */
099    protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
100
101    /**
102     * Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
103     * TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
104     */
105    protected boolean dynamicManagement = false;
106
107    /**
108     * startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
109     * startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
110     * log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or
111     * TransportConnector URIs.
112     */
113    protected boolean startLogging = true;
114    protected int jmxPort = TransportLoggerSupport.defaultJmxPort;
115    protected final ServerSocketFactory serverSocketFactory;
116    protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
117    protected Thread socketHandlerThread;
118
119    /**
120     * The maximum number of sockets allowed for this server
121     */
122    protected int maximumConnections = Integer.MAX_VALUE;
123    protected AtomicInteger currentTransportCount = new AtomicInteger();
124
125    public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
126        URISyntaxException {
127        super(location);
128        this.transportFactory = transportFactory;
129        this.serverSocketFactory = serverSocketFactory;
130    }
131
132    public void bind() throws IOException {
133        URI bind = getBindLocation();
134
135        String host = bind.getHost();
136        host = (host == null || host.length() == 0) ? "localhost" : host;
137        InetAddress addr = InetAddress.getByName(host);
138
139        try {
140            this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
141            configureServerSocket(this.serverSocket);
142        } catch (IOException e) {
143            throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
144        }
145        try {
146            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
147                bind.getQuery(), bind.getFragment()));
148        } catch (URISyntaxException e) {
149
150            // it could be that the host name contains invalid characters such
151            // as _ on unix platforms so lets try use the IP address instead
152            try {
153                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
154                    bind.getQuery(), bind.getFragment()));
155            } catch (URISyntaxException e2) {
156                throw IOExceptionSupport.create(e2);
157            }
158        }
159    }
160
161    private void configureServerSocket(ServerSocket socket) throws SocketException {
162        socket.setSoTimeout(2000);
163        if (transportOptions != null) {
164
165            // If the enabledCipherSuites option is invalid we don't want to ignore it as the call
166            // to SSLServerSocket to configure it has a side effect on the socket rendering it
167            // useless as all suites are enabled many of which are considered as insecure.  We
168            // instead trap that option here and throw an exception.  We should really consider
169            // all invalid options as breaking and not start the transport but the current design
170            // doesn't really allow for this.
171            //
172            //  see: https://issues.apache.org/jira/browse/AMQ-4582
173            //
174            if (socket instanceof SSLServerSocket) {
175                if (transportOptions.containsKey("enabledCipherSuites")) {
176                    Object cipherSuites = transportOptions.remove("enabledCipherSuites");
177
178                    if (!IntrospectionSupport.setProperty(socket, "enabledCipherSuites", cipherSuites)) {
179                        throw new SocketException(String.format(
180                            "Invalid transport options {enabledCipherSuites=%s}", cipherSuites));
181                    }
182                }
183            }
184
185            IntrospectionSupport.setProperties(socket, transportOptions);
186        }
187    }
188
189    /**
190     * @return Returns the wireFormatFactory.
191     */
192    public WireFormatFactory getWireFormatFactory() {
193        return wireFormatFactory;
194    }
195
196    /**
197     * @param wireFormatFactory
198     *            The wireFormatFactory to set.
199     */
200    public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
201        this.wireFormatFactory = wireFormatFactory;
202    }
203
204    /**
205     * Associates a broker info with the transport server so that the transport can do discovery advertisements of the
206     * broker.
207     *
208     * @param brokerInfo
209     */
210    @Override
211    public void setBrokerInfo(BrokerInfo brokerInfo) {
212    }
213
214    public long getMaxInactivityDuration() {
215        return maxInactivityDuration;
216    }
217
218    public void setMaxInactivityDuration(long maxInactivityDuration) {
219        this.maxInactivityDuration = maxInactivityDuration;
220    }
221
222    public long getMaxInactivityDurationInitalDelay() {
223        return this.maxInactivityDurationInitalDelay;
224    }
225
226    public void setMaxInactivityDurationInitalDelay(long maxInactivityDurationInitalDelay) {
227        this.maxInactivityDurationInitalDelay = maxInactivityDurationInitalDelay;
228    }
229
230    public int getMinmumWireFormatVersion() {
231        return minmumWireFormatVersion;
232    }
233
234    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
235        this.minmumWireFormatVersion = minmumWireFormatVersion;
236    }
237
238    public boolean isTrace() {
239        return trace;
240    }
241
242    public void setTrace(boolean trace) {
243        this.trace = trace;
244    }
245
246    public String getLogWriterName() {
247        return logWriterName;
248    }
249
250    public void setLogWriterName(String logFormat) {
251        this.logWriterName = logFormat;
252    }
253
254    public boolean isDynamicManagement() {
255        return dynamicManagement;
256    }
257
258    public void setDynamicManagement(boolean useJmx) {
259        this.dynamicManagement = useJmx;
260    }
261
262    public void setJmxPort(int jmxPort) {
263        this.jmxPort = jmxPort;
264    }
265
266    public int getJmxPort() {
267        return jmxPort;
268    }
269
270    public boolean isStartLogging() {
271        return startLogging;
272    }
273
274    public void setStartLogging(boolean startLogging) {
275        this.startLogging = startLogging;
276    }
277
278    /**
279     * @return the backlog
280     */
281    public int getBacklog() {
282        return backlog;
283    }
284
285    /**
286     * @param backlog
287     *            the backlog to set
288     */
289    public void setBacklog(int backlog) {
290        this.backlog = backlog;
291    }
292
293    /**
294     * @return the useQueueForAccept
295     */
296    public boolean isUseQueueForAccept() {
297        return useQueueForAccept;
298    }
299
300    /**
301     * @param useQueueForAccept
302     *            the useQueueForAccept to set
303     */
304    public void setUseQueueForAccept(boolean useQueueForAccept) {
305        this.useQueueForAccept = useQueueForAccept;
306    }
307
308    /**
309     * pull Sockets from the ServerSocket
310     */
311    @Override
312    public void run() {
313        final ServerSocketChannel chan = serverSocket.getChannel();
314        if (chan != null) {
315            try {
316                chan.configureBlocking(false);
317                selector = Selector.open();
318                chan.register(selector, SelectionKey.OP_ACCEPT);
319                while (!isStopped()) {
320                    int count = selector.select(10);
321
322                    if (count == 0) {
323                        continue;
324                    }
325
326                    Set<SelectionKey> keys = selector.selectedKeys();
327
328                    for (Iterator<SelectionKey> i = keys.iterator(); i.hasNext(); ) {
329                        final SelectionKey key = i.next();
330                        if (key.isAcceptable()) {
331                            try {
332                                SocketChannel sc = chan.accept();
333                                if (sc != null) {
334                                    if (isStopped() || getAcceptListener() == null) {
335                                        sc.close();
336                                    } else {
337                                        if (useQueueForAccept) {
338                                            socketQueue.put(sc.socket());
339                                        } else {
340                                            handleSocket(sc.socket());
341                                        }
342                                    }
343                                }
344
345                            } catch (SocketTimeoutException ste) {
346                                // expect this to happen
347                            } catch (Exception e) {
348                                e.printStackTrace();
349                                if (!isStopping()) {
350                                    onAcceptError(e);
351                                } else if (!isStopped()) {
352                                    LOG.warn("run()", e);
353                                    onAcceptError(e);
354                                }
355                            }
356                        }
357                        i.remove();
358                    }
359
360                }
361            } catch (IOException ex) {
362                if (selector != null) {
363                    try {
364                        selector.close();
365                    } catch (IOException ioe) {}
366                    selector = null;
367                }
368            }
369        } else {
370            while (!isStopped()) {
371                Socket socket = null;
372                try {
373                    socket = serverSocket.accept();
374                    if (socket != null) {
375                        if (isStopped() || getAcceptListener() == null) {
376                            socket.close();
377                        } else {
378                            if (useQueueForAccept) {
379                                socketQueue.put(socket);
380                            } else {
381                                handleSocket(socket);
382                            }
383                        }
384                    }
385                } catch (SocketTimeoutException ste) {
386                    // expect this to happen
387                } catch (Exception e) {
388                    if (!isStopping()) {
389                        onAcceptError(e);
390                    } else if (!isStopped()) {
391                        LOG.warn("run()", e);
392                        onAcceptError(e);
393                    }
394                }
395            }
396        }
397    }
398
399    /**
400     * Allow derived classes to override the Transport implementation that this transport server creates.
401     *
402     * @param socket
403     * @param format
404     * @return
405     * @throws IOException
406     */
407    protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
408        return new TcpTransport(format, socket);
409    }
410
411    /**
412     * @return pretty print of this
413     */
414    @Override
415    public String toString() {
416        return "" + getBindLocation();
417    }
418
419    /**
420     * @param socket
421     * @param bindAddress
422     * @return real hostName
423     * @throws UnknownHostException
424     */
425    protected String resolveHostName(ServerSocket socket, InetAddress bindAddress) throws UnknownHostException {
426        String result = null;
427        if (socket.isBound()) {
428            if (socket.getInetAddress().isAnyLocalAddress()) {
429                // make it more human readable and useful, an alternative to 0.0.0.0
430                result = InetAddressUtil.getLocalHostName();
431            } else {
432                result = socket.getInetAddress().getCanonicalHostName();
433            }
434        } else {
435            result = bindAddress.getCanonicalHostName();
436        }
437        return result;
438    }
439
440    @Override
441    protected void doStart() throws Exception {
442        if (useQueueForAccept) {
443            Runnable run = new Runnable() {
444                @Override
445                public void run() {
446                    try {
447                        while (!isStopped() && !isStopping()) {
448                            Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
449                            if (sock != null) {
450                                try {
451                                    handleSocket(sock);
452                                } catch (Throwable thrown) {
453                                    if (!isStopping()) {
454                                        onAcceptError(new Exception(thrown));
455                                    } else if (!isStopped()) {
456                                        LOG.warn("Unexpected error thrown during accept handling: ", thrown);
457                                        onAcceptError(new Exception(thrown));
458                                    }
459                                }
460                            }
461                        }
462
463                    } catch (InterruptedException e) {
464                        LOG.info("socketQueue interuppted - stopping");
465                        if (!isStopping()) {
466                            onAcceptError(e);
467                        }
468                    }
469                }
470            };
471            socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
472            socketHandlerThread.setDaemon(true);
473            socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
474            socketHandlerThread.start();
475        }
476        super.doStart();
477    }
478
479    @Override
480    protected void doStop(ServiceStopper stopper) throws Exception {
481        if (selector != null) {
482            selector.close();
483            selector = null;
484        }
485        if (serverSocket != null) {
486            serverSocket.close();
487            serverSocket = null;
488        }
489        super.doStop(stopper);
490    }
491
492    @Override
493    public InetSocketAddress getSocketAddress() {
494        return (InetSocketAddress) serverSocket.getLocalSocketAddress();
495    }
496
497    protected final void handleSocket(Socket socket) {
498        boolean closeSocket = true;
499        try {
500            if (this.currentTransportCount.get() >= this.maximumConnections) {
501                throw new ExceededMaximumConnectionsException(
502                    "Exceeded the maximum number of allowed client connections. See the '" +
503                    "maximumConnections' property on the TCP transport configuration URI " +
504                    "in the ActiveMQ configuration file (e.g., activemq.xml)");
505            } else {
506                HashMap<String, Object> options = new HashMap<String, Object>();
507                options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
508                options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
509                options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
510                options.put("trace", Boolean.valueOf(trace));
511                options.put("soTimeout", Integer.valueOf(soTimeout));
512                options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
513                options.put("connectionTimeout", Integer.valueOf(connectionTimeout));
514                options.put("logWriterName", logWriterName);
515                options.put("dynamicManagement", Boolean.valueOf(dynamicManagement));
516                options.put("startLogging", Boolean.valueOf(startLogging));
517                options.put("jmxPort", Integer.valueOf(jmxPort));
518                options.putAll(transportOptions);
519
520                WireFormat format = wireFormatFactory.createWireFormat();
521                Transport transport = createTransport(socket, format);
522                closeSocket = false;
523
524                if (transport instanceof ServiceSupport) {
525                    ((ServiceSupport) transport).addServiceListener(this);
526                }
527
528                Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
529
530                getAcceptListener().onAccept(configuredTransport);
531                currentTransportCount.incrementAndGet();
532            }
533        } catch (SocketTimeoutException ste) {
534            // expect this to happen
535        } catch (Exception e) {
536            if (closeSocket) {
537                try {
538                    socket.close();
539                } catch (Exception ignore) {
540                }
541            }
542
543            if (!isStopping()) {
544                onAcceptError(e);
545            } else if (!isStopped()) {
546                LOG.warn("run()", e);
547                onAcceptError(e);
548            }
549        }
550    }
551
552    public int getSoTimeout() {
553        return soTimeout;
554    }
555
556    public void setSoTimeout(int soTimeout) {
557        this.soTimeout = soTimeout;
558    }
559
560    public int getSocketBufferSize() {
561        return socketBufferSize;
562    }
563
564    public void setSocketBufferSize(int socketBufferSize) {
565        this.socketBufferSize = socketBufferSize;
566    }
567
568    public int getConnectionTimeout() {
569        return connectionTimeout;
570    }
571
572    public void setConnectionTimeout(int connectionTimeout) {
573        this.connectionTimeout = connectionTimeout;
574    }
575
576    /**
577     * @return the maximumConnections
578     */
579    public int getMaximumConnections() {
580        return maximumConnections;
581    }
582
583    /**
584     * @param maximumConnections
585     *            the maximumConnections to set
586     */
587    public void setMaximumConnections(int maximumConnections) {
588        this.maximumConnections = maximumConnections;
589    }
590
591    @Override
592    public void started(Service service) {
593    }
594
595    @Override
596    public void stopped(Service service) {
597        this.currentTransportCount.decrementAndGet();
598    }
599
600    @Override
601    public boolean isSslServer() {
602        return false;
603    }
604
605    @Override
606    public boolean isAllowLinkStealing() {
607        return allowLinkStealing;
608    }
609
610    @Override
611    public void setAllowLinkStealing(boolean allowLinkStealing) {
612        this.allowLinkStealing = allowLinkStealing;
613    }
614}