001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.multicast; 018 019import java.io.IOException; 020import java.net.DatagramSocket; 021import java.net.InetAddress; 022import java.net.InetSocketAddress; 023import java.net.MulticastSocket; 024import java.net.SocketAddress; 025import java.net.SocketException; 026import java.net.URI; 027import java.net.UnknownHostException; 028 029import org.apache.activemq.openwire.OpenWireFormat; 030import org.apache.activemq.transport.udp.CommandChannel; 031import org.apache.activemq.transport.udp.CommandDatagramSocket; 032import org.apache.activemq.transport.udp.DatagramHeaderMarshaller; 033import org.apache.activemq.transport.udp.UdpTransport; 034import org.apache.activemq.util.ServiceStopper; 035import org.slf4j.Logger; 036import org.slf4j.LoggerFactory; 037 038/** 039 * A multicast based transport. 040 * 041 * 042 */ 043public class MulticastTransport extends UdpTransport { 044 045 private static final Logger LOG = LoggerFactory.getLogger(MulticastTransport.class); 046 047 private static final int DEFAULT_IDLE_TIME = 5000; 048 049 private MulticastSocket socket; 050 private InetAddress mcastAddress; 051 private int mcastPort; 052 private int timeToLive = 1; 053 private boolean loopBackMode; 054 private long keepAliveInterval = DEFAULT_IDLE_TIME; 055 056 public MulticastTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException, IOException { 057 super(wireFormat, remoteLocation); 058 } 059 060 public long getKeepAliveInterval() { 061 return keepAliveInterval; 062 } 063 064 public void setKeepAliveInterval(long keepAliveInterval) { 065 this.keepAliveInterval = keepAliveInterval; 066 } 067 068 public boolean isLoopBackMode() { 069 return loopBackMode; 070 } 071 072 public void setLoopBackMode(boolean loopBackMode) { 073 this.loopBackMode = loopBackMode; 074 } 075 076 public int getTimeToLive() { 077 return timeToLive; 078 } 079 080 public void setTimeToLive(int timeToLive) { 081 this.timeToLive = timeToLive; 082 } 083 084 protected String getProtocolName() { 085 return "Multicast"; 086 } 087 088 protected String getProtocolUriScheme() { 089 return "multicast://"; 090 } 091 092 protected void bind(DatagramSocket socket, SocketAddress localAddress) throws SocketException { 093 } 094 095 protected void doStop(ServiceStopper stopper) throws Exception { 096 super.doStop(stopper); 097 if (socket != null) { 098 try { 099 socket.leaveGroup(getMulticastAddress()); 100 } catch (IOException e) { 101 stopper.onException(this, e); 102 } 103 socket.close(); 104 } 105 } 106 107 protected CommandChannel createCommandChannel() throws IOException { 108 socket = new MulticastSocket(mcastPort); 109 socket.setLoopbackMode(loopBackMode); 110 socket.setTimeToLive(timeToLive); 111 112 LOG.debug("Joining multicast address: " + getMulticastAddress()); 113 socket.joinGroup(getMulticastAddress()); 114 socket.setSoTimeout((int)keepAliveInterval); 115 116 return new CommandDatagramSocket(this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), getSocket()); 117 } 118 119 protected InetAddress getMulticastAddress() { 120 return mcastAddress; 121 } 122 123 protected MulticastSocket getSocket() { 124 return socket; 125 } 126 127 protected void setSocket(MulticastSocket socket) { 128 this.socket = socket; 129 } 130 131 protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException { 132 this.mcastAddress = InetAddress.getByName(remoteLocation.getHost()); 133 this.mcastPort = remoteLocation.getPort(); 134 return new InetSocketAddress(mcastAddress, mcastPort); 135 } 136 137 protected DatagramHeaderMarshaller createDatagramHeaderMarshaller() { 138 return new MulticastDatagramHeaderMarshaller("udp://dummyHostName:" + getPort()); 139 } 140 141}