001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.network;
018
019import java.net.URI;
020import java.util.Hashtable;
021import java.util.Map;
022import java.util.concurrent.ConcurrentHashMap;
023
024import javax.naming.CommunicationException;
025import javax.naming.Context;
026import javax.naming.NamingEnumeration;
027import javax.naming.directory.Attributes;
028import javax.naming.directory.DirContext;
029import javax.naming.directory.InitialDirContext;
030import javax.naming.directory.SearchControls;
031import javax.naming.directory.SearchResult;
032import javax.naming.event.EventDirContext;
033import javax.naming.event.NamespaceChangeListener;
034import javax.naming.event.NamingEvent;
035import javax.naming.event.NamingExceptionEvent;
036import javax.naming.event.ObjectChangeListener;
037
038import org.apache.activemq.util.URISupport;
039import org.apache.activemq.util.URISupport.CompositeData;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * class to create dynamic network connectors listed in an directory server
045 * using the LDAP v3 protocol as defined in RFC 2251, the entries listed in the
046 * directory server must implement the ipHost and ipService objectClasses as
047 * defined in RFC 2307.
048 *
049 * @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a>
050 * @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a>
051 *
052 * @org.apache.xbean.XBean element="ldapNetworkConnector"
053 */
054public class LdapNetworkConnector extends NetworkConnector implements NamespaceChangeListener, ObjectChangeListener {
055    private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class);
056
057    // force returned entries to implement the ipHost and ipService object classes (RFC 2307)
058    private static final String REQUIRED_OBJECT_CLASS_FILTER =
059            "(&(objectClass=ipHost)(objectClass=ipService))";
060
061    // connection
062    private URI[] availableURIs = null;
063    private int availableURIsIndex = 0;
064    private String base = null;
065    private boolean failover = false;
066    private long curReconnectDelay = 1000; /* 1 sec */
067    private long maxReconnectDelay = 30000; /* 30 sec */
068
069    // authentication
070    private String user = null;
071    private String password = null;
072    private boolean anonymousAuthentication = false;
073
074    // search
075    private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */);
076    private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER;
077    private boolean searchEventListener = false;
078
079    // connector management
080    private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap<URI, NetworkConnector>();
081    private Map<URI, Integer> referenceMap = new ConcurrentHashMap<URI, Integer>();
082    private Map<String, URI> uuidMap = new ConcurrentHashMap<String, URI>();
083
084    // local context
085    private DirContext context = null;
086    // currently in use URI
087    private URI ldapURI = null;
088
089    /**
090     * returns the next URI from the configured list
091     *
092     * @return random URI from the configured list
093     */
094    public URI getUri() {
095        return availableURIs[++availableURIsIndex % availableURIs.length];
096    }
097
098    /**
099     * sets the LDAP server URI
100     *
101     * @param _uri
102     *            LDAP server URI
103     */
104    public void setUri(URI uri) throws Exception {
105        CompositeData data = URISupport.parseComposite(uri);
106        if (data.getScheme().equals("failover")) {
107            availableURIs = data.getComponents();
108            failover = true;
109        } else {
110            availableURIs = new URI[] { uri };
111        }
112    }
113
114    /**
115     * sets the base LDAP dn used for lookup operations
116     *
117     * @param _base
118     *            LDAP base dn
119     */
120    public void setBase(String base) {
121        this.base = base;
122    }
123
124    /**
125     * sets the LDAP user for access credentials
126     *
127     * @param _user
128     *            LDAP dn of user
129     */
130    public void setUser(String user) {
131        this.user = user;
132    }
133
134    /**
135     * sets the LDAP password for access credentials
136     *
137     * @param _password
138     *            user password
139     */
140    public void setPassword(String password) {
141        this.password = password;
142    }
143
144    /**
145     * sets LDAP anonymous authentication access credentials
146     *
147     * @param _anonymousAuthentication
148     *            set to true to use anonymous authentication
149     */
150    public void setAnonymousAuthentication(boolean anonymousAuthentication) {
151        this.anonymousAuthentication = anonymousAuthentication;
152    }
153
154    /**
155     * sets the LDAP search scope
156     *
157     * @param _searchScope
158     *            LDAP JNDI search scope
159     */
160    public void setSearchScope(String searchScope) throws Exception {
161        int scope;
162        if (searchScope.equals("OBJECT_SCOPE")) {
163            scope = SearchControls.OBJECT_SCOPE;
164        } else if (searchScope.equals("ONELEVEL_SCOPE")) {
165            scope = SearchControls.ONELEVEL_SCOPE;
166        } else if (searchScope.equals("SUBTREE_SCOPE")) {
167            scope = SearchControls.SUBTREE_SCOPE;
168        } else {
169            throw new Exception("ERR: unknown LDAP search scope specified: " + searchScope);
170        }
171        searchControls.setSearchScope(scope);
172    }
173
174    /**
175     * sets the LDAP search filter as defined in RFC 2254
176     *
177     * @param _searchFilter
178     *            LDAP search filter
179     * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a>
180     */
181    public void setSearchFilter(String searchFilter) {
182        this.searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + searchFilter + "))";
183    }
184
185    /**
186     * enables/disable a persistent search to the LDAP server as defined in
187     * draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3)
188     *
189     * @param _searchEventListener
190     *            enable = true, disable = false (default)
191     * @see <a
192     *      href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a>
193     */
194    public void setSearchEventListener(boolean searchEventListener) {
195        this.searchEventListener = searchEventListener;
196    }
197
198    /**
199     * start the connector
200     */
201    public void start() throws Exception {
202        LOG.info("connecting...");
203        Hashtable<String, String> env = new Hashtable<String, String>();
204        env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
205        this.ldapURI = getUri();
206        LOG.debug("    URI [{}]", this.ldapURI);
207        env.put(Context.PROVIDER_URL, this.ldapURI.toString());
208        if (anonymousAuthentication) {
209            LOG.debug("    login credentials [anonymous]");
210            env.put(Context.SECURITY_AUTHENTICATION, "none");
211        } else {
212            LOG.debug("    login credentials [{}:******]", user);
213            if (user != null && !"".equals(user)) {
214                env.put(Context.SECURITY_PRINCIPAL, user);
215            } else {
216                throw new Exception("Empty username is not allowed");
217            }
218            if (password != null && !"".equals(password)) {
219                env.put(Context.SECURITY_CREDENTIALS, password);
220            } else {
221                throw new Exception("Empty password is not allowed");
222            }
223        }
224        boolean isConnected = false;
225        while (!isConnected) {
226            try {
227                context = new InitialDirContext(env);
228                isConnected = true;
229            } catch (CommunicationException err) {
230                if (failover) {
231                    this.ldapURI = getUri();
232                    LOG.error("connection error [{}], failover connection to [{}]", env.get(Context.PROVIDER_URL), this.ldapURI.toString());
233                    env.put(Context.PROVIDER_URL, this.ldapURI.toString());
234                    Thread.sleep(curReconnectDelay);
235                    curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay);
236                } else {
237                    throw err;
238                }
239            }
240        }
241
242        // add connectors from search results
243        LOG.info("searching for network connectors...");
244        LOG.debug("    base   [{}]", base);
245        LOG.debug("    filter [{}]", searchFilter);
246        LOG.debug("    scope  [{}]", searchControls.getSearchScope());
247        NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls);
248        while (results.hasMore()) {
249            addConnector(results.next());
250        }
251
252        // register persistent search event listener
253        if (searchEventListener) {
254            LOG.info("registering persistent search listener...");
255            EventDirContext eventContext = (EventDirContext) context.lookup("");
256            eventContext.addNamingListener(base, searchFilter, searchControls, this);
257        } else { // otherwise close context (i.e. connection as it is no longer needed)
258            context.close();
259        }
260    }
261
262    /**
263     * stop the connector
264     */
265    public void stop() throws Exception {
266        LOG.info("stopping context...");
267        for (NetworkConnector connector : connectorMap.values()) {
268            connector.stop();
269        }
270        connectorMap.clear();
271        referenceMap.clear();
272        uuidMap.clear();
273        context.close();
274    }
275
276    public String toString() {
277        return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]";
278    }
279
280    /**
281     * add connector of the given URI
282     *
283     * @param result
284     *            search result of connector to add
285     */
286    protected synchronized void addConnector(SearchResult result) throws Exception {
287        String uuid = toUUID(result);
288        if (uuidMap.containsKey(uuid)) {
289            LOG.warn("connector already regsitered for UUID [{}]", uuid);
290            return;
291        }
292
293        URI connectorURI = toURI(result);
294        if (connectorMap.containsKey(connectorURI)) {
295            int referenceCount = referenceMap.get(connectorURI) + 1;
296            LOG.warn("connector reference added for URI [{}], UUID [{}], total reference(s) [{}]", new Object[]{ connectorURI, uuid, referenceCount });
297            referenceMap.put(connectorURI, referenceCount);
298            uuidMap.put(uuid, connectorURI);
299            return;
300        }
301
302        // FIXME: disable JMX listing of LDAP managed connectors, we will
303        // want to map/manage these differently in the future
304        // boolean useJMX = getBrokerService().isUseJmx();
305        // getBrokerService().setUseJmx(false);
306        NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI);
307        // getBrokerService().setUseJmx(useJMX);
308
309        // Propagate standard connector properties that may have been set via XML
310        connector.setDynamicOnly(isDynamicOnly());
311        connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
312        connector.setNetworkTTL(getNetworkTTL());
313        connector.setConsumerTTL(getConsumerTTL());
314        connector.setMessageTTL(getMessageTTL());
315        connector.setConduitSubscriptions(isConduitSubscriptions());
316        connector.setExcludedDestinations(getExcludedDestinations());
317        connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
318        connector.setDuplex(isDuplex());
319
320        // XXX: set in the BrokerService.startAllConnectors method and is
321        // required to prevent remote broker exceptions upon connection
322        connector.setLocalUri(getBrokerService().getVmConnectorURI());
323        connector.setBrokerName(getBrokerService().getBrokerName());
324        connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations());
325
326        // start network connector
327        connectorMap.put(connectorURI, connector);
328        referenceMap.put(connectorURI, 1);
329        uuidMap.put(uuid, connectorURI);
330        connector.start();
331        LOG.info("connector added with URI [{}]", connectorURI);
332    }
333
334    /**
335     * remove connector of the given URI
336     *
337     * @param result
338     *            search result of connector to remove
339     */
340    protected synchronized void removeConnector(SearchResult result) throws Exception {
341        String uuid = toUUID(result);
342        if (!uuidMap.containsKey(uuid)) {
343            LOG.warn("connector not registered for UUID [{}]", uuid);
344            return;
345        }
346
347        URI connectorURI = uuidMap.get(uuid);
348        if (!connectorMap.containsKey(connectorURI)) {
349            LOG.warn("connector not registered for URI [{}]", connectorURI);
350            return;
351        }
352
353        int referenceCount = referenceMap.get(connectorURI) - 1;
354        referenceMap.put(connectorURI, referenceCount);
355        uuidMap.remove(uuid);
356        LOG.debug("connector referenced removed for URI [{}], UUID[{}], remaining reference(s) [{}]", new Object[]{ connectorURI, uuid, referenceCount });
357
358        if (referenceCount > 0) {
359            return;
360        }
361
362        NetworkConnector connector = connectorMap.remove(connectorURI);
363        connector.stop();
364        LOG.info("connector removed with URI [{}]", connectorURI);
365    }
366
367    /**
368     * convert search result into URI
369     *
370     * @param result
371     *            search result to convert to URI
372     */
373    protected URI toURI(SearchResult result) throws Exception {
374        Attributes attributes = result.getAttributes();
375        String address = (String) attributes.get("iphostnumber").get();
376        String port = (String) attributes.get("ipserviceport").get();
377        String protocol = (String) attributes.get("ipserviceprotocol").get();
378        URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")");
379        LOG.debug("retrieved URI from SearchResult [{}]", connectorURI);
380        return connectorURI;
381    }
382
383    /**
384     * convert search result into URI
385     *
386     * @param result
387     *            search result to convert to URI
388     */
389    protected String toUUID(SearchResult result) {
390        String uuid = result.getNameInNamespace();
391        LOG.debug("retrieved UUID from SearchResult [{}]", uuid);
392        return uuid;
393    }
394
395    /**
396     * invoked when an entry has been added during a persistent search
397     */
398    public void objectAdded(NamingEvent event) {
399        LOG.debug("entry added");
400        try {
401            addConnector((SearchResult) event.getNewBinding());
402        } catch (Exception err) {
403            LOG.error("ERR: caught unexpected exception", err);
404        }
405    }
406
407    /**
408     * invoked when an entry has been removed during a persistent search
409     */
410    public void objectRemoved(NamingEvent event) {
411        LOG.debug("entry removed");
412        try {
413            removeConnector((SearchResult) event.getOldBinding());
414        } catch (Exception err) {
415            LOG.error("ERR: caught unexpected exception", err);
416        }
417    }
418
419    /**
420     * invoked when an entry has been renamed during a persistent search
421     */
422    public void objectRenamed(NamingEvent event) {
423        LOG.debug("entry renamed");
424        // XXX: getNameInNamespace method does not seem to work properly,
425        // but getName seems to provide the result we want
426        String uuidOld = event.getOldBinding().getName();
427        String uuidNew = event.getNewBinding().getName();
428        URI connectorURI = uuidMap.remove(uuidOld);
429        uuidMap.put(uuidNew, connectorURI);
430        LOG.debug("connector reference renamed for URI [{}], Old UUID [{}], New UUID [{}]", new Object[]{ connectorURI, uuidOld, uuidNew });
431    }
432
433    /**
434     * invoked when an entry has been changed during a persistent search
435     */
436    public void objectChanged(NamingEvent event) {
437        LOG.debug("entry changed");
438        try {
439            SearchResult result = (SearchResult) event.getNewBinding();
440            removeConnector(result);
441            addConnector(result);
442        } catch (Exception err) {
443            LOG.error("ERR: caught unexpected exception", err);
444        }
445    }
446
447    /**
448     * invoked when an exception has occurred during a persistent search
449     */
450    public void namingExceptionThrown(NamingExceptionEvent event) {
451        LOG.error("ERR: caught unexpected exception", event.getException());
452    }
453}