001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.nio; 018 019import java.io.EOFException; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.io.OutputStream; 023import java.nio.ByteBuffer; 024import java.nio.channels.WritableByteChannel; 025 026import javax.net.ssl.SSLEngine; 027 028import org.apache.activemq.transport.tcp.TimeStampStream; 029 030/** 031 * An optimized buffered OutputStream for TCP/IP 032 */ 033public class NIOOutputStream extends OutputStream implements TimeStampStream { 034 035 private static final int BUFFER_SIZE = 8196; 036 037 private final WritableByteChannel out; 038 private final byte[] buffer; 039 private final ByteBuffer byteBuffer; 040 041 private int count; 042 private boolean closed; 043 private volatile long writeTimestamp = -1; // concurrent reads of this value 044 045 private SSLEngine engine; 046 047 /** 048 * Constructor 049 * 050 * @param out 051 * the channel to write data to. 052 */ 053 public NIOOutputStream(WritableByteChannel out) { 054 this(out, BUFFER_SIZE); 055 } 056 057 /** 058 * Creates a new buffered output stream to write data to the specified 059 * underlying output stream with the specified buffer size. 060 * 061 * @param out 062 * the underlying output stream. 063 * @param size 064 * the buffer size. 065 * 066 * @throws IllegalArgumentException if size <= 0. 067 */ 068 public NIOOutputStream(WritableByteChannel out, int size) { 069 this.out = out; 070 if (size <= 0) { 071 throw new IllegalArgumentException("Buffer size <= 0"); 072 } 073 buffer = new byte[size]; 074 byteBuffer = ByteBuffer.wrap(buffer); 075 } 076 077 /** 078 * write a byte on to the stream 079 * 080 * @param b 081 * byte to write 082 * 083 * @throws IOException if an error occurs while writing the data. 084 */ 085 @Override 086 public void write(int b) throws IOException { 087 checkClosed(); 088 if (availableBufferToWrite() < 1) { 089 flush(); 090 } 091 buffer[count++] = (byte) b; 092 } 093 094 /** 095 * write a byte array to the stream 096 * 097 * @param b 098 * the byte buffer 099 * @param off 100 * the offset into the buffer 101 * @param len 102 * the length of data to write 103 * 104 * @throws IOException if an error occurs while writing the data. 105 */ 106 @Override 107 public void write(byte b[], int off, int len) throws IOException { 108 checkClosed(); 109 if (availableBufferToWrite() < len) { 110 flush(); 111 } 112 if (buffer.length >= len) { 113 System.arraycopy(b, off, buffer, count, len); 114 count += len; 115 } else { 116 write(ByteBuffer.wrap(b, off, len)); 117 } 118 } 119 120 /** 121 * flush the data to the output stream This doesn't call flush on the 122 * underlying OutputStream, because TCP/IP is particularly efficient at doing 123 * this itself .... 124 * 125 * @throws IOException if an error occurs while writing the data. 126 */ 127 @Override 128 public void flush() throws IOException { 129 if (count > 0 && out != null) { 130 byteBuffer.position(0); 131 byteBuffer.limit(count); 132 write(byteBuffer); 133 count = 0; 134 } 135 } 136 137 /** 138 * close this stream 139 * 140 * @throws IOException 141 */ 142 @Override 143 public void close() throws IOException { 144 super.close(); 145 if (engine != null) { 146 engine.closeOutbound(); 147 } 148 closed = true; 149 } 150 151 /** 152 * Checks that the stream has not been closed 153 * 154 * @throws IOException 155 */ 156 protected void checkClosed() throws IOException { 157 if (closed) { 158 throw new EOFException("Cannot write to the stream any more it has already been closed"); 159 } 160 } 161 162 /** 163 * @return the amount free space in the buffer 164 */ 165 private int availableBufferToWrite() { 166 return buffer.length - count; 167 } 168 169 protected void write(ByteBuffer data) throws IOException { 170 ByteBuffer plain; 171 if (engine != null) { 172 plain = ByteBuffer.allocate(engine.getSession().getPacketBufferSize()); 173 plain.clear(); 174 engine.wrap(data, plain); 175 plain.flip(); 176 } else { 177 plain = data; 178 } 179 180 int remaining = plain.remaining(); 181 long delay = 1; 182 int lastWriteSize = -1; 183 try { 184 writeTimestamp = System.currentTimeMillis(); 185 while (remaining > 0) { 186 187 // We may need to do a little bit of sleeping to avoid a busy 188 // loop. Slow down if no data was written out.. 189 if (lastWriteSize == 0) { 190 try { 191 // Use exponential growth to increase sleep time. 192 Thread.sleep(delay); 193 delay *= 2; 194 if (delay > 1000) { 195 delay = 1000; 196 } 197 } catch (InterruptedException e) { 198 throw new InterruptedIOException(); 199 } 200 } else { 201 delay = 1; 202 } 203 204 // Since the write is non-blocking, all the data may not have 205 // been written. 206 lastWriteSize = out.write(plain); 207 208 // if the data buffer was larger than the packet buffer we might 209 // need to wrap more packets until we reach the end of data, but only 210 // when plain has no more space since we are non-blocking and a write 211 // might not have written anything. 212 if (engine != null && data.hasRemaining() && !plain.hasRemaining()) { 213 plain.clear(); 214 engine.wrap(data, plain); 215 plain.flip(); 216 } 217 218 remaining = plain.remaining(); 219 } 220 } finally { 221 writeTimestamp = -1; 222 } 223 } 224 225 /* 226 * (non-Javadoc) 227 * 228 * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting() 229 */ 230 @Override 231 public boolean isWriting() { 232 return writeTimestamp > 0; 233 } 234 235 /* 236 * (non-Javadoc) 237 * 238 * @see 239 * org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp() 240 */ 241 @Override 242 public long getWriteTimestamp() { 243 return writeTimestamp; 244 } 245 246 public void setEngine(SSLEngine engine) { 247 this.engine = engine; 248 } 249}