001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.mqtt;
018
019import java.io.IOException;
020
021import org.apache.activemq.transport.tcp.TcpTransport;
022import org.fusesource.hawtbuf.Buffer;
023import org.fusesource.hawtbuf.DataByteArrayInputStream;
024import org.fusesource.mqtt.codec.MQTTFrame;
025
026public class MQTTCodec {
027
028    private final MQTTFrameSink frameSink;
029    private final MQTTWireFormat wireFormat;
030
031    private byte header;
032    private int contentLength = -1;
033
034    private FrameParser currentParser;
035
036    private final Buffer scratch = new Buffer(8 * 1024);
037    private Buffer currentBuffer;
038
039    /**
040     * Sink for newly decoded MQTT Frames.
041     */
042    public interface MQTTFrameSink {
043        void onFrame(MQTTFrame mqttFrame);
044    }
045
046    public MQTTCodec(MQTTFrameSink sink) {
047        this(sink, null);
048    }
049
050    public MQTTCodec(MQTTFrameSink sink, MQTTWireFormat wireFormat) {
051        this.frameSink = sink;
052        this.wireFormat = wireFormat;
053    }
054
055    public MQTTCodec(final TcpTransport transport) {
056        this(transport, null);
057    }
058
059    public MQTTCodec(final TcpTransport transport, MQTTWireFormat wireFormat) {
060        this.wireFormat = wireFormat;
061        this.frameSink = new MQTTFrameSink() {
062
063            @Override
064            public void onFrame(MQTTFrame mqttFrame) {
065                transport.doConsume(mqttFrame);
066            }
067        };
068    }
069
070    public void parse(DataByteArrayInputStream input, int readSize) throws Exception {
071        if (currentParser == null) {
072            currentParser = initializeHeaderParser();
073        }
074
075        // Parser stack will run until current incoming data has all been consumed.
076        currentParser.parse(input, readSize);
077    }
078
079    private void processCommand() throws IOException {
080
081        Buffer frameContents = null;
082        if (currentBuffer == scratch) {
083            frameContents = scratch.deepCopy();
084        } else {
085            frameContents = currentBuffer;
086            currentBuffer = null;
087        }
088
089        MQTTFrame frame = new MQTTFrame(frameContents).header(header);
090        frameSink.onFrame(frame);
091    }
092
093    private int getMaxFrameSize() {
094        return wireFormat != null ? wireFormat.getMaxFrameSize() : MQTTWireFormat.MAX_MESSAGE_LENGTH;
095    }
096
097    //----- Prepare the current frame parser for use -------------------------//
098
099    private FrameParser initializeHeaderParser() throws IOException {
100        headerParser.reset();
101        return headerParser;
102    }
103
104    private FrameParser initializeVariableLengthParser() throws IOException {
105        variableLengthParser.reset();
106        return variableLengthParser;
107    }
108
109    private FrameParser initializeContentParser() throws IOException {
110        contentParser.reset();
111        return contentParser;
112    }
113
114    //----- Frame parser implementations -------------------------------------//
115
116    private interface FrameParser {
117
118        void parse(DataByteArrayInputStream data, int readSize) throws IOException;
119
120        void reset() throws IOException;
121    }
122
123    private final FrameParser headerParser = new FrameParser() {
124
125        @Override
126        public void parse(DataByteArrayInputStream data, int readSize) throws IOException {
127            while (readSize-- > 0) {
128                byte b = data.readByte();
129                // skip repeating nulls
130                if (b == 0) {
131                    continue;
132                }
133
134                header = b;
135
136                currentParser = initializeVariableLengthParser();
137                if (readSize > 0) {
138                    currentParser.parse(data, readSize);
139                }
140                return;
141            }
142        }
143
144        @Override
145        public void reset() throws IOException {
146            header = -1;
147            contentLength = -1;
148        }
149    };
150
151    private final FrameParser variableLengthParser = new FrameParser() {
152
153        private byte digit;
154        private int multiplier = 1;
155        private int length;
156
157        @Override
158        public void parse(DataByteArrayInputStream data, int readSize) throws IOException {
159            int i = 0;
160            while (i++ < readSize) {
161                digit = data.readByte();
162                length += (digit & 0x7F) * multiplier;
163                multiplier <<= 7;
164                if ((digit & 0x80) == 0) {
165                    if (length == 0) {
166                        processCommand();
167                        currentParser = initializeHeaderParser();
168                    } else {
169                        if (length > getMaxFrameSize()) {
170                            throw new IOException("The maximum message length was exceeded");
171                        }
172
173                        currentParser = initializeContentParser();
174                        contentLength = length;
175                    }
176
177                    readSize = readSize - i;
178                    if (readSize > 0) {
179                        currentParser.parse(data, readSize);
180                    }
181                    return;
182                }
183            }
184        }
185
186        @Override
187        public void reset() throws IOException {
188            digit = 0;
189            multiplier = 1;
190            length = 0;
191        }
192    };
193
194    private final FrameParser contentParser = new FrameParser() {
195
196        private int payLoadRead = 0;
197
198        @Override
199        public void parse(DataByteArrayInputStream data, int readSize) throws IOException {
200            if (currentBuffer == null) {
201                if (contentLength < scratch.length()) {
202                    currentBuffer = scratch;
203                    currentBuffer.length = contentLength;
204                } else {
205                    currentBuffer = new Buffer(contentLength);
206                }
207            }
208
209            int length = Math.min(readSize, contentLength - payLoadRead);
210            payLoadRead += data.read(currentBuffer.data, payLoadRead, length);
211
212            if (payLoadRead == contentLength) {
213                processCommand();
214                currentParser = initializeHeaderParser();
215                readSize = readSize - length;
216                if (readSize > 0) {
217                    currentParser.parse(data, readSize);
218                }
219            }
220        }
221
222        @Override
223        public void reset() throws IOException {
224            contentLength = -1;
225            payLoadRead = 0;
226            scratch.reset();
227            currentBuffer = null;
228        }
229    };
230}