001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 *  Unless required by applicable law or agreed to in writing, software
012 *  distributed under the License is distributed on an "AS IS" BASIS,
013 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 *  See the License for the specific language governing permissions and
015 *  limitations under the License.
016 */
017package org.apache.activemq.util.osgi;
018
019import java.io.BufferedReader;
020import java.io.IOException;
021import java.io.InputStream;
022import java.io.InputStreamReader;
023import java.net.URL;
024import java.util.ArrayList;
025import java.util.List;
026import java.util.Properties;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.concurrent.ConcurrentMap;
029
030import org.apache.activemq.Service;
031import org.apache.activemq.store.PersistenceAdapter;
032import org.apache.activemq.transport.Transport;
033import org.apache.activemq.transport.discovery.DiscoveryAgent;
034import org.apache.activemq.util.FactoryFinder;
035import org.apache.activemq.util.FactoryFinder.ObjectFactory;
036import org.osgi.framework.Bundle;
037import org.osgi.framework.BundleActivator;
038import org.osgi.framework.BundleContext;
039import org.osgi.framework.BundleEvent;
040import org.osgi.framework.SynchronousBundleListener;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * An OSGi bundle activator for ActiveMQ which adapts the {@link org.apache.activemq.util.FactoryFinder}
046 * to the OSGi environment.
047 *
048 */
049public class Activator implements BundleActivator, SynchronousBundleListener, ObjectFactory {
050
051    private static final Logger LOG = LoggerFactory.getLogger(Activator.class);
052
053    private final ConcurrentMap<String, Class> serviceCache = new ConcurrentHashMap<String, Class>();
054    private final ConcurrentMap<Long, BundleWrapper> bundleWrappers = new ConcurrentHashMap<Long, BundleWrapper>();
055    private BundleContext bundleContext;
056
057    // ================================================================
058    // BundleActivator interface impl
059    // ================================================================
060
061    @Override
062    public synchronized void start(BundleContext bundleContext) throws Exception {
063
064        // This is how we replace the default FactoryFinder strategy
065        // with one that is more compatible in an OSGi env.
066        FactoryFinder.setObjectFactory(this);
067
068        debug("activating");
069        this.bundleContext = bundleContext;
070        debug("checking existing bundles");
071        bundleContext.addBundleListener(this);
072        for (Bundle bundle : bundleContext.getBundles()) {
073            if (bundle.getState() == Bundle.RESOLVED || bundle.getState() == Bundle.STARTING ||
074                bundle.getState() == Bundle.ACTIVE || bundle.getState() == Bundle.STOPPING) {
075                register(bundle);
076            }
077        }
078        debug("activated");
079    }
080
081
082    @Override
083    public synchronized void stop(BundleContext bundleContext) throws Exception {
084        debug("deactivating");
085        bundleContext.removeBundleListener(this);
086        while (!bundleWrappers.isEmpty()) {
087            unregister(bundleWrappers.keySet().iterator().next());
088        }
089        debug("deactivated");
090        this.bundleContext = null;
091    }
092
093    // ================================================================
094    // SynchronousBundleListener interface impl
095    // ================================================================
096
097    @Override
098    public void bundleChanged(BundleEvent event) {
099        if (event.getType() == BundleEvent.RESOLVED) {
100            register(event.getBundle());
101        } else if (event.getType() == BundleEvent.UNRESOLVED || event.getType() == BundleEvent.UNINSTALLED) {
102            unregister(event.getBundle().getBundleId());
103        }
104    }
105
106    protected void register(final Bundle bundle) {
107        debug("checking bundle " + bundle.getBundleId());
108        if( !isImportingUs(bundle) ) {
109            debug("The bundle does not import us: "+ bundle.getBundleId());
110            return;
111        }
112        bundleWrappers.put(bundle.getBundleId(), new BundleWrapper(bundle));
113    }
114
115    /**
116     * When bundles unload.. we remove them thier cached Class entries from the
117     * serviceCache.  Future service lookups for the service will fail.
118     *
119     * TODO: consider a way to get the Broker release any references to
120     * instances of the service.
121     *
122     * @param bundleId
123     */
124    protected void unregister(long bundleId) {
125        BundleWrapper bundle = bundleWrappers.remove(bundleId);
126        if (bundle != null) {
127            for (String path : bundle.cachedServices) {
128                debug("unregistering service for key: " +path );
129                serviceCache.remove(path);
130            }
131        }
132    }
133
134    // ================================================================
135    // ObjectFactory interface impl
136    // ================================================================
137
138    @Override
139    public Object create(String path) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException {
140        Class clazz = serviceCache.get(path);
141        if (clazz == null) {
142            StringBuffer warnings = new StringBuffer();
143            // We need to look for a bundle that has that class.
144            int wrrningCounter=1;
145            for (BundleWrapper wrapper : bundleWrappers.values()) {
146                URL resource = wrapper.bundle.getResource(path);
147                if( resource == null ) {
148                    continue;
149                }
150
151                Properties properties = loadProperties(resource);
152
153                String className = properties.getProperty("class");
154                if (className == null) {
155                    warnings.append("("+(wrrningCounter++)+") Invalid service file in bundle "+wrapper+": 'class' property not defined.");
156                    continue;
157                }
158
159                try {
160                    clazz = wrapper.bundle.loadClass(className);
161                } catch (ClassNotFoundException e) {
162                    warnings.append("("+(wrrningCounter++)+") Bundle "+wrapper+" could not load "+className+": "+e);
163                    continue;
164                }
165
166                // Yay.. the class was found.  Now cache it.
167                serviceCache.put(path, clazz);
168                wrapper.cachedServices.add(path);
169                break;
170            }
171
172            if( clazz == null ) {
173                // Since OSGi is such a tricky environment to work in.. lets give folks the
174                // most information we can in the error message.
175                String msg = "Service not found: '" + path + "'";
176                if (warnings.length()!= 0) {
177                    msg += ", "+warnings;
178                }
179                throw new IOException(msg);
180            }
181        }
182        return clazz.newInstance();
183    }
184
185    // ================================================================
186    // Internal Helper Methods
187    // ================================================================
188
189    private void debug(Object msg) {
190        LOG.debug(msg.toString());
191    }
192
193    private Properties loadProperties(URL resource) throws IOException {
194        InputStream in = resource.openStream();
195        try {
196            BufferedReader br = new BufferedReader(new InputStreamReader(in, "UTF-8"));
197            Properties properties = new Properties();
198            properties.load(in);
199            return properties;
200        } finally {
201            try {
202                in.close();
203            } catch (Exception e) {
204            }
205        }
206    }
207
208    private boolean isImportingUs(Bundle bundle) {
209        return isImportingClass(bundle, Service.class)
210                || isImportingClass(bundle, Transport.class)
211                || isImportingClass(bundle, DiscoveryAgent.class)
212                || isImportingClass(bundle, PersistenceAdapter.class);
213    }
214
215    private boolean isImportingClass(Bundle bundle, Class clazz) {
216        try {
217            return bundle.loadClass(clazz.getName())==clazz;
218        } catch (ClassNotFoundException e) {
219            return false;
220        }
221    }
222
223    private static class BundleWrapper {
224        private final Bundle bundle;
225        private final List<String> cachedServices = new ArrayList<String>();
226
227        public BundleWrapper(Bundle bundle) {
228            this.bundle = bundle;
229        }
230    }
231}