001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.udp; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.net.DatagramPacket; 023import java.net.DatagramSocket; 024import java.net.SocketAddress; 025 026import org.apache.activemq.command.Command; 027import org.apache.activemq.command.Endpoint; 028import org.apache.activemq.command.LastPartialCommand; 029import org.apache.activemq.command.PartialCommand; 030import org.apache.activemq.openwire.BooleanStream; 031import org.apache.activemq.openwire.OpenWireFormat; 032import org.apache.activemq.transport.reliable.ReplayBuffer; 033import org.apache.activemq.util.ByteArrayInputStream; 034import org.apache.activemq.util.ByteArrayOutputStream; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A strategy for reading datagrams and de-fragmenting them together. 040 * 041 * 042 */ 043public class CommandDatagramSocket extends CommandChannelSupport { 044 045 private static final Logger LOG = LoggerFactory.getLogger(CommandDatagramSocket.class); 046 047 private DatagramSocket channel; 048 private Object readLock = new Object(); 049 private Object writeLock = new Object(); 050 051 private volatile int receiveCounter; 052 053 public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, 054 DatagramSocket channel) { 055 super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller); 056 this.channel = channel; 057 } 058 059 public void start() throws Exception { 060 } 061 062 public void stop() throws Exception { 063 } 064 065 public Command read() throws IOException { 066 Command answer = null; 067 Endpoint from = null; 068 synchronized (readLock) { 069 while (true) { 070 DatagramPacket datagram = createDatagramPacket(); 071 channel.receive(datagram); 072 073 // TODO could use a DataInput implementation that talks direct 074 // to the byte[] to avoid object allocation 075 receiveCounter++; 076 DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData(), 0, datagram.getLength())); 077 078 from = headerMarshaller.createEndpoint(datagram, dataIn); 079 answer = (Command)wireFormat.unmarshal(dataIn); 080 break; 081 } 082 } 083 if (answer != null) { 084 answer.setFrom(from); 085 086 if (LOG.isDebugEnabled()) { 087 LOG.debug("Channel: " + name + " about to process: " + answer); 088 } 089 } 090 return answer; 091 } 092 093 public void write(Command command, SocketAddress address) throws IOException { 094 synchronized (writeLock) { 095 096 ByteArrayOutputStream writeBuffer = createByteArrayOutputStream(); 097 DataOutputStream dataOut = new DataOutputStream(writeBuffer); 098 headerMarshaller.writeHeader(command, dataOut); 099 100 int offset = writeBuffer.size(); 101 102 wireFormat.marshal(command, dataOut); 103 104 if (remaining(writeBuffer) >= 0) { 105 sendWriteBuffer(address, writeBuffer, command.getCommandId()); 106 } else { 107 // lets split the command up into chunks 108 byte[] data = writeBuffer.toByteArray(); 109 boolean lastFragment = false; 110 int length = data.length; 111 for (int fragment = 0; !lastFragment; fragment++) { 112 writeBuffer = createByteArrayOutputStream(); 113 headerMarshaller.writeHeader(command, dataOut); 114 115 int chunkSize = remaining(writeBuffer); 116 117 // we need to remove the amount of overhead to write the 118 // partial command 119 120 // lets write the flags in there 121 BooleanStream bs = null; 122 if (wireFormat.isTightEncodingEnabled()) { 123 bs = new BooleanStream(); 124 bs.writeBoolean(true); // the partial data byte[] is 125 // never null 126 } 127 128 // lets remove the header of the partial command 129 // which is the byte for the type and an int for the size of 130 // the byte[] 131 132 // data type + the command ID + size of the partial data 133 chunkSize -= 1 + 4 + 4; 134 135 // the boolean flags 136 if (bs != null) { 137 chunkSize -= bs.marshalledSize(); 138 } else { 139 chunkSize -= 1; 140 } 141 142 if (!wireFormat.isSizePrefixDisabled()) { 143 // lets write the size of the command buffer 144 dataOut.writeInt(chunkSize); 145 chunkSize -= 4; 146 } 147 148 lastFragment = offset + chunkSize >= length; 149 if (chunkSize + offset > length) { 150 chunkSize = length - offset; 151 } 152 153 if (lastFragment) { 154 dataOut.write(LastPartialCommand.DATA_STRUCTURE_TYPE); 155 } else { 156 dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE); 157 } 158 159 if (bs != null) { 160 bs.marshal(dataOut); 161 } 162 163 int commandId = command.getCommandId(); 164 if (fragment > 0) { 165 commandId = sequenceGenerator.getNextSequenceId(); 166 } 167 dataOut.writeInt(commandId); 168 if (bs == null) { 169 dataOut.write((byte)1); 170 } 171 172 // size of byte array 173 dataOut.writeInt(chunkSize); 174 175 // now the data 176 dataOut.write(data, offset, chunkSize); 177 178 offset += chunkSize; 179 sendWriteBuffer(address, writeBuffer, commandId); 180 } 181 } 182 } 183 } 184 185 public int getDatagramSize() { 186 return datagramSize; 187 } 188 189 public void setDatagramSize(int datagramSize) { 190 this.datagramSize = datagramSize; 191 } 192 193 // Implementation methods 194 // ------------------------------------------------------------------------- 195 protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId) throws IOException { 196 byte[] data = writeBuffer.toByteArray(); 197 sendWriteBuffer(commandId, address, data, false); 198 } 199 200 protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery) throws IOException { 201 // lets put the datagram into the replay buffer first to prevent timing 202 // issues 203 ReplayBuffer bufferCache = getReplayBuffer(); 204 if (bufferCache != null && !redelivery) { 205 bufferCache.addBuffer(commandId, data); 206 } 207 208 if (LOG.isDebugEnabled()) { 209 String text = redelivery ? "REDELIVERING" : "sending"; 210 LOG.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address); 211 } 212 DatagramPacket packet = new DatagramPacket(data, 0, data.length, address); 213 channel.send(packet); 214 } 215 216 public void sendBuffer(int commandId, Object buffer) throws IOException { 217 if (buffer != null) { 218 byte[] data = (byte[])buffer; 219 sendWriteBuffer(commandId, replayAddress, data, true); 220 } else { 221 if (LOG.isWarnEnabled()) { 222 LOG.warn("Request for buffer: " + commandId + " is no longer present"); 223 } 224 } 225 } 226 227 protected DatagramPacket createDatagramPacket() { 228 return new DatagramPacket(new byte[datagramSize], datagramSize); 229 } 230 231 protected int remaining(ByteArrayOutputStream buffer) { 232 return datagramSize - buffer.size(); 233 } 234 235 protected ByteArrayOutputStream createByteArrayOutputStream() { 236 return new ByteArrayOutputStream(datagramSize); 237 } 238 239 public int getReceiveCounter() { 240 return receiveCounter; 241 } 242}