001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.transport.nio; 019 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.EOFException; 023import java.io.IOException; 024import java.net.Socket; 025import java.net.SocketTimeoutException; 026import java.net.URI; 027import java.net.UnknownHostException; 028import java.nio.ByteBuffer; 029import java.nio.channels.SelectionKey; 030import java.nio.channels.Selector; 031import java.security.cert.X509Certificate; 032 033import javax.net.SocketFactory; 034import javax.net.ssl.SSLContext; 035import javax.net.ssl.SSLEngine; 036import javax.net.ssl.SSLEngineResult; 037import javax.net.ssl.SSLEngineResult.HandshakeStatus; 038import javax.net.ssl.SSLPeerUnverifiedException; 039import javax.net.ssl.SSLSession; 040 041import org.apache.activemq.command.ConnectionInfo; 042import org.apache.activemq.openwire.OpenWireFormat; 043import org.apache.activemq.thread.TaskRunnerFactory; 044import org.apache.activemq.util.IOExceptionSupport; 045import org.apache.activemq.util.ServiceStopper; 046import org.apache.activemq.wireformat.WireFormat; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050public class NIOSSLTransport extends NIOTransport { 051 052 private static final Logger LOG = LoggerFactory.getLogger(NIOSSLTransport.class); 053 054 protected boolean needClientAuth; 055 protected boolean wantClientAuth; 056 protected String[] enabledCipherSuites; 057 protected String[] enabledProtocols; 058 059 protected SSLContext sslContext; 060 protected SSLEngine sslEngine; 061 protected SSLSession sslSession; 062 063 protected volatile boolean handshakeInProgress = false; 064 protected SSLEngineResult.Status status = null; 065 protected SSLEngineResult.HandshakeStatus handshakeStatus = null; 066 protected TaskRunnerFactory taskRunnerFactory; 067 068 public NIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { 069 super(wireFormat, socketFactory, remoteLocation, localLocation); 070 } 071 072 public NIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException { 073 super(wireFormat, socket); 074 } 075 076 public void setSslContext(SSLContext sslContext) { 077 this.sslContext = sslContext; 078 } 079 080 @Override 081 protected void initializeStreams() throws IOException { 082 NIOOutputStream outputStream = null; 083 try { 084 channel = socket.getChannel(); 085 channel.configureBlocking(false); 086 087 if (sslContext == null) { 088 sslContext = SSLContext.getDefault(); 089 } 090 091 String remoteHost = null; 092 int remotePort = -1; 093 094 try { 095 URI remoteAddress = new URI(this.getRemoteAddress()); 096 remoteHost = remoteAddress.getHost(); 097 remotePort = remoteAddress.getPort(); 098 } catch (Exception e) { 099 } 100 101 // initialize engine, the initial sslSession we get will need to be 102 // updated once the ssl handshake process is completed. 103 if (remoteHost != null && remotePort != -1) { 104 sslEngine = sslContext.createSSLEngine(remoteHost, remotePort); 105 } else { 106 sslEngine = sslContext.createSSLEngine(); 107 } 108 109 sslEngine.setUseClientMode(false); 110 if (enabledCipherSuites != null) { 111 sslEngine.setEnabledCipherSuites(enabledCipherSuites); 112 } 113 114 if (enabledProtocols != null) { 115 sslEngine.setEnabledProtocols(enabledProtocols); 116 } 117 118 if (wantClientAuth) { 119 sslEngine.setWantClientAuth(wantClientAuth); 120 } 121 122 if (needClientAuth) { 123 sslEngine.setNeedClientAuth(needClientAuth); 124 } 125 126 sslSession = sslEngine.getSession(); 127 128 inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize()); 129 inputBuffer.clear(); 130 131 outputStream = new NIOOutputStream(channel); 132 outputStream.setEngine(sslEngine); 133 this.dataOut = new DataOutputStream(outputStream); 134 this.buffOut = outputStream; 135 sslEngine.beginHandshake(); 136 handshakeStatus = sslEngine.getHandshakeStatus(); 137 doHandshake(); 138 } catch (Exception e) { 139 try { 140 if(outputStream != null) { 141 outputStream.close(); 142 } 143 super.closeStreams(); 144 } catch (Exception ex) {} 145 throw new IOException(e); 146 } 147 } 148 149 protected void finishHandshake() throws Exception { 150 if (handshakeInProgress) { 151 handshakeInProgress = false; 152 nextFrameSize = -1; 153 154 // Once handshake completes we need to ask for the now real sslSession 155 // otherwise the session would return 'SSL_NULL_WITH_NULL_NULL' for the 156 // cipher suite. 157 sslSession = sslEngine.getSession(); 158 159 // listen for events telling us when the socket is readable. 160 selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() { 161 @Override 162 public void onSelect(SelectorSelection selection) { 163 serviceRead(); 164 } 165 166 @Override 167 public void onError(SelectorSelection selection, Throwable error) { 168 if (error instanceof IOException) { 169 onException((IOException) error); 170 } else { 171 onException(IOExceptionSupport.create(error)); 172 } 173 } 174 }); 175 } 176 } 177 178 @Override 179 protected void serviceRead() { 180 try { 181 if (handshakeInProgress) { 182 doHandshake(); 183 } 184 185 ByteBuffer plain = ByteBuffer.allocate(sslSession.getApplicationBufferSize()); 186 plain.position(plain.limit()); 187 188 while (true) { 189 if (!plain.hasRemaining()) { 190 191 int readCount = secureRead(plain); 192 193 if (readCount == 0) { 194 break; 195 } 196 197 // channel is closed, cleanup 198 if (readCount == -1) { 199 onException(new EOFException()); 200 selection.close(); 201 break; 202 } 203 204 receiveCounter += readCount; 205 } 206 207 if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) { 208 processCommand(plain); 209 } 210 } 211 } catch (IOException e) { 212 onException(e); 213 } catch (Throwable e) { 214 onException(IOExceptionSupport.create(e)); 215 } 216 } 217 218 protected void processCommand(ByteBuffer plain) throws Exception { 219 220 // Are we waiting for the next Command or are we building on the current one 221 if (nextFrameSize == -1) { 222 223 // We can get small packets that don't give us enough for the frame size 224 // so allocate enough for the initial size value and 225 if (plain.remaining() < Integer.SIZE) { 226 if (currentBuffer == null) { 227 currentBuffer = ByteBuffer.allocate(4); 228 } 229 230 // Go until we fill the integer sized current buffer. 231 while (currentBuffer.hasRemaining() && plain.hasRemaining()) { 232 currentBuffer.put(plain.get()); 233 } 234 235 // Didn't we get enough yet to figure out next frame size. 236 if (currentBuffer.hasRemaining()) { 237 return; 238 } else { 239 currentBuffer.flip(); 240 nextFrameSize = currentBuffer.getInt(); 241 } 242 243 } else { 244 245 // Either we are completing a previous read of the next frame size or its 246 // fully contained in plain already. 247 if (currentBuffer != null) { 248 249 // Finish the frame size integer read and get from the current buffer. 250 while (currentBuffer.hasRemaining()) { 251 currentBuffer.put(plain.get()); 252 } 253 254 currentBuffer.flip(); 255 nextFrameSize = currentBuffer.getInt(); 256 257 } else { 258 nextFrameSize = plain.getInt(); 259 } 260 } 261 262 if (wireFormat instanceof OpenWireFormat) { 263 long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize(); 264 if (nextFrameSize > maxFrameSize) { 265 throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + 266 " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); 267 } 268 } 269 270 // now we got the data, lets reallocate and store the size for the marshaler. 271 // if there's more data in plain, then the next call will start processing it. 272 currentBuffer = ByteBuffer.allocate(nextFrameSize + 4); 273 currentBuffer.putInt(nextFrameSize); 274 275 } else { 276 277 // If its all in one read then we can just take it all, otherwise take only 278 // the current frame size and the next iteration starts a new command. 279 if (currentBuffer.remaining() >= plain.remaining()) { 280 currentBuffer.put(plain); 281 } else { 282 byte[] fill = new byte[currentBuffer.remaining()]; 283 plain.get(fill); 284 currentBuffer.put(fill); 285 } 286 287 // Either we have enough data for a new command or we have to wait for some more. 288 if (currentBuffer.hasRemaining()) { 289 return; 290 } else { 291 currentBuffer.flip(); 292 Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer))); 293 doConsume(command); 294 nextFrameSize = -1; 295 currentBuffer = null; 296 } 297 } 298 } 299 300 protected int secureRead(ByteBuffer plain) throws Exception { 301 302 if (!(inputBuffer.position() != 0 && inputBuffer.hasRemaining()) || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) { 303 int bytesRead = channel.read(inputBuffer); 304 305 if (bytesRead == 0 && !(sslEngine.getHandshakeStatus().equals(SSLEngineResult.HandshakeStatus.NEED_UNWRAP))) { 306 return 0; 307 } 308 309 if (bytesRead == -1) { 310 sslEngine.closeInbound(); 311 if (inputBuffer.position() == 0 || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) { 312 return -1; 313 } 314 } 315 } 316 317 plain.clear(); 318 319 inputBuffer.flip(); 320 SSLEngineResult res; 321 do { 322 res = sslEngine.unwrap(inputBuffer, plain); 323 } while (res.getStatus() == SSLEngineResult.Status.OK && res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP 324 && res.bytesProduced() == 0); 325 326 if (res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED) { 327 finishHandshake(); 328 } 329 330 status = res.getStatus(); 331 handshakeStatus = res.getHandshakeStatus(); 332 333 // TODO deal with BUFFER_OVERFLOW 334 335 if (status == SSLEngineResult.Status.CLOSED) { 336 sslEngine.closeInbound(); 337 return -1; 338 } 339 340 inputBuffer.compact(); 341 plain.flip(); 342 343 return plain.remaining(); 344 } 345 346 protected void doHandshake() throws Exception { 347 handshakeInProgress = true; 348 Selector selector = null; 349 SelectionKey key = null; 350 boolean readable = true; 351 try { 352 while (true) { 353 HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus(); 354 switch (handshakeStatus) { 355 case NEED_UNWRAP: 356 if (readable) { 357 secureRead(ByteBuffer.allocate(sslSession.getApplicationBufferSize())); 358 } 359 if (this.status == SSLEngineResult.Status.BUFFER_UNDERFLOW) { 360 long now = System.currentTimeMillis(); 361 if (selector == null) { 362 selector = Selector.open(); 363 key = channel.register(selector, SelectionKey.OP_READ); 364 } else { 365 key.interestOps(SelectionKey.OP_READ); 366 } 367 int keyCount = selector.select(this.getSoTimeout()); 368 if (keyCount == 0 && this.getSoTimeout() > 0 && ((System.currentTimeMillis() - now) >= this.getSoTimeout())) { 369 throw new SocketTimeoutException("Timeout during handshake"); 370 } 371 readable = key.isReadable(); 372 } 373 break; 374 case NEED_TASK: 375 Runnable task; 376 while ((task = sslEngine.getDelegatedTask()) != null) { 377 task.run(); 378 } 379 break; 380 case NEED_WRAP: 381 ((NIOOutputStream) buffOut).write(ByteBuffer.allocate(0)); 382 break; 383 case FINISHED: 384 case NOT_HANDSHAKING: 385 finishHandshake(); 386 return; 387 } 388 } 389 } finally { 390 if (key!=null) try {key.cancel();} catch (Exception ignore) {} 391 if (selector!=null) try {selector.close();} catch (Exception ignore) {} 392 } 393 } 394 395 @Override 396 protected void doStart() throws Exception { 397 taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task"); 398 // no need to init as we can delay that until demand (eg in doHandshake) 399 super.doStart(); 400 } 401 402 @Override 403 protected void doStop(ServiceStopper stopper) throws Exception { 404 if (taskRunnerFactory != null) { 405 taskRunnerFactory.shutdownNow(); 406 taskRunnerFactory = null; 407 } 408 if (channel != null) { 409 channel.close(); 410 channel = null; 411 } 412 super.doStop(stopper); 413 } 414 415 /** 416 * Overriding in order to add the client's certificates to ConnectionInfo Commands. 417 * 418 * @param command 419 * The Command coming in. 420 */ 421 @Override 422 public void doConsume(Object command) { 423 if (command instanceof ConnectionInfo) { 424 ConnectionInfo connectionInfo = (ConnectionInfo) command; 425 connectionInfo.setTransportContext(getPeerCertificates()); 426 } 427 super.doConsume(command); 428 } 429 430 /** 431 * @return peer certificate chain associated with the ssl socket 432 */ 433 public X509Certificate[] getPeerCertificates() { 434 435 X509Certificate[] clientCertChain = null; 436 try { 437 if (sslEngine.getSession() != null) { 438 clientCertChain = (X509Certificate[]) sslEngine.getSession().getPeerCertificates(); 439 } 440 } catch (SSLPeerUnverifiedException e) { 441 if (LOG.isTraceEnabled()) { 442 LOG.trace("Failed to get peer certificates.", e); 443 } 444 } 445 446 return clientCertChain; 447 } 448 449 public boolean isNeedClientAuth() { 450 return needClientAuth; 451 } 452 453 public void setNeedClientAuth(boolean needClientAuth) { 454 this.needClientAuth = needClientAuth; 455 } 456 457 public boolean isWantClientAuth() { 458 return wantClientAuth; 459 } 460 461 public void setWantClientAuth(boolean wantClientAuth) { 462 this.wantClientAuth = wantClientAuth; 463 } 464 465 public String[] getEnabledCipherSuites() { 466 return enabledCipherSuites; 467 } 468 469 public void setEnabledCipherSuites(String[] enabledCipherSuites) { 470 this.enabledCipherSuites = enabledCipherSuites; 471 } 472 473 public String[] getEnabledProtocols() { 474 return enabledProtocols; 475 } 476 477 public void setEnabledProtocols(String[] enabledProtocols) { 478 this.enabledProtocols = enabledProtocols; 479 } 480}