001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.udp; 018 019import java.io.DataInputStream; 020import java.io.DataOutputStream; 021import java.io.IOException; 022import java.net.SocketAddress; 023import java.nio.ByteBuffer; 024import java.nio.channels.DatagramChannel; 025 026import org.apache.activemq.command.Command; 027import org.apache.activemq.command.Endpoint; 028import org.apache.activemq.command.LastPartialCommand; 029import org.apache.activemq.command.PartialCommand; 030import org.apache.activemq.openwire.BooleanStream; 031import org.apache.activemq.openwire.OpenWireFormat; 032import org.apache.activemq.transport.reliable.ReplayBuffer; 033import org.apache.activemq.util.ByteArrayInputStream; 034import org.apache.activemq.util.ByteArrayOutputStream; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A strategy for reading datagrams and de-fragmenting them together. 040 * 041 * 042 */ 043public class CommandDatagramChannel extends CommandChannelSupport { 044 045 private static final Logger LOG = LoggerFactory.getLogger(CommandDatagramChannel.class); 046 047 private DatagramChannel channel; 048 private ByteBufferPool bufferPool; 049 050 // reading 051 private Object readLock = new Object(); 052 private ByteBuffer readBuffer; 053 054 // writing 055 private Object writeLock = new Object(); 056 private int defaultMarshalBufferSize = 64 * 1024; 057 private volatile int receiveCounter; 058 059 public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, 060 DatagramChannel channel, ByteBufferPool bufferPool) { 061 super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller); 062 this.channel = channel; 063 this.bufferPool = bufferPool; 064 } 065 066 public void start() throws Exception { 067 bufferPool.setDefaultSize(datagramSize); 068 bufferPool.start(); 069 readBuffer = bufferPool.borrowBuffer(); 070 } 071 072 public void stop() throws Exception { 073 bufferPool.stop(); 074 } 075 076 public Command read() throws IOException { 077 Command answer = null; 078 Endpoint from = null; 079 synchronized (readLock) { 080 while (true) { 081 readBuffer.clear(); 082 SocketAddress address = channel.receive(readBuffer); 083 084 readBuffer.flip(); 085 086 if (readBuffer.limit() == 0) { 087 continue; 088 } 089 090 receiveCounter++; 091 from = headerMarshaller.createEndpoint(readBuffer, address); 092 093 int remaining = readBuffer.remaining(); 094 byte[] data = new byte[remaining]; 095 readBuffer.get(data); 096 097 // TODO could use a DataInput implementation that talks direct 098 // to 099 // the ByteBuffer to avoid object allocation and unnecessary 100 // buffering? 101 DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data)); 102 answer = (Command)wireFormat.unmarshal(dataIn); 103 break; 104 } 105 } 106 if (answer != null) { 107 answer.setFrom(from); 108 109 if (LOG.isDebugEnabled()) { 110 LOG.debug("Channel: " + name + " received from: " + from + " about to process: " + answer); 111 } 112 } 113 return answer; 114 } 115 116 public void write(Command command, SocketAddress address) throws IOException { 117 synchronized (writeLock) { 118 119 ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize); 120 wireFormat.marshal(command, new DataOutputStream(largeBuffer)); 121 byte[] data = largeBuffer.toByteArray(); 122 int size = data.length; 123 124 ByteBuffer writeBuffer = bufferPool.borrowBuffer(); 125 writeBuffer.clear(); 126 headerMarshaller.writeHeader(command, writeBuffer); 127 128 if (size > writeBuffer.remaining()) { 129 // lets split the command up into chunks 130 int offset = 0; 131 boolean lastFragment = false; 132 int length = data.length; 133 for (int fragment = 0; !lastFragment; fragment++) { 134 // write the header 135 if (fragment > 0) { 136 writeBuffer = bufferPool.borrowBuffer(); 137 writeBuffer.clear(); 138 headerMarshaller.writeHeader(command, writeBuffer); 139 } 140 141 int chunkSize = writeBuffer.remaining(); 142 143 // we need to remove the amount of overhead to write the 144 // partial command 145 146 // lets write the flags in there 147 BooleanStream bs = null; 148 if (wireFormat.isTightEncodingEnabled()) { 149 bs = new BooleanStream(); 150 bs.writeBoolean(true); // the partial data byte[] is 151 // never null 152 } 153 154 // lets remove the header of the partial command 155 // which is the byte for the type and an int for the size of 156 // the byte[] 157 158 // data type + the command ID + size of the partial data 159 chunkSize -= 1 + 4 + 4; 160 161 // the boolean flags 162 if (bs != null) { 163 chunkSize -= bs.marshalledSize(); 164 } else { 165 chunkSize -= 1; 166 } 167 168 if (!wireFormat.isSizePrefixDisabled()) { 169 // lets write the size of the command buffer 170 writeBuffer.putInt(chunkSize); 171 chunkSize -= 4; 172 } 173 174 lastFragment = offset + chunkSize >= length; 175 if (chunkSize + offset > length) { 176 chunkSize = length - offset; 177 } 178 179 if (lastFragment) { 180 writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE); 181 } else { 182 writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE); 183 } 184 185 if (bs != null) { 186 bs.marshal(writeBuffer); 187 } 188 189 int commandId = command.getCommandId(); 190 if (fragment > 0) { 191 commandId = sequenceGenerator.getNextSequenceId(); 192 } 193 writeBuffer.putInt(commandId); 194 if (bs == null) { 195 writeBuffer.put((byte)1); 196 } 197 198 // size of byte array 199 writeBuffer.putInt(chunkSize); 200 201 // now the data 202 writeBuffer.put(data, offset, chunkSize); 203 204 offset += chunkSize; 205 sendWriteBuffer(commandId, address, writeBuffer, false); 206 } 207 } else { 208 writeBuffer.put(data); 209 sendWriteBuffer(command.getCommandId(), address, writeBuffer, false); 210 } 211 } 212 } 213 214 // Properties 215 // ------------------------------------------------------------------------- 216 217 public ByteBufferPool getBufferPool() { 218 return bufferPool; 219 } 220 221 /** 222 * Sets the implementation of the byte buffer pool to use 223 */ 224 public void setBufferPool(ByteBufferPool bufferPool) { 225 this.bufferPool = bufferPool; 226 } 227 228 // Implementation methods 229 // ------------------------------------------------------------------------- 230 protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery) throws IOException { 231 // lets put the datagram into the replay buffer first to prevent timing 232 // issues 233 ReplayBuffer bufferCache = getReplayBuffer(); 234 if (bufferCache != null && !redelivery) { 235 bufferCache.addBuffer(commandId, writeBuffer); 236 } 237 238 writeBuffer.flip(); 239 240 if (LOG.isDebugEnabled()) { 241 String text = redelivery ? "REDELIVERING" : "sending"; 242 LOG.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address); 243 } 244 channel.send(writeBuffer, address); 245 } 246 247 public void sendBuffer(int commandId, Object buffer) throws IOException { 248 if (buffer != null) { 249 ByteBuffer writeBuffer = (ByteBuffer)buffer; 250 sendWriteBuffer(commandId, getReplayAddress(), writeBuffer, true); 251 } else { 252 if (LOG.isWarnEnabled()) { 253 LOG.warn("Request for buffer: " + commandId + " is no longer present"); 254 } 255 } 256 } 257 258 public int getReceiveCounter() { 259 return receiveCounter; 260 } 261 262}