001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.osgi;
018
019import java.util.Collections;
020import java.util.Dictionary;
021import java.util.Enumeration;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.Properties;
025
026import org.apache.activemq.broker.BrokerService;
027import org.apache.activemq.spring.SpringBrokerContext;
028import org.apache.activemq.spring.Utils;
029import org.osgi.framework.BundleContext;
030import org.osgi.service.cm.ConfigurationException;
031import org.osgi.service.cm.ManagedServiceFactory;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034import org.springframework.beans.factory.config.PropertyPlaceholderConfigurer;
035import org.springframework.beans.factory.xml.XmlBeanDefinitionReader;
036import org.springframework.core.io.Resource;
037import org.springframework.osgi.context.support.OsgiBundleXmlApplicationContext;
038
039public class ActiveMQServiceFactory implements ManagedServiceFactory {
040
041    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQServiceFactory.class);
042
043    BundleContext bundleContext;
044    HashMap<String, BrokerService> brokers = new HashMap<String, BrokerService>();
045
046    @Override
047    public String getName() {
048        return "ActiveMQ Server Controller";
049    }
050
051    public Map<String, BrokerService> getBrokersMap() {
052        return Collections.unmodifiableMap(brokers);
053    }
054
055    @SuppressWarnings("rawtypes")
056    @Override
057    synchronized public void updated(String pid, Dictionary properties) throws ConfigurationException {
058
059        // First stop currently running broker (if any)
060        deleted(pid);
061
062        String config = (String) properties.get("config");
063        if (config == null) {
064            throw new ConfigurationException("config", "Property must be set");
065        }
066        String name = (String) properties.get("broker-name");
067        if (name == null) {
068            throw new ConfigurationException("broker-name", "Property must be set");
069        }
070
071        LOG.info("Starting broker " + name);
072
073        try {
074            Thread.currentThread().setContextClassLoader(BrokerService.class.getClassLoader());
075            Resource resource = Utils.resourceFromString(config);
076
077            // when camel is embedded it needs a bundle context
078            OsgiBundleXmlApplicationContext ctx = new OsgiBundleXmlApplicationContext(new String[]{resource.getURL().toExternalForm()}) {
079                @Override
080                protected void initBeanDefinitionReader(XmlBeanDefinitionReader reader) {
081                    reader.setValidating(false);
082                }
083            };
084            ctx.setBundleContext(bundleContext);
085
086            // Handle properties in configuration
087            PropertyPlaceholderConfigurer configurator = new PropertyPlaceholderConfigurer();
088
089            // convert dictionary to properties. Is there a better way?
090            Properties props = new Properties();
091            Enumeration<?> elements = properties.keys();
092            while (elements.hasMoreElements()) {
093                Object key = elements.nextElement();
094                props.put(key, properties.get(key));
095            }
096
097            configurator.setProperties(props);
098            configurator.setIgnoreUnresolvablePlaceholders(true);
099
100            ctx.addBeanFactoryPostProcessor(configurator);
101
102            ctx.refresh();
103
104            // Start the broker
105            BrokerService broker = ctx.getBean(BrokerService.class);
106            if (broker == null) {
107                throw new ConfigurationException(null, "Broker not defined");
108            }
109            // TODO deal with multiple brokers
110
111            SpringBrokerContext brokerContext = new SpringBrokerContext();
112            brokerContext.setConfigurationUrl(resource.getURL().toExternalForm());
113            brokerContext.setApplicationContext(ctx);
114            broker.setBrokerContext(brokerContext);
115
116            broker.start();
117            broker.waitUntilStarted();
118            brokers.put(pid, broker);
119        } catch (Exception e) {
120            throw new ConfigurationException(null, "Cannot start the broker", e);
121        }
122    }
123
124    @Override
125    synchronized public void deleted(String pid) {
126        BrokerService broker = brokers.get(pid);
127        if (broker == null) {
128            return;
129        }
130        try {
131            LOG.info("Stopping broker " + pid);
132            broker.stop();
133            broker.waitUntilStopped();
134        } catch (Exception e) {
135            LOG.error("Exception on stopping broker", e);
136        }
137    }
138
139    synchronized public void destroy() {
140        for (String broker : brokers.keySet()) {
141            deleted(broker);
142        }
143    }
144
145    public BundleContext getBundleContext() {
146        return bundleContext;
147    }
148
149    public void setBundleContext(BundleContext bundleContext) {
150        this.bundleContext = bundleContext;
151    }
152}