001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.amqp; 018 019import java.io.DataInput; 020import java.io.DataInputStream; 021import java.io.DataOutput; 022import java.io.DataOutputStream; 023import java.io.IOException; 024import java.io.OutputStream; 025import java.nio.ByteBuffer; 026import java.nio.channels.Channels; 027import java.nio.channels.WritableByteChannel; 028 029import org.apache.activemq.transport.amqp.message.InboundTransformer; 030import org.apache.activemq.util.ByteArrayInputStream; 031import org.apache.activemq.util.ByteArrayOutputStream; 032import org.apache.activemq.util.ByteSequence; 033import org.apache.activemq.wireformat.WireFormat; 034import org.fusesource.hawtbuf.Buffer; 035 036public class AmqpWireFormat implements WireFormat { 037 038 public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE; 039 public static final int NO_AMQP_MAX_FRAME_SIZE = -1; 040 public static final int DEFAULT_CONNECTION_TIMEOUT = 30000; 041 public static final int DEFAULT_IDLE_TIMEOUT = 30000; 042 public static final int DEFAULT_PRODUCER_CREDIT = 1000; 043 044 private static final int SASL_PROTOCOL = 3; 045 046 private int version = 1; 047 private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE; 048 private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE; 049 private int connectAttemptTimeout = DEFAULT_CONNECTION_TIMEOUT; 050 private int idelTimeout = DEFAULT_IDLE_TIMEOUT; 051 private int producerCredit = DEFAULT_PRODUCER_CREDIT; 052 private String transformer = InboundTransformer.TRANSFORMER_NATIVE; 053 054 private boolean magicRead = false; 055 private ResetListener resetListener; 056 057 public interface ResetListener { 058 void onProtocolReset(); 059 } 060 061 private boolean allowNonSaslConnections = true; 062 063 @Override 064 public ByteSequence marshal(Object command) throws IOException { 065 ByteArrayOutputStream baos = new ByteArrayOutputStream(); 066 DataOutputStream dos = new DataOutputStream(baos); 067 marshal(command, dos); 068 dos.close(); 069 return baos.toByteSequence(); 070 } 071 072 @Override 073 public Object unmarshal(ByteSequence packet) throws IOException { 074 ByteArrayInputStream stream = new ByteArrayInputStream(packet); 075 DataInputStream dis = new DataInputStream(stream); 076 return unmarshal(dis); 077 } 078 079 @Override 080 public void marshal(Object command, DataOutput dataOut) throws IOException { 081 if (command instanceof ByteBuffer) { 082 ByteBuffer buffer = (ByteBuffer) command; 083 084 if (dataOut instanceof OutputStream) { 085 WritableByteChannel channel = Channels.newChannel((OutputStream) dataOut); 086 channel.write(buffer); 087 } else { 088 while (buffer.hasRemaining()) { 089 dataOut.writeByte(buffer.get()); 090 } 091 } 092 } else { 093 Buffer frame = (Buffer) command; 094 frame.writeTo(dataOut); 095 } 096 } 097 098 @Override 099 public Object unmarshal(DataInput dataIn) throws IOException { 100 if (!magicRead) { 101 Buffer magic = new Buffer(8); 102 magic.readFrom(dataIn); 103 magicRead = true; 104 return new AmqpHeader(magic, false); 105 } else { 106 int size = dataIn.readInt(); 107 if (size > maxFrameSize) { 108 throw new AmqpProtocolException("Frame size exceeded max frame length."); 109 } else if (size <= 0) { 110 throw new AmqpProtocolException("Frame size value was invalid: " + size); 111 } 112 Buffer frame = new Buffer(size); 113 frame.bigEndianEditor().writeInt(size); 114 frame.readFrom(dataIn); 115 frame.clear(); 116 return frame; 117 } 118 } 119 120 /** 121 * Given an AMQP header validate that the AMQP magic is present and 122 * if so that the version and protocol values align with what we support. 123 * 124 * @param header 125 * the header instance received from the client. 126 * 127 * @return true if the header is valid against the current WireFormat. 128 */ 129 public boolean isHeaderValid(AmqpHeader header) { 130 if (!header.hasValidPrefix()) { 131 return false; 132 } 133 134 if (!isAllowNonSaslConnections() && header.getProtocolId() != SASL_PROTOCOL) { 135 return false; 136 } 137 138 if (header.getMajor() != 1 || header.getMinor() != 0 || header.getRevision() != 0) { 139 return false; 140 } 141 142 return true; 143 } 144 145 /** 146 * Returns an AMQP Header object that represents the minimally protocol 147 * versions supported by this transport. A client that attempts to 148 * connect with an AMQP version that doesn't at least meat this value 149 * will receive this prior to the connection being closed. 150 * 151 * @return the minimal AMQP version needed from the client. 152 */ 153 public AmqpHeader getMinimallySupportedHeader() { 154 AmqpHeader header = new AmqpHeader(); 155 if (!isAllowNonSaslConnections()) { 156 header.setProtocolId(3); 157 } 158 159 return header; 160 } 161 162 @Override 163 public void setVersion(int version) { 164 this.version = version; 165 } 166 167 @Override 168 public int getVersion() { 169 return this.version; 170 } 171 172 public void resetMagicRead() { 173 this.magicRead = false; 174 if (resetListener != null) { 175 resetListener.onProtocolReset(); 176 } 177 } 178 179 public void setProtocolResetListener(ResetListener listener) { 180 this.resetListener = listener; 181 } 182 183 public boolean isMagicRead() { 184 return this.magicRead; 185 } 186 187 public long getMaxFrameSize() { 188 return maxFrameSize; 189 } 190 191 public void setMaxFrameSize(long maxFrameSize) { 192 this.maxFrameSize = maxFrameSize; 193 } 194 195 public int getMaxAmqpFrameSize() { 196 return maxAmqpFrameSize; 197 } 198 199 public void setMaxAmqpFrameSize(int maxAmqpFrameSize) { 200 this.maxAmqpFrameSize = maxAmqpFrameSize; 201 } 202 203 public boolean isAllowNonSaslConnections() { 204 return allowNonSaslConnections; 205 } 206 207 public void setAllowNonSaslConnections(boolean allowNonSaslConnections) { 208 this.allowNonSaslConnections = allowNonSaslConnections; 209 } 210 211 public int getConnectAttemptTimeout() { 212 return connectAttemptTimeout; 213 } 214 215 public void setConnectAttemptTimeout(int connectAttemptTimeout) { 216 this.connectAttemptTimeout = connectAttemptTimeout; 217 } 218 219 public void setProducerCredit(int producerCredit) { 220 this.producerCredit = producerCredit; 221 } 222 223 public int getProducerCredit() { 224 return producerCredit; 225 } 226 227 public String getTransformer() { 228 return transformer; 229 } 230 231 public void setTransformer(String transformer) { 232 this.transformer = transformer; 233 } 234 235 public int getIdleTimeout() { 236 return idelTimeout; 237 } 238 239 public void setIdleTimeout(int idelTimeout) { 240 this.idelTimeout = idelTimeout; 241 } 242}