001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp;
018
019import java.io.IOException;
020import java.nio.ByteBuffer;
021
022import org.apache.activemq.transport.amqp.AmqpWireFormat.ResetListener;
023import org.apache.activemq.transport.tcp.TcpTransport;
024import org.apache.activemq.util.IOExceptionSupport;
025import org.fusesource.hawtbuf.Buffer;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/**
030 * State based Frame reader that is used in the NIO based transports where
031 * AMQP frames can come in in partial or overlapping forms.
032 */
033public class AmqpFrameParser {
034
035    private static final Logger LOG = LoggerFactory.getLogger(AmqpFrameParser.class);
036
037    public interface AMQPFrameSink {
038        void onFrame(Object frame);
039    }
040
041    private static final byte AMQP_FRAME_SIZE_BYTES = 4;
042    private static final byte AMQP_HEADER_BYTES = 8;
043
044    private final AMQPFrameSink frameSink;
045
046    private FrameParser currentParser;
047    private AmqpWireFormat wireFormat;
048
049    public AmqpFrameParser(AMQPFrameSink sink) {
050        this.frameSink = sink;
051    }
052
053    public AmqpFrameParser(final TcpTransport transport) {
054        this.frameSink = new AMQPFrameSink() {
055
056            @Override
057            public void onFrame(Object frame) {
058                transport.doConsume(frame);
059            }
060        };
061    }
062
063    public void parse(ByteBuffer incoming) throws Exception {
064
065        if (incoming == null || !incoming.hasRemaining()) {
066            return;
067        }
068
069        if (currentParser == null) {
070            currentParser = initializeHeaderParser();
071        }
072
073        // Parser stack will run until current incoming data has all been consumed.
074        currentParser.parse(incoming);
075    }
076
077    public void reset() {
078        currentParser = initializeHeaderParser();
079    }
080
081    private void validateFrameSize(int frameSize) throws IOException {
082        long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE;
083        if (wireFormat != null) {
084            maxFrameSize = wireFormat.getMaxFrameSize();
085        }
086
087        if (frameSize > maxFrameSize) {
088            throw IOExceptionSupport.createFrameSizeException(frameSize, maxFrameSize);
089        }
090    }
091
092    public void setWireFormat(AmqpWireFormat wireFormat) {
093        this.wireFormat = wireFormat;
094        if (wireFormat != null) {
095            wireFormat.setProtocolResetListener(new ResetListener() {
096
097                @Override
098                public void onProtocolReset() {
099                    reset();
100                }
101            });
102        }
103    }
104
105    public AmqpWireFormat getWireFormat() {
106        return this.wireFormat;
107    }
108
109    //----- Prepare the current frame parser for use -------------------------//
110
111    private FrameParser initializeHeaderParser() {
112        headerReader.reset(AMQP_HEADER_BYTES);
113        return headerReader;
114    }
115
116    private FrameParser initializeFrameLengthParser() {
117        frameSizeReader.reset(AMQP_FRAME_SIZE_BYTES);
118        return frameSizeReader;
119    }
120
121    private FrameParser initializeContentReader(int contentLength) {
122        contentReader.reset(contentLength);
123        return contentReader;
124    }
125
126    //----- Frame parser implementations -------------------------------------//
127
128    private interface FrameParser {
129
130        void parse(ByteBuffer incoming) throws IOException;
131
132        void reset(int nextExpectedReadSize);
133    }
134
135    private final FrameParser headerReader = new FrameParser() {
136
137        private final Buffer header = new Buffer(AMQP_HEADER_BYTES);
138
139        @Override
140        public void parse(ByteBuffer incoming) throws IOException {
141            int length = Math.min(incoming.remaining(), header.length - header.offset);
142
143            incoming.get(header.data, header.offset, length);
144            header.offset += length;
145
146            if (header.offset == AMQP_HEADER_BYTES) {
147                header.reset();
148                AmqpHeader amqpHeader = new AmqpHeader(header.deepCopy(), false);
149                currentParser = initializeFrameLengthParser();
150                frameSink.onFrame(amqpHeader);
151                if (incoming.hasRemaining()) {
152                    currentParser.parse(incoming);
153                }
154            }
155        }
156
157        @Override
158        public void reset(int nextExpectedReadSize) {
159            header.reset();
160        }
161    };
162
163    private final FrameParser frameSizeReader = new FrameParser() {
164
165        private int frameSize;
166        private int multiplier;
167
168        @Override
169        public void parse(ByteBuffer incoming) throws IOException {
170
171            while (incoming.hasRemaining()) {
172                frameSize += ((incoming.get() & 0xFF) << --multiplier * Byte.SIZE);
173
174                if (multiplier == 0) {
175                    LOG.trace("Next incoming frame length: {}", frameSize);
176                    validateFrameSize(frameSize);
177                    currentParser = initializeContentReader(frameSize);
178                    if (incoming.hasRemaining()) {
179                        currentParser.parse(incoming);
180                        return;
181                    }
182                }
183            }
184        }
185
186        @Override
187        public void reset(int nextExpectedReadSize) {
188            multiplier = AMQP_FRAME_SIZE_BYTES;
189            frameSize = 0;
190        }
191    };
192
193    private final FrameParser contentReader = new FrameParser() {
194
195        private Buffer frame;
196
197        @Override
198        public void parse(ByteBuffer incoming) throws IOException {
199            int length = Math.min(incoming.remaining(), frame.getLength() - frame.offset);
200            incoming.get(frame.data, frame.offset, length);
201            frame.offset += length;
202
203            if (frame.offset == frame.length) {
204                LOG.trace("Contents of size {} have been read", frame.length);
205                frame.reset();
206                frameSink.onFrame(frame);
207                if (currentParser == this) {
208                    currentParser = initializeFrameLengthParser();
209                }
210                if (incoming.hasRemaining()) {
211                    currentParser.parse(incoming);
212                }
213            }
214        }
215
216        @Override
217        public void reset(int nextExpectedReadSize) {
218            // Allocate a new Buffer to hold the incoming frame.  We must write
219            // back the frame size value before continue on to read the indicated
220            // frame size minus the size of the AMQP frame size header value.
221            frame = new Buffer(nextExpectedReadSize);
222            frame.bigEndianEditor().writeInt(nextExpectedReadSize);
223
224            // Reset the length to total length as we do direct write after this.
225            frame.length = frame.data.length;
226        }
227    };
228}