001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.discovery.simple;
018
019import java.io.IOException;
020import java.net.URI;
021import java.util.concurrent.atomic.AtomicBoolean;
022
023import org.apache.activemq.command.DiscoveryEvent;
024import org.apache.activemq.thread.TaskRunnerFactory;
025import org.apache.activemq.transport.discovery.DiscoveryAgent;
026import org.apache.activemq.transport.discovery.DiscoveryListener;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029
030/**
031 * A simple DiscoveryAgent that allows static configuration of the discovered
032 * services.
033 *
034 *
035 */
036public class SimpleDiscoveryAgent implements DiscoveryAgent {
037
038    private final static Logger LOG = LoggerFactory.getLogger(SimpleDiscoveryAgent.class);
039    private long initialReconnectDelay = 1000;
040    private long maxReconnectDelay = 1000 * 30;
041    private long backOffMultiplier = 2;
042    private boolean useExponentialBackOff=true;
043    private int maxReconnectAttempts;
044    private final Object sleepMutex = new Object();
045    private long minConnectTime = 5000;
046    private DiscoveryListener listener;
047    private String services[] = new String[] {};
048    private final AtomicBoolean running = new AtomicBoolean(false);
049    private TaskRunnerFactory taskRunner;
050
051    class SimpleDiscoveryEvent extends DiscoveryEvent {
052
053        private int connectFailures;
054        private long reconnectDelay = -1;
055        private long connectTime = System.currentTimeMillis();
056        private final AtomicBoolean failed = new AtomicBoolean(false);
057
058        public SimpleDiscoveryEvent(String service) {
059            super(service);
060        }
061
062        public SimpleDiscoveryEvent(SimpleDiscoveryEvent copy) {
063            super(copy);
064            connectFailures = copy.connectFailures;
065            reconnectDelay = copy.reconnectDelay;
066            connectTime = copy.connectTime;
067            failed.set(copy.failed.get());
068        }
069
070        @Override
071        public String toString() {
072            return "[" + serviceName + ", failed:" + failed + ", connectionFailures:" + connectFailures + "]";
073        }
074    }
075
076    @Override
077    public void setDiscoveryListener(DiscoveryListener listener) {
078        this.listener = listener;
079    }
080
081    @Override
082    public void registerService(String name) throws IOException {
083    }
084
085    @Override
086    public void start() throws Exception {
087        taskRunner = new TaskRunnerFactory();
088        taskRunner.init();
089
090        running.set(true);
091        for (int i = 0; i < services.length; i++) {
092            listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
093        }
094    }
095
096    @Override
097    public void stop() throws Exception {
098        running.set(false);
099
100        if (taskRunner != null) {
101            taskRunner.shutdown();
102        }
103
104        // TODO: Should we not remove the services on the listener?
105
106        synchronized (sleepMutex) {
107            sleepMutex.notifyAll();
108        }
109    }
110
111    public String[] getServices() {
112        return services;
113    }
114
115    public void setServices(String services) {
116        this.services = services.split(",");
117    }
118
119    public void setServices(String services[]) {
120        this.services = services;
121    }
122
123    public void setServices(URI services[]) {
124        this.services = new String[services.length];
125        for (int i = 0; i < services.length; i++) {
126            this.services[i] = services[i].toString();
127        }
128    }
129
130    @Override
131    public void serviceFailed(DiscoveryEvent devent) throws IOException {
132
133        final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
134        if (sevent.failed.compareAndSet(false, true)) {
135
136            listener.onServiceRemove(sevent);
137            taskRunner.execute(new Runnable() {
138                @Override
139                public void run() {
140                    SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);
141
142                    // We detect a failed connection attempt because the service
143                    // fails right away.
144                    if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
145                        LOG.debug("Failure occurred soon after the discovery event was generated.  It will be classified as a connection failure: {}", event);
146
147                        event.connectFailures++;
148
149                        if (maxReconnectAttempts > 0 && event.connectFailures >= maxReconnectAttempts) {
150                            LOG.warn("Reconnect attempts exceeded {} tries.  Reconnecting has been disabled for: {}", maxReconnectAttempts, event);
151                            return;
152                        }
153
154                        if (!useExponentialBackOff || event.reconnectDelay == -1) {
155                            event.reconnectDelay = initialReconnectDelay;
156                        } else {
157                            // Exponential increment of reconnect delay.
158                            event.reconnectDelay *= backOffMultiplier;
159                            if (event.reconnectDelay > maxReconnectDelay) {
160                                event.reconnectDelay = maxReconnectDelay;
161                            }
162                        }
163
164                        doReconnectDelay(event);
165
166                    } else {
167                        LOG.trace("Failure occurred to long after the discovery event was generated.  " +
168                                  "It will not be classified as a connection failure: {}", event);
169                        event.connectFailures = 0;
170                        event.reconnectDelay = initialReconnectDelay;
171
172                        doReconnectDelay(event);
173                    }
174
175                    if (!running.get()) {
176                        LOG.debug("Reconnecting disabled: stopped");
177                        return;
178                    }
179
180                    event.connectTime = System.currentTimeMillis();
181                    event.failed.set(false);
182                    listener.onServiceAdd(event);
183                }
184            }, "Simple Discovery Agent");
185        }
186    }
187
188    protected void doReconnectDelay(SimpleDiscoveryEvent event) {
189        synchronized (sleepMutex) {
190            try {
191                if (!running.get()) {
192                    LOG.debug("Reconnecting disabled: stopped");
193                    return;
194                }
195
196                LOG.debug("Waiting {}ms before attempting to reconnect.", event.reconnectDelay);
197                sleepMutex.wait(event.reconnectDelay);
198            } catch (InterruptedException ie) {
199                LOG.debug("Reconnecting disabled: ", ie);
200                Thread.currentThread().interrupt();
201                return;
202            }
203        }
204    }
205
206    public long getBackOffMultiplier() {
207        return backOffMultiplier;
208    }
209
210    public void setBackOffMultiplier(long backOffMultiplier) {
211        this.backOffMultiplier = backOffMultiplier;
212    }
213
214    public long getInitialReconnectDelay() {
215        return initialReconnectDelay;
216    }
217
218    public void setInitialReconnectDelay(long initialReconnectDelay) {
219        this.initialReconnectDelay = initialReconnectDelay;
220    }
221
222    public int getMaxReconnectAttempts() {
223        return maxReconnectAttempts;
224    }
225
226    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
227        this.maxReconnectAttempts = maxReconnectAttempts;
228    }
229
230    public long getMaxReconnectDelay() {
231        return maxReconnectDelay;
232    }
233
234    public void setMaxReconnectDelay(long maxReconnectDelay) {
235        this.maxReconnectDelay = maxReconnectDelay;
236    }
237
238    public long getMinConnectTime() {
239        return minConnectTime;
240    }
241
242    public void setMinConnectTime(long minConnectTime) {
243        this.minConnectTime = minConnectTime;
244    }
245
246    public boolean isUseExponentialBackOff() {
247        return useExponentialBackOff;
248    }
249
250    public void setUseExponentialBackOff(boolean useExponentialBackOff) {
251        this.useExponentialBackOff = useExponentialBackOff;
252    }
253}