001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp; 018 019import java.io.IOException; 020import java.nio.ByteBuffer; 021 022import org.apache.activemq.transport.amqp.AmqpWireFormat.ResetListener; 023import org.apache.activemq.transport.tcp.TcpTransport; 024import org.apache.activemq.util.IOExceptionSupport; 025import org.fusesource.hawtbuf.Buffer; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * State based Frame reader that is used in the NIO based transports where 031 * AMQP frames can come in in partial or overlapping forms. 032 */ 033public class AmqpFrameParser { 034 035 private static final Logger LOG = LoggerFactory.getLogger(AmqpFrameParser.class); 036 037 public interface AMQPFrameSink { 038 void onFrame(Object frame); 039 } 040 041 private static final byte AMQP_FRAME_SIZE_BYTES = 4; 042 private static final byte AMQP_HEADER_BYTES = 8; 043 044 private final AMQPFrameSink frameSink; 045 046 private FrameParser currentParser; 047 private AmqpWireFormat wireFormat; 048 049 public AmqpFrameParser(AMQPFrameSink sink) { 050 this.frameSink = sink; 051 } 052 053 public AmqpFrameParser(final TcpTransport transport) { 054 this.frameSink = new AMQPFrameSink() { 055 056 @Override 057 public void onFrame(Object frame) { 058 transport.doConsume(frame); 059 } 060 }; 061 } 062 063 public void parse(ByteBuffer incoming) throws Exception { 064 065 if (incoming == null || !incoming.hasRemaining()) { 066 return; 067 } 068 069 if (currentParser == null) { 070 currentParser = initializeHeaderParser(); 071 } 072 073 // Parser stack will run until current incoming data has all been consumed. 074 currentParser.parse(incoming); 075 } 076 077 public void reset() { 078 currentParser = initializeHeaderParser(); 079 } 080 081 private void validateFrameSize(int frameSize) throws IOException { 082 long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE; 083 if (wireFormat != null) { 084 maxFrameSize = wireFormat.getMaxFrameSize(); 085 } 086 087 if (frameSize > maxFrameSize) { 088 throw IOExceptionSupport.createFrameSizeException(frameSize, maxFrameSize); 089 } 090 } 091 092 public void setWireFormat(AmqpWireFormat wireFormat) { 093 this.wireFormat = wireFormat; 094 if (wireFormat != null) { 095 wireFormat.setProtocolResetListener(new ResetListener() { 096 097 @Override 098 public void onProtocolReset() { 099 reset(); 100 } 101 }); 102 } 103 } 104 105 public AmqpWireFormat getWireFormat() { 106 return this.wireFormat; 107 } 108 109 //----- Prepare the current frame parser for use -------------------------// 110 111 private FrameParser initializeHeaderParser() { 112 headerReader.reset(AMQP_HEADER_BYTES); 113 return headerReader; 114 } 115 116 private FrameParser initializeFrameLengthParser() { 117 frameSizeReader.reset(AMQP_FRAME_SIZE_BYTES); 118 return frameSizeReader; 119 } 120 121 private FrameParser initializeContentReader(int contentLength) { 122 contentReader.reset(contentLength); 123 return contentReader; 124 } 125 126 //----- Frame parser implementations -------------------------------------// 127 128 private interface FrameParser { 129 130 void parse(ByteBuffer incoming) throws IOException; 131 132 void reset(int nextExpectedReadSize); 133 } 134 135 private final FrameParser headerReader = new FrameParser() { 136 137 private final Buffer header = new Buffer(AMQP_HEADER_BYTES); 138 139 @Override 140 public void parse(ByteBuffer incoming) throws IOException { 141 int length = Math.min(incoming.remaining(), header.length - header.offset); 142 143 incoming.get(header.data, header.offset, length); 144 header.offset += length; 145 146 if (header.offset == AMQP_HEADER_BYTES) { 147 header.reset(); 148 AmqpHeader amqpHeader = new AmqpHeader(header.deepCopy(), false); 149 currentParser = initializeFrameLengthParser(); 150 frameSink.onFrame(amqpHeader); 151 if (incoming.hasRemaining()) { 152 currentParser.parse(incoming); 153 } 154 } 155 } 156 157 @Override 158 public void reset(int nextExpectedReadSize) { 159 header.reset(); 160 } 161 }; 162 163 private final FrameParser frameSizeReader = new FrameParser() { 164 165 private int frameSize; 166 private int multiplier; 167 168 @Override 169 public void parse(ByteBuffer incoming) throws IOException { 170 171 while (incoming.hasRemaining()) { 172 frameSize += ((incoming.get() & 0xFF) << --multiplier * Byte.SIZE); 173 174 if (multiplier == 0) { 175 LOG.trace("Next incoming frame length: {}", frameSize); 176 validateFrameSize(frameSize); 177 currentParser = initializeContentReader(frameSize); 178 if (incoming.hasRemaining()) { 179 currentParser.parse(incoming); 180 return; 181 } 182 } 183 } 184 } 185 186 @Override 187 public void reset(int nextExpectedReadSize) { 188 multiplier = AMQP_FRAME_SIZE_BYTES; 189 frameSize = 0; 190 } 191 }; 192 193 private final FrameParser contentReader = new FrameParser() { 194 195 private Buffer frame; 196 197 @Override 198 public void parse(ByteBuffer incoming) throws IOException { 199 int length = Math.min(incoming.remaining(), frame.getLength() - frame.offset); 200 incoming.get(frame.data, frame.offset, length); 201 frame.offset += length; 202 203 if (frame.offset == frame.length) { 204 LOG.trace("Contents of size {} have been read", frame.length); 205 frame.reset(); 206 frameSink.onFrame(frame); 207 if (currentParser == this) { 208 currentParser = initializeFrameLengthParser(); 209 } 210 if (incoming.hasRemaining()) { 211 currentParser.parse(incoming); 212 } 213 } 214 } 215 216 @Override 217 public void reset(int nextExpectedReadSize) { 218 // Allocate a new Buffer to hold the incoming frame. We must write 219 // back the frame size value before continue on to read the indicated 220 // frame size minus the size of the AMQP frame size header value. 221 frame = new Buffer(nextExpectedReadSize); 222 frame.bigEndianEditor().writeInt(nextExpectedReadSize); 223 224 // Reset the length to total length as we do direct write after this. 225 frame.length = frame.data.length; 226 } 227 }; 228}