001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.discovery;
018
019import java.net.URI;
020import java.net.URISyntaxException;
021import java.util.Map;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.ConcurrentMap;
024
025import org.apache.activemq.command.DiscoveryEvent;
026import org.apache.activemq.transport.CompositeTransport;
027import org.apache.activemq.transport.TransportFilter;
028import org.apache.activemq.util.ServiceStopper;
029import org.apache.activemq.util.Suspendable;
030import org.apache.activemq.util.URISupport;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034/**
035 * A {@link ReliableTransportChannel} which uses a {@link DiscoveryAgent} to
036 * discover remote broker instances and dynamically connect to them.
037 *
038 *
039 */
040public class DiscoveryTransport extends TransportFilter implements DiscoveryListener {
041
042    private static final Logger LOG = LoggerFactory.getLogger(DiscoveryTransport.class);
043
044    private final CompositeTransport next;
045    private DiscoveryAgent discoveryAgent;
046    private final ConcurrentMap<String, URI> serviceURIs = new ConcurrentHashMap<String, URI>();
047
048    private Map<String, String> parameters;
049
050    public DiscoveryTransport(CompositeTransport next) {
051        super(next);
052        this.next = next;
053    }
054
055    @Override
056    public void start() throws Exception {
057        if (discoveryAgent == null) {
058            throw new IllegalStateException("discoveryAgent not configured");
059        }
060
061        // lets pass into the agent the broker name and connection details
062        discoveryAgent.setDiscoveryListener(this);
063        discoveryAgent.start();
064        next.start();
065    }
066
067    @Override
068    public void stop() throws Exception {
069        ServiceStopper ss = new ServiceStopper();
070        ss.stop(discoveryAgent);
071        ss.stop(next);
072        ss.throwFirstException();
073    }
074
075    @Override
076    public void onServiceAdd(DiscoveryEvent event) {
077        String url = event.getServiceName();
078        if (url != null) {
079            try {
080                URI uri = new URI(url);
081                LOG.info("Adding new broker connection URL: " + uri);
082                uri = URISupport.applyParameters(uri, parameters, DISCOVERED_OPTION_PREFIX);
083                serviceURIs.put(event.getServiceName(), uri);
084                next.add(false,new URI[] {uri});
085            } catch (URISyntaxException e) {
086                LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
087            }
088        }
089    }
090
091    @Override
092    public void onServiceRemove(DiscoveryEvent event) {
093        URI uri = serviceURIs.get(event.getServiceName());
094        if (uri != null) {
095            next.remove(false,new URI[] {uri});
096        }
097    }
098
099    public DiscoveryAgent getDiscoveryAgent() {
100        return discoveryAgent;
101    }
102
103    public void setDiscoveryAgent(DiscoveryAgent discoveryAgent) {
104        this.discoveryAgent = discoveryAgent;
105    }
106
107    public void setParameters(Map<String, String> parameters) {
108       this.parameters = parameters;
109    }
110
111    @Override
112    public void transportResumed() {
113        if( discoveryAgent instanceof Suspendable ) {
114            try {
115                ((Suspendable)discoveryAgent).suspend();
116            } catch (Exception e) {
117                e.printStackTrace();
118            }
119        }
120        super.transportResumed();
121    }
122
123    @Override
124    public void transportInterupted() {
125        if( discoveryAgent instanceof Suspendable ) {
126            try {
127                ((Suspendable)discoveryAgent).resume();
128            } catch (Exception e) {
129                e.printStackTrace();
130            }
131        }
132        super.transportInterupted();
133    }
134}