001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.web;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.Collection;
022import java.util.HashMap;
023import java.util.HashSet;
024import java.util.List;
025import java.util.Map;
026import java.util.Set;
027
028import javax.management.MBeanServerConnection;
029import javax.management.MBeanServerInvocationHandler;
030import javax.management.MalformedObjectNameException;
031import javax.management.ObjectName;
032import javax.management.QueryExp;
033import javax.management.remote.JMXConnector;
034import javax.management.remote.JMXConnectorFactory;
035import javax.management.remote.JMXServiceURL;
036
037import org.apache.activemq.broker.jmx.BrokerViewMBean;
038import org.apache.activemq.broker.jmx.ManagementContext;
039import org.apache.activemq.broker.jmx.QueueViewMBean;
040import org.apache.activemq.command.ActiveMQDestination;
041import org.apache.activemq.web.config.WebConsoleConfiguration;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045/**
046 * A {@link BrokerFacade} which uses a JMX-Connection to communicate with a
047 * broker
048 */
049public class RemoteJMXBrokerFacade extends BrokerFacadeSupport {
050
051    private static final transient Logger LOG = LoggerFactory.getLogger(RemoteJMXBrokerFacade.class);
052
053    private String brokerName;
054    private JMXConnector connector;
055    private WebConsoleConfiguration configuration;
056
057    public void setBrokerName(String brokerName) {
058        this.brokerName = brokerName;
059    }
060
061    public WebConsoleConfiguration getConfiguration() {
062        return configuration;
063    }
064
065    public void setConfiguration(WebConsoleConfiguration configuration) {
066        this.configuration = configuration;
067    }
068
069    /**
070     * Shutdown this facade aka close any open connection.
071     */
072    public void shutdown() {
073        closeConnection();
074    }
075
076    @Override
077    public BrokerViewMBean getBrokerAdmin() throws Exception {
078        MBeanServerConnection connection = getMBeanServerConnection();
079
080        Set<ObjectName> brokers = findBrokers(connection);
081        if (brokers.size() == 0) {
082            throw new IOException("No broker could be found in the JMX.");
083        }
084        ObjectName name = brokers.iterator().next();
085        BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(connection, name, BrokerViewMBean.class, true);
086        return mbean;
087    }
088
089    @Override
090    public String getBrokerName() throws Exception, MalformedObjectNameException {
091        return getBrokerAdmin().getBrokerName();
092    }
093
094    protected MBeanServerConnection getMBeanServerConnection() throws Exception {
095        JMXConnector connector = this.connector;
096        if (isConnectionActive(connector)) {
097            return connector.getMBeanServerConnection();
098        }
099
100        synchronized (this) {
101            closeConnection();
102            LOG.debug("Creating a new JMX-Connection to the broker");
103            this.connector = createConnection();
104            return this.connector.getMBeanServerConnection();
105        }
106    }
107
108    protected boolean isConnectionActive(JMXConnector connector) {
109        if (connector == null) {
110            return false;
111        }
112
113        try {
114            MBeanServerConnection connection = connector.getMBeanServerConnection();
115            int brokerCount = findBrokers(connection).size();
116            return brokerCount > 0;
117        } catch (Exception e) {
118            return false;
119        }
120    }
121
122    protected JMXConnector createConnection() {
123
124        Map<String, Object> env = new HashMap<String, Object>();
125        if (this.configuration.getJmxUser() != null) {
126            env.put("jmx.remote.credentials", new String[] { this.configuration.getJmxUser(), this.configuration.getJmxPassword() });
127        }
128        Collection<JMXServiceURL> jmxUrls = this.configuration.getJmxUrls();
129
130        Exception exception = null;
131        for (JMXServiceURL url : jmxUrls) {
132            try {
133                JMXConnector connector = JMXConnectorFactory.connect(url, env);
134                connector.connect();
135                MBeanServerConnection connection = connector.getMBeanServerConnection();
136
137                Set<ObjectName> brokers = findBrokers(connection);
138                if (brokers.size() > 0) {
139                    LOG.info("Connected via JMX to the broker at " + url);
140                    return connector;
141                }
142            } catch (Exception e) {
143                // Keep the exception for later
144                exception = e;
145            }
146        }
147        if (exception != null) {
148            if (exception instanceof RuntimeException) {
149                throw (RuntimeException) exception;
150            } else {
151                throw new RuntimeException(exception);
152            }
153        }
154        throw new IllegalStateException("No broker is found at any of the " + jmxUrls.size() + " configured urls");
155    }
156
157    protected synchronized void closeConnection() {
158        if (connector != null) {
159            try {
160                LOG.debug("Closing a connection to a broker (" + connector.getConnectionId() + ")");
161                connector.close();
162            } catch (IOException e) {
163                // Ignore the exception, since it most likly won't matter anymore
164            }
165        }
166    }
167
168    /**
169     * Finds all ActiveMQ-Brokers registered on a certain JMX-Server or, if a
170     * JMX-BrokerName has been set, the broker with that name.
171     *
172     * @param connection
173     *            not <code>null</code>
174     * @return Set with ObjectName-elements
175     * @throws IOException
176     * @throws MalformedObjectNameException
177     */
178    protected Set<ObjectName> findBrokers(MBeanServerConnection connection) throws IOException, MalformedObjectNameException {
179        ObjectName name;
180        if (this.brokerName == null) {
181            name = new ObjectName("org.apache.activemq:type=Broker,brokerName=*");
182        } else {
183            name = new ObjectName("org.apache.activemq:type=Broker,brokerName=" + this.brokerName);
184        }
185
186        Set<ObjectName> brokers = connection.queryNames(name, null);
187        Set<ObjectName> masterBrokers = new HashSet<ObjectName>();
188        for (ObjectName objectName : brokers) {
189            BrokerViewMBean mbean = MBeanServerInvocationHandler.newProxyInstance(connection, objectName, BrokerViewMBean.class, true);
190            if (!mbean.isSlave())
191                masterBrokers.add(objectName);
192        }
193        return masterBrokers;
194    }
195
196    @Override
197    public void purgeQueue(ActiveMQDestination destination) throws Exception {
198        QueueViewMBean queue = getQueue(destination.getPhysicalName());
199        queue.purge();
200    }
201
202    @Override
203    public ManagementContext getManagementContext() {
204        throw new IllegalStateException("not supported");
205    }
206
207    @Override
208    protected <T> Collection<T> getManagedObjects(ObjectName[] names, Class<T> type) {
209        MBeanServerConnection connection;
210        try {
211            connection = getMBeanServerConnection();
212        } catch (Exception e) {
213            throw new RuntimeException(e);
214        }
215
216        List<T> answer = new ArrayList<T>();
217        if (connection != null) {
218            for (int i = 0; i < names.length; i++) {
219                ObjectName name = names[i];
220                T value = MBeanServerInvocationHandler.newProxyInstance(connection, name, type, true);
221                if (value != null) {
222                    answer.add(value);
223                }
224            }
225        }
226        return answer;
227    }
228
229    @Override
230    public Set queryNames(ObjectName name, QueryExp query) throws Exception {
231        return getMBeanServerConnection().queryNames(name, query);
232    }
233
234    @Override
235    public Object newProxyInstance(ObjectName objectName, Class interfaceClass, boolean notificationBroadcaster) throws Exception {
236        return MBeanServerInvocationHandler.newProxyInstance(getMBeanServerConnection(), objectName, interfaceClass, notificationBroadcaster);
237    }
238}