001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.udp;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.net.DatagramPacket;
023import java.net.DatagramSocket;
024import java.net.SocketAddress;
025
026import org.apache.activemq.command.Command;
027import org.apache.activemq.command.Endpoint;
028import org.apache.activemq.command.LastPartialCommand;
029import org.apache.activemq.command.PartialCommand;
030import org.apache.activemq.openwire.BooleanStream;
031import org.apache.activemq.openwire.OpenWireFormat;
032import org.apache.activemq.transport.reliable.ReplayBuffer;
033import org.apache.activemq.util.ByteArrayInputStream;
034import org.apache.activemq.util.ByteArrayOutputStream;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A strategy for reading datagrams and de-fragmenting them together.
040 * 
041 * 
042 */
043public class CommandDatagramSocket extends CommandChannelSupport {
044
045    private static final Logger LOG = LoggerFactory.getLogger(CommandDatagramSocket.class);
046
047    private DatagramSocket channel;
048    private Object readLock = new Object();
049    private Object writeLock = new Object();
050
051    private volatile int receiveCounter;
052
053    public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
054                                 DatagramSocket channel) {
055        super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
056        this.channel = channel;
057    }
058
059    public void start() throws Exception {
060    }
061
062    public void stop() throws Exception {
063    }
064
065    public Command read() throws IOException {
066        Command answer = null;
067        Endpoint from = null;
068        synchronized (readLock) {
069            while (true) {
070                DatagramPacket datagram = createDatagramPacket();
071                channel.receive(datagram);
072
073                // TODO could use a DataInput implementation that talks direct
074                // to the byte[] to avoid object allocation
075                receiveCounter++;
076                DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData(), 0, datagram.getLength()));
077                
078                from = headerMarshaller.createEndpoint(datagram, dataIn);
079                answer = (Command)wireFormat.unmarshal(dataIn);
080                break;
081            }
082        }
083        if (answer != null) {
084            answer.setFrom(from);
085
086            if (LOG.isDebugEnabled()) {
087                LOG.debug("Channel: " + name + " about to process: " + answer);
088            }
089        }
090        return answer;
091    }
092
093    public void write(Command command, SocketAddress address) throws IOException {
094        synchronized (writeLock) {
095
096            ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
097            DataOutputStream dataOut = new DataOutputStream(writeBuffer);
098            headerMarshaller.writeHeader(command, dataOut);
099
100            int offset = writeBuffer.size();
101
102            wireFormat.marshal(command, dataOut);
103
104            if (remaining(writeBuffer) >= 0) {
105                sendWriteBuffer(address, writeBuffer, command.getCommandId());
106            } else {
107                // lets split the command up into chunks
108                byte[] data = writeBuffer.toByteArray();
109                boolean lastFragment = false;
110                int length = data.length;
111                for (int fragment = 0; !lastFragment; fragment++) {
112                    writeBuffer = createByteArrayOutputStream();
113                    headerMarshaller.writeHeader(command, dataOut);
114
115                    int chunkSize = remaining(writeBuffer);
116
117                    // we need to remove the amount of overhead to write the
118                    // partial command
119
120                    // lets write the flags in there
121                    BooleanStream bs = null;
122                    if (wireFormat.isTightEncodingEnabled()) {
123                        bs = new BooleanStream();
124                        bs.writeBoolean(true); // the partial data byte[] is
125                        // never null
126                    }
127
128                    // lets remove the header of the partial command
129                    // which is the byte for the type and an int for the size of
130                    // the byte[]
131
132                    // data type + the command ID + size of the partial data
133                    chunkSize -= 1 + 4 + 4;
134
135                    // the boolean flags
136                    if (bs != null) {
137                        chunkSize -= bs.marshalledSize();
138                    } else {
139                        chunkSize -= 1;
140                    }
141
142                    if (!wireFormat.isSizePrefixDisabled()) {
143                        // lets write the size of the command buffer
144                        dataOut.writeInt(chunkSize);
145                        chunkSize -= 4;
146                    }
147
148                    lastFragment = offset + chunkSize >= length;
149                    if (chunkSize + offset > length) {
150                        chunkSize = length - offset;
151                    }
152
153                    if (lastFragment) {
154                        dataOut.write(LastPartialCommand.DATA_STRUCTURE_TYPE);
155                    } else {
156                        dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE);
157                    }
158
159                    if (bs != null) {
160                        bs.marshal(dataOut);
161                    }
162
163                    int commandId = command.getCommandId();
164                    if (fragment > 0) {
165                        commandId = sequenceGenerator.getNextSequenceId();
166                    }
167                    dataOut.writeInt(commandId);
168                    if (bs == null) {
169                        dataOut.write((byte)1);
170                    }
171
172                    // size of byte array
173                    dataOut.writeInt(chunkSize);
174
175                    // now the data
176                    dataOut.write(data, offset, chunkSize);
177
178                    offset += chunkSize;
179                    sendWriteBuffer(address, writeBuffer, commandId);
180                }
181            }
182        }
183    }
184
185    public int getDatagramSize() {
186        return datagramSize;
187    }
188
189    public void setDatagramSize(int datagramSize) {
190        this.datagramSize = datagramSize;
191    }
192
193    // Implementation methods
194    // -------------------------------------------------------------------------
195    protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId) throws IOException {
196        byte[] data = writeBuffer.toByteArray();
197        sendWriteBuffer(commandId, address, data, false);
198    }
199
200    protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery) throws IOException {
201        // lets put the datagram into the replay buffer first to prevent timing
202        // issues
203        ReplayBuffer bufferCache = getReplayBuffer();
204        if (bufferCache != null && !redelivery) {
205            bufferCache.addBuffer(commandId, data);
206        }
207
208        if (LOG.isDebugEnabled()) {
209            String text = redelivery ? "REDELIVERING" : "sending";
210            LOG.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address);
211        }
212        DatagramPacket packet = new DatagramPacket(data, 0, data.length, address);
213        channel.send(packet);
214    }
215
216    public void sendBuffer(int commandId, Object buffer) throws IOException {
217        if (buffer != null) {
218            byte[] data = (byte[])buffer;
219            sendWriteBuffer(commandId, replayAddress, data, true);
220        } else {
221            if (LOG.isWarnEnabled()) {
222                LOG.warn("Request for buffer: " + commandId + " is no longer present");
223            }
224        }
225    }
226
227    protected DatagramPacket createDatagramPacket() {
228        return new DatagramPacket(new byte[datagramSize], datagramSize);
229    }
230
231    protected int remaining(ByteArrayOutputStream buffer) {
232        return datagramSize - buffer.size();
233    }
234
235    protected ByteArrayOutputStream createByteArrayOutputStream() {
236        return new ByteArrayOutputStream(datagramSize);
237    }
238
239    public int getReceiveCounter() {
240        return receiveCounter;
241    }
242}