001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one or more
004 * contributor license agreements.  See the NOTICE file distributed with
005 * this work for additional information regarding copyright ownership.
006 * The ASF licenses this file to You under the Apache License, Version 2.0
007 * (the "License"); you may not use this file except in compliance with
008 * the License.  You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.activemq.transport.nio;
019
020import java.io.IOException;
021import java.io.InputStream;
022import java.nio.ByteBuffer;
023import java.nio.channels.Channel;
024import java.nio.channels.ClosedChannelException;
025import java.nio.channels.ReadableByteChannel;
026import java.nio.channels.SelectionKey;
027import java.nio.channels.Selector;
028import java.nio.channels.SocketChannel;
029
030/**
031 * Implementation of InputStream using Java NIO channel,direct buffer and
032 * Selector
033 */
034public class NIOBufferedInputStream extends InputStream {
035
036    private final static int BUFFER_SIZE = 8192;
037
038    private SocketChannel sc = null;
039
040    private ByteBuffer bb = null;
041
042    private Selector rs = null;
043
044    public NIOBufferedInputStream(ReadableByteChannel channel, int size)
045            throws ClosedChannelException, IOException {
046
047        if (size <= 0) {
048            throw new IllegalArgumentException("Buffer size <= 0");
049        }
050
051        this.bb = ByteBuffer.allocateDirect(size);
052        this.sc = (SocketChannel) channel;
053
054        this.sc.configureBlocking(false);
055
056        this.rs = Selector.open();
057
058        sc.register(rs, SelectionKey.OP_READ);
059
060        bb.position(0);
061        bb.limit(0);
062    }
063
064    public NIOBufferedInputStream(ReadableByteChannel channel)
065            throws ClosedChannelException, IOException {
066        this(channel, BUFFER_SIZE);
067    }
068
069    public int available() throws IOException {
070        if (!rs.isOpen())
071            throw new IOException("Input Stream Closed");
072
073        return bb.remaining();
074    }
075
076    public void close() throws IOException {
077        if (rs.isOpen()) {
078            rs.close();
079
080            if (sc.isOpen()) {
081                sc.socket().shutdownInput();
082                sc.socket().close();
083            }
084
085            bb = null;
086            sc = null;
087        }
088    }
089
090    public int read() throws IOException {
091        if (!rs.isOpen())
092            throw new IOException("Input Stream Closed");
093
094        if (!bb.hasRemaining()) {
095            try {
096                fill(1);
097            } catch (ClosedChannelException e) {
098                close();
099                return -1;
100            }
101        }
102
103        return (bb.get() & 0xFF);
104    }
105
106    public int read(byte[] b, int off, int len) throws IOException {
107        int bytesCopied = -1;
108
109        if (!rs.isOpen())
110            throw new IOException("Input Stream Closed");
111
112        while (bytesCopied == -1) {
113            if (bb.hasRemaining()) {
114                bytesCopied = (len < bb.remaining() ? len : bb.remaining());
115                bb.get(b, off, bytesCopied);
116            } else {
117                try {
118                    fill(1);
119                } catch (ClosedChannelException e) {
120                    close();
121                    return -1;
122                }
123            }
124        }
125
126        return bytesCopied;
127    }
128
129    public long skip(long n) throws IOException {
130        long skiped = 0;
131
132        if (!rs.isOpen())
133            throw new IOException("Input Stream Closed");
134
135        while (n > 0) {
136            if (n <= bb.remaining()) {
137                skiped += n;
138                bb.position(bb.position() + (int) n);
139                n = 0;
140            } else {
141                skiped += bb.remaining();
142                n -= bb.remaining();
143
144                bb.position(bb.limit());
145
146                try {
147                    fill((int) n);
148                } catch (ClosedChannelException e) {
149                    close();
150                    return skiped;
151                }
152            }
153        }
154
155        return skiped;
156    }
157
158    private void fill(int n) throws IOException, ClosedChannelException {
159        int bytesRead = -1;
160
161        if ((n <= 0) || (n <= bb.remaining()))
162            return;
163
164        bb.compact();
165
166        n = (bb.remaining() < n ? bb.remaining() : n);
167
168        for (;;) {
169            bytesRead = sc.read(bb);
170
171            if (bytesRead == -1)
172                throw new ClosedChannelException();
173
174            n -= bytesRead;
175
176            if (n <= 0)
177                break;
178
179            rs.select(0);
180            rs.selectedKeys().clear();
181        }
182
183        bb.flip();
184    }
185}