001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.File; 020import java.io.IOException; 021import java.net.URI; 022import java.util.*; 023import java.util.concurrent.atomic.AtomicInteger; 024 025import javax.management.MalformedObjectNameException; 026import javax.management.ObjectName; 027import javax.management.openmbean.CompositeData; 028import javax.management.openmbean.OpenDataException; 029 030import org.apache.activemq.ActiveMQConnectionMetaData; 031import org.apache.activemq.broker.BrokerService; 032import org.apache.activemq.broker.ConnectionContext; 033import org.apache.activemq.broker.TransportConnector; 034import org.apache.activemq.broker.region.Subscription; 035import org.apache.activemq.command.*; 036import org.apache.activemq.network.NetworkConnector; 037import org.apache.activemq.util.BrokerSupport; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041public class BrokerView implements BrokerViewMBean { 042 043 private static final Logger LOG = LoggerFactory.getLogger(BrokerView.class); 044 045 ManagedRegionBroker broker; 046 047 private final BrokerService brokerService; 048 private final AtomicInteger sessionIdCounter = new AtomicInteger(0); 049 private ObjectName jmsJobScheduler; 050 051 public BrokerView(BrokerService brokerService, ManagedRegionBroker managedBroker) throws Exception { 052 this.brokerService = brokerService; 053 this.broker = managedBroker; 054 } 055 056 public ManagedRegionBroker getBroker() { 057 return broker; 058 } 059 060 public void setBroker(ManagedRegionBroker broker) { 061 this.broker = broker; 062 } 063 064 @Override 065 public String getBrokerId() { 066 return safeGetBroker().getBrokerId().toString(); 067 } 068 069 @Override 070 public String getBrokerName() { 071 return safeGetBroker().getBrokerName(); 072 } 073 074 @Override 075 public String getBrokerVersion() { 076 return ActiveMQConnectionMetaData.PROVIDER_VERSION; 077 } 078 079 @Override 080 public String getUptime() { 081 return brokerService.getUptime(); 082 } 083 084 @Override 085 public long getUptimeMillis() { 086 return brokerService.getUptimeMillis(); 087 } 088 089 @Override 090 public int getCurrentConnectionsCount() { 091 return brokerService.getCurrentConnections(); 092 } 093 094 @Override 095 public long getTotalConnectionsCount() { 096 return brokerService.getTotalConnections(); 097 } 098 099 @Override 100 public void gc() throws Exception { 101 brokerService.getBroker().gc(); 102 try { 103 brokerService.getPersistenceAdapter().checkpoint(true); 104 } catch (IOException e) { 105 LOG.error("Failed to checkpoint persistence adapter on gc request", e); 106 } 107 } 108 109 @Override 110 public void start() throws Exception { 111 brokerService.start(); 112 } 113 114 @Override 115 public void stop() throws Exception { 116 brokerService.stop(); 117 } 118 119 @Override 120 public void restart() throws Exception { 121 if (brokerService.isRestartAllowed()) { 122 brokerService.requestRestart(); 123 brokerService.stop(); 124 } else { 125 throw new Exception("Restart is not allowed"); 126 } 127 } 128 129 @Override 130 public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception { 131 brokerService.stopGracefully(connectorName, queueName, timeout, pollInterval); 132 } 133 134 @Override 135 public long getTotalEnqueueCount() { 136 return safeGetBroker().getDestinationStatistics().getEnqueues().getCount(); 137 } 138 139 @Override 140 public long getTotalDequeueCount() { 141 return safeGetBroker().getDestinationStatistics().getDequeues().getCount(); 142 } 143 144 @Override 145 public long getTotalConsumerCount() { 146 return safeGetBroker().getDestinationStatistics().getConsumers().getCount(); 147 } 148 149 @Override 150 public long getTotalProducerCount() { 151 return safeGetBroker().getDestinationStatistics().getProducers().getCount(); 152 } 153 154 @Override 155 public long getTotalMessageCount() { 156 return safeGetBroker().getDestinationStatistics().getMessages().getCount(); 157 } 158 159 /** 160 * @return the average size of a message (bytes) 161 */ 162 @Override 163 public long getAverageMessageSize() { 164 // we are okay with the size without decimals so cast to long 165 return (long) safeGetBroker().getDestinationStatistics().getMessageSize().getAverageSize(); 166 } 167 168 /** 169 * @return the max size of a message (bytes) 170 */ 171 @Override 172 public long getMaxMessageSize() { 173 return safeGetBroker().getDestinationStatistics().getMessageSize().getMaxSize(); 174 } 175 176 /** 177 * @return the min size of a message (bytes) 178 */ 179 @Override 180 public long getMinMessageSize() { 181 return safeGetBroker().getDestinationStatistics().getMessageSize().getMinSize(); 182 } 183 184 public long getTotalMessagesCached() { 185 return safeGetBroker().getDestinationStatistics().getMessagesCached().getCount(); 186 } 187 188 @Override 189 public int getMemoryPercentUsage() { 190 return brokerService.getSystemUsage().getMemoryUsage().getPercentUsage(); 191 } 192 193 @Override 194 public long getMemoryLimit() { 195 return brokerService.getSystemUsage().getMemoryUsage().getLimit(); 196 } 197 198 @Override 199 public void setMemoryLimit(long limit) { 200 brokerService.getSystemUsage().getMemoryUsage().setLimit(limit); 201 } 202 203 @Override 204 public long getStoreLimit() { 205 return brokerService.getSystemUsage().getStoreUsage().getLimit(); 206 } 207 208 @Override 209 public int getStorePercentUsage() { 210 return brokerService.getSystemUsage().getStoreUsage().getPercentUsage(); 211 } 212 213 @Override 214 public long getTempLimit() { 215 return brokerService.getSystemUsage().getTempUsage().getLimit(); 216 } 217 218 @Override 219 public int getTempPercentUsage() { 220 return brokerService.getSystemUsage().getTempUsage().getPercentUsage(); 221 } 222 223 @Override 224 public long getJobSchedulerStoreLimit() { 225 return brokerService.getSystemUsage().getJobSchedulerUsage().getLimit(); 226 } 227 228 @Override 229 public int getJobSchedulerStorePercentUsage() { 230 return brokerService.getSystemUsage().getJobSchedulerUsage().getPercentUsage(); 231 } 232 233 @Override 234 public void setStoreLimit(long limit) { 235 brokerService.getSystemUsage().getStoreUsage().setLimit(limit); 236 } 237 238 @Override 239 public void setTempLimit(long limit) { 240 brokerService.getSystemUsage().getTempUsage().setLimit(limit); 241 } 242 243 @Override 244 public void setJobSchedulerStoreLimit(long limit) { 245 brokerService.getSystemUsage().getJobSchedulerUsage().setLimit(limit); 246 } 247 248 @Override 249 public void resetStatistics() { 250 safeGetBroker().getDestinationStatistics().reset(); 251 } 252 253 @Override 254 public void enableStatistics() { 255 safeGetBroker().getDestinationStatistics().setEnabled(true); 256 } 257 258 @Override 259 public void disableStatistics() { 260 safeGetBroker().getDestinationStatistics().setEnabled(false); 261 } 262 263 @Override 264 public boolean isStatisticsEnabled() { 265 return safeGetBroker().getDestinationStatistics().isEnabled(); 266 } 267 268 @Override 269 public boolean isPersistent() { 270 return brokerService.isPersistent(); 271 } 272 273 @Override 274 public void terminateJVM(int exitCode) { 275 System.exit(exitCode); 276 } 277 278 @Override 279 public ObjectName[] getTopics() { 280 return safeGetBroker().getTopics(); 281 } 282 283 @Override 284 public ObjectName[] getQueues() { 285 return safeGetBroker().getQueues(); 286 } 287 288 @Override 289 public String queryQueues(String filter, int page, int pageSize) throws IOException { 290 return DestinationsViewFilter.create(filter) 291 .setDestinations(safeGetBroker().getQueueViews()) 292 .filter(page, pageSize); 293 } 294 295 @Override 296 public String queryTopics(String filter, int page, int pageSize) throws IOException { 297 return DestinationsViewFilter.create(filter) 298 .setDestinations(safeGetBroker().getTopicViews()) 299 .filter(page, pageSize); 300 } 301 302 public CompositeData[] browseQueue(String queueName) throws OpenDataException, MalformedObjectNameException { 303 return safeGetBroker().getQueueView(queueName).browse(); 304 } 305 306 @Override 307 public ObjectName[] getTemporaryTopics() { 308 return safeGetBroker().getTemporaryTopics(); 309 } 310 311 @Override 312 public ObjectName[] getTemporaryQueues() { 313 return safeGetBroker().getTemporaryQueues(); 314 } 315 316 @Override 317 public ObjectName[] getTopicSubscribers() { 318 return safeGetBroker().getTopicSubscribers(); 319 } 320 321 @Override 322 public ObjectName[] getDurableTopicSubscribers() { 323 return safeGetBroker().getDurableTopicSubscribers(); 324 } 325 326 @Override 327 public ObjectName[] getQueueSubscribers() { 328 return safeGetBroker().getQueueSubscribers(); 329 } 330 331 @Override 332 public ObjectName[] getTemporaryTopicSubscribers() { 333 return safeGetBroker().getTemporaryTopicSubscribers(); 334 } 335 336 @Override 337 public ObjectName[] getTemporaryQueueSubscribers() { 338 return safeGetBroker().getTemporaryQueueSubscribers(); 339 } 340 341 @Override 342 public ObjectName[] getInactiveDurableTopicSubscribers() { 343 return safeGetBroker().getInactiveDurableTopicSubscribers(); 344 } 345 346 @Override 347 public ObjectName[] getTopicProducers() { 348 return safeGetBroker().getTopicProducers(); 349 } 350 351 @Override 352 public ObjectName[] getQueueProducers() { 353 return safeGetBroker().getQueueProducers(); 354 } 355 356 @Override 357 public ObjectName[] getTemporaryTopicProducers() { 358 return safeGetBroker().getTemporaryTopicProducers(); 359 } 360 361 @Override 362 public ObjectName[] getTemporaryQueueProducers() { 363 return safeGetBroker().getTemporaryQueueProducers(); 364 } 365 366 @Override 367 public ObjectName[] getDynamicDestinationProducers() { 368 return safeGetBroker().getDynamicDestinationProducers(); 369 } 370 371 @Override 372 public String addConnector(String discoveryAddress) throws Exception { 373 TransportConnector connector = brokerService.addConnector(discoveryAddress); 374 if (connector == null) { 375 throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress); 376 } 377 brokerService.startTransportConnector(connector); 378 return connector.getName(); 379 } 380 381 @Override 382 public String addNetworkConnector(String discoveryAddress) throws Exception { 383 NetworkConnector connector = brokerService.addNetworkConnector(discoveryAddress); 384 if (connector == null) { 385 throw new NoSuchElementException("Not connector matched the given name: " + discoveryAddress); 386 } 387 brokerService.registerNetworkConnectorMBean(connector); 388 connector.start(); 389 return connector.getName(); 390 } 391 392 @Override 393 public boolean removeConnector(String connectorName) throws Exception { 394 TransportConnector connector = brokerService.getConnectorByName(connectorName); 395 if (connector == null) { 396 throw new NoSuchElementException("Not connector matched the given name: " + connectorName); 397 } 398 connector.stop(); 399 return brokerService.removeConnector(connector); 400 } 401 402 @Override 403 public boolean removeNetworkConnector(String connectorName) throws Exception { 404 NetworkConnector connector = brokerService.getNetworkConnectorByName(connectorName); 405 if (connector == null) { 406 throw new NoSuchElementException("Not connector matched the given name: " + connectorName); 407 } 408 connector.stop(); 409 return brokerService.removeNetworkConnector(connector); 410 } 411 412 @Override 413 public void addTopic(String name) throws Exception { 414 safeGetBroker().getContextBroker() 415 .addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), true); 416 } 417 418 @Override 419 public void addQueue(String name) throws Exception { 420 safeGetBroker().getContextBroker() 421 .addDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), true); 422 } 423 424 @Override 425 public void removeTopic(String name) throws Exception { 426 safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQTopic(name), 1000); 427 } 428 429 @Override 430 public void removeQueue(String name) throws Exception { 431 safeGetBroker().getContextBroker().removeDestination(BrokerSupport.getConnectionContext(safeGetBroker().getContextBroker()), new ActiveMQQueue(name), 1000); 432 } 433 434 @Override 435 public ObjectName createDurableSubscriber(String clientId, String subscriberName, String topicName, String selector) throws Exception { 436 ConnectionContext context = getConnectionContext(); 437 context.setBroker(safeGetBroker()); 438 context.setClientId(clientId); 439 ConsumerInfo info = new ConsumerInfo(); 440 ConsumerId consumerId = new ConsumerId(); 441 consumerId.setConnectionId(clientId); 442 consumerId.setSessionId(sessionIdCounter.incrementAndGet()); 443 consumerId.setValue(0); 444 info.setConsumerId(consumerId); 445 info.setDestination(new ActiveMQTopic(topicName)); 446 info.setSubscriptionName(subscriberName); 447 info.setSelector(selector); 448 Subscription subscription = safeGetBroker().addConsumer(context, info); 449 safeGetBroker().removeConsumer(context, info); 450 if (subscription != null) { 451 return subscription.getObjectName(); 452 } 453 return null; 454 } 455 456 @Override 457 public void destroyDurableSubscriber(String clientId, String subscriberName) throws Exception { 458 RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); 459 info.setClientId(clientId); 460 info.setSubscriptionName(subscriberName); 461 ConnectionContext context = getConnectionContext(); 462 context.setBroker(safeGetBroker()); 463 context.setClientId(clientId); 464 brokerService.getBroker().removeSubscription(context, info); 465 } 466 467 @Override 468 public void reloadLog4jProperties() throws Throwable { 469 Log4JConfigView.doReloadLog4jProperties(); 470 } 471 472 @Override 473 public Map<String, String> getTransportConnectors() { 474 Map<String, String> answer = new HashMap<String, String>(); 475 try { 476 for (TransportConnector connector : brokerService.getTransportConnectors()) { 477 answer.put(connector.getName(), connector.getConnectUri().toString()); 478 } 479 } catch (Exception e) { 480 LOG.debug("Failed to read URI to build transport connectors map", e); 481 } 482 return answer; 483 } 484 485 @Override 486 public String getTransportConnectorByType(String type) { 487 return brokerService.getTransportConnectorURIsAsMap().get(type); 488 } 489 490 @Override 491 public String getVMURL() { 492 URI answer = brokerService.getVmConnectorURI(); 493 return answer != null ? answer.toString() : ""; 494 } 495 496 @Override 497 public String getDataDirectory() { 498 File file = brokerService.getDataDirectoryFile(); 499 try { 500 return file != null ? file.getCanonicalPath() : ""; 501 } catch (IOException e) { 502 return ""; 503 } 504 } 505 506 @Override 507 public ObjectName getJMSJobScheduler() { 508 return this.jmsJobScheduler; 509 } 510 511 public void setJMSJobScheduler(ObjectName name) { 512 this.jmsJobScheduler = name; 513 } 514 515 @Override 516 public boolean isSlave() { 517 return brokerService.isSlave(); 518 } 519 520 private ManagedRegionBroker safeGetBroker() { 521 if (broker == null) { 522 throw new IllegalStateException("Broker is not yet started."); 523 } 524 525 return broker; 526 } 527 528 private ConnectionContext getConnectionContext() { 529 ConnectionContext context; 530 if (broker == null) { 531 context = new ConnectionContext(); 532 } else { 533 ConnectionContext sharedContext = BrokerSupport.getConnectionContext(broker.getContextBroker()); 534 // Make a local copy of the sharedContext. We do this because we do 535 // not want to set a clientId on the 536 // global sharedContext. Taking a copy of the sharedContext is a 537 // good way to make sure that we are not 538 // messing up the shared context 539 context = sharedContext.copy(); 540 } 541 542 return context; 543 } 544}