001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.network; 018 019import java.net.URI; 020import java.util.Hashtable; 021import java.util.Map; 022import java.util.concurrent.ConcurrentHashMap; 023 024import javax.naming.CommunicationException; 025import javax.naming.Context; 026import javax.naming.NamingEnumeration; 027import javax.naming.directory.Attributes; 028import javax.naming.directory.DirContext; 029import javax.naming.directory.InitialDirContext; 030import javax.naming.directory.SearchControls; 031import javax.naming.directory.SearchResult; 032import javax.naming.event.EventDirContext; 033import javax.naming.event.NamespaceChangeListener; 034import javax.naming.event.NamingEvent; 035import javax.naming.event.NamingExceptionEvent; 036import javax.naming.event.ObjectChangeListener; 037 038import org.apache.activemq.util.URISupport; 039import org.apache.activemq.util.URISupport.CompositeData; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * class to create dynamic network connectors listed in an directory server 045 * using the LDAP v3 protocol as defined in RFC 2251, the entries listed in the 046 * directory server must implement the ipHost and ipService objectClasses as 047 * defined in RFC 2307. 048 * 049 * @see <a href="http://www.faqs.org/rfcs/rfc2251.html">RFC 2251</a> 050 * @see <a href="http://www.faqs.org/rfcs/rfc2307.html">RFC 2307</a> 051 * 052 * @org.apache.xbean.XBean element="ldapNetworkConnector" 053 */ 054public class LdapNetworkConnector extends NetworkConnector implements NamespaceChangeListener, ObjectChangeListener { 055 private static final Logger LOG = LoggerFactory.getLogger(LdapNetworkConnector.class); 056 057 // force returned entries to implement the ipHost and ipService object classes (RFC 2307) 058 private static final String REQUIRED_OBJECT_CLASS_FILTER = 059 "(&(objectClass=ipHost)(objectClass=ipService))"; 060 061 // connection 062 private URI[] availableURIs = null; 063 private int availableURIsIndex = 0; 064 private String base = null; 065 private boolean failover = false; 066 private long curReconnectDelay = 1000; /* 1 sec */ 067 private long maxReconnectDelay = 30000; /* 30 sec */ 068 069 // authentication 070 private String user = null; 071 private String password = null; 072 private boolean anonymousAuthentication = false; 073 074 // search 075 private SearchControls searchControls = new SearchControls(/* ONELEVEL_SCOPE */); 076 private String searchFilter = REQUIRED_OBJECT_CLASS_FILTER; 077 private boolean searchEventListener = false; 078 079 // connector management 080 private Map<URI, NetworkConnector> connectorMap = new ConcurrentHashMap<URI, NetworkConnector>(); 081 private Map<URI, Integer> referenceMap = new ConcurrentHashMap<URI, Integer>(); 082 private Map<String, URI> uuidMap = new ConcurrentHashMap<String, URI>(); 083 084 // local context 085 private DirContext context = null; 086 // currently in use URI 087 private URI ldapURI = null; 088 089 /** 090 * returns the next URI from the configured list 091 * 092 * @return random URI from the configured list 093 */ 094 public URI getUri() { 095 return availableURIs[++availableURIsIndex % availableURIs.length]; 096 } 097 098 /** 099 * sets the LDAP server URI 100 * 101 * @param _uri 102 * LDAP server URI 103 */ 104 public void setUri(URI uri) throws Exception { 105 CompositeData data = URISupport.parseComposite(uri); 106 if (data.getScheme().equals("failover")) { 107 availableURIs = data.getComponents(); 108 failover = true; 109 } else { 110 availableURIs = new URI[] { uri }; 111 } 112 } 113 114 /** 115 * sets the base LDAP dn used for lookup operations 116 * 117 * @param _base 118 * LDAP base dn 119 */ 120 public void setBase(String base) { 121 this.base = base; 122 } 123 124 /** 125 * sets the LDAP user for access credentials 126 * 127 * @param _user 128 * LDAP dn of user 129 */ 130 public void setUser(String user) { 131 this.user = user; 132 } 133 134 /** 135 * sets the LDAP password for access credentials 136 * 137 * @param _password 138 * user password 139 */ 140 public void setPassword(String password) { 141 this.password = password; 142 } 143 144 /** 145 * sets LDAP anonymous authentication access credentials 146 * 147 * @param _anonymousAuthentication 148 * set to true to use anonymous authentication 149 */ 150 public void setAnonymousAuthentication(boolean anonymousAuthentication) { 151 this.anonymousAuthentication = anonymousAuthentication; 152 } 153 154 /** 155 * sets the LDAP search scope 156 * 157 * @param _searchScope 158 * LDAP JNDI search scope 159 */ 160 public void setSearchScope(String searchScope) throws Exception { 161 int scope; 162 if (searchScope.equals("OBJECT_SCOPE")) { 163 scope = SearchControls.OBJECT_SCOPE; 164 } else if (searchScope.equals("ONELEVEL_SCOPE")) { 165 scope = SearchControls.ONELEVEL_SCOPE; 166 } else if (searchScope.equals("SUBTREE_SCOPE")) { 167 scope = SearchControls.SUBTREE_SCOPE; 168 } else { 169 throw new Exception("ERR: unknown LDAP search scope specified: " + searchScope); 170 } 171 searchControls.setSearchScope(scope); 172 } 173 174 /** 175 * sets the LDAP search filter as defined in RFC 2254 176 * 177 * @param _searchFilter 178 * LDAP search filter 179 * @see <a href="http://www.faqs.org/rfcs/rfc2254.html">RFC 2254</a> 180 */ 181 public void setSearchFilter(String searchFilter) { 182 this.searchFilter = "(&" + REQUIRED_OBJECT_CLASS_FILTER + "(" + searchFilter + "))"; 183 } 184 185 /** 186 * enables/disable a persistent search to the LDAP server as defined in 187 * draft-ietf-ldapext-psearch-03.txt (2.16.840.1.113730.3.4.3) 188 * 189 * @param _searchEventListener 190 * enable = true, disable = false (default) 191 * @see <a 192 * href="http://www.ietf.org/proceedings/01mar/I-D/draft-ietf-ldapext-psearch-03.txt">draft-ietf-ldapext-psearch-03.txt</a> 193 */ 194 public void setSearchEventListener(boolean searchEventListener) { 195 this.searchEventListener = searchEventListener; 196 } 197 198 /** 199 * start the connector 200 */ 201 public void start() throws Exception { 202 LOG.info("connecting..."); 203 Hashtable<String, String> env = new Hashtable<String, String>(); 204 env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory"); 205 this.ldapURI = getUri(); 206 LOG.debug(" URI [{}]", this.ldapURI); 207 env.put(Context.PROVIDER_URL, this.ldapURI.toString()); 208 if (anonymousAuthentication) { 209 LOG.debug(" login credentials [anonymous]"); 210 env.put(Context.SECURITY_AUTHENTICATION, "none"); 211 } else { 212 LOG.debug(" login credentials [{}:******]", user); 213 if (user != null && !"".equals(user)) { 214 env.put(Context.SECURITY_PRINCIPAL, user); 215 } else { 216 throw new Exception("Empty username is not allowed"); 217 } 218 if (password != null && !"".equals(password)) { 219 env.put(Context.SECURITY_CREDENTIALS, password); 220 } else { 221 throw new Exception("Empty password is not allowed"); 222 } 223 } 224 boolean isConnected = false; 225 while (!isConnected) { 226 try { 227 context = new InitialDirContext(env); 228 isConnected = true; 229 } catch (CommunicationException err) { 230 if (failover) { 231 this.ldapURI = getUri(); 232 LOG.error("connection error [{}], failover connection to [{}]", env.get(Context.PROVIDER_URL), this.ldapURI.toString()); 233 env.put(Context.PROVIDER_URL, this.ldapURI.toString()); 234 Thread.sleep(curReconnectDelay); 235 curReconnectDelay = Math.min(curReconnectDelay * 2, maxReconnectDelay); 236 } else { 237 throw err; 238 } 239 } 240 } 241 242 // add connectors from search results 243 LOG.info("searching for network connectors..."); 244 LOG.debug(" base [{}]", base); 245 LOG.debug(" filter [{}]", searchFilter); 246 LOG.debug(" scope [{}]", searchControls.getSearchScope()); 247 NamingEnumeration<SearchResult> results = context.search(base, searchFilter, searchControls); 248 while (results.hasMore()) { 249 addConnector(results.next()); 250 } 251 252 // register persistent search event listener 253 if (searchEventListener) { 254 LOG.info("registering persistent search listener..."); 255 EventDirContext eventContext = (EventDirContext) context.lookup(""); 256 eventContext.addNamingListener(base, searchFilter, searchControls, this); 257 } else { // otherwise close context (i.e. connection as it is no longer needed) 258 context.close(); 259 } 260 } 261 262 /** 263 * stop the connector 264 */ 265 public void stop() throws Exception { 266 LOG.info("stopping context..."); 267 for (NetworkConnector connector : connectorMap.values()) { 268 connector.stop(); 269 } 270 connectorMap.clear(); 271 referenceMap.clear(); 272 uuidMap.clear(); 273 context.close(); 274 } 275 276 public String toString() { 277 return this.getClass().getName() + getName() + "[" + ldapURI.toString() + "]"; 278 } 279 280 /** 281 * add connector of the given URI 282 * 283 * @param result 284 * search result of connector to add 285 */ 286 protected synchronized void addConnector(SearchResult result) throws Exception { 287 String uuid = toUUID(result); 288 if (uuidMap.containsKey(uuid)) { 289 LOG.warn("connector already regsitered for UUID [{}]", uuid); 290 return; 291 } 292 293 URI connectorURI = toURI(result); 294 if (connectorMap.containsKey(connectorURI)) { 295 int referenceCount = referenceMap.get(connectorURI) + 1; 296 LOG.warn("connector reference added for URI [{}], UUID [{}], total reference(s) [{}]", new Object[]{ connectorURI, uuid, referenceCount }); 297 referenceMap.put(connectorURI, referenceCount); 298 uuidMap.put(uuid, connectorURI); 299 return; 300 } 301 302 // FIXME: disable JMX listing of LDAP managed connectors, we will 303 // want to map/manage these differently in the future 304 // boolean useJMX = getBrokerService().isUseJmx(); 305 // getBrokerService().setUseJmx(false); 306 NetworkConnector connector = getBrokerService().addNetworkConnector(connectorURI); 307 // getBrokerService().setUseJmx(useJMX); 308 309 // Propagate standard connector properties that may have been set via XML 310 connector.setDynamicOnly(isDynamicOnly()); 311 connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority()); 312 connector.setNetworkTTL(getNetworkTTL()); 313 connector.setConsumerTTL(getConsumerTTL()); 314 connector.setMessageTTL(getMessageTTL()); 315 connector.setConduitSubscriptions(isConduitSubscriptions()); 316 connector.setExcludedDestinations(getExcludedDestinations()); 317 connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations()); 318 connector.setDuplex(isDuplex()); 319 320 // XXX: set in the BrokerService.startAllConnectors method and is 321 // required to prevent remote broker exceptions upon connection 322 connector.setLocalUri(getBrokerService().getVmConnectorURI()); 323 connector.setBrokerName(getBrokerService().getBrokerName()); 324 connector.setDurableDestinations(getBrokerService().getBroker().getDurableDestinations()); 325 326 // start network connector 327 connectorMap.put(connectorURI, connector); 328 referenceMap.put(connectorURI, 1); 329 uuidMap.put(uuid, connectorURI); 330 connector.start(); 331 LOG.info("connector added with URI [{}]", connectorURI); 332 } 333 334 /** 335 * remove connector of the given URI 336 * 337 * @param result 338 * search result of connector to remove 339 */ 340 protected synchronized void removeConnector(SearchResult result) throws Exception { 341 String uuid = toUUID(result); 342 if (!uuidMap.containsKey(uuid)) { 343 LOG.warn("connector not registered for UUID [{}]", uuid); 344 return; 345 } 346 347 URI connectorURI = uuidMap.get(uuid); 348 if (!connectorMap.containsKey(connectorURI)) { 349 LOG.warn("connector not registered for URI [{}]", connectorURI); 350 return; 351 } 352 353 int referenceCount = referenceMap.get(connectorURI) - 1; 354 referenceMap.put(connectorURI, referenceCount); 355 uuidMap.remove(uuid); 356 LOG.debug("connector referenced removed for URI [{}], UUID[{}], remaining reference(s) [{}]", new Object[]{ connectorURI, uuid, referenceCount }); 357 358 if (referenceCount > 0) { 359 return; 360 } 361 362 NetworkConnector connector = connectorMap.remove(connectorURI); 363 connector.stop(); 364 LOG.info("connector removed with URI [{}]", connectorURI); 365 } 366 367 /** 368 * convert search result into URI 369 * 370 * @param result 371 * search result to convert to URI 372 */ 373 protected URI toURI(SearchResult result) throws Exception { 374 Attributes attributes = result.getAttributes(); 375 String address = (String) attributes.get("iphostnumber").get(); 376 String port = (String) attributes.get("ipserviceport").get(); 377 String protocol = (String) attributes.get("ipserviceprotocol").get(); 378 URI connectorURI = new URI("static:(" + protocol + "://" + address + ":" + port + ")"); 379 LOG.debug("retrieved URI from SearchResult [{}]", connectorURI); 380 return connectorURI; 381 } 382 383 /** 384 * convert search result into URI 385 * 386 * @param result 387 * search result to convert to URI 388 */ 389 protected String toUUID(SearchResult result) { 390 String uuid = result.getNameInNamespace(); 391 LOG.debug("retrieved UUID from SearchResult [{}]", uuid); 392 return uuid; 393 } 394 395 /** 396 * invoked when an entry has been added during a persistent search 397 */ 398 public void objectAdded(NamingEvent event) { 399 LOG.debug("entry added"); 400 try { 401 addConnector((SearchResult) event.getNewBinding()); 402 } catch (Exception err) { 403 LOG.error("ERR: caught unexpected exception", err); 404 } 405 } 406 407 /** 408 * invoked when an entry has been removed during a persistent search 409 */ 410 public void objectRemoved(NamingEvent event) { 411 LOG.debug("entry removed"); 412 try { 413 removeConnector((SearchResult) event.getOldBinding()); 414 } catch (Exception err) { 415 LOG.error("ERR: caught unexpected exception", err); 416 } 417 } 418 419 /** 420 * invoked when an entry has been renamed during a persistent search 421 */ 422 public void objectRenamed(NamingEvent event) { 423 LOG.debug("entry renamed"); 424 // XXX: getNameInNamespace method does not seem to work properly, 425 // but getName seems to provide the result we want 426 String uuidOld = event.getOldBinding().getName(); 427 String uuidNew = event.getNewBinding().getName(); 428 URI connectorURI = uuidMap.remove(uuidOld); 429 uuidMap.put(uuidNew, connectorURI); 430 LOG.debug("connector reference renamed for URI [{}], Old UUID [{}], New UUID [{}]", new Object[]{ connectorURI, uuidOld, uuidNew }); 431 } 432 433 /** 434 * invoked when an entry has been changed during a persistent search 435 */ 436 public void objectChanged(NamingEvent event) { 437 LOG.debug("entry changed"); 438 try { 439 SearchResult result = (SearchResult) event.getNewBinding(); 440 removeConnector(result); 441 addConnector(result); 442 } catch (Exception err) { 443 LOG.error("ERR: caught unexpected exception", err); 444 } 445 } 446 447 /** 448 * invoked when an exception has occurred during a persistent search 449 */ 450 public void namingExceptionThrown(NamingExceptionEvent event) { 451 LOG.error("ERR: caught unexpected exception", event.getException()); 452 } 453}