001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.udp; 018 019 020import java.io.DataInputStream; 021import java.io.DataOutputStream; 022import java.net.DatagramPacket; 023import java.net.SocketAddress; 024import java.nio.ByteBuffer; 025import java.util.HashMap; 026import java.util.Map; 027 028import org.apache.activemq.command.Command; 029import org.apache.activemq.command.Endpoint; 030 031/** 032 * 033 * 034 */ 035public class DatagramHeaderMarshaller { 036 037 // TODO for large dynamic networks 038 // we may want to evict endpoints that disconnect 039 // from a transport - e.g. for multicast 040 private Map<SocketAddress, Endpoint> endpoints = new HashMap<SocketAddress, Endpoint>(); 041 042 /** 043 * Reads any header if applicable and then creates an endpoint object 044 */ 045 public Endpoint createEndpoint(ByteBuffer readBuffer, SocketAddress address) { 046 return getEndpoint(address); 047 } 048 049 public Endpoint createEndpoint(DatagramPacket datagram, DataInputStream dataIn) { 050 return getEndpoint(datagram.getSocketAddress()); 051 } 052 053 public void writeHeader(Command command, ByteBuffer writeBuffer) { 054 /* 055 writeBuffer.putLong(command.getCounter()); 056 writeBuffer.putInt(command.getDataSize()); 057 byte flags = command.getFlags(); 058 //System.out.println("Writing header with counter: " + header.getCounter() + " size: " + header.getDataSize() + " with flags: " + flags); 059 writeBuffer.put(flags); 060 */ 061 } 062 063 public void writeHeader(Command command, DataOutputStream dataOut) { 064 } 065 066 /** 067 * Gets the current endpoint object for this address or creates one if not available. 068 * 069 * Note that this method does not need to be synchronized as its only ever going to be 070 * used by the already-synchronized read() method of a CommandChannel 071 * 072 */ 073 protected Endpoint getEndpoint(SocketAddress address) { 074 Endpoint endpoint = endpoints.get(address); 075 if (endpoint == null) { 076 endpoint = createEndpoint(address); 077 endpoints.put(address, endpoint); 078 } 079 return endpoint; 080 } 081 082 protected Endpoint createEndpoint(SocketAddress address) { 083 return new DatagramEndpoint(address.toString(), address); 084 } 085}