001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.management;
018    
019    import java.io.IOException;
020    import java.lang.management.ManagementFactory;
021    import java.net.InetAddress;
022    import java.net.UnknownHostException;
023    import java.rmi.NoSuchObjectException;
024    import java.rmi.RemoteException;
025    import java.rmi.registry.LocateRegistry;
026    import java.rmi.registry.Registry;
027    import java.rmi.server.UnicastRemoteObject;
028    import java.util.HashMap;
029    import java.util.List;
030    import java.util.Map;
031    import javax.management.JMException;
032    import javax.management.MBeanServer;
033    import javax.management.MBeanServerFactory;
034    import javax.management.NotCompliantMBeanException;
035    import javax.management.ObjectInstance;
036    import javax.management.ObjectName;
037    import javax.management.remote.JMXConnectorServer;
038    import javax.management.remote.JMXConnectorServerFactory;
039    import javax.management.remote.JMXServiceURL;
040    
041    import org.apache.camel.CamelContext;
042    import org.apache.camel.CamelContextAware;
043    import org.apache.camel.spi.ManagementAgent;
044    import org.apache.camel.spi.ManagementMBeanAssembler;
045    import org.apache.camel.support.ServiceSupport;
046    import org.apache.camel.util.ObjectHelper;
047    import org.slf4j.Logger;
048    import org.slf4j.LoggerFactory;
049    
050    /**
051     * Default implementation of the Camel JMX service agent
052     */
053    public class DefaultManagementAgent extends ServiceSupport implements ManagementAgent, CamelContextAware {
054    
055        public static final String DEFAULT_DOMAIN = "org.apache.camel";
056        public static final String DEFAULT_HOST = "localhost";
057        public static final int DEFAULT_REGISTRY_PORT = 1099;
058        public static final int DEFAULT_CONNECTION_PORT = -1;
059        public static final String DEFAULT_SERVICE_URL_PATH = "/jmxrmi/camel";
060        private static final transient Logger LOG = LoggerFactory.getLogger(DefaultManagementAgent.class);
061    
062        private CamelContext camelContext;
063        private MBeanServer server;
064        // need a name -> actual name mapping as some servers changes the names (such as WebSphere)
065        private final Map<ObjectName, ObjectName> mbeansRegistered = new HashMap<ObjectName, ObjectName>();
066        private JMXConnectorServer cs;
067        private Registry registry;
068    
069        private Integer registryPort;
070        private Integer connectorPort;
071        private String mBeanServerDefaultDomain;
072        private String mBeanObjectDomainName;
073        private String serviceUrlPath;
074        private Boolean usePlatformMBeanServer = true;
075        private Boolean createConnector;
076        private Boolean onlyRegisterProcessorWithCustomId;
077        private Boolean registerAlways;
078        private Boolean registerNewRoutes = true;
079    
080        public DefaultManagementAgent() {
081        }
082    
083        public DefaultManagementAgent(CamelContext camelContext) {
084            this.camelContext = camelContext;
085        }
086    
087        protected void finalizeSettings() {
088            // TODO: System properties ought to take precedence, over configured options
089    
090            if (registryPort == null) {
091                registryPort = Integer.getInteger(JmxSystemPropertyKeys.REGISTRY_PORT, DEFAULT_REGISTRY_PORT);
092            }
093            if (connectorPort == null) {
094                connectorPort = Integer.getInteger(JmxSystemPropertyKeys.CONNECTOR_PORT, DEFAULT_CONNECTION_PORT);
095            }
096            if (mBeanServerDefaultDomain == null) {
097                mBeanServerDefaultDomain = System.getProperty(JmxSystemPropertyKeys.DOMAIN, DEFAULT_DOMAIN);
098            }
099            if (mBeanObjectDomainName == null) {
100                mBeanObjectDomainName = System.getProperty(JmxSystemPropertyKeys.MBEAN_DOMAIN, DEFAULT_DOMAIN);
101            }
102            if (serviceUrlPath == null) {
103                serviceUrlPath = System.getProperty(JmxSystemPropertyKeys.SERVICE_URL_PATH, DEFAULT_SERVICE_URL_PATH);
104            }
105            if (createConnector == null) {
106                createConnector = Boolean.getBoolean(JmxSystemPropertyKeys.CREATE_CONNECTOR);
107            }
108            if (onlyRegisterProcessorWithCustomId == null) {
109                onlyRegisterProcessorWithCustomId = Boolean.getBoolean(JmxSystemPropertyKeys.ONLY_REGISTER_PROCESSOR_WITH_CUSTOM_ID);
110            }
111            // "Use platform mbean server" is true by default
112            if (System.getProperty(JmxSystemPropertyKeys.USE_PLATFORM_MBS) != null) {
113                usePlatformMBeanServer = Boolean.getBoolean(JmxSystemPropertyKeys.USE_PLATFORM_MBS);
114            }
115    
116            if (System.getProperty(JmxSystemPropertyKeys.REGISTER_ALWAYS) != null) {
117                registerAlways = Boolean.getBoolean(JmxSystemPropertyKeys.REGISTER_ALWAYS);
118            }
119            if (System.getProperty(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES) != null) {
120                registerNewRoutes = Boolean.getBoolean(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES);
121            }
122        }
123    
124        public void setRegistryPort(Integer port) {
125            registryPort = port;
126        }
127    
128        public Integer getRegistryPort() {
129            return registryPort;
130        }
131    
132        public void setConnectorPort(Integer port) {
133            connectorPort = port;
134        }
135    
136        public Integer getConnectorPort() {
137            return connectorPort;
138        }
139    
140        public void setMBeanServerDefaultDomain(String domain) {
141            mBeanServerDefaultDomain = domain;
142        }
143    
144        public String getMBeanServerDefaultDomain() {
145            return mBeanServerDefaultDomain;
146        }
147    
148        public void setMBeanObjectDomainName(String domainName) {
149            mBeanObjectDomainName = domainName;
150        }
151    
152        public String getMBeanObjectDomainName() {
153            return mBeanObjectDomainName;
154        }
155    
156        public void setServiceUrlPath(String url) {
157            serviceUrlPath = url;
158        }
159    
160        public String getServiceUrlPath() {
161            return serviceUrlPath;
162        }
163    
164        public void setCreateConnector(Boolean flag) {
165            createConnector = flag;
166        }
167    
168        public Boolean getCreateConnector() {
169            return createConnector;
170        }
171    
172        public void setUsePlatformMBeanServer(Boolean flag) {
173            usePlatformMBeanServer = flag;
174        }
175    
176        public Boolean getUsePlatformMBeanServer() {
177            return usePlatformMBeanServer;
178        }
179    
180        public Boolean getOnlyRegisterProcessorWithCustomId() {
181            return onlyRegisterProcessorWithCustomId;
182        }
183    
184        public void setOnlyRegisterProcessorWithCustomId(Boolean onlyRegisterProcessorWithCustomId) {
185            this.onlyRegisterProcessorWithCustomId = onlyRegisterProcessorWithCustomId;
186        }
187    
188        public void setMBeanServer(MBeanServer mbeanServer) {
189            server = mbeanServer;
190        }
191    
192        public MBeanServer getMBeanServer() {
193            return server;
194        }
195    
196        public Boolean getRegisterAlways() {
197            return registerAlways != null && registerAlways;
198        }
199    
200        public void setRegisterAlways(Boolean registerAlways) {
201            this.registerAlways = registerAlways;
202        }
203    
204        public Boolean getRegisterNewRoutes() {
205            return registerNewRoutes != null && registerNewRoutes;
206        }
207    
208        public void setRegisterNewRoutes(Boolean registerNewRoutes) {
209            this.registerNewRoutes = registerNewRoutes;
210        }
211    
212        public CamelContext getCamelContext() {
213            return camelContext;
214        }
215    
216        public void setCamelContext(CamelContext camelContext) {
217            this.camelContext = camelContext;
218        }
219    
220        public void register(Object obj, ObjectName name) throws JMException {
221            register(obj, name, false);
222        }
223    
224        public void register(Object obj, ObjectName name, boolean forceRegistration) throws JMException {
225            try {
226                registerMBeanWithServer(obj, name, forceRegistration);
227            } catch (NotCompliantMBeanException e) {
228                // If this is not a "normal" MBean, then try to deploy it using JMX annotations
229                ManagementMBeanAssembler assembler = camelContext.getManagementMBeanAssembler();
230                ObjectHelper.notNull(assembler, "ManagementMBeanAssembler", camelContext);
231                Object mbean = assembler.assemble(server, obj, name);
232                if (mbean != null) {
233                    // and register the mbean
234                    registerMBeanWithServer(mbean, name, forceRegistration);
235                }
236            }
237        }
238    
239        public void unregister(ObjectName name) throws JMException {
240            if (isRegistered(name)) {
241                server.unregisterMBean(mbeansRegistered.get(name));
242                LOG.debug("Unregistered MBean with ObjectName: {}", name);
243            }
244            mbeansRegistered.remove(name);
245        }
246    
247        public boolean isRegistered(ObjectName name) {
248            return (mbeansRegistered.containsKey(name) 
249                    && server.isRegistered(mbeansRegistered.get(name))) 
250                    || server.isRegistered(name);
251        }
252    
253        protected void doStart() throws Exception {
254            ObjectHelper.notNull(camelContext, "CamelContext");
255    
256            // create mbean server if is has not be injected.
257            if (server == null) {
258                finalizeSettings();
259                createMBeanServer();
260            }
261    
262            LOG.debug("Starting JMX agent on server: {}", getMBeanServer());
263        }
264    
265        protected void doStop() throws Exception {
266            // close JMX Connector, if it was created
267            if (cs != null) {
268                try {
269                    cs.stop();
270                    LOG.debug("Stopped JMX Connector");
271                } catch (IOException e) {
272                    LOG.debug("Error occurred during stopping JMXConnectorService: "
273                            + cs + ". This exception will be ignored.");
274                }
275                cs = null;
276            }
277    
278            // Unexport JMX RMI registry, if it was created
279            if (registry != null) {
280                try {
281                    UnicastRemoteObject.unexportObject(registry, true);
282                    LOG.debug("Unexported JMX RMI Registry");
283                } catch (NoSuchObjectException e) {
284                    LOG.debug("Error occurred while unexporting JMX RMI registry. This exception will be ignored.");
285                }
286            }
287    
288            if (mbeansRegistered.isEmpty()) {
289                return;
290            }
291    
292            // Using the array to hold the busMBeans to avoid the CurrentModificationException
293            ObjectName[] mBeans = mbeansRegistered.keySet().toArray(new ObjectName[mbeansRegistered.size()]);
294            int caught = 0;
295            for (ObjectName name : mBeans) {
296                try {
297                    unregister(name);
298                } catch (Exception e) {
299                    LOG.info("Exception unregistering MBean with name " + name, e);
300                    caught++;
301                }
302            }
303            if (caught > 0) {
304                LOG.warn("A number of " + caught
305                         + " exceptions caught while unregistering MBeans during stop operation."
306                         + " See INFO log for details.");
307            }
308        }
309    
310        private void registerMBeanWithServer(Object obj, ObjectName name, boolean forceRegistration)
311            throws JMException {
312    
313            // have we already registered the bean, there can be shared instances in the camel routes
314            boolean exists = isRegistered(name);
315            if (exists) {
316                if (forceRegistration) {
317                    LOG.info("ForceRegistration enabled, unregistering existing MBean with ObjectName: {}", name);
318                    server.unregisterMBean(name);
319                } else {
320                    // okay ignore we do not want to force it and it could be a shared instance
321                    LOG.debug("MBean already registered with ObjectName: {}", name);
322                }
323            }
324    
325            // register bean if by force or not exists
326            ObjectInstance instance = null;
327            if (forceRegistration || !exists) {
328                LOG.trace("Registering MBean with ObjectName: {}", name);
329                instance = server.registerMBean(obj, name);
330            }
331    
332            // need to use the name returned from the server as some JEE servers may modify the name
333            if (instance != null) {
334                ObjectName registeredName = instance.getObjectName();
335                LOG.debug("Registered MBean with ObjectName: {}", registeredName);
336                mbeansRegistered.put(name, registeredName);
337            }
338        }
339    
340        protected void createMBeanServer() {
341            String hostName;
342            boolean canAccessSystemProps = true;
343            try {
344                // we'll do it this way mostly to determine if we should lookup the hostName
345                SecurityManager sm = System.getSecurityManager();
346                if (sm != null) {
347                    sm.checkPropertiesAccess();
348                }
349            } catch (SecurityException se) {
350                canAccessSystemProps = false;
351            }
352    
353            if (canAccessSystemProps) {
354                try {
355                    hostName = InetAddress.getLocalHost().getHostName();
356                } catch (UnknownHostException uhe) {
357                    LOG.info("Cannot determine localhost name. Using default: " + DEFAULT_REGISTRY_PORT, uhe);
358                    hostName = DEFAULT_HOST;
359                }
360            } else {
361                hostName = DEFAULT_HOST;
362            }
363    
364            server = findOrCreateMBeanServer();
365    
366            try {
367                // Create the connector if we need
368                if (createConnector) {
369                    createJmxConnector(hostName);
370                }
371            } catch (IOException ioe) {
372                LOG.warn("Could not create and start JMX connector.", ioe);
373            }
374        }
375        
376        protected MBeanServer findOrCreateMBeanServer() {
377    
378            // return platform mbean server if the option is specified.
379            if (usePlatformMBeanServer) {
380                return ManagementFactory.getPlatformMBeanServer();
381            }
382    
383            // look for the first mbean server that has match default domain name
384            List<MBeanServer> servers = MBeanServerFactory.findMBeanServer(null);
385    
386            for (MBeanServer server : servers) {
387                LOG.debug("Found MBeanServer with default domain {}", server.getDefaultDomain());
388    
389                if (mBeanServerDefaultDomain.equals(server.getDefaultDomain())) {
390                    return server;
391                }
392            }
393    
394            // create a mbean server with the given default domain name
395            return MBeanServerFactory.createMBeanServer(mBeanServerDefaultDomain);
396        }
397    
398        protected void createJmxConnector(String host) throws IOException {
399            ObjectHelper.notEmpty(serviceUrlPath, "serviceUrlPath");
400            ObjectHelper.notNull(registryPort, "registryPort");
401    
402            try {
403                registry = LocateRegistry.createRegistry(registryPort);
404                LOG.debug("Created JMXConnector RMI registry on port {}", registryPort);
405            } catch (RemoteException ex) {
406                // The registry may had been created, we could get the registry instead
407            }
408    
409            // must start with leading slash
410            String path = serviceUrlPath.startsWith("/") ? serviceUrlPath : "/" + serviceUrlPath;
411            // Create an RMI connector and start it
412            final JMXServiceURL url;
413            if (connectorPort > 0) {
414                url = new JMXServiceURL("service:jmx:rmi://" + host + ":" + connectorPort + "/jndi/rmi://" + host
415                                        + ":" + registryPort + path);
416            } else {
417                url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + registryPort + path);
418            }
419    
420            cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, server);
421    
422            // use async thread for starting the JMX Connector
423            // (no need to use a thread pool or enlist in JMX as this thread is terminated when the JMX connector has been started)
424            Thread thread = new Thread(new Runnable() {
425                public void run() {
426                    try {
427                        LOG.debug("Staring JMX Connector thread to listen at: {}", url);
428                        cs.start();
429                        LOG.info("JMX Connector thread started and listening at: {}", url);
430                    } catch (IOException ioe) {
431                        LOG.warn("Could not start JMXConnector thread at: " + url + ". JMX Connector not in use.", ioe);
432                    }
433                }
434            });
435            thread.setDaemon(true);
436            String threadName = camelContext.getExecutorServiceManager().resolveThreadName("JMXConnector: " + url);
437            thread.setName(threadName);
438            thread.start();
439        }
440    
441    }