001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.udp; 018 019import java.io.IOException; 020import java.net.URI; 021import java.net.URISyntaxException; 022import java.net.UnknownHostException; 023import java.util.HashMap; 024import java.util.Map; 025 026import org.apache.activemq.TransportLoggerSupport; 027import org.apache.activemq.openwire.OpenWireFormat; 028import org.apache.activemq.transport.CommandJoiner; 029import org.apache.activemq.transport.InactivityMonitor; 030import org.apache.activemq.transport.Transport; 031import org.apache.activemq.transport.TransportFactory; 032import org.apache.activemq.transport.TransportServer; 033import org.apache.activemq.transport.reliable.DefaultReplayStrategy; 034import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy; 035import org.apache.activemq.transport.reliable.ReliableTransport; 036import org.apache.activemq.transport.reliable.ReplayStrategy; 037import org.apache.activemq.transport.reliable.Replayer; 038import org.apache.activemq.transport.tcp.TcpTransportFactory; 039import org.apache.activemq.util.IOExceptionSupport; 040import org.apache.activemq.util.IntrospectionSupport; 041import org.apache.activemq.util.URISupport; 042import org.apache.activemq.wireformat.WireFormat; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications) 048 * 049 * @deprecated 050 */ 051@Deprecated 052public class UdpTransportFactory extends TransportFactory { 053 054 private static final Logger log = LoggerFactory.getLogger(TcpTransportFactory.class); 055 056 @Override 057 public TransportServer doBind(final URI location) throws IOException { 058 try { 059 Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); 060 if (options.containsKey("port")) { 061 throw new IllegalArgumentException("The port property cannot be specified on a UDP server transport - please use the port in the URI syntax"); 062 } 063 WireFormat wf = createWireFormat(options); 064 int port = location.getPort(); 065 OpenWireFormat openWireFormat = asOpenWireFormat(wf); 066 UdpTransport transport = (UdpTransport) createTransport(location.getPort(), wf); 067 068 Transport configuredTransport = configure(transport, wf, options, true); 069 UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, createReplayStrategy()); 070 return server; 071 } catch (URISyntaxException e) { 072 throw IOExceptionSupport.create(e); 073 } catch (Exception e) { 074 throw IOExceptionSupport.create(e); 075 } 076 } 077 078 @Override 079 public Transport configure(Transport transport, WireFormat format, Map options) throws Exception { 080 return configure(transport, format, options, false); 081 } 082 083 @Override 084 public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { 085 IntrospectionSupport.setProperties(transport, options); 086 final UdpTransport udpTransport = (UdpTransport)transport; 087 088 // deal with fragmentation 089 transport = new CommandJoiner(transport, asOpenWireFormat(format)); 090 091 if (udpTransport.isTrace()) { 092 try { 093 transport = TransportLoggerSupport.createTransportLogger(transport); 094 } catch (Throwable e) { 095 log.error("Could not create TransportLogger, reason: " + e, e); 096 } 097 } 098 099 transport = new InactivityMonitor(transport, format); 100 101 if (format instanceof OpenWireFormat) { 102 transport = configureClientSideNegotiator(transport, format, udpTransport); 103 } 104 105 return transport; 106 } 107 108 @Override 109 protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException { 110 OpenWireFormat wireFormat = asOpenWireFormat(wf); 111 return new UdpTransport(wireFormat, location); 112 } 113 114 protected Transport createTransport(int port, WireFormat wf) throws UnknownHostException, IOException { 115 OpenWireFormat wireFormat = asOpenWireFormat(wf); 116 return new UdpTransport(wireFormat, port); 117 } 118 119 /** 120 * Configures the transport 121 * 122 * @param acceptServer true if this transport is used purely as an 'accept' 123 * transport for new connections which work like TCP 124 * SocketServers where new connections spin up a new separate 125 * UDP transport 126 */ 127 protected Transport configure(Transport transport, WireFormat format, Map options, boolean acceptServer) throws Exception { 128 IntrospectionSupport.setProperties(transport, options); 129 UdpTransport udpTransport = (UdpTransport)transport; 130 131 OpenWireFormat openWireFormat = asOpenWireFormat(format); 132 133 if (udpTransport.isTrace()) { 134 transport = TransportLoggerSupport.createTransportLogger(transport); 135 } 136 137 transport = new InactivityMonitor(transport, format); 138 139 if (!acceptServer && format instanceof OpenWireFormat) { 140 transport = configureClientSideNegotiator(transport, format, udpTransport); 141 } 142 143 // deal with fragmentation 144 145 if (acceptServer) { 146 // lets not support a buffer of messages to enable reliable 147 // messaging on the 'accept server' transport 148 udpTransport.setReplayEnabled(false); 149 150 // we don't want to do reliable checks on this transport as we 151 // delegate to one that does 152 transport = new CommandJoiner(transport, openWireFormat); 153 return transport; 154 } else { 155 ReliableTransport reliableTransport = new ReliableTransport(transport, udpTransport); 156 Replayer replayer = reliableTransport.getReplayer(); 157 reliableTransport.setReplayStrategy(createReplayStrategy(replayer)); 158 159 // Joiner must be on outside as the inbound messages must be 160 // processed by the reliable transport first 161 return new CommandJoiner(reliableTransport, openWireFormat); 162 } 163 } 164 165 166 protected ReplayStrategy createReplayStrategy(Replayer replayer) { 167 if (replayer != null) { 168 return new DefaultReplayStrategy(5); 169 } 170 return new ExceptionIfDroppedReplayStrategy(1); 171 } 172 173 protected ReplayStrategy createReplayStrategy() { 174 return new DefaultReplayStrategy(5); 175 } 176 177 protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) { 178 return new ResponseRedirectInterceptor(transport, udpTransport); 179 } 180 181 protected OpenWireFormat asOpenWireFormat(WireFormat wf) { 182 OpenWireFormat answer = (OpenWireFormat)wf; 183 return answer; 184 } 185}