001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.udp;
018
019
020import java.io.DataInputStream;
021import java.io.DataOutputStream;
022import java.net.DatagramPacket;
023import java.net.SocketAddress;
024import java.nio.ByteBuffer;
025import java.util.HashMap;
026import java.util.Map;
027
028import org.apache.activemq.command.Command;
029import org.apache.activemq.command.Endpoint;
030
031/**
032 * 
033 * 
034 */
035public class DatagramHeaderMarshaller {
036
037    // TODO for large dynamic networks
038    // we may want to evict endpoints that disconnect
039    // from a transport - e.g. for multicast
040    private Map<SocketAddress, Endpoint> endpoints = new HashMap<SocketAddress, Endpoint>();
041    
042    /**
043     * Reads any header if applicable and then creates an endpoint object
044     */
045    public Endpoint createEndpoint(ByteBuffer readBuffer, SocketAddress address) {
046        return getEndpoint(address);
047    }
048
049    public Endpoint createEndpoint(DatagramPacket datagram, DataInputStream dataIn) {
050        return getEndpoint(datagram.getSocketAddress());
051    }
052
053    public void writeHeader(Command command, ByteBuffer writeBuffer) {
054        /*
055        writeBuffer.putLong(command.getCounter());
056        writeBuffer.putInt(command.getDataSize());
057        byte flags = command.getFlags();
058        //System.out.println("Writing header with counter: " + header.getCounter() + " size: " + header.getDataSize() + " with flags: " + flags);
059        writeBuffer.put(flags);
060        */
061    }
062
063    public void writeHeader(Command command, DataOutputStream dataOut) {
064    }
065
066    /**
067     * Gets the current endpoint object for this address or creates one if not available.
068     * 
069     * Note that this method does not need to be synchronized as its only ever going to be
070     * used by the already-synchronized read() method of a CommandChannel 
071     * 
072     */
073    protected Endpoint getEndpoint(SocketAddress address) {
074        Endpoint endpoint = endpoints.get(address);
075        if (endpoint == null) {
076            endpoint = createEndpoint(address);
077            endpoints.put(address, endpoint);
078        }
079        return endpoint;
080    }
081
082    protected Endpoint createEndpoint(SocketAddress address) {
083        return new DatagramEndpoint(address.toString(), address);
084    }
085}