001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.File;
020import java.io.IOException;
021import java.net.URI;
022import java.util.*;
023import java.util.concurrent.atomic.AtomicInteger;
024
025import javax.management.MalformedObjectNameException;
026import javax.management.ObjectName;
027import javax.management.openmbean.CompositeData;
028import javax.management.openmbean.OpenDataException;
029
030import org.apache.activemq.ActiveMQConnectionMetaData;
031import org.apache.activemq.broker.BrokerService;
032import org.apache.activemq.broker.ConnectionContext;
033import org.apache.activemq.broker.TransportConnector;
034import org.apache.activemq.broker.region.Subscription;
035import org.apache.activemq.command.*;
036import org.apache.activemq.network.NetworkConnector;
037import org.apache.activemq.util.BrokerSupport;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041public class BrokerView implements BrokerViewMBean {
042
043    private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class);
044
045    ManagedRegionBroker broker;
046
047    private final BrokerService brokerService;
048    private final AtomicInteger sessionIdCounter = new AtomicInteger(0);
049    private ObjectName jmsJobScheduler;
050
051    public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception {
052        this.brokerService = brokerService;
053        this.broker = managedBroker;
054    }
055
056    public ManagedRegionBroker getBroker() {
057        return broker;
058    }
059
060    public void setBroker(ManagedRegionBroker broker) {
061        this.broker = broker;
062    }
063
064    @Override
065    public String getBrokerId() {
066        return safeGetBroker().getBrokerId().toString();
067    }
068
069    @Override
070    public String getBrokerName() {
071        return safeGetBroker().getBrokerName();
072    }
073
074    @Override
075    public String getBrokerVersion() {
076        return ActiveMQConnectionMetaData.PROVIDER_VERSION;
077    }
078
079    @Override
080    public String getUptime() {
081        return brokerService.getUptime();
082    }
083
084    @Override
085    public long getUptimeMillis() {
086        return brokerService.getUptimeMillis();
087    }
088
089    @Override
090    public int getCurrentConnectionsCount() {
091        return brokerService.getCurrentConnections();
092    }
093
094    @Override
095    public long getTotalConnectionsCount() {
096        return brokerService.getTotalConnections();
097    }
098
099    @Override
100    public void gc() throws Exception {
101        brokerService.getBroker().gc();
102        try {
103            brokerService.getPersistenceAdapter().checkpoint(true);
104        } catch (IOException e) {
105            LOG.error("Failed to checkpoint persistence adapter on gc request", e);
106        }
107    }
108
109    @Override
110    public void start() throws Exception {
111        brokerService.start();
112    }
113
114    @Override
115    public void stop() throws Exception {
116        brokerService.stop();
117    }
118
119    @Override
120    public void restart() throws Exception {
121        if (brokerService.isRestartAllowed()) {
122            brokerService.requestRestart();
123            brokerService.stop();
124        } else {
125            throw new Exception("Restart is not allowed");
126        }
127    }
128
129    @Override
130    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
131        brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval);
132    }
133
134    @Override
135    public long getTotalEnqueueCount() {
136        return safeGetBroker().getDestinationStatistics().getEnqueues().getCount();
137    }
138
139    @Override
140    public long getTotalDequeueCount() {
141        return safeGetBroker().getDestinationStatistics().getDequeues().getCount();
142    }
143
144    @Override
145    public long getTotalConsumerCount() {
146        return safeGetBroker().getDestinationStatistics().getConsumers().getCount();
147    }
148
149    @Override
150    public long getTotalProducerCount() {
151        return safeGetBroker().getDestinationStatistics().getProducers().getCount();
152    }
153
154    @Override
155    public long getTotalMessageCount() {
156        return safeGetBroker().getDestinationStatistics().getMessages().getCount();
157    }
158
159    /**
160     * @return the average size of a message (bytes)
161     */
162    @Override
163    public long getAverageMessageSize() {
164        // we are okay with the size without decimals so cast to long
165        return (long) safeGetBroker().getDestinationStatistics().getMessageSize().getAverageSize();
166    }
167
168    /**
169     * @return the max size of a message (bytes)
170     */
171    @Override
172    public long getMaxMessageSize() {
173        return safeGetBroker().getDestinationStatistics().getMessageSize().getMaxSize();
174    }
175
176    /**
177     * @return the min size of a message (bytes)
178     */
179    @Override
180    public long getMinMessageSize() {
181        return safeGetBroker().getDestinationStatistics().getMessageSize().getMinSize();
182    }
183
184    public long getTotalMessagesCached() {
185        return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount();
186    }
187
188    @Override
189    public int getMemoryPercentUsage() {
190        return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage();
191    }
192
193    @Override
194    public long getMemoryLimit() {
195        return brokerService.getSystemUsage().getMemoryUsage().getLimit();
196    }
197
198    @Override
199    public void setMemoryLimit(long limit) {
200        brokerService.getSystemUsage().getMemoryUsage().setLimit(limit);
201    }
202
203    @Override
204    public long getStoreLimit() {
205        return brokerService.getSystemUsage().getStoreUsage().getLimit();
206    }
207
208    @Override
209    public int getStorePercentUsage() {
210        return brokerService.getSystemUsage().getStoreUsage().getPercentUsage();
211    }
212
213    @Override
214    public long getTempLimit() {
215        return brokerService.getSystemUsage().getTempUsage().getLimit();
216    }
217
218    @Override
219    public int getTempPercentUsage() {
220        return brokerService.getSystemUsage().getTempUsage().getPercentUsage();
221    }
222
223    @Override
224    public long getJobSchedulerStoreLimit() {
225        return brokerService.getSystemUsage().getJobSchedulerUsage().getLimit();
226    }
227
228    @Override
229    public int getJobSchedulerStorePercentUsage() {
230        return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage();
231    }
232
233    @Override
234    public void setStoreLimit(long limit) {
235        brokerService.getSystemUsage().getStoreUsage().setLimit(limit);
236    }
237
238    @Override
239    public void setTempLimit(long limit) {
240        brokerService.getSystemUsage().getTempUsage().setLimit(limit);
241    }
242
243    @Override
244    public void setJobSchedulerStoreLimit(long limit) {
245        brokerService.getSystemUsage().getJobSchedulerUsage().setLimit(limit);
246    }
247
248    @Override
249    public void resetStatistics() {
250        safeGetBroker().getDestinationStatistics().reset();
251    }
252
253    @Override
254    public void enableStatistics() {
255        safeGetBroker().getDestinationStatistics().setEnabled(true);
256    }
257
258    @Override
259    public void disableStatistics() {
260        safeGetBroker().getDestinationStatistics().setEnabled(false);
261    }
262
263    @Override
264    public boolean isStatisticsEnabled() {
265        return safeGetBroker().getDestinationStatistics().isEnabled();
266    }
267
268    @Override
269    public boolean isPersistent() {
270        return brokerService.isPersistent();
271    }
272
273    @Override
274    public void terminateJVM(int exitCode) {
275        System.exit(exitCode);
276    }
277
278    @Override
279    public ObjectName[] getTopics() {
280        return safeGetBroker().getTopics();
281    }
282
283    @Override
284    public ObjectName[] getQueues() {
285        return safeGetBroker().getQueues();
286    }
287
288    @Override
289    public String queryQueues(String filter, int page, int pageSize) throws IOException {
290        return DestinationsViewFilter.create(filter)
291                .setDestinations(safeGetBroker().getQueueViews())
292                .filter(page, pageSize);
293    }
294
295    @Override
296    public String queryTopics(String filter, int page, int pageSize) throws IOException {
297        return DestinationsViewFilter.create(filter)
298                .setDestinations(safeGetBroker().getTopicViews())
299                .filter(page, pageSize);
300    }
301
302    public CompositeData[] browseQueue(String queueName) throws OpenDataException, MalformedObjectNameException {
303       return safeGetBroker().getQueueView(queueName).browse();
304    }
305
306    @Override
307    public ObjectName[] getTemporaryTopics() {
308        return safeGetBroker().getTemporaryTopics();
309    }
310
311    @Override
312    public ObjectName[] getTemporaryQueues() {
313        return safeGetBroker().getTemporaryQueues();
314    }
315
316    @Override
317    public ObjectName[] getTopicSubscribers() {
318        return safeGetBroker().getTopicSubscribers();
319    }
320
321    @Override
322    public ObjectName[] getDurableTopicSubscribers() {
323        return safeGetBroker().getDurableTopicSubscribers();
324    }
325
326    @Override
327    public ObjectName[] getQueueSubscribers() {
328        return safeGetBroker().getQueueSubscribers();
329    }
330
331    @Override
332    public ObjectName[] getTemporaryTopicSubscribers() {
333        return safeGetBroker().getTemporaryTopicSubscribers();
334    }
335
336    @Override
337    public ObjectName[] getTemporaryQueueSubscribers() {
338        return safeGetBroker().getTemporaryQueueSubscribers();
339    }
340
341    @Override
342    public ObjectName[] getInactiveDurableTopicSubscribers() {
343        return safeGetBroker().getInactiveDurableTopicSubscribers();
344    }
345
346    @Override
347    public ObjectName[] getTopicProducers() {
348        return safeGetBroker().getTopicProducers();
349    }
350
351    @Override
352    public ObjectName[] getQueueProducers() {
353        return safeGetBroker().getQueueProducers();
354    }
355
356    @Override
357    public ObjectName[] getTemporaryTopicProducers() {
358        return safeGetBroker().getTemporaryTopicProducers();
359    }
360
361    @Override
362    public ObjectName[] getTemporaryQueueProducers() {
363        return safeGetBroker().getTemporaryQueueProducers();
364    }
365
366    @Override
367    public ObjectName[] getDynamicDestinationProducers() {
368        return safeGetBroker().getDynamicDestinationProducers();
369    }
370
371    @Override
372    public String addConnector(String discoveryAddress) throws Exception {
373        TransportConnector connector = brokerService.addConnector(discoveryAddress);
374        if (connector == null) {
375            throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
376        }
377        brokerService.startTransportConnector(connector);
378        return connector.getName();
379    }
380
381    @Override
382    public String addNetworkConnector(String discoveryAddress) throws Exception {
383        NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress);
384        if (connector == null) {
385            throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress);
386        }
387        brokerService.registerNetworkConnectorMBean(connector);
388        connector.start();
389        return connector.getName();
390    }
391
392    @Override
393    public boolean removeConnector(String connectorName) throws Exception {
394        TransportConnector connector = brokerService.getConnectorByName(connectorName);
395        if (connector == null) {
396            throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
397        }
398        connector.stop();
399        return brokerService.removeConnector(connector);
400    }
401
402    @Override
403    public boolean removeNetworkConnector(String connectorName) throws Exception {
404        NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName);
405        if (connector == null) {
406            throw new NoSuchElementException("Not connector matched the given name: " + connectorName);
407        }
408        connector.stop();
409        return brokerService.removeNetworkConnector(connector);
410    }
411
412    @Override
413    public void addTopic(String name) throws Exception {
414        safeGetBroker().getContextBroker()
415            .addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), true);
416    }
417
418    @Override
419    public void addQueue(String name) throws Exception {
420        safeGetBroker().getContextBroker()
421            .addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), true);
422    }
423
424    @Override
425    public void removeTopic(String name) throws Exception {
426        safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000);
427    }
428
429    @Override
430    public void removeQueue(String name) throws Exception {
431        safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000);
432    }
433
434    @Override
435    public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception {
436        ConnectionContext context = getConnectionContext();
437        context.setBroker(safeGetBroker());
438        context.setClientId(clientId);
439        ConsumerInfo info = new ConsumerInfo();
440        ConsumerId consumerId = new ConsumerId();
441        consumerId.setConnectionId(clientId);
442        consumerId.setSessionId(sessionIdCounter.incrementAndGet());
443        consumerId.setValue(0);
444        info.setConsumerId(consumerId);
445        info.setDestination(new ActiveMQTopic(topicName));
446        info.setSubscriptionName(subscriberName);
447        info.setSelector(selector);
448        Subscription subscription = safeGetBroker().addConsumer(context, info);
449        safeGetBroker().removeConsumer(context, info);
450        if (subscription != null) {
451            return subscription.getObjectName();
452        }
453        return null;
454    }
455
456    @Override
457    public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception {
458        RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
459        info.setClientId(clientId);
460        info.setSubscriptionName(subscriberName);
461        ConnectionContext context = getConnectionContext();
462        context.setBroker(safeGetBroker());
463        context.setClientId(clientId);
464        brokerService.getBroker().removeSubscription(context, info);
465    }
466
467    @Override
468    public void reloadLog4jProperties() throws Throwable {
469        Log4JConfigView.doReloadLog4jProperties();
470    }
471
472    @Override
473    public Map<String, String> getTransportConnectors() {
474        Map<String, String> answer = new HashMap<String, String>();
475        try {
476            for (TransportConnector connector : brokerService.getTransportConnectors()) {
477                answer.put(connector.getName(), connector.getConnectUri().toString());
478            }
479        } catch (Exception e) {
480            LOG.debug("Failed to read URI to build transport connectors map", e);
481        }
482        return answer;
483    }
484
485    @Override
486    public String getTransportConnectorByType(String type) {
487        return brokerService.getTransportConnectorURIsAsMap().get(type);
488    }
489
490    @Override
491    public String getVMURL() {
492        URI answer = brokerService.getVmConnectorURI();
493        return answer != null ? answer.toString() : "";
494    }
495
496    @Override
497    public String getDataDirectory() {
498        File file = brokerService.getDataDirectoryFile();
499        try {
500            return file != null ? file.getCanonicalPath() : "";
501        } catch (IOException e) {
502            return "";
503        }
504    }
505
506    @Override
507    public ObjectName getJMSJobScheduler() {
508        return this.jmsJobScheduler;
509    }
510
511    public void setJMSJobScheduler(ObjectName name) {
512        this.jmsJobScheduler = name;
513    }
514
515    @Override
516    public boolean isSlave() {
517        return brokerService.isSlave();
518    }
519
520    private ManagedRegionBroker safeGetBroker() {
521        if (broker == null) {
522            throw new IllegalStateException("Broker is not yet started.");
523        }
524
525        return broker;
526    }
527
528    private ConnectionContext getConnectionContext() {
529        ConnectionContext context;
530        if (broker == null) {
531            context = new ConnectionContext();
532        } else {
533            ConnectionContext sharedContext = BrokerSupport.getConnectionContext(broker.getContextBroker());
534            // Make a local copy of the sharedContext. We do this because we do
535            // not want to set a clientId on the
536            // global sharedContext. Taking a copy of the sharedContext is a
537            // good way to make sure that we are not
538            // messing up the shared context
539            context = sharedContext.copy();
540        }
541
542        return context;
543    }
544}