001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.nio;
018
019import java.io.EOFException;
020import java.io.IOException;
021import java.io.InterruptedIOException;
022import java.io.OutputStream;
023import java.nio.ByteBuffer;
024import java.nio.channels.WritableByteChannel;
025
026import javax.net.ssl.SSLEngine;
027
028import org.apache.activemq.transport.tcp.TimeStampStream;
029
030/**
031 * An optimized buffered OutputStream for TCP/IP
032 */
033public class NIOOutputStream extends OutputStream implements TimeStampStream {
034
035    private static final int BUFFER_SIZE = 8196;
036
037    private final WritableByteChannel out;
038    private final byte[] buffer;
039    private final ByteBuffer byteBuffer;
040
041    private int count;
042    private boolean closed;
043    private volatile long writeTimestamp = -1; // concurrent reads of this value
044
045    private SSLEngine engine;
046
047    /**
048     * Constructor
049     *
050     * @param out
051     *        the channel to write data to.
052     */
053    public NIOOutputStream(WritableByteChannel out) {
054        this(out, BUFFER_SIZE);
055    }
056
057    /**
058     * Creates a new buffered output stream to write data to the specified
059     * underlying output stream with the specified buffer size.
060     *
061     * @param out
062     *        the underlying output stream.
063     * @param size
064     *        the buffer size.
065     *
066     * @throws IllegalArgumentException if size <= 0.
067     */
068    public NIOOutputStream(WritableByteChannel out, int size) {
069        this.out = out;
070        if (size <= 0) {
071            throw new IllegalArgumentException("Buffer size <= 0");
072        }
073        buffer = new byte[size];
074        byteBuffer = ByteBuffer.wrap(buffer);
075    }
076
077    /**
078     * write a byte on to the stream
079     *
080     * @param b
081     *        byte to write
082     *
083     * @throws IOException if an error occurs while writing the data.
084     */
085    @Override
086    public void write(int b) throws IOException {
087        checkClosed();
088        if (availableBufferToWrite() < 1) {
089            flush();
090        }
091        buffer[count++] = (byte) b;
092    }
093
094    /**
095     * write a byte array to the stream
096     *
097     * @param b
098     *        the byte buffer
099     * @param off
100     *        the offset into the buffer
101     * @param len
102     *        the length of data to write
103     *
104     * @throws IOException if an error occurs while writing the data.
105     */
106    @Override
107    public void write(byte b[], int off, int len) throws IOException {
108        checkClosed();
109        if (availableBufferToWrite() < len) {
110            flush();
111        }
112        if (buffer.length >= len) {
113            System.arraycopy(b, off, buffer, count, len);
114            count += len;
115        } else {
116            write(ByteBuffer.wrap(b, off, len));
117        }
118    }
119
120    /**
121     * flush the data to the output stream This doesn't call flush on the
122     * underlying OutputStream, because TCP/IP is particularly efficient at doing
123     * this itself ....
124     *
125     * @throws IOException if an error occurs while writing the data.
126     */
127    @Override
128    public void flush() throws IOException {
129        if (count > 0 && out != null) {
130            byteBuffer.position(0);
131            byteBuffer.limit(count);
132            write(byteBuffer);
133            count = 0;
134        }
135    }
136
137    /**
138     * close this stream
139     *
140     * @throws IOException
141     */
142    @Override
143    public void close() throws IOException {
144        super.close();
145        if (engine != null) {
146            engine.closeOutbound();
147        }
148        closed = true;
149    }
150
151    /**
152     * Checks that the stream has not been closed
153     *
154     * @throws IOException
155     */
156    protected void checkClosed() throws IOException {
157        if (closed) {
158            throw new EOFException("Cannot write to the stream any more it has already been closed");
159        }
160    }
161
162    /**
163     * @return the amount free space in the buffer
164     */
165    private int availableBufferToWrite() {
166        return buffer.length - count;
167    }
168
169    protected void write(ByteBuffer data) throws IOException {
170        ByteBuffer plain;
171        if (engine != null) {
172            plain = ByteBuffer.allocate(engine.getSession().getPacketBufferSize());
173            plain.clear();
174            engine.wrap(data, plain);
175            plain.flip();
176        } else {
177            plain = data;
178        }
179
180        int remaining = plain.remaining();
181        long delay = 1;
182        int lastWriteSize = -1;
183        try {
184            writeTimestamp = System.currentTimeMillis();
185            while (remaining > 0) {
186
187                // We may need to do a little bit of sleeping to avoid a busy
188                // loop. Slow down if no data was written out..
189                if (lastWriteSize == 0) {
190                    try {
191                        // Use exponential growth to increase sleep time.
192                        Thread.sleep(delay);
193                        delay *= 2;
194                        if (delay > 1000) {
195                            delay = 1000;
196                        }
197                    } catch (InterruptedException e) {
198                        throw new InterruptedIOException();
199                    }
200                } else {
201                    delay = 1;
202                }
203
204                // Since the write is non-blocking, all the data may not have
205                // been written.
206                lastWriteSize = out.write(plain);
207
208                // if the data buffer was larger than the packet buffer we might
209                // need to wrap more packets until we reach the end of data, but only
210                // when plain has no more space since we are non-blocking and a write
211                // might not have written anything.
212                if (engine != null && data.hasRemaining() && !plain.hasRemaining()) {
213                    plain.clear();
214                    engine.wrap(data, plain);
215                    plain.flip();
216                }
217
218                remaining = plain.remaining();
219            }
220        } finally {
221            writeTimestamp = -1;
222        }
223    }
224
225    /*
226     * (non-Javadoc)
227     *
228     * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
229     */
230    @Override
231    public boolean isWriting() {
232        return writeTimestamp > 0;
233    }
234
235    /*
236     * (non-Javadoc)
237     *
238     * @see
239     * org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
240     */
241    @Override
242    public long getWriteTimestamp() {
243        return writeTimestamp;
244    }
245
246    public void setEngine(SSLEngine engine) {
247        this.engine = engine;
248    }
249}