001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.stomp;
018
019import java.io.ByteArrayInputStream;
020import java.util.Arrays;
021import java.util.Collections;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.Map;
025import java.util.concurrent.atomic.AtomicLong;
026
027import org.apache.activemq.transport.tcp.TcpTransport;
028import org.apache.activemq.util.ByteArrayOutputStream;
029import org.apache.activemq.util.DataByteArrayInputStream;
030
031public class StompCodec {
032
033    final static byte[] crlfcrlf = new byte[]{'\r','\n','\r','\n'};
034    TcpTransport transport;
035    StompWireFormat wireFormat;
036
037    AtomicLong frameSize = new AtomicLong(); 
038    ByteArrayOutputStream currentCommand = new ByteArrayOutputStream();
039    boolean processedHeaders = false;
040    String action;
041    HashMap<String, String> headers;
042    int contentLength = -1;
043    int readLength = 0;
044    int previousByte = -1;
045    boolean awaitingCommandStart = true;
046    String version = Stomp.DEFAULT_VERSION;
047
048    public StompCodec(TcpTransport transport) {
049        this.transport = transport;
050        this.wireFormat = (StompWireFormat) transport.getWireFormat();
051    }
052
053    public void parse(ByteArrayInputStream input, int readSize) throws Exception {
054       int i = 0;
055       int b;
056       while(i++ < readSize) {
057           b = input.read();
058           // skip repeating nulls
059           if (!processedHeaders && previousByte == 0 && b == 0) {
060               continue;
061           }
062
063           if (!processedHeaders) {
064
065               // skip heart beat commands.
066               if (awaitingCommandStart && b == '\n') {
067                   continue;
068               } else {
069                   awaitingCommandStart = false;   // non-newline indicates next frame.
070               }
071
072               currentCommand.write(b);
073               // end of headers section, parse action and header
074               if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) {
075                   DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
076
077                   try {
078                       action = wireFormat.parseAction(data, frameSize);
079                       headers = wireFormat.parseHeaders(data, frameSize);
080                       
081                       String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
082                       if ((action.equals(Stomp.Commands.SEND) || action.equals(Stomp.Responses.MESSAGE)) && contentLengthHeader != null) {
083                           contentLength = wireFormat.parseContentLength(contentLengthHeader, frameSize);
084                       } else {
085                           contentLength = -1;
086                       }
087                   } catch (ProtocolException e) {
088                       transport.doConsume(new StompFrameError(e));
089                       return;
090                   }
091                   processedHeaders = true;
092                   currentCommand.reset();
093               }
094
095           } else {
096
097               if (contentLength == -1) {
098                   // end of command reached, unmarshal
099                   if (b == 0) {
100                       processCommand();
101                   } else {
102                       currentCommand.write(b);
103                       if (currentCommand.size() > wireFormat.getMaxDataLength()) {
104                           transport.doConsume(new StompFrameError(new ProtocolException("The maximum data length was exceeded", true)));
105                           return;
106                       }
107                       if (frameSize.incrementAndGet() > wireFormat.getMaxFrameSize()) {
108                           transport.doConsume(new StompFrameError(new ProtocolException("The maximum frame size was exceeded", true)));
109                           return;
110                       }
111                   }
112               } else {
113                   // read desired content length
114                   if (readLength++ == contentLength) {
115                       processCommand();
116                       readLength = 0;
117                   } else {
118                       currentCommand.write(b);
119                   }
120               }
121           }
122
123           previousByte = b;
124       }
125    }
126
127    protected void processCommand() throws Exception {
128        StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
129        transport.doConsume(frame);
130        processedHeaders = false;
131        awaitingCommandStart = true;
132        currentCommand.reset();
133        contentLength = -1;
134        frameSize.set(0);
135    }
136
137    public static String detectVersion(Map<String, String> headers) throws ProtocolException {
138        String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
139
140        if (accepts == null) {
141            accepts = Stomp.DEFAULT_VERSION;
142        }
143        HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.trim().split(Stomp.COMMA)));
144        acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
145        if (acceptsVersions.isEmpty()) {
146            throw new ProtocolException("Invalid Protocol version[" + accepts +"], supported versions are: " +
147                    Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
148        } else {
149            return Collections.max(acceptsVersions);
150        }
151    }
152}