001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.discovery.simple; 018 019import java.io.IOException; 020import java.net.URI; 021import java.util.concurrent.atomic.AtomicBoolean; 022 023import org.apache.activemq.command.DiscoveryEvent; 024import org.apache.activemq.thread.TaskRunnerFactory; 025import org.apache.activemq.transport.discovery.DiscoveryAgent; 026import org.apache.activemq.transport.discovery.DiscoveryListener; 027import org.slf4j.Logger; 028import org.slf4j.LoggerFactory; 029 030/** 031 * A simple DiscoveryAgent that allows static configuration of the discovered 032 * services. 033 * 034 * 035 */ 036public class SimpleDiscoveryAgent implements DiscoveryAgent { 037 038 private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class); 039 private long initialReconnectDelay = 1000; 040 private long maxReconnectDelay = 1000 * 30; 041 private long backOffMultiplier = 2; 042 private boolean useExponentialBackOff=true; 043 private int maxReconnectAttempts; 044 private final Object sleepMutex = new Object(); 045 private long minConnectTime = 5000; 046 private DiscoveryListener listener; 047 private String services[] = new String[] {}; 048 private final AtomicBoolean running = new AtomicBoolean(false); 049 private TaskRunnerFactory taskRunner; 050 051 class SimpleDiscoveryEvent extends DiscoveryEvent { 052 053 private int connectFailures; 054 private long reconnectDelay = -1; 055 private long connectTime = System.currentTimeMillis(); 056 private final AtomicBoolean failed = new AtomicBoolean(false); 057 058 public SimpleDiscoveryEvent(String service) { 059 super(service); 060 } 061 062 public SimpleDiscoveryEvent(SimpleDiscoveryEvent copy) { 063 super(copy); 064 connectFailures = copy.connectFailures; 065 reconnectDelay = copy.reconnectDelay; 066 connectTime = copy.connectTime; 067 failed.set(copy.failed.get()); 068 } 069 070 @Override 071 public String toString() { 072 return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]"; 073 } 074 } 075 076 @Override 077 public void setDiscoveryListener(DiscoveryListener listener) { 078 this.listener = listener; 079 } 080 081 @Override 082 public void registerService(String name) throws IOException { 083 } 084 085 @Override 086 public void start() throws Exception { 087 taskRunner = new TaskRunnerFactory(); 088 taskRunner.init(); 089 090 running.set(true); 091 for (int i = 0; i < services.length; i++) { 092 listener.onServiceAdd(new SimpleDiscoveryEvent(services[i])); 093 } 094 } 095 096 @Override 097 public void stop() throws Exception { 098 running.set(false); 099 100 if (taskRunner != null) { 101 taskRunner.shutdown(); 102 } 103 104 // TODO: Should we not remove the services on the listener? 105 106 synchronized (sleepMutex) { 107 sleepMutex.notifyAll(); 108 } 109 } 110 111 public String[] getServices() { 112 return services; 113 } 114 115 public void setServices(String services) { 116 this.services = services.split(","); 117 } 118 119 public void setServices(String services[]) { 120 this.services = services; 121 } 122 123 public void setServices(URI services[]) { 124 this.services = new String[services.length]; 125 for (int i = 0; i < services.length; i++) { 126 this.services[i] = services[i].toString(); 127 } 128 } 129 130 @Override 131 public void serviceFailed(DiscoveryEvent devent) throws IOException { 132 133 final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent; 134 if (sevent.failed.compareAndSet(false, true)) { 135 136 listener.onServiceRemove(sevent); 137 taskRunner.execute(new Runnable() { 138 @Override 139 public void run() { 140 SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent); 141 142 // We detect a failed connection attempt because the service 143 // fails right away. 144 if (event.connectTime + minConnectTime > System.currentTimeMillis()) { 145 LOG.debug("Failure occurred soon after the discovery event was generated. It will be classified as a connection failure: {}", event); 146 147 event.connectFailures++; 148 149 if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) { 150 LOG.warn("Reconnect attempts exceeded {} tries. Reconnecting has been disabled for: {}", maxReconnectAttempts, event); 151 return; 152 } 153 154 if (!useExponentialBackOff || event.reconnectDelay == -1) { 155 event.reconnectDelay = initialReconnectDelay; 156 } else { 157 // Exponential increment of reconnect delay. 158 event.reconnectDelay *= backOffMultiplier; 159 if (event.reconnectDelay > maxReconnectDelay) { 160 event.reconnectDelay = maxReconnectDelay; 161 } 162 } 163 164 doReconnectDelay(event); 165 166 } else { 167 LOG.trace("Failure occurred to long after the discovery event was generated. " + 168 "It will not be classified as a connection failure: {}", event); 169 event.connectFailures = 0; 170 event.reconnectDelay = initialReconnectDelay; 171 172 doReconnectDelay(event); 173 } 174 175 if (!running.get()) { 176 LOG.debug("Reconnecting disabled: stopped"); 177 return; 178 } 179 180 event.connectTime = System.currentTimeMillis(); 181 event.failed.set(false); 182 listener.onServiceAdd(event); 183 } 184 }, "Simple Discovery Agent"); 185 } 186 } 187 188 protected void doReconnectDelay(SimpleDiscoveryEvent event) { 189 synchronized (sleepMutex) { 190 try { 191 if (!running.get()) { 192 LOG.debug("Reconnecting disabled: stopped"); 193 return; 194 } 195 196 LOG.debug("Waiting {}ms before attempting to reconnect.", event.reconnectDelay); 197 sleepMutex.wait(event.reconnectDelay); 198 } catch (InterruptedException ie) { 199 LOG.debug("Reconnecting disabled: ", ie); 200 Thread.currentThread().interrupt(); 201 return; 202 } 203 } 204 } 205 206 public long getBackOffMultiplier() { 207 return backOffMultiplier; 208 } 209 210 public void setBackOffMultiplier(long backOffMultiplier) { 211 this.backOffMultiplier = backOffMultiplier; 212 } 213 214 public long getInitialReconnectDelay() { 215 return initialReconnectDelay; 216 } 217 218 public void setInitialReconnectDelay(long initialReconnectDelay) { 219 this.initialReconnectDelay = initialReconnectDelay; 220 } 221 222 public int getMaxReconnectAttempts() { 223 return maxReconnectAttempts; 224 } 225 226 public void setMaxReconnectAttempts(int maxReconnectAttempts) { 227 this.maxReconnectAttempts = maxReconnectAttempts; 228 } 229 230 public long getMaxReconnectDelay() { 231 return maxReconnectDelay; 232 } 233 234 public void setMaxReconnectDelay(long maxReconnectDelay) { 235 this.maxReconnectDelay = maxReconnectDelay; 236 } 237 238 public long getMinConnectTime() { 239 return minConnectTime; 240 } 241 242 public void setMinConnectTime(long minConnectTime) { 243 this.minConnectTime = minConnectTime; 244 } 245 246 public boolean isUseExponentialBackOff() { 247 return useExponentialBackOff; 248 } 249 250 public void setUseExponentialBackOff(boolean useExponentialBackOff) { 251 this.useExponentialBackOff = useExponentialBackOff; 252 } 253}