001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.management;
018
019import java.io.IOException;
020import java.lang.management.ManagementFactory;
021import java.net.InetAddress;
022import java.net.UnknownHostException;
023import java.rmi.NoSuchObjectException;
024import java.rmi.RemoteException;
025import java.rmi.registry.LocateRegistry;
026import java.rmi.registry.Registry;
027import java.rmi.server.UnicastRemoteObject;
028import java.util.LinkedHashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import javax.management.JMException;
034import javax.management.MBeanServer;
035import javax.management.MBeanServerFactory;
036import javax.management.MBeanServerInvocationHandler;
037import javax.management.NotCompliantMBeanException;
038import javax.management.ObjectInstance;
039import javax.management.ObjectName;
040import javax.management.remote.JMXConnectorServer;
041import javax.management.remote.JMXConnectorServerFactory;
042import javax.management.remote.JMXServiceURL;
043
044import org.apache.camel.CamelContext;
045import org.apache.camel.CamelContextAware;
046import org.apache.camel.ManagementStatisticsLevel;
047import org.apache.camel.spi.ManagementAgent;
048import org.apache.camel.spi.ManagementMBeanAssembler;
049import org.apache.camel.support.ServiceSupport;
050import org.apache.camel.util.InetAddressUtil;
051import org.apache.camel.util.ObjectHelper;
052import org.slf4j.Logger;
053import org.slf4j.LoggerFactory;
054
055/**
056 * Default implementation of the Camel JMX service agent
057 */
058public class DefaultManagementAgent extends ServiceSupport implements ManagementAgent, CamelContextAware {
059
060    public static final String DEFAULT_DOMAIN = "org.apache.camel";
061    public static final String DEFAULT_HOST = "localhost";
062    public static final int DEFAULT_REGISTRY_PORT = 1099;
063    public static final int DEFAULT_CONNECTION_PORT = -1;
064    public static final String DEFAULT_SERVICE_URL_PATH = "/jmxrmi/camel";
065    private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementAgent.class);
066
067    private CamelContext camelContext;
068    private MBeanServer server;
069    // need a name -> actual name mapping as some servers changes the names (such as WebSphere)
070    private final ConcurrentMap<ObjectName, ObjectName> mbeansRegistered = new ConcurrentHashMap<ObjectName, ObjectName>();
071    private JMXConnectorServer cs;
072    private Registry registry;
073
074    private Integer registryPort = DEFAULT_REGISTRY_PORT;
075    private Integer connectorPort = DEFAULT_CONNECTION_PORT;
076    private String mBeanServerDefaultDomain = DEFAULT_DOMAIN;
077    private String mBeanObjectDomainName = DEFAULT_DOMAIN;
078    private String serviceUrlPath = DEFAULT_SERVICE_URL_PATH;
079    private Boolean usePlatformMBeanServer = true;
080    private Boolean createConnector = false;
081    private Boolean onlyRegisterProcessorWithCustomId = false;
082    private Boolean loadStatisticsEnabled = false;
083    private Boolean endpointRuntimeStatisticsEnabled = true;
084    private Boolean registerAlways = false;
085    private Boolean registerNewRoutes = true;
086    private Boolean mask = true;
087    private Boolean includeHostName = false;
088    private Boolean useHostIPAddress = false;
089    private String managementNamePattern = "#name#";
090    private ManagementStatisticsLevel statisticsLevel = ManagementStatisticsLevel.Default;
091
092    public DefaultManagementAgent() {
093    }
094
095    public DefaultManagementAgent(CamelContext camelContext) {
096        this.camelContext = camelContext;
097    }
098
099    protected void finalizeSettings() throws Exception {
100        // JVM system properties take precedence over any configuration
101        Map<String, Object> values = new LinkedHashMap<String, Object>();
102
103        if (System.getProperty(JmxSystemPropertyKeys.REGISTRY_PORT) != null) {
104            registryPort = Integer.getInteger(JmxSystemPropertyKeys.REGISTRY_PORT);
105            values.put(JmxSystemPropertyKeys.REGISTRY_PORT, registryPort);
106        }
107        if (System.getProperty(JmxSystemPropertyKeys.CONNECTOR_PORT) != null) {
108            connectorPort = Integer.getInteger(JmxSystemPropertyKeys.CONNECTOR_PORT);
109            values.put(JmxSystemPropertyKeys.CONNECTOR_PORT, connectorPort);
110        }
111        if (System.getProperty(JmxSystemPropertyKeys.DOMAIN) != null) {
112            mBeanServerDefaultDomain = System.getProperty(JmxSystemPropertyKeys.DOMAIN);
113            values.put(JmxSystemPropertyKeys.DOMAIN, mBeanServerDefaultDomain);
114        }
115        if (System.getProperty(JmxSystemPropertyKeys.MBEAN_DOMAIN) != null) {
116            mBeanObjectDomainName = System.getProperty(JmxSystemPropertyKeys.MBEAN_DOMAIN);
117            values.put(JmxSystemPropertyKeys.MBEAN_DOMAIN, mBeanObjectDomainName);
118        }
119        if (System.getProperty(JmxSystemPropertyKeys.SERVICE_URL_PATH) != null) {
120            serviceUrlPath = System.getProperty(JmxSystemPropertyKeys.SERVICE_URL_PATH);
121            values.put(JmxSystemPropertyKeys.SERVICE_URL_PATH, serviceUrlPath);
122        }
123        if (System.getProperty(JmxSystemPropertyKeys.CREATE_CONNECTOR) != null) {
124            createConnector = Boolean.getBoolean(JmxSystemPropertyKeys.CREATE_CONNECTOR);
125            values.put(JmxSystemPropertyKeys.CREATE_CONNECTOR, createConnector);
126        }
127        if (System.getProperty(JmxSystemPropertyKeys.ONLY_REGISTER_PROCESSOR_WITH_CUSTOM_ID) != null) {
128            onlyRegisterProcessorWithCustomId = Boolean.getBoolean(JmxSystemPropertyKeys.ONLY_REGISTER_PROCESSOR_WITH_CUSTOM_ID);
129            values.put(JmxSystemPropertyKeys.ONLY_REGISTER_PROCESSOR_WITH_CUSTOM_ID, onlyRegisterProcessorWithCustomId);
130        }
131        if (System.getProperty(JmxSystemPropertyKeys.USE_PLATFORM_MBS) != null) {
132            usePlatformMBeanServer = Boolean.getBoolean(JmxSystemPropertyKeys.USE_PLATFORM_MBS);
133            values.put(JmxSystemPropertyKeys.USE_PLATFORM_MBS, usePlatformMBeanServer);
134        }
135        if (System.getProperty(JmxSystemPropertyKeys.REGISTER_ALWAYS) != null) {
136            registerAlways = Boolean.getBoolean(JmxSystemPropertyKeys.REGISTER_ALWAYS);
137            values.put(JmxSystemPropertyKeys.REGISTER_ALWAYS, registerAlways);
138        }
139        if (System.getProperty(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES) != null) {
140            registerNewRoutes = Boolean.getBoolean(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES);
141            values.put(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES, registerNewRoutes);
142        }
143        if (System.getProperty(JmxSystemPropertyKeys.MASK) != null) {
144            mask = Boolean.getBoolean(JmxSystemPropertyKeys.MASK);
145            values.put(JmxSystemPropertyKeys.MASK, mask);
146        }
147        if (System.getProperty(JmxSystemPropertyKeys.INCLUDE_HOST_NAME) != null) {
148            includeHostName = Boolean.getBoolean(JmxSystemPropertyKeys.INCLUDE_HOST_NAME);
149            values.put(JmxSystemPropertyKeys.INCLUDE_HOST_NAME, includeHostName);
150        }
151        if (System.getProperty(JmxSystemPropertyKeys.CREATE_CONNECTOR) != null) {
152            createConnector = Boolean.getBoolean(JmxSystemPropertyKeys.CREATE_CONNECTOR);
153            values.put(JmxSystemPropertyKeys.CREATE_CONNECTOR, createConnector);
154        }
155        if (System.getProperty(JmxSystemPropertyKeys.LOAD_STATISTICS_ENABLED) != null) {
156            loadStatisticsEnabled = Boolean.getBoolean(JmxSystemPropertyKeys.LOAD_STATISTICS_ENABLED);
157            values.put(JmxSystemPropertyKeys.LOAD_STATISTICS_ENABLED, loadStatisticsEnabled);
158        }
159        if (System.getProperty(JmxSystemPropertyKeys.ENDPOINT_RUNTIME_STATISTICS_ENABLED) != null) {
160            endpointRuntimeStatisticsEnabled = Boolean.getBoolean(JmxSystemPropertyKeys.ENDPOINT_RUNTIME_STATISTICS_ENABLED);
161            values.put(JmxSystemPropertyKeys.ENDPOINT_RUNTIME_STATISTICS_ENABLED, endpointRuntimeStatisticsEnabled);
162        }
163        if (System.getProperty(JmxSystemPropertyKeys.STATISTICS_LEVEL) != null) {
164            statisticsLevel = camelContext.getTypeConverter().mandatoryConvertTo(ManagementStatisticsLevel.class, System.getProperty(JmxSystemPropertyKeys.STATISTICS_LEVEL));
165            values.put(JmxSystemPropertyKeys.STATISTICS_LEVEL, statisticsLevel);
166        }
167        if (System.getProperty(JmxSystemPropertyKeys.MANAGEMENT_NAME_PATTERN) != null) {
168            managementNamePattern = System.getProperty(JmxSystemPropertyKeys.MANAGEMENT_NAME_PATTERN);
169            values.put(JmxSystemPropertyKeys.MANAGEMENT_NAME_PATTERN, managementNamePattern);
170        }
171        if (System.getProperty(JmxSystemPropertyKeys.USE_HOST_IP_ADDRESS) != null) {
172            useHostIPAddress = Boolean.getBoolean(JmxSystemPropertyKeys.USE_HOST_IP_ADDRESS);
173            values.put(JmxSystemPropertyKeys.USE_HOST_IP_ADDRESS, useHostIPAddress);
174        }
175
176        if (!values.isEmpty()) {
177            LOG.info("ManagementAgent detected JVM system properties: {}", values);
178        }
179    }
180
181    public void setRegistryPort(Integer port) {
182        registryPort = port;
183    }
184
185    public Integer getRegistryPort() {
186        return registryPort;
187    }
188
189    public void setConnectorPort(Integer port) {
190        connectorPort = port;
191    }
192
193    public Integer getConnectorPort() {
194        return connectorPort;
195    }
196
197    public void setMBeanServerDefaultDomain(String domain) {
198        mBeanServerDefaultDomain = domain;
199    }
200
201    public String getMBeanServerDefaultDomain() {
202        return mBeanServerDefaultDomain;
203    }
204
205    public void setMBeanObjectDomainName(String domainName) {
206        mBeanObjectDomainName = domainName;
207    }
208
209    public String getMBeanObjectDomainName() {
210        return mBeanObjectDomainName;
211    }
212
213    public void setServiceUrlPath(String url) {
214        serviceUrlPath = url;
215    }
216
217    public String getServiceUrlPath() {
218        return serviceUrlPath;
219    }
220
221    public void setCreateConnector(Boolean flag) {
222        createConnector = flag;
223    }
224
225    public Boolean getCreateConnector() {
226        return createConnector;
227    }
228
229    public void setUsePlatformMBeanServer(Boolean flag) {
230        usePlatformMBeanServer = flag;
231    }
232
233    public Boolean getUsePlatformMBeanServer() {
234        return usePlatformMBeanServer;
235    }
236
237    public Boolean getOnlyRegisterProcessorWithCustomId() {
238        return onlyRegisterProcessorWithCustomId;
239    }
240
241    public void setOnlyRegisterProcessorWithCustomId(Boolean onlyRegisterProcessorWithCustomId) {
242        this.onlyRegisterProcessorWithCustomId = onlyRegisterProcessorWithCustomId;
243    }
244
245    public void setMBeanServer(MBeanServer mbeanServer) {
246        server = mbeanServer;
247    }
248
249    public MBeanServer getMBeanServer() {
250        return server;
251    }
252
253    public Boolean getRegisterAlways() {
254        return registerAlways != null && registerAlways;
255    }
256
257    public void setRegisterAlways(Boolean registerAlways) {
258        this.registerAlways = registerAlways;
259    }
260
261    public Boolean getRegisterNewRoutes() {
262        return registerNewRoutes != null && registerNewRoutes;
263    }
264
265    public void setRegisterNewRoutes(Boolean registerNewRoutes) {
266        this.registerNewRoutes = registerNewRoutes;
267    }
268
269    public Boolean getMask() {
270        return mask != null && mask;
271    }
272
273    public void setMask(Boolean mask) {
274        this.mask = mask;
275    }
276
277    public Boolean getIncludeHostName() {
278        return includeHostName != null && includeHostName;
279    }
280
281    public void setIncludeHostName(Boolean includeHostName) {
282        this.includeHostName = includeHostName;
283    }
284
285    public Boolean getUseHostIPAddress() {
286        return useHostIPAddress != null && useHostIPAddress;
287    }
288
289    public void setUseHostIPAddress(Boolean useHostIPAddress) {
290        this.useHostIPAddress = useHostIPAddress;
291    }
292
293    public String getManagementNamePattern() {
294        return managementNamePattern;
295    }
296
297    public void setManagementNamePattern(String managementNamePattern) {
298        this.managementNamePattern = managementNamePattern;
299    }
300
301    public Boolean getLoadStatisticsEnabled() {
302        return loadStatisticsEnabled;
303    }
304
305    public void setLoadStatisticsEnabled(Boolean loadStatisticsEnabled) {
306        this.loadStatisticsEnabled = loadStatisticsEnabled;
307    }
308
309    public Boolean getEndpointRuntimeStatisticsEnabled() {
310        return endpointRuntimeStatisticsEnabled;
311    }
312
313    public void setEndpointRuntimeStatisticsEnabled(Boolean endpointRuntimeStatisticsEnabled) {
314        this.endpointRuntimeStatisticsEnabled = endpointRuntimeStatisticsEnabled;
315    }
316
317    public ManagementStatisticsLevel getStatisticsLevel() {
318        return statisticsLevel;
319    }
320
321    public void setStatisticsLevel(ManagementStatisticsLevel statisticsLevel) {
322        this.statisticsLevel = statisticsLevel;
323    }
324
325    public CamelContext getCamelContext() {
326        return camelContext;
327    }
328
329    public void setCamelContext(CamelContext camelContext) {
330        this.camelContext = camelContext;
331    }
332
333    public void register(Object obj, ObjectName name) throws JMException {
334        register(obj, name, false);
335    }
336
337    public void register(Object obj, ObjectName name, boolean forceRegistration) throws JMException {
338        try {
339            registerMBeanWithServer(obj, name, forceRegistration);
340        } catch (NotCompliantMBeanException e) {
341            // If this is not a "normal" MBean, then try to deploy it using JMX annotations
342            ManagementMBeanAssembler assembler = camelContext.getManagementMBeanAssembler();
343            ObjectHelper.notNull(assembler, "ManagementMBeanAssembler", camelContext);
344            Object mbean = assembler.assemble(server, obj, name);
345            if (mbean != null) {
346                // and register the mbean
347                registerMBeanWithServer(mbean, name, forceRegistration);
348            }
349        }
350    }
351
352    public void unregister(ObjectName name) throws JMException {
353        if (isRegistered(name)) {
354            ObjectName on = mbeansRegistered.remove(name);
355            server.unregisterMBean(on);
356            LOG.debug("Unregistered MBean with ObjectName: {}", name);
357        } else {
358            mbeansRegistered.remove(name);
359        }
360    }
361
362    public boolean isRegistered(ObjectName name) {
363        if (server == null) {
364            return false;
365        }
366        ObjectName on = mbeansRegistered.get(name);
367        return (on != null && server.isRegistered(on))
368                || server.isRegistered(name);
369    }
370
371    public <T> T newProxyClient(ObjectName name, Class<T> mbean) {
372        if (isRegistered(name)) {
373            ObjectName on = mbeansRegistered.get(name);
374            return MBeanServerInvocationHandler.newProxyInstance(server, on != null ? on : name, mbean, false);
375        } else {
376            return null;
377        }
378    }
379
380    protected void doStart() throws Exception {
381        ObjectHelper.notNull(camelContext, "CamelContext");
382
383        // create mbean server if is has not be injected.
384        if (server == null) {
385            finalizeSettings();
386            createMBeanServer();
387        }
388
389        LOG.debug("Starting JMX agent on server: {}", getMBeanServer());
390    }
391
392    protected void doStop() throws Exception {
393        // close JMX Connector, if it was created
394        if (cs != null) {
395            try {
396                cs.stop();
397                LOG.debug("Stopped JMX Connector");
398            } catch (IOException e) {
399                LOG.debug("Error occurred during stopping JMXConnectorService: "
400                        + cs + ". This exception will be ignored.");
401            }
402            cs = null;
403        }
404
405        // Unexport JMX RMI registry, if it was created
406        if (registry != null) {
407            try {
408                UnicastRemoteObject.unexportObject(registry, true);
409                LOG.debug("Unexported JMX RMI Registry");
410            } catch (NoSuchObjectException e) {
411                LOG.debug("Error occurred while unexporting JMX RMI registry. This exception will be ignored.");
412            }
413        }
414
415        if (mbeansRegistered.isEmpty()) {
416            return;
417        }
418
419        // Using the array to hold the busMBeans to avoid the CurrentModificationException
420        ObjectName[] mBeans = mbeansRegistered.keySet().toArray(new ObjectName[mbeansRegistered.size()]);
421        int caught = 0;
422        for (ObjectName name : mBeans) {
423            try {
424                unregister(name);
425            } catch (Exception e) {
426                LOG.info("Exception unregistering MBean with name " + name, e);
427                caught++;
428            }
429        }
430        if (caught > 0) {
431            LOG.warn("A number of " + caught
432                     + " exceptions caught while unregistering MBeans during stop operation."
433                     + " See INFO log for details.");
434        }
435    }
436
437    private void registerMBeanWithServer(Object obj, ObjectName name, boolean forceRegistration)
438        throws JMException {
439
440        // have we already registered the bean, there can be shared instances in the camel routes
441        boolean exists = isRegistered(name);
442        if (exists) {
443            if (forceRegistration) {
444                LOG.info("ForceRegistration enabled, unregistering existing MBean with ObjectName: {}", name);
445                server.unregisterMBean(name);
446            } else {
447                // okay ignore we do not want to force it and it could be a shared instance
448                LOG.debug("MBean already registered with ObjectName: {}", name);
449            }
450        }
451
452        // register bean if by force or not exists
453        ObjectInstance instance = null;
454        if (forceRegistration || !exists) {
455            LOG.trace("Registering MBean with ObjectName: {}", name);
456            instance = server.registerMBean(obj, name);
457        }
458
459        // need to use the name returned from the server as some JEE servers may modify the name
460        if (instance != null) {
461            ObjectName registeredName = instance.getObjectName();
462            LOG.debug("Registered MBean with ObjectName: {}", registeredName);
463            mbeansRegistered.put(name, registeredName);
464        }
465    }
466
467    protected void createMBeanServer() {
468        String hostName;
469        boolean canAccessSystemProps = true;
470        try {
471            // we'll do it this way mostly to determine if we should lookup the hostName
472            SecurityManager sm = System.getSecurityManager();
473            if (sm != null) {
474                sm.checkPropertiesAccess();
475            }
476        } catch (SecurityException se) {
477            canAccessSystemProps = false;
478        }
479
480        if (canAccessSystemProps) {
481            try {
482                if (useHostIPAddress) {
483                    hostName = InetAddress.getLocalHost().getHostAddress();
484                } else {
485                    hostName = InetAddressUtil.getLocalHostName();
486                }
487            } catch (UnknownHostException uhe) {
488                LOG.info("Cannot determine localhost name or address. Using default: " + DEFAULT_REGISTRY_PORT, uhe);
489                hostName = DEFAULT_HOST;
490            }
491        } else {
492            hostName = DEFAULT_HOST;
493        }
494
495        server = findOrCreateMBeanServer();
496
497        try {
498            // Create the connector if we need
499            if (createConnector) {
500                createJmxConnector(hostName);
501            }
502        } catch (IOException ioe) {
503            LOG.warn("Could not create and start JMX connector.", ioe);
504        }
505    }
506    
507    protected MBeanServer findOrCreateMBeanServer() {
508
509        // return platform mbean server if the option is specified.
510        if (usePlatformMBeanServer) {
511            return ManagementFactory.getPlatformMBeanServer();
512        }
513
514        // look for the first mbean server that has match default domain name
515        List<MBeanServer> servers = MBeanServerFactory.findMBeanServer(null);
516
517        for (MBeanServer server : servers) {
518            LOG.debug("Found MBeanServer with default domain {}", server.getDefaultDomain());
519
520            if (mBeanServerDefaultDomain.equals(server.getDefaultDomain())) {
521                return server;
522            }
523        }
524
525        // create a mbean server with the given default domain name
526        return MBeanServerFactory.createMBeanServer(mBeanServerDefaultDomain);
527    }
528
529    protected void createJmxConnector(String host) throws IOException {
530        ObjectHelper.notEmpty(serviceUrlPath, "serviceUrlPath");
531        ObjectHelper.notNull(registryPort, "registryPort");
532
533        try {
534            registry = LocateRegistry.createRegistry(registryPort);
535            LOG.debug("Created JMXConnector RMI registry on port {}", registryPort);
536        } catch (RemoteException ex) {
537            // The registry may had been created, we could get the registry instead
538        }
539
540        // must start with leading slash
541        String path = serviceUrlPath.startsWith("/") ? serviceUrlPath : "/" + serviceUrlPath;
542        // Create an RMI connector and start it
543        final JMXServiceURL url;
544        if (connectorPort > 0) {
545            url = new JMXServiceURL("service:jmx:rmi://" + host + ":" + connectorPort + "/jndi/rmi://" + host
546                                    + ":" + registryPort + path);
547        } else {
548            url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + registryPort + path);
549        }
550
551        cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, server);
552
553        // use async thread for starting the JMX Connector
554        // (no need to use a thread pool or enlist in JMX as this thread is terminated when the JMX connector has been started)
555        String threadName = camelContext.getExecutorServiceManager().resolveThreadName("JMXConnector: " + url);
556        Thread thread = getCamelContext().getExecutorServiceManager().newThread(threadName, new Runnable() {
557            public void run() {
558                try {
559                    LOG.debug("Staring JMX Connector thread to listen at: {}", url);
560                    cs.start();
561                    LOG.info("JMX Connector thread started and listening at: {}", url);
562                } catch (IOException ioe) {
563                    LOG.warn("Could not start JMXConnector thread at: " + url + ". JMX Connector not in use.", ioe);
564                }
565            }
566        });
567        thread.start();
568    }
569
570}