001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.transport.nio;
019
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.io.EOFException;
023import java.io.IOException;
024import java.net.Socket;
025import java.net.SocketTimeoutException;
026import java.net.URI;
027import java.net.UnknownHostException;
028import java.nio.ByteBuffer;
029import java.nio.channels.SelectionKey;
030import java.nio.channels.Selector;
031import java.security.cert.X509Certificate;
032
033import javax.net.SocketFactory;
034import javax.net.ssl.SSLContext;
035import javax.net.ssl.SSLEngine;
036import javax.net.ssl.SSLEngineResult;
037import javax.net.ssl.SSLEngineResult.HandshakeStatus;
038import javax.net.ssl.SSLPeerUnverifiedException;
039import javax.net.ssl.SSLSession;
040
041import org.apache.activemq.command.ConnectionInfo;
042import org.apache.activemq.openwire.OpenWireFormat;
043import org.apache.activemq.thread.TaskRunnerFactory;
044import org.apache.activemq.util.IOExceptionSupport;
045import org.apache.activemq.util.ServiceStopper;
046import org.apache.activemq.wireformat.WireFormat;
047import org.slf4j.Logger;
048import org.slf4j.LoggerFactory;
049
050public class NIOSSLTransport extends NIOTransport {
051
052    private static final Logger LOG = LoggerFactory.getLogger(NIOSSLTransport.class);
053
054    protected boolean needClientAuth;
055    protected boolean wantClientAuth;
056    protected String[] enabledCipherSuites;
057    protected String[] enabledProtocols;
058
059    protected SSLContext sslContext;
060    protected SSLEngine sslEngine;
061    protected SSLSession sslSession;
062
063    protected volatile boolean handshakeInProgress = false;
064    protected SSLEngineResult.Status status = null;
065    protected SSLEngineResult.HandshakeStatus handshakeStatus = null;
066    protected TaskRunnerFactory taskRunnerFactory;
067
068    public NIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
069        super(wireFormat, socketFactory, remoteLocation, localLocation);
070    }
071
072    public NIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
073        super(wireFormat, socket);
074    }
075
076    public void setSslContext(SSLContext sslContext) {
077        this.sslContext = sslContext;
078    }
079
080    @Override
081    protected void initializeStreams() throws IOException {
082        NIOOutputStream outputStream = null;
083        try {
084            channel = socket.getChannel();
085            channel.configureBlocking(false);
086
087            if (sslContext == null) {
088                sslContext = SSLContext.getDefault();
089            }
090
091            String remoteHost = null;
092            int remotePort = -1;
093
094            try {
095                URI remoteAddress = new URI(this.getRemoteAddress());
096                remoteHost = remoteAddress.getHost();
097                remotePort = remoteAddress.getPort();
098            } catch (Exception e) {
099            }
100
101            // initialize engine, the initial sslSession we get will need to be
102            // updated once the ssl handshake process is completed.
103            if (remoteHost != null && remotePort != -1) {
104                sslEngine = sslContext.createSSLEngine(remoteHost, remotePort);
105            } else {
106                sslEngine = sslContext.createSSLEngine();
107            }
108
109            sslEngine.setUseClientMode(false);
110            if (enabledCipherSuites != null) {
111                sslEngine.setEnabledCipherSuites(enabledCipherSuites);
112            }
113
114            if (enabledProtocols != null) {
115                sslEngine.setEnabledProtocols(enabledProtocols);
116            }
117
118            if (wantClientAuth) {
119                sslEngine.setWantClientAuth(wantClientAuth);
120            }
121
122            if (needClientAuth) {
123                sslEngine.setNeedClientAuth(needClientAuth);
124            }
125
126            sslSession = sslEngine.getSession();
127
128            inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
129            inputBuffer.clear();
130
131            outputStream = new NIOOutputStream(channel);
132            outputStream.setEngine(sslEngine);
133            this.dataOut = new DataOutputStream(outputStream);
134            this.buffOut = outputStream;
135            sslEngine.beginHandshake();
136            handshakeStatus = sslEngine.getHandshakeStatus();
137            doHandshake();
138
139            if (inputBuffer.position() > 0 && inputBuffer.hasRemaining() && wireFormat instanceof OpenWireFormat) {
140                // can only deal with this pending openwire command after we have sent negotiation start command
141                // with default wireformat options from this thread.
142                // org.apache.activemq.transport.WireFormatNegotiator.sendWireFormat()
143                // read will block till wireformat is sent.
144                taskRunnerFactory.execute(new Runnable() {
145                    @Override
146                    public void run() {
147                        serviceRead();
148                    }
149                });
150            }
151        } catch (Exception e) {
152            try {
153                if(outputStream != null) {
154                    outputStream.close();
155                }
156                super.closeStreams();
157            } catch (Exception ex) {}
158            throw new IOException(e);
159        }
160    }
161
162    protected void finishHandshake() throws Exception {
163        if (handshakeInProgress) {
164            handshakeInProgress = false;
165            nextFrameSize = -1;
166
167            // Once handshake completes we need to ask for the now real sslSession
168            // otherwise the session would return 'SSL_NULL_WITH_NULL_NULL' for the
169            // cipher suite.
170            sslSession = sslEngine.getSession();
171
172            // listen for events telling us when the socket is readable.
173            selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
174                @Override
175                public void onSelect(SelectorSelection selection) {
176                    serviceRead();
177                }
178
179                @Override
180                public void onError(SelectorSelection selection, Throwable error) {
181                    if (error instanceof IOException) {
182                        onException((IOException) error);
183                    } else {
184                        onException(IOExceptionSupport.create(error));
185                    }
186                }
187            });
188        }
189    }
190
191    @Override
192    protected void serviceRead() {
193        try {
194            if (handshakeInProgress) {
195                doHandshake();
196            }
197
198            ByteBuffer plain = ByteBuffer.allocate(sslSession.getApplicationBufferSize());
199            plain.position(plain.limit());
200
201            while (true) {
202                if (!plain.hasRemaining()) {
203
204                    int readCount = secureRead(plain);
205
206                    if (readCount == 0) {
207                        break;
208                    }
209
210                    // channel is closed, cleanup
211                    if (readCount == -1) {
212                        onException(new EOFException());
213                        selection.close();
214                        break;
215                    }
216
217                    receiveCounter += readCount;
218                }
219
220                if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
221                    processCommand(plain);
222                }
223            }
224        } catch (IOException e) {
225            onException(e);
226        } catch (Throwable e) {
227            onException(IOExceptionSupport.create(e));
228        }
229    }
230
231    protected void processCommand(ByteBuffer plain) throws Exception {
232
233        // Are we waiting for the next Command or are we building on the current one
234        if (nextFrameSize == -1) {
235
236            // We can get small packets that don't give us enough for the frame size
237            // so allocate enough for the initial size value and
238            if (plain.remaining() < Integer.SIZE) {
239                if (currentBuffer == null) {
240                    currentBuffer = ByteBuffer.allocate(4);
241                }
242
243                // Go until we fill the integer sized current buffer.
244                while (currentBuffer.hasRemaining() && plain.hasRemaining()) {
245                    currentBuffer.put(plain.get());
246                }
247
248                // Didn't we get enough yet to figure out next frame size.
249                if (currentBuffer.hasRemaining()) {
250                    return;
251                } else {
252                    currentBuffer.flip();
253                    nextFrameSize = currentBuffer.getInt();
254                }
255
256            } else {
257
258                // Either we are completing a previous read of the next frame size or its
259                // fully contained in plain already.
260                if (currentBuffer != null) {
261
262                    // Finish the frame size integer read and get from the current buffer.
263                    while (currentBuffer.hasRemaining()) {
264                        currentBuffer.put(plain.get());
265                    }
266
267                    currentBuffer.flip();
268                    nextFrameSize = currentBuffer.getInt();
269
270                } else {
271                    nextFrameSize = plain.getInt();
272                }
273            }
274
275            if (wireFormat instanceof OpenWireFormat) {
276                long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize();
277                if (nextFrameSize > maxFrameSize) {
278                    throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) +
279                                          " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
280                }
281            }
282
283            // now we got the data, lets reallocate and store the size for the marshaler.
284            // if there's more data in plain, then the next call will start processing it.
285            currentBuffer = ByteBuffer.allocate(nextFrameSize + 4);
286            currentBuffer.putInt(nextFrameSize);
287
288        } else {
289
290            // If its all in one read then we can just take it all, otherwise take only
291            // the current frame size and the next iteration starts a new command.
292            if (currentBuffer.remaining() >= plain.remaining()) {
293                currentBuffer.put(plain);
294            } else {
295                byte[] fill = new byte[currentBuffer.remaining()];
296                plain.get(fill);
297                currentBuffer.put(fill);
298            }
299
300            // Either we have enough data for a new command or we have to wait for some more.
301            if (currentBuffer.hasRemaining()) {
302                return;
303            } else {
304                currentBuffer.flip();
305                Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer)));
306                doConsume(command);
307                nextFrameSize = -1;
308                currentBuffer = null;
309            }
310        }
311    }
312
313    protected int secureRead(ByteBuffer plain) throws Exception {
314
315        if (!(inputBuffer.position() != 0 && inputBuffer.hasRemaining()) || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
316            int bytesRead = channel.read(inputBuffer);
317
318            if (bytesRead == 0 && !(sslEngine.getHandshakeStatus().equals(SSLEngineResult.HandshakeStatus.NEED_UNWRAP))) {
319                return 0;
320            }
321
322            if (bytesRead == -1) {
323                sslEngine.closeInbound();
324                if (inputBuffer.position() == 0 || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
325                    return -1;
326                }
327            }
328        }
329
330        plain.clear();
331
332        inputBuffer.flip();
333        SSLEngineResult res;
334        do {
335            res = sslEngine.unwrap(inputBuffer, plain);
336        } while (res.getStatus() == SSLEngineResult.Status.OK && res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP
337                && res.bytesProduced() == 0);
338
339        if (res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED) {
340            finishHandshake();
341        }
342
343        status = res.getStatus();
344        handshakeStatus = res.getHandshakeStatus();
345
346        // TODO deal with BUFFER_OVERFLOW
347
348        if (status == SSLEngineResult.Status.CLOSED) {
349            sslEngine.closeInbound();
350            return -1;
351        }
352
353        inputBuffer.compact();
354        plain.flip();
355
356        return plain.remaining();
357    }
358
359    protected void doHandshake() throws Exception {
360        handshakeInProgress = true;
361        Selector selector = null;
362        SelectionKey key = null;
363        boolean readable = true;
364        try {
365            while (true) {
366                HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
367                switch (handshakeStatus) {
368                    case NEED_UNWRAP:
369                        if (readable) {
370                            secureRead(ByteBuffer.allocate(sslSession.getApplicationBufferSize()));
371                        }
372                        if (this.status == SSLEngineResult.Status.BUFFER_UNDERFLOW) {
373                            long now = System.currentTimeMillis();
374                            if (selector == null) {
375                                selector = Selector.open();
376                                key = channel.register(selector, SelectionKey.OP_READ);
377                            } else {
378                                key.interestOps(SelectionKey.OP_READ);
379                            }
380                            int keyCount = selector.select(this.getSoTimeout());
381                            if (keyCount == 0 && this.getSoTimeout() > 0 && ((System.currentTimeMillis() - now) >= this.getSoTimeout())) {
382                                throw new SocketTimeoutException("Timeout during handshake");
383                            }
384                            readable = key.isReadable();
385                        }
386                        break;
387                    case NEED_TASK:
388                        Runnable task;
389                        while ((task = sslEngine.getDelegatedTask()) != null) {
390                            task.run();
391                        }
392                        break;
393                    case NEED_WRAP:
394                        ((NIOOutputStream) buffOut).write(ByteBuffer.allocate(0));
395                        break;
396                    case FINISHED:
397                    case NOT_HANDSHAKING:
398                        finishHandshake();
399                        return;
400                }
401            }
402        } finally {
403            if (key!=null) try {key.cancel();} catch (Exception ignore) {}
404            if (selector!=null) try {selector.close();} catch (Exception ignore) {}
405        }
406    }
407
408    @Override
409    protected void doStart() throws Exception {
410        taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task");
411        // no need to init as we can delay that until demand (eg in doHandshake)
412        super.doStart();
413    }
414
415    @Override
416    protected void doStop(ServiceStopper stopper) throws Exception {
417        if (taskRunnerFactory != null) {
418            taskRunnerFactory.shutdownNow();
419            taskRunnerFactory = null;
420        }
421        if (channel != null) {
422            channel.close();
423            channel = null;
424        }
425        super.doStop(stopper);
426    }
427
428    /**
429     * Overriding in order to add the client's certificates to ConnectionInfo Commands.
430     *
431     * @param command
432     *            The Command coming in.
433     */
434    @Override
435    public void doConsume(Object command) {
436        if (command instanceof ConnectionInfo) {
437            ConnectionInfo connectionInfo = (ConnectionInfo) command;
438            connectionInfo.setTransportContext(getPeerCertificates());
439        }
440        super.doConsume(command);
441    }
442
443    /**
444     * @return peer certificate chain associated with the ssl socket
445     */
446    public X509Certificate[] getPeerCertificates() {
447
448        X509Certificate[] clientCertChain = null;
449        try {
450            if (sslEngine.getSession() != null) {
451                clientCertChain = (X509Certificate[]) sslEngine.getSession().getPeerCertificates();
452            }
453        } catch (SSLPeerUnverifiedException e) {
454            if (LOG.isTraceEnabled()) {
455                LOG.trace("Failed to get peer certificates.", e);
456            }
457        }
458
459        return clientCertChain;
460    }
461
462    public boolean isNeedClientAuth() {
463        return needClientAuth;
464    }
465
466    public void setNeedClientAuth(boolean needClientAuth) {
467        this.needClientAuth = needClientAuth;
468    }
469
470    public boolean isWantClientAuth() {
471        return wantClientAuth;
472    }
473
474    public void setWantClientAuth(boolean wantClientAuth) {
475        this.wantClientAuth = wantClientAuth;
476    }
477
478    public String[] getEnabledCipherSuites() {
479        return enabledCipherSuites;
480    }
481
482    public void setEnabledCipherSuites(String[] enabledCipherSuites) {
483        this.enabledCipherSuites = enabledCipherSuites;
484    }
485
486    public String[] getEnabledProtocols() {
487        return enabledProtocols;
488    }
489
490    public void setEnabledProtocols(String[] enabledProtocols) {
491        this.enabledProtocols = enabledProtocols;
492    }
493}