001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.discovery.http;
018
019import java.io.IOException;
020import java.util.HashMap;
021import java.util.HashSet;
022import java.util.Map;
023import java.util.Scanner;
024import java.util.Set;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.concurrent.atomic.AtomicInteger;
027import java.util.concurrent.atomic.AtomicReference;
028
029import org.apache.activemq.Service;
030import org.apache.activemq.command.DiscoveryEvent;
031import org.apache.activemq.transport.discovery.DiscoveryAgent;
032import org.apache.activemq.transport.discovery.DiscoveryListener;
033import org.apache.activemq.util.IntrospectionSupport;
034import org.apache.activemq.util.Suspendable;
035import org.apache.http.client.HttpClient;
036import org.apache.http.client.ResponseHandler;
037import org.apache.http.client.methods.HttpDelete;
038import org.apache.http.client.methods.HttpGet;
039import org.apache.http.client.methods.HttpPut;
040import org.apache.http.impl.client.BasicResponseHandler;
041import org.apache.http.impl.client.DefaultHttpClient;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045public class HTTPDiscoveryAgent implements DiscoveryAgent, Suspendable {
046
047    static enum UpdateState {
048        SUSPENDED,
049        RESUMING,
050        RESUMED
051    }
052
053    private static final Logger LOG = LoggerFactory.getLogger(HTTPDiscoveryAgent.class);
054
055    private String registryURL = "http://localhost:8080/discovery-registry/default";
056    private HttpClient httpClient = new DefaultHttpClient();
057    private AtomicBoolean running = new AtomicBoolean();
058    private final AtomicReference<DiscoveryListener> discoveryListener = new AtomicReference<DiscoveryListener>();
059    private final HashSet<String> registeredServices = new HashSet<String>();
060    private final HashMap<String, SimpleDiscoveryEvent> discoveredServices = new HashMap<String, SimpleDiscoveryEvent>();
061    private Thread thread;
062    private long updateInterval = 1000 * 10;
063    @SuppressWarnings("unused")
064    private String brokerName;
065    private boolean startEmbeddRegistry = false;
066    private Service jetty;
067    private AtomicInteger startCounter = new AtomicInteger(0);
068
069    private long initialReconnectDelay = 1000;
070    private long maxReconnectDelay = 1000 * 30;
071    private long backOffMultiplier = 2;
072    private boolean useExponentialBackOff = true;
073    private int maxReconnectAttempts;
074    private final Object sleepMutex = new Object();
075    private final Object updateMutex = new Object();
076    private UpdateState updateState = UpdateState.RESUMED;
077    private long minConnectTime = 5000;
078
079    class SimpleDiscoveryEvent extends DiscoveryEvent {
080
081        private int connectFailures;
082        private long reconnectDelay = initialReconnectDelay;
083        private long connectTime = System.currentTimeMillis();
084        private AtomicBoolean failed = new AtomicBoolean(false);
085        private AtomicBoolean removed = new AtomicBoolean(false);
086
087        public SimpleDiscoveryEvent(String service) {
088            super(service);
089        }
090    }
091
092    public String getGroup() {
093        return null;
094    }
095
096    public void registerService(String service) throws IOException {
097        synchronized (registeredServices) {
098            registeredServices.add(service);
099        }
100        doRegister(service);
101    }
102
103    synchronized private void doRegister(String service) {
104        String url = registryURL;
105        try {
106            HttpPut method = new HttpPut(url);
107            method.addHeader("service", service);
108            ResponseHandler<String> handler = new BasicResponseHandler();
109            String responseBody = httpClient.execute(method, handler);
110            LOG.debug("PUT to " + url + " got a " + responseBody);
111        } catch (Exception e) {
112            LOG.debug("PUT to " + url + " failed with: " + e);
113        }
114    }
115
116    @SuppressWarnings("unused")
117    synchronized private void doUnRegister(String service) {
118        String url = registryURL;
119        try {
120            HttpDelete method = new HttpDelete(url);
121            method.addHeader("service", service);
122            ResponseHandler<String> handler = new BasicResponseHandler();
123            String responseBody = httpClient.execute(method, handler);
124            LOG.debug("DELETE to " + url + " got a " + responseBody);
125        } catch (Exception e) {
126            LOG.debug("DELETE to " + url + " failed with: " + e);
127        }
128    }
129
130    synchronized private Set<String> doLookup(long freshness) {
131        String url = registryURL + "?freshness=" + freshness;
132        try {
133            HttpGet method = new HttpGet(url);
134            ResponseHandler<String> handler = new BasicResponseHandler();
135            String response = httpClient.execute(method, handler);
136            LOG.debug("GET to " + url + " got a " + response);
137            Set<String> rc = new HashSet<String>();
138            Scanner scanner = new Scanner(response);
139            while (scanner.hasNextLine()) {
140                String service = scanner.nextLine();
141                if (service.trim().length() != 0) {
142                    rc.add(service);
143                }
144            }
145            scanner.close();
146            return rc;
147        } catch (Exception e) {
148            LOG.debug("GET to " + url + " failed with: " + e);
149            return null;
150        }
151    }
152
153    public void serviceFailed(DiscoveryEvent devent) throws IOException {
154
155        final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent) devent;
156        if (event.failed.compareAndSet(false, true)) {
157            discoveryListener.get().onServiceRemove(event);
158            if (!event.removed.get()) {
159                // Setup a thread to re-raise the event...
160                Thread thread = new Thread() {
161                    public void run() {
162
163                        // We detect a failed connection attempt because the
164                        // service
165                        // fails right away.
166                        if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
167                            LOG.debug("Failure occured soon after the discovery event was generated.  " +
168                                      "It will be clasified as a connection failure: " + event);
169
170                            event.connectFailures++;
171
172                            if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
173                                LOG.debug("Reconnect attempts exceeded " + maxReconnectAttempts +
174                                          " tries.  Reconnecting has been disabled.");
175                                return;
176                            }
177
178                            synchronized (sleepMutex) {
179                                try {
180                                    if (!running.get() || event.removed.get()) {
181                                        return;
182                                    }
183                                    LOG.debug("Waiting " + event.reconnectDelay +
184                                              " ms before attepting to reconnect.");
185                                    sleepMutex.wait(event.reconnectDelay);
186                                } catch (InterruptedException ie) {
187                                    Thread.currentThread().interrupt();
188                                    return;
189                                }
190                            }
191
192                            if (!useExponentialBackOff) {
193                                event.reconnectDelay = initialReconnectDelay;
194                            } else {
195                                // Exponential increment of reconnect delay.
196                                event.reconnectDelay *= backOffMultiplier;
197                                if (event.reconnectDelay > maxReconnectDelay) {
198                                    event.reconnectDelay = maxReconnectDelay;
199                                }
200                            }
201
202                        } else {
203                            event.connectFailures = 0;
204                            event.reconnectDelay = initialReconnectDelay;
205                        }
206
207                        if (!running.get() || event.removed.get()) {
208                            return;
209                        }
210
211                        event.connectTime = System.currentTimeMillis();
212                        event.failed.set(false);
213                        discoveryListener.get().onServiceAdd(event);
214                    }
215                };
216                thread.setDaemon(true);
217                thread.start();
218            }
219        }
220    }
221
222    public void setBrokerName(String brokerName) {
223        this.brokerName = brokerName;
224    }
225
226    public void setDiscoveryListener(DiscoveryListener discoveryListener) {
227        this.discoveryListener.set(discoveryListener);
228    }
229
230    public void setGroup(String group) {
231    }
232
233    public void start() throws Exception {
234        if (startCounter.addAndGet(1) == 1) {
235            if (startEmbeddRegistry) {
236                jetty = createEmbeddedJettyServer();
237                Map<String, Object> props = new HashMap<String, Object>();
238                props.put("agent", this);
239                IntrospectionSupport.setProperties(jetty, props);
240                jetty.start();
241            }
242
243            running.set(true);
244            thread = new Thread("HTTPDiscovery Agent") {
245                @Override
246                public void run() {
247                    while (running.get()) {
248                        try {
249                            update();
250                            synchronized (updateMutex) {
251                                do {
252                                    if( updateState == UpdateState.RESUMING ) {
253                                        updateState = UpdateState.RESUMED;
254                                    } else {
255                                        updateMutex.wait(updateInterval);
256                                    }
257                                } while( updateState==UpdateState.SUSPENDED && running.get());
258                            }
259                        } catch (InterruptedException e) {
260                            return;
261                        }
262                    }
263                }
264            };
265            thread.setDaemon(true);
266            thread.start();
267        }
268    }
269
270    /**
271     * Create the EmbeddedJettyServer instance via reflection so that we can
272     * avoid a hard runtime dependency on jetty.
273     *
274     * @return
275     * @throws Exception
276     */
277    private Service createEmbeddedJettyServer() throws Exception {
278        Class<?> clazz = HTTPDiscoveryAgent.class.getClassLoader().loadClass("org.apache.activemq.transport.discovery.http.EmbeddedJettyServer");
279        return (Service) clazz.newInstance();
280    }
281
282    private void update() {
283        // Register all our services...
284        synchronized (registeredServices) {
285            for (String service : registeredServices) {
286                doRegister(service);
287            }
288        }
289
290        // Find new registered services...
291        DiscoveryListener discoveryListener = this.discoveryListener.get();
292        if (discoveryListener != null) {
293            Set<String> activeServices = doLookup(updateInterval * 3);
294            // If there is error talking the the central server, then
295            // activeServices == null
296            if (activeServices != null) {
297                synchronized (discoveredServices) {
298
299                    HashSet<String> removedServices = new HashSet<String>(discoveredServices.keySet());
300                    removedServices.removeAll(activeServices);
301
302                    HashSet<String> addedServices = new HashSet<String>(activeServices);
303                    addedServices.removeAll(discoveredServices.keySet());
304                    addedServices.removeAll(removedServices);
305
306                    for (String service : addedServices) {
307                        SimpleDiscoveryEvent e = new SimpleDiscoveryEvent(service);
308                        discoveredServices.put(service, e);
309                        discoveryListener.onServiceAdd(e);
310                    }
311
312                    for (String service : removedServices) {
313                        SimpleDiscoveryEvent e = discoveredServices.remove(service);
314                        if (e != null) {
315                            e.removed.set(true);
316                        }
317                        discoveryListener.onServiceRemove(e);
318                    }
319                }
320            }
321        }
322    }
323
324    public void stop() throws Exception {
325        if (startCounter.decrementAndGet() == 0) {
326            resume();
327            running.set(false);
328            if (thread != null) {
329                thread.join(updateInterval * 3);
330                thread = null;
331            }
332            if (jetty != null) {
333                jetty.stop();
334                jetty = null;
335            }
336        }
337    }
338
339    public String getRegistryURL() {
340        return registryURL;
341    }
342
343    public void setRegistryURL(String discoveryRegistryURL) {
344        this.registryURL = discoveryRegistryURL;
345    }
346
347    public long getUpdateInterval() {
348        return updateInterval;
349    }
350
351    public void setUpdateInterval(long updateInterval) {
352        this.updateInterval = updateInterval;
353    }
354
355    public boolean isStartEmbeddRegistry() {
356        return startEmbeddRegistry;
357    }
358
359    public void setStartEmbeddRegistry(boolean startEmbeddRegistry) {
360        this.startEmbeddRegistry = startEmbeddRegistry;
361    }
362
363
364    @Override
365    public void suspend() throws Exception {
366        synchronized (updateMutex) {
367            updateState = UpdateState.SUSPENDED;
368        }
369    }
370
371    @Override
372    public void resume() throws Exception {
373        synchronized (updateMutex) {
374            updateState = UpdateState.RESUMING;
375            updateMutex.notify();
376        }
377    }
378
379}