001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.discovery.http; 018 019import java.io.IOException; 020import java.util.HashMap; 021import java.util.HashSet; 022import java.util.Map; 023import java.util.Scanner; 024import java.util.Set; 025import java.util.concurrent.atomic.AtomicBoolean; 026import java.util.concurrent.atomic.AtomicInteger; 027import java.util.concurrent.atomic.AtomicReference; 028 029import org.apache.activemq.Service; 030import org.apache.activemq.command.DiscoveryEvent; 031import org.apache.activemq.transport.discovery.DiscoveryAgent; 032import org.apache.activemq.transport.discovery.DiscoveryListener; 033import org.apache.activemq.util.IntrospectionSupport; 034import org.apache.activemq.util.Suspendable; 035import org.apache.http.client.HttpClient; 036import org.apache.http.client.ResponseHandler; 037import org.apache.http.client.methods.HttpDelete; 038import org.apache.http.client.methods.HttpGet; 039import org.apache.http.client.methods.HttpPut; 040import org.apache.http.impl.client.BasicResponseHandler; 041import org.apache.http.impl.client.DefaultHttpClient; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045public class HTTPDiscoveryAgent implements DiscoveryAgent, Suspendable { 046 047 static enum UpdateState { 048 SUSPENDED, 049 RESUMING, 050 RESUMED 051 } 052 053 private static final Logger LOG = LoggerFactory.getLogger(HTTPDiscoveryAgent.class); 054 055 private String registryURL = "http://localhost:8080/discovery-registry/default"; 056 private HttpClient httpClient = new DefaultHttpClient(); 057 private AtomicBoolean running = new AtomicBoolean(); 058 private final AtomicReference<DiscoveryListener> discoveryListener = new AtomicReference<DiscoveryListener>(); 059 private final HashSet<String> registeredServices = new HashSet<String>(); 060 private final HashMap<String, SimpleDiscoveryEvent> discoveredServices = new HashMap<String, SimpleDiscoveryEvent>(); 061 private Thread thread; 062 private long updateInterval = 1000 * 10; 063 @SuppressWarnings("unused") 064 private String brokerName; 065 private boolean startEmbeddRegistry = false; 066 private Service jetty; 067 private AtomicInteger startCounter = new AtomicInteger(0); 068 069 private long initialReconnectDelay = 1000; 070 private long maxReconnectDelay = 1000 * 30; 071 private long backOffMultiplier = 2; 072 private boolean useExponentialBackOff = true; 073 private int maxReconnectAttempts; 074 private final Object sleepMutex = new Object(); 075 private final Object updateMutex = new Object(); 076 private UpdateState updateState = UpdateState.RESUMED; 077 private long minConnectTime = 5000; 078 079 class SimpleDiscoveryEvent extends DiscoveryEvent { 080 081 private int connectFailures; 082 private long reconnectDelay = initialReconnectDelay; 083 private long connectTime = System.currentTimeMillis(); 084 private AtomicBoolean failed = new AtomicBoolean(false); 085 private AtomicBoolean removed = new AtomicBoolean(false); 086 087 public SimpleDiscoveryEvent(String service) { 088 super(service); 089 } 090 } 091 092 public String getGroup() { 093 return null; 094 } 095 096 public void registerService(String service) throws IOException { 097 synchronized (registeredServices) { 098 registeredServices.add(service); 099 } 100 doRegister(service); 101 } 102 103 synchronized private void doRegister(String service) { 104 String url = registryURL; 105 try { 106 HttpPut method = new HttpPut(url); 107 method.addHeader("service", service); 108 ResponseHandler<String> handler = new BasicResponseHandler(); 109 String responseBody = httpClient.execute(method, handler); 110 LOG.debug("PUT to " + url + " got a " + responseBody); 111 } catch (Exception e) { 112 LOG.debug("PUT to " + url + " failed with: " + e); 113 } 114 } 115 116 @SuppressWarnings("unused") 117 synchronized private void doUnRegister(String service) { 118 String url = registryURL; 119 try { 120 HttpDelete method = new HttpDelete(url); 121 method.addHeader("service", service); 122 ResponseHandler<String> handler = new BasicResponseHandler(); 123 String responseBody = httpClient.execute(method, handler); 124 LOG.debug("DELETE to " + url + " got a " + responseBody); 125 } catch (Exception e) { 126 LOG.debug("DELETE to " + url + " failed with: " + e); 127 } 128 } 129 130 synchronized private Set<String> doLookup(long freshness) { 131 String url = registryURL + "?freshness=" + freshness; 132 try { 133 HttpGet method = new HttpGet(url); 134 ResponseHandler<String> handler = new BasicResponseHandler(); 135 String response = httpClient.execute(method, handler); 136 LOG.debug("GET to " + url + " got a " + response); 137 Set<String> rc = new HashSet<String>(); 138 Scanner scanner = new Scanner(response); 139 while (scanner.hasNextLine()) { 140 String service = scanner.nextLine(); 141 if (service.trim().length() != 0) { 142 rc.add(service); 143 } 144 } 145 scanner.close(); 146 return rc; 147 } catch (Exception e) { 148 LOG.debug("GET to " + url + " failed with: " + e); 149 return null; 150 } 151 } 152 153 public void serviceFailed(DiscoveryEvent devent) throws IOException { 154 155 final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent) devent; 156 if (event.failed.compareAndSet(false, true)) { 157 discoveryListener.get().onServiceRemove(event); 158 if (!event.removed.get()) { 159 // Setup a thread to re-raise the event... 160 Thread thread = new Thread() { 161 public void run() { 162 163 // We detect a failed connection attempt because the 164 // service 165 // fails right away. 166 if (event.connectTime + minConnectTime > System.currentTimeMillis()) { 167 LOG.debug("Failure occured soon after the discovery event was generated. " + 168 "It will be clasified as a connection failure: " + event); 169 170 event.connectFailures++; 171 172 if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) { 173 LOG.debug("Reconnect attempts exceeded " + maxReconnectAttempts + 174 " tries. Reconnecting has been disabled."); 175 return; 176 } 177 178 synchronized (sleepMutex) { 179 try { 180 if (!running.get() || event.removed.get()) { 181 return; 182 } 183 LOG.debug("Waiting " + event.reconnectDelay + 184 " ms before attepting to reconnect."); 185 sleepMutex.wait(event.reconnectDelay); 186 } catch (InterruptedException ie) { 187 Thread.currentThread().interrupt(); 188 return; 189 } 190 } 191 192 if (!useExponentialBackOff) { 193 event.reconnectDelay = initialReconnectDelay; 194 } else { 195 // Exponential increment of reconnect delay. 196 event.reconnectDelay *= backOffMultiplier; 197 if (event.reconnectDelay > maxReconnectDelay) { 198 event.reconnectDelay = maxReconnectDelay; 199 } 200 } 201 202 } else { 203 event.connectFailures = 0; 204 event.reconnectDelay = initialReconnectDelay; 205 } 206 207 if (!running.get() || event.removed.get()) { 208 return; 209 } 210 211 event.connectTime = System.currentTimeMillis(); 212 event.failed.set(false); 213 discoveryListener.get().onServiceAdd(event); 214 } 215 }; 216 thread.setDaemon(true); 217 thread.start(); 218 } 219 } 220 } 221 222 public void setBrokerName(String brokerName) { 223 this.brokerName = brokerName; 224 } 225 226 public void setDiscoveryListener(DiscoveryListener discoveryListener) { 227 this.discoveryListener.set(discoveryListener); 228 } 229 230 public void setGroup(String group) { 231 } 232 233 public void start() throws Exception { 234 if (startCounter.addAndGet(1) == 1) { 235 if (startEmbeddRegistry) { 236 jetty = createEmbeddedJettyServer(); 237 Map<String, Object> props = new HashMap<String, Object>(); 238 props.put("agent", this); 239 IntrospectionSupport.setProperties(jetty, props); 240 jetty.start(); 241 } 242 243 running.set(true); 244 thread = new Thread("HTTPDiscovery Agent") { 245 @Override 246 public void run() { 247 while (running.get()) { 248 try { 249 update(); 250 synchronized (updateMutex) { 251 do { 252 if( updateState == UpdateState.RESUMING ) { 253 updateState = UpdateState.RESUMED; 254 } else { 255 updateMutex.wait(updateInterval); 256 } 257 } while( updateState==UpdateState.SUSPENDED && running.get()); 258 } 259 } catch (InterruptedException e) { 260 return; 261 } 262 } 263 } 264 }; 265 thread.setDaemon(true); 266 thread.start(); 267 } 268 } 269 270 /** 271 * Create the EmbeddedJettyServer instance via reflection so that we can 272 * avoid a hard runtime dependency on jetty. 273 * 274 * @return 275 * @throws Exception 276 */ 277 private Service createEmbeddedJettyServer() throws Exception { 278 Class<?> clazz = HTTPDiscoveryAgent.class.getClassLoader().loadClass("org.apache.activemq.transport.discovery.http.EmbeddedJettyServer"); 279 return (Service) clazz.newInstance(); 280 } 281 282 private void update() { 283 // Register all our services... 284 synchronized (registeredServices) { 285 for (String service : registeredServices) { 286 doRegister(service); 287 } 288 } 289 290 // Find new registered services... 291 DiscoveryListener discoveryListener = this.discoveryListener.get(); 292 if (discoveryListener != null) { 293 Set<String> activeServices = doLookup(updateInterval * 3); 294 // If there is error talking the the central server, then 295 // activeServices == null 296 if (activeServices != null) { 297 synchronized (discoveredServices) { 298 299 HashSet<String> removedServices = new HashSet<String>(discoveredServices.keySet()); 300 removedServices.removeAll(activeServices); 301 302 HashSet<String> addedServices = new HashSet<String>(activeServices); 303 addedServices.removeAll(discoveredServices.keySet()); 304 addedServices.removeAll(removedServices); 305 306 for (String service : addedServices) { 307 SimpleDiscoveryEvent e = new SimpleDiscoveryEvent(service); 308 discoveredServices.put(service, e); 309 discoveryListener.onServiceAdd(e); 310 } 311 312 for (String service : removedServices) { 313 SimpleDiscoveryEvent e = discoveredServices.remove(service); 314 if (e != null) { 315 e.removed.set(true); 316 } 317 discoveryListener.onServiceRemove(e); 318 } 319 } 320 } 321 } 322 } 323 324 public void stop() throws Exception { 325 if (startCounter.decrementAndGet() == 0) { 326 resume(); 327 running.set(false); 328 if (thread != null) { 329 thread.join(updateInterval * 3); 330 thread = null; 331 } 332 if (jetty != null) { 333 jetty.stop(); 334 jetty = null; 335 } 336 } 337 } 338 339 public String getRegistryURL() { 340 return registryURL; 341 } 342 343 public void setRegistryURL(String discoveryRegistryURL) { 344 this.registryURL = discoveryRegistryURL; 345 } 346 347 public long getUpdateInterval() { 348 return updateInterval; 349 } 350 351 public void setUpdateInterval(long updateInterval) { 352 this.updateInterval = updateInterval; 353 } 354 355 public boolean isStartEmbeddRegistry() { 356 return startEmbeddRegistry; 357 } 358 359 public void setStartEmbeddRegistry(boolean startEmbeddRegistry) { 360 this.startEmbeddRegistry = startEmbeddRegistry; 361 } 362 363 364 @Override 365 public void suspend() throws Exception { 366 synchronized (updateMutex) { 367 updateState = UpdateState.SUSPENDED; 368 } 369 } 370 371 @Override 372 public void resume() throws Exception { 373 synchronized (updateMutex) { 374 updateState = UpdateState.RESUMING; 375 updateMutex.notify(); 376 } 377 } 378 379}