001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.management;
018
019import java.io.IOException;
020import java.lang.management.ManagementFactory;
021import java.net.InetAddress;
022import java.net.UnknownHostException;
023import java.rmi.NoSuchObjectException;
024import java.rmi.RemoteException;
025import java.rmi.registry.LocateRegistry;
026import java.rmi.registry.Registry;
027import java.rmi.server.UnicastRemoteObject;
028import java.util.LinkedHashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033
034import javax.management.JMException;
035import javax.management.MBeanServer;
036import javax.management.MBeanServerFactory;
037import javax.management.MBeanServerInvocationHandler;
038import javax.management.NotCompliantMBeanException;
039import javax.management.ObjectInstance;
040import javax.management.ObjectName;
041import javax.management.remote.JMXConnectorServer;
042import javax.management.remote.JMXConnectorServerFactory;
043import javax.management.remote.JMXServiceURL;
044
045import org.apache.camel.CamelContext;
046import org.apache.camel.CamelContextAware;
047import org.apache.camel.ManagementStatisticsLevel;
048import org.apache.camel.spi.ManagementAgent;
049import org.apache.camel.spi.ManagementMBeanAssembler;
050import org.apache.camel.support.ServiceSupport;
051import org.apache.camel.util.InetAddressUtil;
052import org.apache.camel.util.ObjectHelper;
053import org.apache.camel.util.ServiceHelper;
054import org.apache.camel.util.StringHelper;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * Default implementation of the Camel JMX service agent
060 */
061public class DefaultManagementAgent extends ServiceSupport implements ManagementAgent, CamelContextAware {
062
063    public static final String DEFAULT_DOMAIN = "org.apache.camel";
064    public static final String DEFAULT_HOST = "localhost";
065    public static final int DEFAULT_REGISTRY_PORT = 1099;
066    public static final int DEFAULT_CONNECTION_PORT = -1;
067    public static final String DEFAULT_SERVICE_URL_PATH = "/jmxrmi/camel";
068    private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementAgent.class);
069
070    private CamelContext camelContext;
071    private MBeanServer server;
072    private ManagementMBeanAssembler assembler;
073
074    // need a name -> actual name mapping as some servers changes the names (such as WebSphere)
075    private final ConcurrentMap<ObjectName, ObjectName> mbeansRegistered = new ConcurrentHashMap<>();
076    private JMXConnectorServer cs;
077    private Registry registry;
078
079    private Integer registryPort = DEFAULT_REGISTRY_PORT;
080    private Integer connectorPort = DEFAULT_CONNECTION_PORT;
081    private String mBeanServerDefaultDomain = DEFAULT_DOMAIN;
082    private String mBeanObjectDomainName = DEFAULT_DOMAIN;
083    private String serviceUrlPath = DEFAULT_SERVICE_URL_PATH;
084    private Boolean usePlatformMBeanServer = true;
085    private Boolean createConnector = false;
086    private Boolean onlyRegisterProcessorWithCustomId = false;
087    private Boolean loadStatisticsEnabled = false;
088    private Boolean endpointRuntimeStatisticsEnabled;
089    private Boolean registerAlways = false;
090    private Boolean registerNewRoutes = true;
091    private Boolean mask = true;
092    private Boolean includeHostName = false;
093    private Boolean useHostIPAddress = false;
094    private String managementNamePattern = "#name#";
095    private ManagementStatisticsLevel statisticsLevel = ManagementStatisticsLevel.Default;
096
097    public DefaultManagementAgent() {
098    }
099
100    public DefaultManagementAgent(CamelContext camelContext) {
101        this.camelContext = camelContext;
102    }
103
104    protected void finalizeSettings() throws Exception {
105        // JVM system properties take precedence over any configuration
106        Map<String, Object> values = new LinkedHashMap<>();
107
108        if (System.getProperty(JmxSystemPropertyKeys.REGISTRY_PORT) != null) {
109            registryPort = Integer.getInteger(JmxSystemPropertyKeys.REGISTRY_PORT);
110            values.put(JmxSystemPropertyKeys.REGISTRY_PORT, registryPort);
111        }
112        if (System.getProperty(JmxSystemPropertyKeys.CONNECTOR_PORT) != null) {
113            connectorPort = Integer.getInteger(JmxSystemPropertyKeys.CONNECTOR_PORT);
114            values.put(JmxSystemPropertyKeys.CONNECTOR_PORT, connectorPort);
115        }
116        if (System.getProperty(JmxSystemPropertyKeys.DOMAIN) != null) {
117            mBeanServerDefaultDomain = System.getProperty(JmxSystemPropertyKeys.DOMAIN);
118            values.put(JmxSystemPropertyKeys.DOMAIN, mBeanServerDefaultDomain);
119        }
120        if (System.getProperty(JmxSystemPropertyKeys.MBEAN_DOMAIN) != null) {
121            mBeanObjectDomainName = System.getProperty(JmxSystemPropertyKeys.MBEAN_DOMAIN);
122            values.put(JmxSystemPropertyKeys.MBEAN_DOMAIN, mBeanObjectDomainName);
123        }
124        if (System.getProperty(JmxSystemPropertyKeys.SERVICE_URL_PATH) != null) {
125            serviceUrlPath = System.getProperty(JmxSystemPropertyKeys.SERVICE_URL_PATH);
126            values.put(JmxSystemPropertyKeys.SERVICE_URL_PATH, serviceUrlPath);
127        }
128        if (System.getProperty(JmxSystemPropertyKeys.CREATE_CONNECTOR) != null) {
129            createConnector = Boolean.getBoolean(JmxSystemPropertyKeys.CREATE_CONNECTOR);
130            values.put(JmxSystemPropertyKeys.CREATE_CONNECTOR, createConnector);
131        }
132        if (System.getProperty(JmxSystemPropertyKeys.ONLY_REGISTER_PROCESSOR_WITH_CUSTOM_ID) != null) {
133            onlyRegisterProcessorWithCustomId = Boolean.getBoolean(JmxSystemPropertyKeys.ONLY_REGISTER_PROCESSOR_WITH_CUSTOM_ID);
134            values.put(JmxSystemPropertyKeys.ONLY_REGISTER_PROCESSOR_WITH_CUSTOM_ID, onlyRegisterProcessorWithCustomId);
135        }
136        if (System.getProperty(JmxSystemPropertyKeys.USE_PLATFORM_MBS) != null) {
137            usePlatformMBeanServer = Boolean.getBoolean(JmxSystemPropertyKeys.USE_PLATFORM_MBS);
138            values.put(JmxSystemPropertyKeys.USE_PLATFORM_MBS, usePlatformMBeanServer);
139        }
140        if (System.getProperty(JmxSystemPropertyKeys.REGISTER_ALWAYS) != null) {
141            registerAlways = Boolean.getBoolean(JmxSystemPropertyKeys.REGISTER_ALWAYS);
142            values.put(JmxSystemPropertyKeys.REGISTER_ALWAYS, registerAlways);
143        }
144        if (System.getProperty(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES) != null) {
145            registerNewRoutes = Boolean.getBoolean(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES);
146            values.put(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES, registerNewRoutes);
147        }
148        if (System.getProperty(JmxSystemPropertyKeys.MASK) != null) {
149            mask = Boolean.getBoolean(JmxSystemPropertyKeys.MASK);
150            values.put(JmxSystemPropertyKeys.MASK, mask);
151        }
152        if (System.getProperty(JmxSystemPropertyKeys.INCLUDE_HOST_NAME) != null) {
153            includeHostName = Boolean.getBoolean(JmxSystemPropertyKeys.INCLUDE_HOST_NAME);
154            values.put(JmxSystemPropertyKeys.INCLUDE_HOST_NAME, includeHostName);
155        }
156        if (System.getProperty(JmxSystemPropertyKeys.CREATE_CONNECTOR) != null) {
157            createConnector = Boolean.getBoolean(JmxSystemPropertyKeys.CREATE_CONNECTOR);
158            values.put(JmxSystemPropertyKeys.CREATE_CONNECTOR, createConnector);
159        }
160        if (System.getProperty(JmxSystemPropertyKeys.LOAD_STATISTICS_ENABLED) != null) {
161            loadStatisticsEnabled = Boolean.getBoolean(JmxSystemPropertyKeys.LOAD_STATISTICS_ENABLED);
162            values.put(JmxSystemPropertyKeys.LOAD_STATISTICS_ENABLED, loadStatisticsEnabled);
163        }
164        if (System.getProperty(JmxSystemPropertyKeys.ENDPOINT_RUNTIME_STATISTICS_ENABLED) != null) {
165            endpointRuntimeStatisticsEnabled = Boolean.getBoolean(JmxSystemPropertyKeys.ENDPOINT_RUNTIME_STATISTICS_ENABLED);
166            values.put(JmxSystemPropertyKeys.ENDPOINT_RUNTIME_STATISTICS_ENABLED, endpointRuntimeStatisticsEnabled);
167        }
168        if (System.getProperty(JmxSystemPropertyKeys.STATISTICS_LEVEL) != null) {
169            statisticsLevel = camelContext.getTypeConverter().mandatoryConvertTo(ManagementStatisticsLevel.class, System.getProperty(JmxSystemPropertyKeys.STATISTICS_LEVEL));
170            values.put(JmxSystemPropertyKeys.STATISTICS_LEVEL, statisticsLevel);
171        }
172        if (System.getProperty(JmxSystemPropertyKeys.MANAGEMENT_NAME_PATTERN) != null) {
173            managementNamePattern = System.getProperty(JmxSystemPropertyKeys.MANAGEMENT_NAME_PATTERN);
174            values.put(JmxSystemPropertyKeys.MANAGEMENT_NAME_PATTERN, managementNamePattern);
175        }
176        if (System.getProperty(JmxSystemPropertyKeys.USE_HOST_IP_ADDRESS) != null) {
177            useHostIPAddress = Boolean.getBoolean(JmxSystemPropertyKeys.USE_HOST_IP_ADDRESS);
178            values.put(JmxSystemPropertyKeys.USE_HOST_IP_ADDRESS, useHostIPAddress);
179        }
180
181        if (!values.isEmpty()) {
182            LOG.info("ManagementAgent detected JVM system properties: {}", values);
183        }
184    }
185
186    public void setRegistryPort(Integer port) {
187        registryPort = port;
188    }
189
190    public Integer getRegistryPort() {
191        return registryPort;
192    }
193
194    public void setConnectorPort(Integer port) {
195        connectorPort = port;
196    }
197
198    public Integer getConnectorPort() {
199        return connectorPort;
200    }
201
202    public void setMBeanServerDefaultDomain(String domain) {
203        mBeanServerDefaultDomain = domain;
204    }
205
206    public String getMBeanServerDefaultDomain() {
207        return mBeanServerDefaultDomain;
208    }
209
210    public void setMBeanObjectDomainName(String domainName) {
211        mBeanObjectDomainName = domainName;
212    }
213
214    public String getMBeanObjectDomainName() {
215        return mBeanObjectDomainName;
216    }
217
218    public void setServiceUrlPath(String url) {
219        serviceUrlPath = url;
220    }
221
222    public String getServiceUrlPath() {
223        return serviceUrlPath;
224    }
225
226    public void setCreateConnector(Boolean flag) {
227        createConnector = flag;
228    }
229
230    public Boolean getCreateConnector() {
231        return createConnector;
232    }
233
234    public void setUsePlatformMBeanServer(Boolean flag) {
235        usePlatformMBeanServer = flag;
236    }
237
238    public Boolean getUsePlatformMBeanServer() {
239        return usePlatformMBeanServer;
240    }
241
242    public Boolean getOnlyRegisterProcessorWithCustomId() {
243        return onlyRegisterProcessorWithCustomId;
244    }
245
246    public void setOnlyRegisterProcessorWithCustomId(Boolean onlyRegisterProcessorWithCustomId) {
247        this.onlyRegisterProcessorWithCustomId = onlyRegisterProcessorWithCustomId;
248    }
249
250    public void setMBeanServer(MBeanServer mbeanServer) {
251        server = mbeanServer;
252    }
253
254    public MBeanServer getMBeanServer() {
255        return server;
256    }
257
258    public Boolean getRegisterAlways() {
259        return registerAlways != null && registerAlways;
260    }
261
262    public void setRegisterAlways(Boolean registerAlways) {
263        this.registerAlways = registerAlways;
264    }
265
266    public Boolean getRegisterNewRoutes() {
267        return registerNewRoutes != null && registerNewRoutes;
268    }
269
270    public void setRegisterNewRoutes(Boolean registerNewRoutes) {
271        this.registerNewRoutes = registerNewRoutes;
272    }
273
274    public Boolean getMask() {
275        return mask != null && mask;
276    }
277
278    public void setMask(Boolean mask) {
279        this.mask = mask;
280    }
281
282    public Boolean getIncludeHostName() {
283        return includeHostName != null && includeHostName;
284    }
285
286    public void setIncludeHostName(Boolean includeHostName) {
287        this.includeHostName = includeHostName;
288    }
289
290    public Boolean getUseHostIPAddress() {
291        return useHostIPAddress != null && useHostIPAddress;
292    }
293
294    public void setUseHostIPAddress(Boolean useHostIPAddress) {
295        this.useHostIPAddress = useHostIPAddress;
296    }
297
298    public String getManagementNamePattern() {
299        return managementNamePattern;
300    }
301
302    public void setManagementNamePattern(String managementNamePattern) {
303        this.managementNamePattern = managementNamePattern;
304    }
305
306    public Boolean getLoadStatisticsEnabled() {
307        return loadStatisticsEnabled;
308    }
309
310    public void setLoadStatisticsEnabled(Boolean loadStatisticsEnabled) {
311        this.loadStatisticsEnabled = loadStatisticsEnabled;
312    }
313
314    public Boolean getEndpointRuntimeStatisticsEnabled() {
315        return endpointRuntimeStatisticsEnabled;
316    }
317
318    public void setEndpointRuntimeStatisticsEnabled(Boolean endpointRuntimeStatisticsEnabled) {
319        this.endpointRuntimeStatisticsEnabled = endpointRuntimeStatisticsEnabled;
320    }
321
322    public ManagementStatisticsLevel getStatisticsLevel() {
323        return statisticsLevel;
324    }
325
326    public void setStatisticsLevel(ManagementStatisticsLevel statisticsLevel) {
327        this.statisticsLevel = statisticsLevel;
328    }
329
330    public CamelContext getCamelContext() {
331        return camelContext;
332    }
333
334    public void setCamelContext(CamelContext camelContext) {
335        this.camelContext = camelContext;
336    }
337
338    public void register(Object obj, ObjectName name) throws JMException {
339        register(obj, name, false);
340    }
341
342    public void register(Object obj, ObjectName name, boolean forceRegistration) throws JMException {
343        try {
344            registerMBeanWithServer(obj, name, forceRegistration);
345        } catch (NotCompliantMBeanException e) {
346            // If this is not a "normal" MBean, then try to deploy it using JMX annotations
347            ObjectHelper.notNull(assembler, "ManagementMBeanAssembler", camelContext);
348            Object mbean = assembler.assemble(server, obj, name);
349            if (mbean != null) {
350                // and register the mbean
351                registerMBeanWithServer(mbean, name, forceRegistration);
352            }
353        }
354    }
355
356    public void unregister(ObjectName name) throws JMException {
357        if (isRegistered(name)) {
358            ObjectName on = mbeansRegistered.remove(name);
359            server.unregisterMBean(on);
360            LOG.debug("Unregistered MBean with ObjectName: {}", name);
361        } else {
362            mbeansRegistered.remove(name);
363        }
364    }
365
366    public boolean isRegistered(ObjectName name) {
367        if (server == null) {
368            return false;
369        }
370        ObjectName on = mbeansRegistered.get(name);
371        return (on != null && server.isRegistered(on))
372                || server.isRegistered(name);
373    }
374
375    public <T> T newProxyClient(ObjectName name, Class<T> mbean) {
376        if (isRegistered(name)) {
377            ObjectName on = mbeansRegistered.get(name);
378            return MBeanServerInvocationHandler.newProxyInstance(server, on != null ? on : name, mbean, false);
379        } else {
380            return null;
381        }
382    }
383
384    protected void doStart() throws Exception {
385        ObjectHelper.notNull(camelContext, "CamelContext");
386
387        // create mbean server if is has not be injected.
388        if (server == null) {
389            finalizeSettings();
390            createMBeanServer();
391        }
392
393        // ensure assembler is started
394        assembler = camelContext.getManagementMBeanAssembler();
395        ServiceHelper.startService(assembler);
396
397        LOG.debug("Starting JMX agent on server: {}", getMBeanServer());
398    }
399
400    protected void doStop() throws Exception {
401        // close JMX Connector, if it was created
402        if (cs != null) {
403            try {
404                cs.stop();
405                LOG.debug("Stopped JMX Connector");
406            } catch (IOException e) {
407                LOG.debug("Error occurred during stopping JMXConnectorService: "
408                        + cs + ". This exception will be ignored.");
409            }
410            cs = null;
411        }
412
413        // Unexport JMX RMI registry, if it was created
414        if (registry != null) {
415            try {
416                UnicastRemoteObject.unexportObject(registry, true);
417                LOG.debug("Unexported JMX RMI Registry");
418            } catch (NoSuchObjectException e) {
419                LOG.debug("Error occurred while unexporting JMX RMI registry. This exception will be ignored.");
420            }
421        }
422
423        if (mbeansRegistered.isEmpty()) {
424            return;
425        }
426
427        // Using the array to hold the busMBeans to avoid the CurrentModificationException
428        ObjectName[] mBeans = mbeansRegistered.keySet().toArray(new ObjectName[mbeansRegistered.size()]);
429        int caught = 0;
430        for (ObjectName name : mBeans) {
431            try {
432                unregister(name);
433            } catch (Exception e) {
434                LOG.info("Exception unregistering MBean with name {}", name, e);
435                caught++;
436            }
437        }
438        if (caught > 0) {
439            LOG.warn("A number of " + caught
440                     + " exceptions caught while unregistering MBeans during stop operation."
441                     + " See INFO log for details.");
442        }
443
444        ServiceHelper.stopService(assembler);
445    }
446
447    private void registerMBeanWithServer(Object obj, ObjectName name, boolean forceRegistration)
448        throws JMException {
449
450        // have we already registered the bean, there can be shared instances in the camel routes
451        boolean exists = isRegistered(name);
452        if (exists) {
453            if (forceRegistration) {
454                LOG.info("ForceRegistration enabled, unregistering existing MBean with ObjectName: {}", name);
455                server.unregisterMBean(name);
456            } else {
457                // okay ignore we do not want to force it and it could be a shared instance
458                LOG.debug("MBean already registered with ObjectName: {}", name);
459            }
460        }
461
462        // register bean if by force or not exists
463        ObjectInstance instance = null;
464        if (forceRegistration || !exists) {
465            LOG.trace("Registering MBean with ObjectName: {}", name);
466            instance = server.registerMBean(obj, name);
467        }
468
469        // need to use the name returned from the server as some JEE servers may modify the name
470        if (instance != null) {
471            ObjectName registeredName = instance.getObjectName();
472            LOG.debug("Registered MBean with ObjectName: {}", registeredName);
473            mbeansRegistered.put(name, registeredName);
474        }
475    }
476
477    protected void createMBeanServer() {
478        String hostName;
479        boolean canAccessSystemProps = true;
480        try {
481            // we'll do it this way mostly to determine if we should lookup the hostName
482            SecurityManager sm = System.getSecurityManager();
483            if (sm != null) {
484                sm.checkPropertiesAccess();
485            }
486        } catch (SecurityException se) {
487            canAccessSystemProps = false;
488        }
489
490        if (canAccessSystemProps) {
491            try {
492                if (useHostIPAddress) {
493                    hostName = InetAddress.getLocalHost().getHostAddress();
494                } else {
495                    hostName = InetAddressUtil.getLocalHostName();
496                }
497            } catch (UnknownHostException uhe) {
498                LOG.info("Cannot determine localhost name or address. Using default: {}", DEFAULT_REGISTRY_PORT, uhe);
499                hostName = DEFAULT_HOST;
500            }
501        } else {
502            hostName = DEFAULT_HOST;
503        }
504
505        server = findOrCreateMBeanServer();
506
507        try {
508            // Create the connector if we need
509            if (createConnector) {
510                createJmxConnector(hostName);
511            }
512        } catch (IOException ioe) {
513            LOG.warn("Could not create and start JMX connector.", ioe);
514        }
515    }
516    
517    protected MBeanServer findOrCreateMBeanServer() {
518
519        // return platform mbean server if the option is specified.
520        if (usePlatformMBeanServer) {
521            return ManagementFactory.getPlatformMBeanServer();
522        }
523
524        // look for the first mbean server that has match default domain name
525        List<MBeanServer> servers = MBeanServerFactory.findMBeanServer(null);
526
527        for (MBeanServer server : servers) {
528            LOG.debug("Found MBeanServer with default domain {}", server.getDefaultDomain());
529
530            if (mBeanServerDefaultDomain.equals(server.getDefaultDomain())) {
531                return server;
532            }
533        }
534
535        // create a mbean server with the given default domain name
536        return MBeanServerFactory.createMBeanServer(mBeanServerDefaultDomain);
537    }
538
539    protected void createJmxConnector(String host) throws IOException {
540        StringHelper.notEmpty(serviceUrlPath, "serviceUrlPath");
541        ObjectHelper.notNull(registryPort, "registryPort");
542
543        try {
544            registry = LocateRegistry.createRegistry(registryPort);
545            LOG.debug("Created JMXConnector RMI registry on port {}", registryPort);
546        } catch (RemoteException ex) {
547            // The registry may had been created, we could get the registry instead
548        }
549
550        // must start with leading slash
551        String path = serviceUrlPath.startsWith("/") ? serviceUrlPath : "/" + serviceUrlPath;
552        // Create an RMI connector and start it
553        final JMXServiceURL url;
554        if (connectorPort > 0) {
555            url = new JMXServiceURL("service:jmx:rmi://" + host + ":" + connectorPort + "/jndi/rmi://" + host
556                                    + ":" + registryPort + path);
557        } else {
558            url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + registryPort + path);
559        }
560
561        cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, server);
562
563        // use async thread for starting the JMX Connector
564        // (no need to use a thread pool or enlist in JMX as this thread is terminated when the JMX connector has been started)
565        String threadName = camelContext.getExecutorServiceManager().resolveThreadName("JMXConnector: " + url);
566        Thread thread = getCamelContext().getExecutorServiceManager().newThread(threadName, new Runnable() {
567            public void run() {
568                try {
569                    LOG.debug("Staring JMX Connector thread to listen at: {}", url);
570                    cs.start();
571                    LOG.info("JMX Connector thread started and listening at: {}", url);
572                } catch (IOException ioe) {
573                    LOG.warn("Could not start JMXConnector thread at: " + url + ". JMX Connector not in use.", ioe);
574                }
575            }
576        });
577        thread.start();
578    }
579
580}