001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.udp;
018
019import java.io.DataInputStream;
020import java.io.DataOutputStream;
021import java.io.IOException;
022import java.net.SocketAddress;
023import java.nio.ByteBuffer;
024import java.nio.channels.DatagramChannel;
025
026import org.apache.activemq.command.Command;
027import org.apache.activemq.command.Endpoint;
028import org.apache.activemq.command.LastPartialCommand;
029import org.apache.activemq.command.PartialCommand;
030import org.apache.activemq.openwire.BooleanStream;
031import org.apache.activemq.openwire.OpenWireFormat;
032import org.apache.activemq.transport.reliable.ReplayBuffer;
033import org.apache.activemq.util.ByteArrayInputStream;
034import org.apache.activemq.util.ByteArrayOutputStream;
035import org.slf4j.Logger;
036import org.slf4j.LoggerFactory;
037
038/**
039 * A strategy for reading datagrams and de-fragmenting them together.
040 * 
041 * 
042 */
043public class CommandDatagramChannel extends CommandChannelSupport {
044
045    private static final Logger LOG = LoggerFactory.getLogger(CommandDatagramChannel.class);
046
047    private DatagramChannel channel;
048    private ByteBufferPool bufferPool;
049
050    // reading
051    private Object readLock = new Object();
052    private ByteBuffer readBuffer;
053
054    // writing
055    private Object writeLock = new Object();
056    private int defaultMarshalBufferSize = 64 * 1024;
057    private volatile int receiveCounter;
058
059    public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
060                                  DatagramChannel channel, ByteBufferPool bufferPool) {
061        super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
062        this.channel = channel;
063        this.bufferPool = bufferPool;
064    }
065
066    public void start() throws Exception {
067        bufferPool.setDefaultSize(datagramSize);
068        bufferPool.start();
069        readBuffer = bufferPool.borrowBuffer();
070    }
071
072    public void stop() throws Exception {
073        bufferPool.stop();
074    }
075
076    public Command read() throws IOException {
077        Command answer = null;
078        Endpoint from = null;
079        synchronized (readLock) {
080            while (true) {
081                readBuffer.clear();
082                SocketAddress address = channel.receive(readBuffer);
083
084                readBuffer.flip();
085
086                if (readBuffer.limit() == 0) {
087                    continue;
088                }
089                
090                receiveCounter++;
091                from = headerMarshaller.createEndpoint(readBuffer, address);
092
093                int remaining = readBuffer.remaining();
094                byte[] data = new byte[remaining];
095                readBuffer.get(data);
096
097                // TODO could use a DataInput implementation that talks direct
098                // to
099                // the ByteBuffer to avoid object allocation and unnecessary
100                // buffering?
101                DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
102                answer = (Command)wireFormat.unmarshal(dataIn);
103                break;
104            }
105        }
106        if (answer != null) {
107            answer.setFrom(from);
108
109            if (LOG.isDebugEnabled()) {
110                LOG.debug("Channel: " + name + " received from: " + from + " about to process: " + answer);
111            }
112        }
113        return answer;
114    }
115
116    public void write(Command command, SocketAddress address) throws IOException {
117        synchronized (writeLock) {
118
119            ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
120            wireFormat.marshal(command, new DataOutputStream(largeBuffer));
121            byte[] data = largeBuffer.toByteArray();
122            int size = data.length;
123
124            ByteBuffer writeBuffer = bufferPool.borrowBuffer();
125            writeBuffer.clear();
126            headerMarshaller.writeHeader(command, writeBuffer);
127
128            if (size > writeBuffer.remaining()) {
129                // lets split the command up into chunks
130                int offset = 0;
131                boolean lastFragment = false;
132                int length = data.length;
133                for (int fragment = 0; !lastFragment; fragment++) {
134                    // write the header
135                    if (fragment > 0) {
136                        writeBuffer = bufferPool.borrowBuffer();
137                        writeBuffer.clear();
138                        headerMarshaller.writeHeader(command, writeBuffer);
139                    }
140
141                    int chunkSize = writeBuffer.remaining();
142
143                    // we need to remove the amount of overhead to write the
144                    // partial command
145
146                    // lets write the flags in there
147                    BooleanStream bs = null;
148                    if (wireFormat.isTightEncodingEnabled()) {
149                        bs = new BooleanStream();
150                        bs.writeBoolean(true); // the partial data byte[] is
151                        // never null
152                    }
153
154                    // lets remove the header of the partial command
155                    // which is the byte for the type and an int for the size of
156                    // the byte[]
157
158                    // data type + the command ID + size of the partial data
159                    chunkSize -= 1 + 4 + 4;
160
161                    // the boolean flags
162                    if (bs != null) {
163                        chunkSize -= bs.marshalledSize();
164                    } else {
165                        chunkSize -= 1;
166                    }
167
168                    if (!wireFormat.isSizePrefixDisabled()) {
169                        // lets write the size of the command buffer
170                        writeBuffer.putInt(chunkSize);
171                        chunkSize -= 4;
172                    }
173
174                    lastFragment = offset + chunkSize >= length;
175                    if (chunkSize + offset > length) {
176                        chunkSize = length - offset;
177                    }
178
179                    if (lastFragment) {
180                        writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE);
181                    } else {
182                        writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
183                    }
184
185                    if (bs != null) {
186                        bs.marshal(writeBuffer);
187                    }
188
189                    int commandId = command.getCommandId();
190                    if (fragment > 0) {
191                        commandId = sequenceGenerator.getNextSequenceId();
192                    }
193                    writeBuffer.putInt(commandId);
194                    if (bs == null) {
195                        writeBuffer.put((byte)1);
196                    }
197
198                    // size of byte array
199                    writeBuffer.putInt(chunkSize);
200
201                    // now the data
202                    writeBuffer.put(data, offset, chunkSize);
203
204                    offset += chunkSize;
205                    sendWriteBuffer(commandId, address, writeBuffer, false);
206                }
207            } else {
208                writeBuffer.put(data);
209                sendWriteBuffer(command.getCommandId(), address, writeBuffer, false);
210            }
211        }
212    }
213
214    // Properties
215    // -------------------------------------------------------------------------
216
217    public ByteBufferPool getBufferPool() {
218        return bufferPool;
219    }
220
221    /**
222     * Sets the implementation of the byte buffer pool to use
223     */
224    public void setBufferPool(ByteBufferPool bufferPool) {
225        this.bufferPool = bufferPool;
226    }
227
228    // Implementation methods
229    // -------------------------------------------------------------------------
230    protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery) throws IOException {
231        // lets put the datagram into the replay buffer first to prevent timing
232        // issues
233        ReplayBuffer bufferCache = getReplayBuffer();
234        if (bufferCache != null && !redelivery) {
235            bufferCache.addBuffer(commandId, writeBuffer);
236        }
237
238        writeBuffer.flip();
239
240        if (LOG.isDebugEnabled()) {
241            String text = redelivery ? "REDELIVERING" : "sending";
242            LOG.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address);
243        }
244        channel.send(writeBuffer, address);
245    }
246
247    public void sendBuffer(int commandId, Object buffer) throws IOException {
248        if (buffer != null) {
249            ByteBuffer writeBuffer = (ByteBuffer)buffer;
250            sendWriteBuffer(commandId, getReplayAddress(), writeBuffer, true);
251        } else {
252            if (LOG.isWarnEnabled()) {
253                LOG.warn("Request for buffer: " + commandId + " is no longer present");
254            }
255        }
256    }
257
258    public int getReceiveCounter() {
259        return receiveCounter;
260    }
261
262}