001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.udp; 018 019import java.io.IOException; 020import java.net.InetSocketAddress; 021import java.net.SocketAddress; 022import java.net.URI; 023import java.util.HashMap; 024import java.util.Map; 025 026import org.apache.activemq.command.BrokerInfo; 027import org.apache.activemq.command.Command; 028import org.apache.activemq.openwire.OpenWireFormat; 029import org.apache.activemq.transport.CommandJoiner; 030import org.apache.activemq.transport.InactivityMonitor; 031import org.apache.activemq.transport.Transport; 032import org.apache.activemq.transport.TransportListener; 033import org.apache.activemq.transport.TransportServer; 034import org.apache.activemq.transport.TransportServerSupport; 035import org.apache.activemq.transport.reliable.ReliableTransport; 036import org.apache.activemq.transport.reliable.ReplayStrategy; 037import org.apache.activemq.util.ServiceStopper; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041/** 042 * A UDP based implementation of {@link TransportServer} 043 * 044 * @deprecated 045 */ 046@Deprecated 047public class UdpTransportServer extends TransportServerSupport { 048 private static final Logger LOG = LoggerFactory.getLogger(UdpTransportServer.class); 049 050 private final UdpTransport serverTransport; 051 private final ReplayStrategy replayStrategy; 052 private final Transport configuredTransport; 053 private boolean usingWireFormatNegotiation; 054 private final Map<DatagramEndpoint, Transport> transports = new HashMap<DatagramEndpoint, Transport>(); 055 private boolean allowLinkStealing; 056 057 public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) { 058 super(connectURI); 059 this.serverTransport = serverTransport; 060 this.configuredTransport = configuredTransport; 061 this.replayStrategy = replayStrategy; 062 } 063 064 @Override 065 public String toString() { 066 return "UdpTransportServer@" + serverTransport; 067 } 068 069 public void run() { 070 } 071 072 public UdpTransport getServerTransport() { 073 return serverTransport; 074 } 075 076 @Override 077 public void setBrokerInfo(BrokerInfo brokerInfo) { 078 } 079 080 @Override 081 protected void doStart() throws Exception { 082 LOG.info("Starting " + this); 083 084 configuredTransport.setTransportListener(new TransportListener() { 085 @Override 086 public void onCommand(Object o) { 087 final Command command = (Command)o; 088 processInboundConnection(command); 089 } 090 091 @Override 092 public void onException(IOException error) { 093 LOG.error("Caught: " + error, error); 094 } 095 096 @Override 097 public void transportInterupted() { 098 } 099 100 @Override 101 public void transportResumed() { 102 } 103 }); 104 configuredTransport.start(); 105 } 106 107 @Override 108 protected void doStop(ServiceStopper stopper) throws Exception { 109 configuredTransport.stop(); 110 } 111 112 protected void processInboundConnection(Command command) { 113 DatagramEndpoint endpoint = (DatagramEndpoint)command.getFrom(); 114 if (LOG.isDebugEnabled()) { 115 LOG.debug("Received command on: " + this + " from address: " + endpoint + " command: " + command); 116 } 117 Transport transport = null; 118 synchronized (transports) { 119 transport = transports.get(endpoint); 120 if (transport == null) { 121 if (usingWireFormatNegotiation && !command.isWireFormatInfo()) { 122 LOG.error("Received inbound server communication from: " + command.getFrom() + " expecting WireFormatInfo but was command: " + command); 123 } else { 124 if (LOG.isDebugEnabled()) { 125 LOG.debug("Creating a new UDP server connection"); 126 } 127 try { 128 transport = createTransport(command, endpoint); 129 transport = configureTransport(transport); 130 transports.put(endpoint, transport); 131 } catch (IOException e) { 132 LOG.error("Caught: " + e, e); 133 getAcceptListener().onAcceptError(e); 134 } 135 } 136 } else { 137 LOG.warn("Discarding duplicate command to server from: " + endpoint + " command: " + command); 138 } 139 } 140 } 141 142 protected Transport configureTransport(Transport transport) { 143 transport = new InactivityMonitor(transport, serverTransport.getWireFormat()); 144 getAcceptListener().onAccept(transport); 145 return transport; 146 } 147 148 protected Transport createTransport(final Command command, DatagramEndpoint endpoint) throws IOException { 149 if (endpoint == null) { 150 throw new IOException("No endpoint available for command: " + command); 151 } 152 final SocketAddress address = endpoint.getAddress(); 153 final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy(); 154 final UdpTransport transport = new UdpTransport(connectionWireFormat, address); 155 156 final ReliableTransport reliableTransport = new ReliableTransport(transport, transport); 157 reliableTransport.getReplayer(); 158 reliableTransport.setReplayStrategy(replayStrategy); 159 160 // Joiner must be on outside as the inbound messages must be processed 161 // by the reliable transport first 162 return new CommandJoiner(reliableTransport, connectionWireFormat) { 163 @Override 164 public void start() throws Exception { 165 super.start(); 166 reliableTransport.onCommand(command); 167 } 168 }; 169 170 /** 171 * final WireFormatNegotiator wireFormatNegotiator = new 172 * WireFormatNegotiator(configuredTransport, transport.getWireFormat(), 173 * serverTransport .getMinmumWireFormatVersion()) { public void start() 174 * throws Exception { super.start(); log.debug("Starting a new server 175 * transport: " + this + " with command: " + command); 176 * onCommand(command); } // lets use the specific addressing of wire 177 * format protected void sendWireFormat(WireFormatInfo info) throws 178 * IOException { log.debug("#### we have negotiated the wireformat; 179 * sending a wireformat to: " + address); transport.oneway(info, 180 * address); } }; return wireFormatNegotiator; 181 */ 182 } 183 184 @Override 185 public InetSocketAddress getSocketAddress() { 186 return serverTransport.getLocalSocketAddress(); 187 } 188 189 @Override 190 public boolean isSslServer() { 191 return false; 192 } 193 194 @Override 195 public boolean isAllowLinkStealing() { 196 return allowLinkStealing; 197 } 198 199 public void setAllowLinkStealing(boolean allowLinkStealing) { 200 this.allowLinkStealing = allowLinkStealing; 201 } 202}