001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.camel.management;
018
019import java.io.IOException;
020import java.lang.management.ManagementFactory;
021import java.net.InetAddress;
022import java.net.UnknownHostException;
023import java.rmi.NoSuchObjectException;
024import java.rmi.RemoteException;
025import java.rmi.registry.LocateRegistry;
026import java.rmi.registry.Registry;
027import java.rmi.server.UnicastRemoteObject;
028import java.util.LinkedHashMap;
029import java.util.List;
030import java.util.Map;
031import java.util.concurrent.ConcurrentHashMap;
032import java.util.concurrent.ConcurrentMap;
033import javax.management.JMException;
034import javax.management.MBeanServer;
035import javax.management.MBeanServerFactory;
036import javax.management.MBeanServerInvocationHandler;
037import javax.management.NotCompliantMBeanException;
038import javax.management.ObjectInstance;
039import javax.management.ObjectName;
040import javax.management.remote.JMXConnectorServer;
041import javax.management.remote.JMXConnectorServerFactory;
042import javax.management.remote.JMXServiceURL;
043
044import org.apache.camel.CamelContext;
045import org.apache.camel.CamelContextAware;
046import org.apache.camel.ManagementStatisticsLevel;
047import org.apache.camel.spi.ManagementAgent;
048import org.apache.camel.spi.ManagementMBeanAssembler;
049import org.apache.camel.support.ServiceSupport;
050import org.apache.camel.util.ObjectHelper;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054/**
055 * Default implementation of the Camel JMX service agent
056 */
057public class DefaultManagementAgent extends ServiceSupport implements ManagementAgent, CamelContextAware {
058
059    public static final String DEFAULT_DOMAIN = "org.apache.camel";
060    public static final String DEFAULT_HOST = "localhost";
061    public static final int DEFAULT_REGISTRY_PORT = 1099;
062    public static final int DEFAULT_CONNECTION_PORT = -1;
063    public static final String DEFAULT_SERVICE_URL_PATH = "/jmxrmi/camel";
064    private static final Logger LOG = LoggerFactory.getLogger(DefaultManagementAgent.class);
065
066    private CamelContext camelContext;
067    private MBeanServer server;
068    // need a name -> actual name mapping as some servers changes the names (such as WebSphere)
069    private final ConcurrentMap<ObjectName, ObjectName> mbeansRegistered = new ConcurrentHashMap<ObjectName, ObjectName>();
070    private JMXConnectorServer cs;
071    private Registry registry;
072
073    private Integer registryPort = DEFAULT_REGISTRY_PORT;
074    private Integer connectorPort = DEFAULT_CONNECTION_PORT;
075    private String mBeanServerDefaultDomain = DEFAULT_DOMAIN;
076    private String mBeanObjectDomainName = DEFAULT_DOMAIN;
077    private String serviceUrlPath = DEFAULT_SERVICE_URL_PATH;
078    private Boolean usePlatformMBeanServer = true;
079    private Boolean createConnector = false;
080    private Boolean onlyRegisterProcessorWithCustomId = false;
081    private Boolean loadStatisticsEnabled = false;
082    private Boolean endpointRuntimeStatisticsEnabled = true;
083    private Boolean registerAlways = false;
084    private Boolean registerNewRoutes = true;
085    private Boolean mask = true;
086    private Boolean includeHostName = false;
087    private Boolean useHostIPAddress = false;
088    private String managementNamePattern = "#name#";
089    private ManagementStatisticsLevel statisticsLevel = ManagementStatisticsLevel.Default;
090
091    public DefaultManagementAgent() {
092    }
093
094    public DefaultManagementAgent(CamelContext camelContext) {
095        this.camelContext = camelContext;
096    }
097
098    protected void finalizeSettings() throws Exception {
099        // JVM system properties take precedence over any configuration
100        Map<String, Object> values = new LinkedHashMap<String, Object>();
101
102        if (System.getProperty(JmxSystemPropertyKeys.REGISTRY_PORT) != null) {
103            registryPort = Integer.getInteger(JmxSystemPropertyKeys.REGISTRY_PORT);
104            values.put(JmxSystemPropertyKeys.REGISTRY_PORT, registryPort);
105        }
106        if (System.getProperty(JmxSystemPropertyKeys.CONNECTOR_PORT) != null) {
107            connectorPort = Integer.getInteger(JmxSystemPropertyKeys.CONNECTOR_PORT);
108            values.put(JmxSystemPropertyKeys.CONNECTOR_PORT, connectorPort);
109        }
110        if (System.getProperty(JmxSystemPropertyKeys.DOMAIN) != null) {
111            mBeanServerDefaultDomain = System.getProperty(JmxSystemPropertyKeys.DOMAIN);
112            values.put(JmxSystemPropertyKeys.DOMAIN, mBeanServerDefaultDomain);
113        }
114        if (System.getProperty(JmxSystemPropertyKeys.MBEAN_DOMAIN) != null) {
115            mBeanObjectDomainName = System.getProperty(JmxSystemPropertyKeys.MBEAN_DOMAIN);
116            values.put(JmxSystemPropertyKeys.MBEAN_DOMAIN, mBeanObjectDomainName);
117        }
118        if (System.getProperty(JmxSystemPropertyKeys.SERVICE_URL_PATH) != null) {
119            serviceUrlPath = System.getProperty(JmxSystemPropertyKeys.SERVICE_URL_PATH);
120            values.put(JmxSystemPropertyKeys.SERVICE_URL_PATH, serviceUrlPath);
121        }
122        if (System.getProperty(JmxSystemPropertyKeys.CREATE_CONNECTOR) != null) {
123            createConnector = Boolean.getBoolean(JmxSystemPropertyKeys.CREATE_CONNECTOR);
124            values.put(JmxSystemPropertyKeys.CREATE_CONNECTOR, createConnector);
125        }
126        if (System.getProperty(JmxSystemPropertyKeys.ONLY_REGISTER_PROCESSOR_WITH_CUSTOM_ID) != null) {
127            onlyRegisterProcessorWithCustomId = Boolean.getBoolean(JmxSystemPropertyKeys.ONLY_REGISTER_PROCESSOR_WITH_CUSTOM_ID);
128            values.put(JmxSystemPropertyKeys.ONLY_REGISTER_PROCESSOR_WITH_CUSTOM_ID, onlyRegisterProcessorWithCustomId);
129        }
130        if (System.getProperty(JmxSystemPropertyKeys.USE_PLATFORM_MBS) != null) {
131            usePlatformMBeanServer = Boolean.getBoolean(JmxSystemPropertyKeys.USE_PLATFORM_MBS);
132            values.put(JmxSystemPropertyKeys.USE_PLATFORM_MBS, usePlatformMBeanServer);
133        }
134        if (System.getProperty(JmxSystemPropertyKeys.REGISTER_ALWAYS) != null) {
135            registerAlways = Boolean.getBoolean(JmxSystemPropertyKeys.REGISTER_ALWAYS);
136            values.put(JmxSystemPropertyKeys.REGISTER_ALWAYS, registerAlways);
137        }
138        if (System.getProperty(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES) != null) {
139            registerNewRoutes = Boolean.getBoolean(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES);
140            values.put(JmxSystemPropertyKeys.REGISTER_NEW_ROUTES, registerNewRoutes);
141        }
142        if (System.getProperty(JmxSystemPropertyKeys.MASK) != null) {
143            mask = Boolean.getBoolean(JmxSystemPropertyKeys.MASK);
144            values.put(JmxSystemPropertyKeys.MASK, mask);
145        }
146        if (System.getProperty(JmxSystemPropertyKeys.INCLUDE_HOST_NAME) != null) {
147            includeHostName = Boolean.getBoolean(JmxSystemPropertyKeys.INCLUDE_HOST_NAME);
148            values.put(JmxSystemPropertyKeys.INCLUDE_HOST_NAME, includeHostName);
149        }
150        if (System.getProperty(JmxSystemPropertyKeys.CREATE_CONNECTOR) != null) {
151            createConnector = Boolean.getBoolean(JmxSystemPropertyKeys.CREATE_CONNECTOR);
152            values.put(JmxSystemPropertyKeys.CREATE_CONNECTOR, createConnector);
153        }
154        if (System.getProperty(JmxSystemPropertyKeys.LOAD_STATISTICS_ENABLED) != null) {
155            loadStatisticsEnabled = Boolean.getBoolean(JmxSystemPropertyKeys.LOAD_STATISTICS_ENABLED);
156            values.put(JmxSystemPropertyKeys.LOAD_STATISTICS_ENABLED, loadStatisticsEnabled);
157        }
158        if (System.getProperty(JmxSystemPropertyKeys.ENDPOINT_RUNTIME_STATISTICS_ENABLED) != null) {
159            endpointRuntimeStatisticsEnabled = Boolean.getBoolean(JmxSystemPropertyKeys.ENDPOINT_RUNTIME_STATISTICS_ENABLED);
160            values.put(JmxSystemPropertyKeys.ENDPOINT_RUNTIME_STATISTICS_ENABLED, endpointRuntimeStatisticsEnabled);
161        }
162        if (System.getProperty(JmxSystemPropertyKeys.STATISTICS_LEVEL) != null) {
163            statisticsLevel = camelContext.getTypeConverter().mandatoryConvertTo(ManagementStatisticsLevel.class, System.getProperty(JmxSystemPropertyKeys.STATISTICS_LEVEL));
164            values.put(JmxSystemPropertyKeys.STATISTICS_LEVEL, statisticsLevel);
165        }
166        if (System.getProperty(JmxSystemPropertyKeys.MANAGEMENT_NAME_PATTERN) != null) {
167            managementNamePattern = System.getProperty(JmxSystemPropertyKeys.MANAGEMENT_NAME_PATTERN);
168            values.put(JmxSystemPropertyKeys.MANAGEMENT_NAME_PATTERN, managementNamePattern);
169        }
170        if (System.getProperty(JmxSystemPropertyKeys.USE_HOST_IP_ADDRESS) != null) {
171            useHostIPAddress = Boolean.getBoolean(JmxSystemPropertyKeys.USE_HOST_IP_ADDRESS);
172            values.put(JmxSystemPropertyKeys.USE_HOST_IP_ADDRESS, useHostIPAddress);
173        }
174
175        if (!values.isEmpty()) {
176            LOG.info("ManagementAgent detected JVM system properties: {}", values);
177        }
178    }
179
180    public void setRegistryPort(Integer port) {
181        registryPort = port;
182    }
183
184    public Integer getRegistryPort() {
185        return registryPort;
186    }
187
188    public void setConnectorPort(Integer port) {
189        connectorPort = port;
190    }
191
192    public Integer getConnectorPort() {
193        return connectorPort;
194    }
195
196    public void setMBeanServerDefaultDomain(String domain) {
197        mBeanServerDefaultDomain = domain;
198    }
199
200    public String getMBeanServerDefaultDomain() {
201        return mBeanServerDefaultDomain;
202    }
203
204    public void setMBeanObjectDomainName(String domainName) {
205        mBeanObjectDomainName = domainName;
206    }
207
208    public String getMBeanObjectDomainName() {
209        return mBeanObjectDomainName;
210    }
211
212    public void setServiceUrlPath(String url) {
213        serviceUrlPath = url;
214    }
215
216    public String getServiceUrlPath() {
217        return serviceUrlPath;
218    }
219
220    public void setCreateConnector(Boolean flag) {
221        createConnector = flag;
222    }
223
224    public Boolean getCreateConnector() {
225        return createConnector;
226    }
227
228    public void setUsePlatformMBeanServer(Boolean flag) {
229        usePlatformMBeanServer = flag;
230    }
231
232    public Boolean getUsePlatformMBeanServer() {
233        return usePlatformMBeanServer;
234    }
235
236    public Boolean getOnlyRegisterProcessorWithCustomId() {
237        return onlyRegisterProcessorWithCustomId;
238    }
239
240    public void setOnlyRegisterProcessorWithCustomId(Boolean onlyRegisterProcessorWithCustomId) {
241        this.onlyRegisterProcessorWithCustomId = onlyRegisterProcessorWithCustomId;
242    }
243
244    public void setMBeanServer(MBeanServer mbeanServer) {
245        server = mbeanServer;
246    }
247
248    public MBeanServer getMBeanServer() {
249        return server;
250    }
251
252    public Boolean getRegisterAlways() {
253        return registerAlways != null && registerAlways;
254    }
255
256    public void setRegisterAlways(Boolean registerAlways) {
257        this.registerAlways = registerAlways;
258    }
259
260    public Boolean getRegisterNewRoutes() {
261        return registerNewRoutes != null && registerNewRoutes;
262    }
263
264    public void setRegisterNewRoutes(Boolean registerNewRoutes) {
265        this.registerNewRoutes = registerNewRoutes;
266    }
267
268    public Boolean getMask() {
269        return mask != null && mask;
270    }
271
272    public void setMask(Boolean mask) {
273        this.mask = mask;
274    }
275
276    public Boolean getIncludeHostName() {
277        return includeHostName != null && includeHostName;
278    }
279
280    public void setIncludeHostName(Boolean includeHostName) {
281        this.includeHostName = includeHostName;
282    }
283
284    public Boolean getUseHostIPAddress() {
285        return useHostIPAddress != null && useHostIPAddress;
286    }
287
288    public void setUseHostIPAddress(Boolean useHostIPAddress) {
289        this.useHostIPAddress = useHostIPAddress;
290    }
291
292    public String getManagementNamePattern() {
293        return managementNamePattern;
294    }
295
296    public void setManagementNamePattern(String managementNamePattern) {
297        this.managementNamePattern = managementNamePattern;
298    }
299
300    public Boolean getLoadStatisticsEnabled() {
301        return loadStatisticsEnabled;
302    }
303
304    public void setLoadStatisticsEnabled(Boolean loadStatisticsEnabled) {
305        this.loadStatisticsEnabled = loadStatisticsEnabled;
306    }
307
308    public Boolean getEndpointRuntimeStatisticsEnabled() {
309        return endpointRuntimeStatisticsEnabled;
310    }
311
312    public void setEndpointRuntimeStatisticsEnabled(Boolean endpointRuntimeStatisticsEnabled) {
313        this.endpointRuntimeStatisticsEnabled = endpointRuntimeStatisticsEnabled;
314    }
315
316    public ManagementStatisticsLevel getStatisticsLevel() {
317        return statisticsLevel;
318    }
319
320    public void setStatisticsLevel(ManagementStatisticsLevel statisticsLevel) {
321        this.statisticsLevel = statisticsLevel;
322    }
323
324    public CamelContext getCamelContext() {
325        return camelContext;
326    }
327
328    public void setCamelContext(CamelContext camelContext) {
329        this.camelContext = camelContext;
330    }
331
332    public void register(Object obj, ObjectName name) throws JMException {
333        register(obj, name, false);
334    }
335
336    public void register(Object obj, ObjectName name, boolean forceRegistration) throws JMException {
337        try {
338            registerMBeanWithServer(obj, name, forceRegistration);
339        } catch (NotCompliantMBeanException e) {
340            // If this is not a "normal" MBean, then try to deploy it using JMX annotations
341            ManagementMBeanAssembler assembler = camelContext.getManagementMBeanAssembler();
342            ObjectHelper.notNull(assembler, "ManagementMBeanAssembler", camelContext);
343            Object mbean = assembler.assemble(server, obj, name);
344            if (mbean != null) {
345                // and register the mbean
346                registerMBeanWithServer(mbean, name, forceRegistration);
347            }
348        }
349    }
350
351    public void unregister(ObjectName name) throws JMException {
352        if (isRegistered(name)) {
353            ObjectName on = mbeansRegistered.remove(name);
354            server.unregisterMBean(on);
355            LOG.debug("Unregistered MBean with ObjectName: {}", name);
356        } else {
357            mbeansRegistered.remove(name);
358        }
359    }
360
361    public boolean isRegistered(ObjectName name) {
362        if (server == null) {
363            return false;
364        }
365        ObjectName on = mbeansRegistered.get(name);
366        return (on != null && server.isRegistered(on))
367                || server.isRegistered(name);
368    }
369
370    public <T> T newProxyClient(ObjectName name, Class<T> mbean) {
371        if (isRegistered(name)) {
372            ObjectName on = mbeansRegistered.get(name);
373            return MBeanServerInvocationHandler.newProxyInstance(server, on != null ? on : name, mbean, false);
374        } else {
375            return null;
376        }
377    }
378
379    protected void doStart() throws Exception {
380        ObjectHelper.notNull(camelContext, "CamelContext");
381
382        // create mbean server if is has not be injected.
383        if (server == null) {
384            finalizeSettings();
385            createMBeanServer();
386        }
387
388        LOG.debug("Starting JMX agent on server: {}", getMBeanServer());
389    }
390
391    protected void doStop() throws Exception {
392        // close JMX Connector, if it was created
393        if (cs != null) {
394            try {
395                cs.stop();
396                LOG.debug("Stopped JMX Connector");
397            } catch (IOException e) {
398                LOG.debug("Error occurred during stopping JMXConnectorService: "
399                        + cs + ". This exception will be ignored.");
400            }
401            cs = null;
402        }
403
404        // Unexport JMX RMI registry, if it was created
405        if (registry != null) {
406            try {
407                UnicastRemoteObject.unexportObject(registry, true);
408                LOG.debug("Unexported JMX RMI Registry");
409            } catch (NoSuchObjectException e) {
410                LOG.debug("Error occurred while unexporting JMX RMI registry. This exception will be ignored.");
411            }
412        }
413
414        if (mbeansRegistered.isEmpty()) {
415            return;
416        }
417
418        // Using the array to hold the busMBeans to avoid the CurrentModificationException
419        ObjectName[] mBeans = mbeansRegistered.keySet().toArray(new ObjectName[mbeansRegistered.size()]);
420        int caught = 0;
421        for (ObjectName name : mBeans) {
422            try {
423                unregister(name);
424            } catch (Exception e) {
425                LOG.info("Exception unregistering MBean with name " + name, e);
426                caught++;
427            }
428        }
429        if (caught > 0) {
430            LOG.warn("A number of " + caught
431                     + " exceptions caught while unregistering MBeans during stop operation."
432                     + " See INFO log for details.");
433        }
434    }
435
436    private void registerMBeanWithServer(Object obj, ObjectName name, boolean forceRegistration)
437        throws JMException {
438
439        // have we already registered the bean, there can be shared instances in the camel routes
440        boolean exists = isRegistered(name);
441        if (exists) {
442            if (forceRegistration) {
443                LOG.info("ForceRegistration enabled, unregistering existing MBean with ObjectName: {}", name);
444                server.unregisterMBean(name);
445            } else {
446                // okay ignore we do not want to force it and it could be a shared instance
447                LOG.debug("MBean already registered with ObjectName: {}", name);
448            }
449        }
450
451        // register bean if by force or not exists
452        ObjectInstance instance = null;
453        if (forceRegistration || !exists) {
454            LOG.trace("Registering MBean with ObjectName: {}", name);
455            instance = server.registerMBean(obj, name);
456        }
457
458        // need to use the name returned from the server as some JEE servers may modify the name
459        if (instance != null) {
460            ObjectName registeredName = instance.getObjectName();
461            LOG.debug("Registered MBean with ObjectName: {}", registeredName);
462            mbeansRegistered.put(name, registeredName);
463        }
464    }
465
466    protected void createMBeanServer() {
467        String hostName;
468        boolean canAccessSystemProps = true;
469        try {
470            // we'll do it this way mostly to determine if we should lookup the hostName
471            SecurityManager sm = System.getSecurityManager();
472            if (sm != null) {
473                sm.checkPropertiesAccess();
474            }
475        } catch (SecurityException se) {
476            canAccessSystemProps = false;
477        }
478
479        if (canAccessSystemProps) {
480            try {
481                if (useHostIPAddress) {
482                    hostName = InetAddress.getLocalHost().getHostAddress();
483                } else {
484                    hostName = InetAddress.getLocalHost().getHostName();
485                }
486            } catch (UnknownHostException uhe) {
487                LOG.info("Cannot determine localhost name or address. Using default: " + DEFAULT_REGISTRY_PORT, uhe);
488                hostName = DEFAULT_HOST;
489            }
490        } else {
491            hostName = DEFAULT_HOST;
492        }
493
494        server = findOrCreateMBeanServer();
495
496        try {
497            // Create the connector if we need
498            if (createConnector) {
499                createJmxConnector(hostName);
500            }
501        } catch (IOException ioe) {
502            LOG.warn("Could not create and start JMX connector.", ioe);
503        }
504    }
505    
506    protected MBeanServer findOrCreateMBeanServer() {
507
508        // return platform mbean server if the option is specified.
509        if (usePlatformMBeanServer) {
510            return ManagementFactory.getPlatformMBeanServer();
511        }
512
513        // look for the first mbean server that has match default domain name
514        List<MBeanServer> servers = MBeanServerFactory.findMBeanServer(null);
515
516        for (MBeanServer server : servers) {
517            LOG.debug("Found MBeanServer with default domain {}", server.getDefaultDomain());
518
519            if (mBeanServerDefaultDomain.equals(server.getDefaultDomain())) {
520                return server;
521            }
522        }
523
524        // create a mbean server with the given default domain name
525        return MBeanServerFactory.createMBeanServer(mBeanServerDefaultDomain);
526    }
527
528    protected void createJmxConnector(String host) throws IOException {
529        ObjectHelper.notEmpty(serviceUrlPath, "serviceUrlPath");
530        ObjectHelper.notNull(registryPort, "registryPort");
531
532        try {
533            registry = LocateRegistry.createRegistry(registryPort);
534            LOG.debug("Created JMXConnector RMI registry on port {}", registryPort);
535        } catch (RemoteException ex) {
536            // The registry may had been created, we could get the registry instead
537        }
538
539        // must start with leading slash
540        String path = serviceUrlPath.startsWith("/") ? serviceUrlPath : "/" + serviceUrlPath;
541        // Create an RMI connector and start it
542        final JMXServiceURL url;
543        if (connectorPort > 0) {
544            url = new JMXServiceURL("service:jmx:rmi://" + host + ":" + connectorPort + "/jndi/rmi://" + host
545                                    + ":" + registryPort + path);
546        } else {
547            url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + registryPort + path);
548        }
549
550        cs = JMXConnectorServerFactory.newJMXConnectorServer(url, null, server);
551
552        // use async thread for starting the JMX Connector
553        // (no need to use a thread pool or enlist in JMX as this thread is terminated when the JMX connector has been started)
554        String threadName = camelContext.getExecutorServiceManager().resolveThreadName("JMXConnector: " + url);
555        Thread thread = getCamelContext().getExecutorServiceManager().newThread(threadName, new Runnable() {
556            public void run() {
557                try {
558                    LOG.debug("Staring JMX Connector thread to listen at: {}", url);
559                    cs.start();
560                    LOG.info("JMX Connector thread started and listening at: {}", url);
561                } catch (IOException ioe) {
562                    LOG.warn("Could not start JMXConnector thread at: " + url + ". JMX Connector not in use.", ioe);
563                }
564            }
565        });
566        thread.start();
567    }
568
569}