001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.amqp;
018
019import java.io.DataInput;
020import java.io.DataInputStream;
021import java.io.DataOutput;
022import java.io.DataOutputStream;
023import java.io.IOException;
024import java.io.OutputStream;
025import java.nio.ByteBuffer;
026import java.nio.channels.Channels;
027import java.nio.channels.WritableByteChannel;
028
029import org.apache.activemq.transport.amqp.message.InboundTransformer;
030import org.apache.activemq.util.ByteArrayInputStream;
031import org.apache.activemq.util.ByteArrayOutputStream;
032import org.apache.activemq.util.ByteSequence;
033import org.apache.activemq.wireformat.WireFormat;
034import org.fusesource.hawtbuf.Buffer;
035
036public class AmqpWireFormat implements WireFormat {
037
038    public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
039    public static final int NO_AMQP_MAX_FRAME_SIZE = -1;
040    public static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
041    public static final int DEFAULT_IDLE_TIMEOUT = 30000;
042    public static final int DEFAULT_PRODUCER_CREDIT = 1000;
043
044    private static final int SASL_PROTOCOL = 3;
045
046    private int version = 1;
047    private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
048    private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
049    private int connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT;
050    private int idelTimeout = DEFAULT_IDLE_TIMEOUT;
051    private int producerCredit = DEFAULT_PRODUCER_CREDIT;
052    private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
053
054    private boolean magicRead = false;
055    private ResetListener resetListener;
056
057    public interface ResetListener {
058        void onProtocolReset();
059    }
060
061    private boolean allowNonSaslConnections = true;
062
063    @Override
064    public ByteSequence marshal(Object command) throws IOException {
065        ByteArrayOutputStream baos = new ByteArrayOutputStream();
066        DataOutputStream dos = new DataOutputStream(baos);
067        marshal(command, dos);
068        dos.close();
069        return baos.toByteSequence();
070    }
071
072    @Override
073    public Object unmarshal(ByteSequence packet) throws IOException {
074        ByteArrayInputStream stream = new ByteArrayInputStream(packet);
075        DataInputStream dis = new DataInputStream(stream);
076        return unmarshal(dis);
077    }
078
079    @Override
080    public void marshal(Object command, DataOutput dataOut) throws IOException {
081        if (command instanceof ByteBuffer) {
082            ByteBuffer buffer = (ByteBuffer) command;
083
084            if (dataOut instanceof OutputStream) {
085                WritableByteChannel channel = Channels.newChannel((OutputStream) dataOut);
086                channel.write(buffer);
087            } else {
088                while (buffer.hasRemaining()) {
089                    dataOut.writeByte(buffer.get());
090                }
091            }
092        } else {
093            Buffer frame = (Buffer) command;
094            frame.writeTo(dataOut);
095        }
096    }
097
098    @Override
099    public Object unmarshal(DataInput dataIn) throws IOException {
100        if (!magicRead) {
101            Buffer magic = new Buffer(8);
102            magic.readFrom(dataIn);
103            magicRead = true;
104            return new AmqpHeader(magic, false);
105        } else {
106            int size = dataIn.readInt();
107            if (size > maxFrameSize) {
108                throw new AmqpProtocolException("Frame size exceeded max frame length.");
109            } else if (size <= 0) {
110                throw new AmqpProtocolException("Frame size value was invalid: " + size);
111            }
112            Buffer frame = new Buffer(size);
113            frame.bigEndianEditor().writeInt(size);
114            frame.readFrom(dataIn);
115            frame.clear();
116            return frame;
117        }
118    }
119
120    /**
121     * Given an AMQP header validate that the AMQP magic is present and
122     * if so that the version and protocol values align with what we support.
123     *
124     * @param header
125     *        the header instance received from the client.
126     *
127     * @return true if the header is valid against the current WireFormat.
128     */
129    public boolean isHeaderValid(AmqpHeader header) {
130        if (!header.hasValidPrefix()) {
131            return false;
132        }
133
134        if (!isAllowNonSaslConnections() && header.getProtocolId() != SASL_PROTOCOL) {
135            return false;
136        }
137
138        if (header.getMajor() != 1 || header.getMinor() != 0 || header.getRevision() != 0) {
139            return false;
140        }
141
142        return true;
143    }
144
145    /**
146     * Returns an AMQP Header object that represents the minimally protocol
147     * versions supported by this transport.  A client that attempts to
148     * connect with an AMQP version that doesn't at least meat this value
149     * will receive this prior to the connection being closed.
150     *
151     * @return the minimal AMQP version needed from the client.
152     */
153    public AmqpHeader getMinimallySupportedHeader() {
154        AmqpHeader header = new AmqpHeader();
155        if (!isAllowNonSaslConnections()) {
156            header.setProtocolId(3);
157        }
158
159        return header;
160    }
161
162    @Override
163    public void setVersion(int version) {
164        this.version = version;
165    }
166
167    @Override
168    public int getVersion() {
169        return this.version;
170    }
171
172    public void resetMagicRead() {
173        this.magicRead = false;
174        if (resetListener != null) {
175            resetListener.onProtocolReset();
176        }
177    }
178
179    public void setProtocolResetListener(ResetListener listener) {
180        this.resetListener = listener;
181    }
182
183    public boolean isMagicRead() {
184        return this.magicRead;
185    }
186
187    public long getMaxFrameSize() {
188        return maxFrameSize;
189    }
190
191    public void setMaxFrameSize(long maxFrameSize) {
192        this.maxFrameSize = maxFrameSize;
193    }
194
195    public int getMaxAmqpFrameSize() {
196        return maxAmqpFrameSize;
197    }
198
199    public void setMaxAmqpFrameSize(int maxAmqpFrameSize) {
200        this.maxAmqpFrameSize = maxAmqpFrameSize;
201    }
202
203    public boolean isAllowNonSaslConnections() {
204        return allowNonSaslConnections;
205    }
206
207    public void setAllowNonSaslConnections(boolean allowNonSaslConnections) {
208        this.allowNonSaslConnections = allowNonSaslConnections;
209    }
210
211    public int getConnectAttemptTimeout() {
212        return connectAttemptTimeout;
213    }
214
215    public void setConnectAttemptTimeout(int connectAttemptTimeout) {
216        this.connectAttemptTimeout = connectAttemptTimeout;
217    }
218
219    public void setProducerCredit(int producerCredit) {
220        this.producerCredit = producerCredit;
221    }
222
223    public int getProducerCredit() {
224        return producerCredit;
225    }
226
227    public String getTransformer() {
228        return transformer;
229    }
230
231    public void setTransformer(String transformer) {
232        this.transformer = transformer;
233    }
234
235    public int getIdleTimeout() {
236        return idelTimeout;
237    }
238
239    public void setIdleTimeout(int idelTimeout) {
240        this.idelTimeout = idelTimeout;
241    }
242}