001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.transport.nio; 019 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.io.EOFException; 023import java.io.IOException; 024import java.net.Socket; 025import java.net.SocketTimeoutException; 026import java.net.URI; 027import java.net.UnknownHostException; 028import java.nio.ByteBuffer; 029import java.nio.channels.SelectionKey; 030import java.nio.channels.Selector; 031import java.security.cert.X509Certificate; 032 033import javax.net.SocketFactory; 034import javax.net.ssl.SSLContext; 035import javax.net.ssl.SSLEngine; 036import javax.net.ssl.SSLEngineResult; 037import javax.net.ssl.SSLEngineResult.HandshakeStatus; 038import javax.net.ssl.SSLPeerUnverifiedException; 039import javax.net.ssl.SSLSession; 040 041import org.apache.activemq.command.ConnectionInfo; 042import org.apache.activemq.openwire.OpenWireFormat; 043import org.apache.activemq.thread.TaskRunnerFactory; 044import org.apache.activemq.util.IOExceptionSupport; 045import org.apache.activemq.util.ServiceStopper; 046import org.apache.activemq.wireformat.WireFormat; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050public class NIOSSLTransport extends NIOTransport { 051 052 private static final Logger LOG = LoggerFactory.getLogger(NIOSSLTransport.class); 053 054 protected boolean needClientAuth; 055 protected boolean wantClientAuth; 056 protected String[] enabledCipherSuites; 057 protected String[] enabledProtocols; 058 059 protected SSLContext sslContext; 060 protected SSLEngine sslEngine; 061 protected SSLSession sslSession; 062 063 protected volatile boolean handshakeInProgress = false; 064 protected SSLEngineResult.Status status = null; 065 protected SSLEngineResult.HandshakeStatus handshakeStatus = null; 066 protected TaskRunnerFactory taskRunnerFactory; 067 068 public NIOSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException { 069 super(wireFormat, socketFactory, remoteLocation, localLocation); 070 } 071 072 public NIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException { 073 super(wireFormat, socket); 074 } 075 076 public void setSslContext(SSLContext sslContext) { 077 this.sslContext = sslContext; 078 } 079 080 @Override 081 protected void initializeStreams() throws IOException { 082 NIOOutputStream outputStream = null; 083 try { 084 channel = socket.getChannel(); 085 channel.configureBlocking(false); 086 087 if (sslContext == null) { 088 sslContext = SSLContext.getDefault(); 089 } 090 091 String remoteHost = null; 092 int remotePort = -1; 093 094 try { 095 URI remoteAddress = new URI(this.getRemoteAddress()); 096 remoteHost = remoteAddress.getHost(); 097 remotePort = remoteAddress.getPort(); 098 } catch (Exception e) { 099 } 100 101 // initialize engine, the initial sslSession we get will need to be 102 // updated once the ssl handshake process is completed. 103 if (remoteHost != null && remotePort != -1) { 104 sslEngine = sslContext.createSSLEngine(remoteHost, remotePort); 105 } else { 106 sslEngine = sslContext.createSSLEngine(); 107 } 108 109 sslEngine.setUseClientMode(false); 110 if (enabledCipherSuites != null) { 111 sslEngine.setEnabledCipherSuites(enabledCipherSuites); 112 } 113 114 if (enabledProtocols != null) { 115 sslEngine.setEnabledProtocols(enabledProtocols); 116 } 117 118 if (wantClientAuth) { 119 sslEngine.setWantClientAuth(wantClientAuth); 120 } 121 122 if (needClientAuth) { 123 sslEngine.setNeedClientAuth(needClientAuth); 124 } 125 126 sslSession = sslEngine.getSession(); 127 128 inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize()); 129 inputBuffer.clear(); 130 131 outputStream = new NIOOutputStream(channel); 132 outputStream.setEngine(sslEngine); 133 this.dataOut = new DataOutputStream(outputStream); 134 this.buffOut = outputStream; 135 sslEngine.beginHandshake(); 136 handshakeStatus = sslEngine.getHandshakeStatus(); 137 doHandshake(); 138 139 if (inputBuffer.position() > 0 && inputBuffer.hasRemaining() && wireFormat instanceof OpenWireFormat) { 140 // can only deal with this pending openwire command after we have sent negotiation start command 141 // with default wireformat options from this thread. 142 // org.apache.activemq.transport.WireFormatNegotiator.sendWireFormat() 143 // read will block till wireformat is sent. 144 taskRunnerFactory.execute(new Runnable() { 145 @Override 146 public void run() { 147 serviceRead(); 148 } 149 }); 150 } 151 } catch (Exception e) { 152 try { 153 if(outputStream != null) { 154 outputStream.close(); 155 } 156 super.closeStreams(); 157 } catch (Exception ex) {} 158 throw new IOException(e); 159 } 160 } 161 162 protected void finishHandshake() throws Exception { 163 if (handshakeInProgress) { 164 handshakeInProgress = false; 165 nextFrameSize = -1; 166 167 // Once handshake completes we need to ask for the now real sslSession 168 // otherwise the session would return 'SSL_NULL_WITH_NULL_NULL' for the 169 // cipher suite. 170 sslSession = sslEngine.getSession(); 171 172 // listen for events telling us when the socket is readable. 173 selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() { 174 @Override 175 public void onSelect(SelectorSelection selection) { 176 serviceRead(); 177 } 178 179 @Override 180 public void onError(SelectorSelection selection, Throwable error) { 181 if (error instanceof IOException) { 182 onException((IOException) error); 183 } else { 184 onException(IOExceptionSupport.create(error)); 185 } 186 } 187 }); 188 } 189 } 190 191 @Override 192 protected void serviceRead() { 193 try { 194 if (handshakeInProgress) { 195 doHandshake(); 196 } 197 198 ByteBuffer plain = ByteBuffer.allocate(sslSession.getApplicationBufferSize()); 199 plain.position(plain.limit()); 200 201 while (true) { 202 if (!plain.hasRemaining()) { 203 204 int readCount = secureRead(plain); 205 206 if (readCount == 0) { 207 break; 208 } 209 210 // channel is closed, cleanup 211 if (readCount == -1) { 212 onException(new EOFException()); 213 selection.close(); 214 break; 215 } 216 217 receiveCounter += readCount; 218 } 219 220 if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) { 221 processCommand(plain); 222 } 223 } 224 } catch (IOException e) { 225 onException(e); 226 } catch (Throwable e) { 227 onException(IOExceptionSupport.create(e)); 228 } 229 } 230 231 protected void processCommand(ByteBuffer plain) throws Exception { 232 233 // Are we waiting for the next Command or are we building on the current one 234 if (nextFrameSize == -1) { 235 236 // We can get small packets that don't give us enough for the frame size 237 // so allocate enough for the initial size value and 238 if (plain.remaining() < Integer.SIZE) { 239 if (currentBuffer == null) { 240 currentBuffer = ByteBuffer.allocate(4); 241 } 242 243 // Go until we fill the integer sized current buffer. 244 while (currentBuffer.hasRemaining() && plain.hasRemaining()) { 245 currentBuffer.put(plain.get()); 246 } 247 248 // Didn't we get enough yet to figure out next frame size. 249 if (currentBuffer.hasRemaining()) { 250 return; 251 } else { 252 currentBuffer.flip(); 253 nextFrameSize = currentBuffer.getInt(); 254 } 255 256 } else { 257 258 // Either we are completing a previous read of the next frame size or its 259 // fully contained in plain already. 260 if (currentBuffer != null) { 261 262 // Finish the frame size integer read and get from the current buffer. 263 while (currentBuffer.hasRemaining()) { 264 currentBuffer.put(plain.get()); 265 } 266 267 currentBuffer.flip(); 268 nextFrameSize = currentBuffer.getInt(); 269 270 } else { 271 nextFrameSize = plain.getInt(); 272 } 273 } 274 275 if (wireFormat instanceof OpenWireFormat) { 276 long maxFrameSize = ((OpenWireFormat) wireFormat).getMaxFrameSize(); 277 if (nextFrameSize > maxFrameSize) { 278 throw new IOException("Frame size of " + (nextFrameSize / (1024 * 1024)) + 279 " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB"); 280 } 281 } 282 283 // now we got the data, lets reallocate and store the size for the marshaler. 284 // if there's more data in plain, then the next call will start processing it. 285 currentBuffer = ByteBuffer.allocate(nextFrameSize + 4); 286 currentBuffer.putInt(nextFrameSize); 287 288 } else { 289 290 // If its all in one read then we can just take it all, otherwise take only 291 // the current frame size and the next iteration starts a new command. 292 if (currentBuffer.remaining() >= plain.remaining()) { 293 currentBuffer.put(plain); 294 } else { 295 byte[] fill = new byte[currentBuffer.remaining()]; 296 plain.get(fill); 297 currentBuffer.put(fill); 298 } 299 300 // Either we have enough data for a new command or we have to wait for some more. 301 if (currentBuffer.hasRemaining()) { 302 return; 303 } else { 304 currentBuffer.flip(); 305 Object command = wireFormat.unmarshal(new DataInputStream(new NIOInputStream(currentBuffer))); 306 doConsume(command); 307 nextFrameSize = -1; 308 currentBuffer = null; 309 } 310 } 311 } 312 313 protected int secureRead(ByteBuffer plain) throws Exception { 314 315 if (!(inputBuffer.position() != 0 && inputBuffer.hasRemaining()) || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) { 316 int bytesRead = channel.read(inputBuffer); 317 318 if (bytesRead == 0 && !(sslEngine.getHandshakeStatus().equals(SSLEngineResult.HandshakeStatus.NEED_UNWRAP))) { 319 return 0; 320 } 321 322 if (bytesRead == -1) { 323 sslEngine.closeInbound(); 324 if (inputBuffer.position() == 0 || status == SSLEngineResult.Status.BUFFER_UNDERFLOW) { 325 return -1; 326 } 327 } 328 } 329 330 plain.clear(); 331 332 inputBuffer.flip(); 333 SSLEngineResult res; 334 do { 335 res = sslEngine.unwrap(inputBuffer, plain); 336 } while (res.getStatus() == SSLEngineResult.Status.OK && res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NEED_UNWRAP 337 && res.bytesProduced() == 0); 338 339 if (res.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED) { 340 finishHandshake(); 341 } 342 343 status = res.getStatus(); 344 handshakeStatus = res.getHandshakeStatus(); 345 346 // TODO deal with BUFFER_OVERFLOW 347 348 if (status == SSLEngineResult.Status.CLOSED) { 349 sslEngine.closeInbound(); 350 return -1; 351 } 352 353 inputBuffer.compact(); 354 plain.flip(); 355 356 return plain.remaining(); 357 } 358 359 protected void doHandshake() throws Exception { 360 handshakeInProgress = true; 361 Selector selector = null; 362 SelectionKey key = null; 363 boolean readable = true; 364 try { 365 while (true) { 366 HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus(); 367 switch (handshakeStatus) { 368 case NEED_UNWRAP: 369 if (readable) { 370 secureRead(ByteBuffer.allocate(sslSession.getApplicationBufferSize())); 371 } 372 if (this.status == SSLEngineResult.Status.BUFFER_UNDERFLOW) { 373 long now = System.currentTimeMillis(); 374 if (selector == null) { 375 selector = Selector.open(); 376 key = channel.register(selector, SelectionKey.OP_READ); 377 } else { 378 key.interestOps(SelectionKey.OP_READ); 379 } 380 int keyCount = selector.select(this.getSoTimeout()); 381 if (keyCount == 0 && this.getSoTimeout() > 0 && ((System.currentTimeMillis() - now) >= this.getSoTimeout())) { 382 throw new SocketTimeoutException("Timeout during handshake"); 383 } 384 readable = key.isReadable(); 385 } 386 break; 387 case NEED_TASK: 388 Runnable task; 389 while ((task = sslEngine.getDelegatedTask()) != null) { 390 task.run(); 391 } 392 break; 393 case NEED_WRAP: 394 ((NIOOutputStream) buffOut).write(ByteBuffer.allocate(0)); 395 break; 396 case FINISHED: 397 case NOT_HANDSHAKING: 398 finishHandshake(); 399 return; 400 } 401 } 402 } finally { 403 if (key!=null) try {key.cancel();} catch (Exception ignore) {} 404 if (selector!=null) try {selector.close();} catch (Exception ignore) {} 405 } 406 } 407 408 @Override 409 protected void doStart() throws Exception { 410 taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task"); 411 // no need to init as we can delay that until demand (eg in doHandshake) 412 super.doStart(); 413 } 414 415 @Override 416 protected void doStop(ServiceStopper stopper) throws Exception { 417 if (taskRunnerFactory != null) { 418 taskRunnerFactory.shutdownNow(); 419 taskRunnerFactory = null; 420 } 421 if (channel != null) { 422 channel.close(); 423 channel = null; 424 } 425 super.doStop(stopper); 426 } 427 428 /** 429 * Overriding in order to add the client's certificates to ConnectionInfo Commands. 430 * 431 * @param command 432 * The Command coming in. 433 */ 434 @Override 435 public void doConsume(Object command) { 436 if (command instanceof ConnectionInfo) { 437 ConnectionInfo connectionInfo = (ConnectionInfo) command; 438 connectionInfo.setTransportContext(getPeerCertificates()); 439 } 440 super.doConsume(command); 441 } 442 443 /** 444 * @return peer certificate chain associated with the ssl socket 445 */ 446 public X509Certificate[] getPeerCertificates() { 447 448 X509Certificate[] clientCertChain = null; 449 try { 450 if (sslEngine.getSession() != null) { 451 clientCertChain = (X509Certificate[]) sslEngine.getSession().getPeerCertificates(); 452 } 453 } catch (SSLPeerUnverifiedException e) { 454 if (LOG.isTraceEnabled()) { 455 LOG.trace("Failed to get peer certificates.", e); 456 } 457 } 458 459 return clientCertChain; 460 } 461 462 public boolean isNeedClientAuth() { 463 return needClientAuth; 464 } 465 466 public void setNeedClientAuth(boolean needClientAuth) { 467 this.needClientAuth = needClientAuth; 468 } 469 470 public boolean isWantClientAuth() { 471 return wantClientAuth; 472 } 473 474 public void setWantClientAuth(boolean wantClientAuth) { 475 this.wantClientAuth = wantClientAuth; 476 } 477 478 public String[] getEnabledCipherSuites() { 479 return enabledCipherSuites; 480 } 481 482 public void setEnabledCipherSuites(String[] enabledCipherSuites) { 483 this.enabledCipherSuites = enabledCipherSuites; 484 } 485 486 public String[] getEnabledProtocols() { 487 return enabledProtocols; 488 } 489 490 public void setEnabledProtocols(String[] enabledProtocols) { 491 this.enabledProtocols = enabledProtocols; 492 } 493}