001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq.transport.discovery.multicast; 019 020import java.io.IOException; 021import java.net.DatagramPacket; 022import java.net.InetAddress; 023import java.net.InetSocketAddress; 024import java.net.InterfaceAddress; 025import java.net.MulticastSocket; 026import java.net.NetworkInterface; 027import java.net.SocketAddress; 028import java.net.SocketException; 029import java.net.SocketTimeoutException; 030import java.net.URI; 031import java.util.ArrayList; 032import java.util.Enumeration; 033import java.util.Iterator; 034import java.util.List; 035import java.util.Map; 036import java.util.concurrent.ConcurrentHashMap; 037import java.util.concurrent.ExecutorService; 038import java.util.concurrent.LinkedBlockingQueue; 039import java.util.concurrent.ThreadFactory; 040import java.util.concurrent.ThreadPoolExecutor; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.atomic.AtomicBoolean; 043 044import org.apache.activemq.command.DiscoveryEvent; 045import org.apache.activemq.transport.discovery.DiscoveryAgent; 046import org.apache.activemq.transport.discovery.DiscoveryListener; 047import org.apache.activemq.util.ThreadPoolUtils; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * A {@link DiscoveryAgent} using a multicast address and heartbeat packets 053 * encoded using any wireformat, but openwire by default. 054 * 055 * 056 */ 057public class MulticastDiscoveryAgent implements DiscoveryAgent, Runnable { 058 059 public static final String DEFAULT_DISCOVERY_URI_STRING = "multicast://239.255.2.3:6155"; 060 public static final String DEFAULT_HOST_STR = "default"; 061 public static final String DEFAULT_HOST_IP = System.getProperty("activemq.partition.discovery", "239.255.2.3"); 062 public static final int DEFAULT_PORT = 6155; 063 064 private static final Logger LOG = LoggerFactory.getLogger(MulticastDiscoveryAgent.class); 065 private static final String TYPE_SUFFIX = "ActiveMQ-4."; 066 private static final String ALIVE = "alive."; 067 private static final String DEAD = "dead."; 068 private static final String DELIMITER = "%"; 069 private static final int BUFF_SIZE = 8192; 070 private static final int DEFAULT_IDLE_TIME = 500; 071 private static final int HEARTBEAT_MISS_BEFORE_DEATH = 10; 072 073 private long initialReconnectDelay = 1000 * 5; 074 private long maxReconnectDelay = 1000 * 30; 075 private long backOffMultiplier = 2; 076 private boolean useExponentialBackOff; 077 private int maxReconnectAttempts; 078 079 private int timeToLive = 1; 080 private boolean loopBackMode; 081 private Map<String, RemoteBrokerData> brokersByService = new ConcurrentHashMap<String, RemoteBrokerData>(); 082 private String group = "default"; 083 private URI discoveryURI; 084 private InetAddress inetAddress; 085 private SocketAddress sockAddress; 086 private DiscoveryListener discoveryListener; 087 private String selfService; 088 private MulticastSocket mcast; 089 private Thread runner; 090 private long keepAliveInterval = DEFAULT_IDLE_TIME; 091 private String mcInterface; 092 private String mcNetworkInterface; 093 private String mcJoinNetworkInterface; 094 private long lastAdvertizeTime; 095 private AtomicBoolean started = new AtomicBoolean(false); 096 private boolean reportAdvertizeFailed = true; 097 private ExecutorService executor = null; 098 099 class RemoteBrokerData extends DiscoveryEvent { 100 long lastHeartBeat; 101 long recoveryTime; 102 int failureCount; 103 boolean failed; 104 105 public RemoteBrokerData(String brokerName, String service) { 106 super(service); 107 setBrokerName(brokerName); 108 this.lastHeartBeat = System.currentTimeMillis(); 109 } 110 111 public synchronized void updateHeartBeat() { 112 lastHeartBeat = System.currentTimeMillis(); 113 114 // Consider that the broker recovery has succeeded if it has not 115 // failed in 60 seconds. 116 if (!failed && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) { 117 if (LOG.isDebugEnabled()) { 118 LOG.debug("I now think that the " + serviceName + " service has recovered."); 119 } 120 failureCount = 0; 121 recoveryTime = 0; 122 } 123 } 124 125 public synchronized long getLastHeartBeat() { 126 return lastHeartBeat; 127 } 128 129 public synchronized boolean markFailed() { 130 if (!failed) { 131 failed = true; 132 failureCount++; 133 134 long reconnectDelay; 135 if (!useExponentialBackOff) { 136 reconnectDelay = initialReconnectDelay; 137 } else { 138 reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount); 139 if (reconnectDelay > maxReconnectDelay) { 140 reconnectDelay = maxReconnectDelay; 141 } 142 } 143 144 if (LOG.isDebugEnabled()) { 145 LOG.debug("Remote failure of " + serviceName + " while still receiving multicast advertisements. Advertising events will be suppressed for " + reconnectDelay 146 + " ms, the current failure count is: " + failureCount); 147 } 148 149 recoveryTime = System.currentTimeMillis() + reconnectDelay; 150 return true; 151 } 152 return false; 153 } 154 155 /** 156 * @return true if this broker is marked failed and it is now the right 157 * time to start recovery. 158 */ 159 public synchronized boolean doRecovery() { 160 if (!failed) { 161 return false; 162 } 163 164 // Are we done trying to recover this guy? 165 if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) { 166 if (LOG.isDebugEnabled()) { 167 LOG.debug("Max reconnect attempts of the " + serviceName + " service has been reached."); 168 } 169 return false; 170 } 171 172 // Is it not yet time? 173 if (System.currentTimeMillis() < recoveryTime) { 174 return false; 175 } 176 177 if (LOG.isDebugEnabled()) { 178 LOG.debug("Resuming event advertisement of the " + serviceName + " service."); 179 } 180 failed = false; 181 return true; 182 } 183 184 public boolean isFailed() { 185 return failed; 186 } 187 } 188 189 /** 190 * Set the discovery listener 191 * 192 * @param listener 193 */ 194 public void setDiscoveryListener(DiscoveryListener listener) { 195 this.discoveryListener = listener; 196 } 197 198 /** 199 * register a service 200 */ 201 public void registerService(String name) throws IOException { 202 this.selfService = name; 203 if (started.get()) { 204 doAdvertizeSelf(); 205 } 206 } 207 208 /** 209 * @return Returns the loopBackMode. 210 */ 211 public boolean isLoopBackMode() { 212 return loopBackMode; 213 } 214 215 /** 216 * @param loopBackMode The loopBackMode to set. 217 */ 218 public void setLoopBackMode(boolean loopBackMode) { 219 this.loopBackMode = loopBackMode; 220 } 221 222 /** 223 * @return Returns the timeToLive. 224 */ 225 public int getTimeToLive() { 226 return timeToLive; 227 } 228 229 /** 230 * @param timeToLive The timeToLive to set. 231 */ 232 public void setTimeToLive(int timeToLive) { 233 this.timeToLive = timeToLive; 234 } 235 236 /** 237 * @return the discoveryURI 238 */ 239 public URI getDiscoveryURI() { 240 return discoveryURI; 241 } 242 243 /** 244 * Set the discoveryURI 245 * 246 * @param discoveryURI 247 */ 248 public void setDiscoveryURI(URI discoveryURI) { 249 this.discoveryURI = discoveryURI; 250 } 251 252 public long getKeepAliveInterval() { 253 return keepAliveInterval; 254 } 255 256 public void setKeepAliveInterval(long keepAliveInterval) { 257 this.keepAliveInterval = keepAliveInterval; 258 } 259 260 public void setInterface(String mcInterface) { 261 this.mcInterface = mcInterface; 262 } 263 264 public void setNetworkInterface(String mcNetworkInterface) { 265 this.mcNetworkInterface = mcNetworkInterface; 266 } 267 268 public void setJoinNetworkInterface(String mcJoinNetwrokInterface) { 269 this.mcJoinNetworkInterface = mcJoinNetwrokInterface; 270 } 271 272 /** 273 * start the discovery agent 274 * 275 * @throws Exception 276 */ 277 public void start() throws Exception { 278 279 if (started.compareAndSet(false, true)) { 280 281 if (group == null || group.length() == 0) { 282 throw new IOException("You must specify a group to discover"); 283 } 284 String type = getType(); 285 if (!type.endsWith(".")) { 286 LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type"); 287 type += "."; 288 } 289 290 if (discoveryURI == null) { 291 discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING); 292 } 293 294 if (LOG.isTraceEnabled()) 295 LOG.trace("start - discoveryURI = " + discoveryURI); 296 297 String myHost = discoveryURI.getHost(); 298 int myPort = discoveryURI.getPort(); 299 300 if( DEFAULT_HOST_STR.equals(myHost) ) 301 myHost = DEFAULT_HOST_IP; 302 303 if(myPort < 0 ) 304 myPort = DEFAULT_PORT; 305 306 if (LOG.isTraceEnabled()) { 307 LOG.trace("start - myHost = " + myHost); 308 LOG.trace("start - myPort = " + myPort); 309 LOG.trace("start - group = " + group ); 310 LOG.trace("start - interface = " + mcInterface ); 311 LOG.trace("start - network interface = " + mcNetworkInterface ); 312 LOG.trace("start - join network interface = " + mcJoinNetworkInterface ); 313 } 314 315 this.inetAddress = InetAddress.getByName(myHost); 316 this.sockAddress = new InetSocketAddress(this.inetAddress, myPort); 317 mcast = new MulticastSocket(myPort); 318 mcast.setLoopbackMode(loopBackMode); 319 mcast.setTimeToLive(getTimeToLive()); 320 if (mcJoinNetworkInterface != null) { 321 mcast.joinGroup(sockAddress, NetworkInterface.getByName(mcJoinNetworkInterface)); 322 } 323 else { 324 mcast.setNetworkInterface(findNetworkInterface()); 325 mcast.joinGroup(inetAddress); 326 } 327 mcast.setSoTimeout((int)keepAliveInterval); 328 if (mcInterface != null) { 329 mcast.setInterface(InetAddress.getByName(mcInterface)); 330 } 331 if (mcNetworkInterface != null) { 332 mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface)); 333 } 334 runner = new Thread(this); 335 runner.setName(this.toString() + ":" + runner.getName()); 336 runner.setDaemon(true); 337 runner.start(); 338 doAdvertizeSelf(); 339 } 340 } 341 342 private NetworkInterface findNetworkInterface() throws SocketException { 343 Enumeration<NetworkInterface> ifcs = NetworkInterface.getNetworkInterfaces(); 344 List<NetworkInterface> possibles = new ArrayList<NetworkInterface>(); 345 while (ifcs.hasMoreElements()) { 346 NetworkInterface ni = ifcs.nextElement(); 347 try { 348 if (ni.supportsMulticast() 349 && ni.isUp()) { 350 for (InterfaceAddress ia : ni.getInterfaceAddresses()) { 351 if (ia.getAddress() instanceof java.net.Inet4Address 352 && !ia.getAddress().isLoopbackAddress() 353 && !ni.getDisplayName().startsWith("vnic")) { 354 possibles.add(ni); 355 } 356 } 357 } 358 } catch (SocketException ignored) {} 359 } 360 return possibles.isEmpty() ? null : possibles.get(possibles.size() - 1); 361 } 362 363 /** 364 * stop the channel 365 * 366 * @throws Exception 367 */ 368 public void stop() throws Exception { 369 if (started.compareAndSet(true, false)) { 370 doAdvertizeSelf(); 371 if (mcast != null) { 372 mcast.close(); 373 } 374 if (runner != null) { 375 runner.interrupt(); 376 } 377 if (executor != null) { 378 ThreadPoolUtils.shutdownNow(executor); 379 executor = null; 380 } 381 } 382 } 383 384 public String getType() { 385 return group + "." + TYPE_SUFFIX; 386 } 387 388 public void run() { 389 byte[] buf = new byte[BUFF_SIZE]; 390 DatagramPacket packet = new DatagramPacket(buf, 0, buf.length); 391 while (started.get()) { 392 doTimeKeepingServices(); 393 try { 394 mcast.receive(packet); 395 if (packet.getLength() > 0) { 396 String str = new String(packet.getData(), packet.getOffset(), packet.getLength()); 397 processData(str); 398 } 399 } catch (SocketTimeoutException se) { 400 // ignore 401 } catch (IOException e) { 402 if (started.get()) { 403 LOG.error("failed to process packet: " + e); 404 } 405 } 406 } 407 } 408 409 private void processData(String str) { 410 if (discoveryListener != null) { 411 if (str.startsWith(getType())) { 412 String payload = str.substring(getType().length()); 413 if (payload.startsWith(ALIVE)) { 414 String brokerName = getBrokerName(payload.substring(ALIVE.length())); 415 String service = payload.substring(ALIVE.length() + brokerName.length() + 2); 416 processAlive(brokerName, service); 417 } else { 418 String brokerName = getBrokerName(payload.substring(DEAD.length())); 419 String service = payload.substring(DEAD.length() + brokerName.length() + 2); 420 processDead(service); 421 } 422 } 423 } 424 } 425 426 private void doTimeKeepingServices() { 427 if (started.get()) { 428 long currentTime = System.currentTimeMillis(); 429 if (currentTime < lastAdvertizeTime || ((currentTime - keepAliveInterval) > lastAdvertizeTime)) { 430 doAdvertizeSelf(); 431 lastAdvertizeTime = currentTime; 432 } 433 doExpireOldServices(); 434 } 435 } 436 437 private void doAdvertizeSelf() { 438 if (selfService != null) { 439 String payload = getType(); 440 payload += started.get() ? ALIVE : DEAD; 441 payload += DELIMITER + "localhost" + DELIMITER; 442 payload += selfService; 443 try { 444 byte[] data = payload.getBytes(); 445 DatagramPacket packet = new DatagramPacket(data, 0, data.length, sockAddress); 446 mcast.send(packet); 447 } catch (IOException e) { 448 // If a send fails, chances are all subsequent sends will fail 449 // too.. No need to keep reporting the 450 // same error over and over. 451 if (reportAdvertizeFailed) { 452 reportAdvertizeFailed = false; 453 LOG.error("Failed to advertise our service: " + payload, e); 454 if ("Operation not permitted".equals(e.getMessage())) { 455 LOG.error("The 'Operation not permitted' error has been know to be caused by improper firewall/network setup. " 456 + "Please make sure that the OS is properly configured to allow multicast traffic over: " + mcast.getLocalAddress()); 457 } 458 } 459 } 460 } 461 } 462 463 private void processAlive(String brokerName, String service) { 464 if (selfService == null || !service.equals(selfService)) { 465 RemoteBrokerData data = brokersByService.get(service); 466 if (data == null) { 467 data = new RemoteBrokerData(brokerName, service); 468 brokersByService.put(service, data); 469 fireServiceAddEvent(data); 470 doAdvertizeSelf(); 471 } else { 472 data.updateHeartBeat(); 473 if (data.doRecovery()) { 474 fireServiceAddEvent(data); 475 } 476 } 477 } 478 } 479 480 private void processDead(String service) { 481 if (!service.equals(selfService)) { 482 RemoteBrokerData data = brokersByService.remove(service); 483 if (data != null && !data.isFailed()) { 484 fireServiceRemovedEvent(data); 485 } 486 } 487 } 488 489 private void doExpireOldServices() { 490 long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 491 for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) { 492 RemoteBrokerData data = i.next(); 493 if (data.getLastHeartBeat() < expireTime) { 494 processDead(data.getServiceName()); 495 } 496 } 497 } 498 499 private String getBrokerName(String str) { 500 String result = null; 501 int start = str.indexOf(DELIMITER); 502 if (start >= 0) { 503 int end = str.indexOf(DELIMITER, start + 1); 504 result = str.substring(start + 1, end); 505 } 506 return result; 507 } 508 509 public void serviceFailed(DiscoveryEvent event) throws IOException { 510 RemoteBrokerData data = brokersByService.get(event.getServiceName()); 511 if (data != null && data.markFailed()) { 512 fireServiceRemovedEvent(data); 513 } 514 } 515 516 private void fireServiceRemovedEvent(final RemoteBrokerData data) { 517 if (discoveryListener != null && started.get()) { 518 // Have the listener process the event async so that 519 // he does not block this thread since we are doing time sensitive 520 // processing of events. 521 getExecutor().execute(new Runnable() { 522 public void run() { 523 DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; 524 if (discoveryListener != null) { 525 discoveryListener.onServiceRemove(data); 526 } 527 } 528 }); 529 } 530 } 531 532 private void fireServiceAddEvent(final RemoteBrokerData data) { 533 if (discoveryListener != null && started.get()) { 534 535 // Have the listener process the event async so that 536 // he does not block this thread since we are doing time sensitive 537 // processing of events. 538 getExecutor().execute(new Runnable() { 539 public void run() { 540 DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener; 541 if (discoveryListener != null) { 542 discoveryListener.onServiceAdd(data); 543 } 544 } 545 }); 546 } 547 } 548 549 private ExecutorService getExecutor() { 550 if (executor == null) { 551 final String threadName = "Notifier-" + this.toString(); 552 executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { 553 public Thread newThread(Runnable runable) { 554 Thread t = new Thread(runable, threadName); 555 t.setDaemon(true); 556 return t; 557 } 558 }); 559 } 560 return executor; 561 } 562 563 public long getBackOffMultiplier() { 564 return backOffMultiplier; 565 } 566 567 public void setBackOffMultiplier(long backOffMultiplier) { 568 this.backOffMultiplier = backOffMultiplier; 569 } 570 571 public long getInitialReconnectDelay() { 572 return initialReconnectDelay; 573 } 574 575 public void setInitialReconnectDelay(long initialReconnectDelay) { 576 this.initialReconnectDelay = initialReconnectDelay; 577 } 578 579 public int getMaxReconnectAttempts() { 580 return maxReconnectAttempts; 581 } 582 583 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 584 this.maxReconnectAttempts = maxReconnectAttempts; 585 } 586 587 public long getMaxReconnectDelay() { 588 return maxReconnectDelay; 589 } 590 591 public void setMaxReconnectDelay(long maxReconnectDelay) { 592 this.maxReconnectDelay = maxReconnectDelay; 593 } 594 595 public boolean isUseExponentialBackOff() { 596 return useExponentialBackOff; 597 } 598 599 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 600 this.useExponentialBackOff = useExponentialBackOff; 601 } 602 603 public void setGroup(String group) { 604 this.group = group; 605 } 606 607 @Override 608 public String toString() { 609 return "MulticastDiscoveryAgent-" 610 + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener); 611 } 612}