001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.osgi; 018 019import java.util.Collections; 020import java.util.Dictionary; 021import java.util.Enumeration; 022import java.util.HashMap; 023import java.util.Map; 024import java.util.Properties; 025 026import org.apache.activemq.broker.BrokerService; 027import org.apache.activemq.spring.SpringBrokerContext; 028import org.apache.activemq.spring.Utils; 029import org.osgi.framework.BundleContext; 030import org.osgi.service.cm.ConfigurationException; 031import org.osgi.service.cm.ManagedServiceFactory; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer; 035import org.springframework.beans.factory.xml.XmlBeanDefinitionReader; 036import org.springframework.core.io.Resource; 037import org.springframework.osgi.context.support.OsgiBundleXmlApplicationContext; 038 039public class ActiveMQServiceFactory implements ManagedServiceFactory { 040 041 private static final Logger LOG = LoggerFactory.getLogger(ActiveMQServiceFactory.class); 042 043 BundleContext bundleContext; 044 HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>(); 045 046 @Override 047 public String getName() { 048 return "ActiveMQ Server Controller"; 049 } 050 051 public Map<String, BrokerService> getBrokersMap() { 052 return Collections.unmodifiableMap(brokers); 053 } 054 055 @SuppressWarnings("rawtypes") 056 @Override 057 synchronized public void updated(String pid, Dictionary properties) throws ConfigurationException { 058 059 // First stop currently running broker (if any) 060 deleted(pid); 061 062 String config = (String) properties.get("config"); 063 if (config == null) { 064 throw new ConfigurationException("config", "Property must be set"); 065 } 066 String name = (String) properties.get("broker-name"); 067 if (name == null) { 068 throw new ConfigurationException("broker-name", "Property must be set"); 069 } 070 071 LOG.info("Starting broker " + name); 072 073 try { 074 Thread.currentThread().setContextClassLoader(BrokerService.class.getClassLoader()); 075 Resource resource = Utils.resourceFromString(config); 076 077 // when camel is embedded it needs a bundle context 078 OsgiBundleXmlApplicationContext ctx = new OsgiBundleXmlApplicationContext(new String[]{resource.getURL().toExternalForm()}) { 079 @Override 080 protected void initBeanDefinitionReader(XmlBeanDefinitionReader reader) { 081 reader.setValidating(false); 082 } 083 }; 084 ctx.setBundleContext(bundleContext); 085 086 // Handle properties in configuration 087 PropertyPlaceholderConfigurer configurator = new PropertyPlaceholderConfigurer(); 088 089 // convert dictionary to properties. Is there a better way? 090 Properties props = new Properties(); 091 Enumeration<?> elements = properties.keys(); 092 while (elements.hasMoreElements()) { 093 Object key = elements.nextElement(); 094 props.put(key, properties.get(key)); 095 } 096 097 configurator.setProperties(props); 098 configurator.setIgnoreUnresolvablePlaceholders(true); 099 100 ctx.addBeanFactoryPostProcessor(configurator); 101 102 ctx.refresh(); 103 104 // Start the broker 105 BrokerService broker = ctx.getBean(BrokerService.class); 106 if (broker == null) { 107 throw new ConfigurationException(null, "Broker not defined"); 108 } 109 // TODO deal with multiple brokers 110 111 SpringBrokerContext brokerContext = new SpringBrokerContext(); 112 brokerContext.setConfigurationUrl(resource.getURL().toExternalForm()); 113 brokerContext.setApplicationContext(ctx); 114 broker.setBrokerContext(brokerContext); 115 116 broker.start(); 117 broker.waitUntilStarted(); 118 brokers.put(pid, broker); 119 } catch (Exception e) { 120 throw new ConfigurationException(null, "Cannot start the broker", e); 121 } 122 } 123 124 @Override 125 synchronized public void deleted(String pid) { 126 BrokerService broker = brokers.get(pid); 127 if (broker == null) { 128 return; 129 } 130 try { 131 LOG.info("Stopping broker " + pid); 132 broker.stop(); 133 broker.waitUntilStopped(); 134 } catch (Exception e) { 135 LOG.error("Exception on stopping broker", e); 136 } 137 } 138 139 synchronized public void destroy() { 140 for (String broker : brokers.keySet()) { 141 deleted(broker); 142 } 143 } 144 145 public BundleContext getBundleContext() { 146 return bundleContext; 147 } 148 149 public void setBundleContext(BundleContext bundleContext) { 150 this.bundleContext = bundleContext; 151 } 152}