001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.tcp;
018
019import java.io.FilterInputStream;
020import java.io.IOException;
021import java.io.InputStream;
022
023/**
024 * An optimized buffered input stream for Tcp
025 * 
026 * 
027 */
028public class TcpBufferedInputStream extends FilterInputStream {
029    private static final int DEFAULT_BUFFER_SIZE = 8192;
030    protected byte internalBuffer[];
031    protected int count;
032    protected int position;
033
034    public TcpBufferedInputStream(InputStream in) {
035        this(in, DEFAULT_BUFFER_SIZE);
036    }
037
038    public TcpBufferedInputStream(InputStream in, int size) {
039        super(in);
040        if (size <= 0) {
041            throw new IllegalArgumentException("Buffer size <= 0");
042        }
043        internalBuffer = new byte[size];
044    }
045
046    protected void fill() throws IOException {
047        byte[] buffer = internalBuffer;
048        count = 0;
049        position = 0;
050        int n = in.read(buffer, position, buffer.length - position);
051        if (n > 0) {
052            count = n + position;
053        }
054    }
055
056    public int read() throws IOException {
057        if (position >= count) {
058            fill();
059            if (position >= count) {
060                return -1;
061            }
062        }
063        return internalBuffer[position++] & 0xff;
064    }
065
066    private int readStream(byte[] b, int off, int len) throws IOException {
067        int avail = count - position;
068        if (avail <= 0) {
069            if (len >= internalBuffer.length) {
070                return in.read(b, off, len);
071            }
072            fill();
073            avail = count - position;
074            if (avail <= 0) {
075                return -1;
076            }
077        }
078        int cnt = (avail < len) ? avail : len;
079        System.arraycopy(internalBuffer, position, b, off, cnt);
080        position += cnt;
081        return cnt;
082    }
083
084    public int read(byte b[], int off, int len) throws IOException {
085        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
086            throw new IndexOutOfBoundsException();
087        } else if (len == 0) {
088            return 0;
089        }
090        int n = 0;
091        for (;;) {
092            int nread = readStream(b, off + n, len - n);
093            if (nread <= 0) {
094                return (n == 0) ? nread : n;
095            }
096            n += nread;
097            if (n >= len) {
098                return n;
099            }
100            // if not closed but no bytes available, return
101            InputStream input = in;
102            if (input != null && input.available() <= 0) {
103                return n;
104            }
105        }
106    }
107
108    public long skip(long n) throws IOException {
109        if (n <= 0) {
110            return 0;
111        }
112        long avail = count - position;
113        if (avail <= 0) {
114            return in.skip(n);
115        }
116        long skipped = (avail < n) ? avail : n;
117        position += skipped;
118        return skipped;
119    }
120
121    public int available() throws IOException {
122        return in.available() + (count - position);
123    }
124
125    public boolean markSupported() {
126        return false;
127    }
128
129    public void close() throws IOException {
130        if (in != null) {
131            in.close();
132        }
133    }
134}