001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.IOException; 020import java.util.*; 021import java.util.Map.Entry; 022import java.util.concurrent.ConcurrentHashMap; 023import java.util.concurrent.CopyOnWriteArraySet; 024import java.util.concurrent.ExecutorService; 025import java.util.concurrent.ThreadPoolExecutor; 026 027import javax.jms.IllegalStateException; 028import javax.management.InstanceNotFoundException; 029import javax.management.MalformedObjectNameException; 030import javax.management.ObjectName; 031import javax.management.openmbean.CompositeData; 032import javax.management.openmbean.CompositeDataSupport; 033import javax.management.openmbean.CompositeType; 034import javax.management.openmbean.OpenDataException; 035import javax.management.openmbean.TabularData; 036import javax.management.openmbean.TabularDataSupport; 037import javax.management.openmbean.TabularType; 038 039import org.apache.activemq.broker.Broker; 040import org.apache.activemq.broker.BrokerService; 041import org.apache.activemq.broker.ConnectionContext; 042import org.apache.activemq.broker.ProducerBrokerExchange; 043import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 044import org.apache.activemq.broker.region.Destination; 045import org.apache.activemq.broker.region.DestinationFactory; 046import org.apache.activemq.broker.region.DestinationInterceptor; 047import org.apache.activemq.broker.region.DurableTopicSubscription; 048import org.apache.activemq.broker.region.MessageReference; 049import org.apache.activemq.broker.region.NullMessageReference; 050import org.apache.activemq.broker.region.Queue; 051import org.apache.activemq.broker.region.Region; 052import org.apache.activemq.broker.region.RegionBroker; 053import org.apache.activemq.broker.region.Subscription; 054import org.apache.activemq.broker.region.Topic; 055import org.apache.activemq.broker.region.TopicRegion; 056import org.apache.activemq.broker.region.TopicSubscription; 057import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; 058import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; 059import org.apache.activemq.command.ActiveMQDestination; 060import org.apache.activemq.command.ActiveMQMessage; 061import org.apache.activemq.command.ActiveMQTopic; 062import org.apache.activemq.command.ConnectionInfo; 063import org.apache.activemq.command.ConsumerInfo; 064import org.apache.activemq.command.Message; 065import org.apache.activemq.command.MessageAck; 066import org.apache.activemq.command.MessageId; 067import org.apache.activemq.command.ProducerInfo; 068import org.apache.activemq.command.SubscriptionInfo; 069import org.apache.activemq.thread.Scheduler; 070import org.apache.activemq.thread.TaskRunnerFactory; 071import org.apache.activemq.transaction.XATransaction; 072import org.apache.activemq.usage.SystemUsage; 073import org.apache.activemq.util.ServiceStopper; 074import org.apache.activemq.util.SubscriptionKey; 075import org.slf4j.Logger; 076import org.slf4j.LoggerFactory; 077 078public class ManagedRegionBroker extends RegionBroker { 079 private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class); 080 private final ManagementContext managementContext; 081 private final ObjectName brokerObjectName; 082 private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>(); 083 private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>(); 084 private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>(); 085 private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>(); 086 private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 087 private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 088 private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 089 private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 090 private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 091 private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 092 private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 093 private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 094 private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 095 private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 096 private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 097 private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>(); 098 private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>(); 099 private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>(); 100 /* This is the first broker in the broker interceptor chain. */ 101 private Broker contextBroker; 102 103 private final ExecutorService asyncInvokeService; 104 private final long mbeanTimeout; 105 106 public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, 107 DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException { 108 super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor); 109 this.managementContext = context; 110 this.brokerObjectName = brokerObjectName; 111 this.mbeanTimeout = brokerService.getMbeanInvocationTimeout(); 112 this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;; 113 } 114 115 @Override 116 public void start() throws Exception { 117 super.start(); 118 // build all existing durable subscriptions 119 buildExistingSubscriptions(); 120 } 121 122 @Override 123 protected void doStop(ServiceStopper stopper) { 124 super.doStop(stopper); 125 // lets remove any mbeans not yet removed 126 for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) { 127 ObjectName name = iter.next(); 128 try { 129 managementContext.unregisterMBean(name); 130 } catch (InstanceNotFoundException e) { 131 LOG.warn("The MBean {} is no longer registered with JMX", name); 132 } catch (Exception e) { 133 stopper.onException(this, e); 134 } 135 } 136 registeredMBeans.clear(); 137 } 138 139 @Override 140 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 141 return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 142 } 143 144 @Override 145 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 146 return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 147 } 148 149 @Override 150 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 151 return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 152 } 153 154 @Override 155 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 156 return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 157 } 158 159 public void register(ActiveMQDestination destName, Destination destination) { 160 // TODO refactor to allow views for custom destinations 161 try { 162 ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName); 163 DestinationView view; 164 if (destination instanceof Queue) { 165 view = new QueueView(this, (Queue)destination); 166 } else if (destination instanceof Topic) { 167 view = new TopicView(this, (Topic)destination); 168 } else { 169 view = null; 170 LOG.warn("JMX View is not supported for custom destination {}", destination); 171 } 172 if (view != null) { 173 registerDestination(objectName, destName, view); 174 } 175 } catch (Exception e) { 176 LOG.error("Failed to register destination {}", destName, e); 177 } 178 } 179 180 public void unregister(ActiveMQDestination destName) { 181 try { 182 ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName); 183 unregisterDestination(objectName); 184 } catch (Exception e) { 185 LOG.error("Failed to unregister {}", destName, e); 186 } 187 } 188 189 public ObjectName registerSubscription(ConnectionContext context, Subscription sub) { 190 String connectionClientId = context.getClientId(); 191 192 SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName()); 193 try { 194 ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, sub.getConsumerInfo()); 195 SubscriptionView view; 196 if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) { 197 // add offline subscribers to inactive list 198 SubscriptionInfo info = new SubscriptionInfo(); 199 info.setClientId(context.getClientId()); 200 info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName()); 201 info.setDestination(sub.getConsumerInfo().getDestination()); 202 info.setSelector(sub.getSelector()); 203 addInactiveSubscription(key, info, sub); 204 } else { 205 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null; 206 if (sub.getConsumerInfo().isDurable()) { 207 view = new DurableSubscriptionView(this, brokerService, context.getClientId(), userName, sub); 208 } else { 209 if (sub instanceof TopicSubscription) { 210 view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub); 211 } else { 212 view = new SubscriptionView(context.getClientId(), userName, sub); 213 } 214 } 215 registerSubscription(objectName, sub.getConsumerInfo(), key, view); 216 } 217 subscriptionMap.put(sub, objectName); 218 return objectName; 219 } catch (Exception e) { 220 LOG.error("Failed to register subscription {}", sub, e); 221 return null; 222 } 223 } 224 225 @Override 226 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 227 super.addConnection(context, info); 228 this.contextBroker.getBrokerService().incrementCurrentConnections(); 229 this.contextBroker.getBrokerService().incrementTotalConnections(); 230 } 231 232 @Override 233 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 234 super.removeConnection(context, info, error); 235 this.contextBroker.getBrokerService().decrementCurrentConnections(); 236 } 237 238 @Override 239 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 240 Subscription sub = super.addConsumer(context, info); 241 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName()); 242 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 243 if (inactiveName != null) { 244 // if it was inactive, register it 245 registerSubscription(context, sub); 246 } 247 return sub; 248 } 249 250 @Override 251 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 252 for (Subscription sub : subscriptionMap.keySet()) { 253 if (sub.getConsumerInfo().equals(info)) { 254 // unregister all consumer subs 255 unregisterSubscription(subscriptionMap.get(sub), true); 256 } 257 } 258 super.removeConsumer(context, info); 259 } 260 261 @Override 262 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 263 super.addProducer(context, info); 264 String connectionClientId = context.getClientId(); 265 ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info); 266 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null; 267 ProducerView view = new ProducerView(info, connectionClientId, userName, this); 268 registerProducer(objectName, info.getDestination(), view); 269 } 270 271 @Override 272 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 273 ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info); 274 unregisterProducer(objectName); 275 super.removeProducer(context, info); 276 } 277 278 @Override 279 public void send(ProducerBrokerExchange exchange, Message message) throws Exception { 280 if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) { 281 ProducerInfo info = exchange.getProducerState().getInfo(); 282 if (info.getDestination() == null && info.getProducerId() != null) { 283 ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, exchange.getConnectionContext().getClientId(), info); 284 ProducerView view = this.dynamicDestinationProducers.get(objectName); 285 if (view != null) { 286 ActiveMQDestination dest = message.getDestination(); 287 if (dest != null) { 288 view.setLastUsedDestinationName(dest); 289 } 290 } 291 } 292 } 293 super.send(exchange, message); 294 } 295 296 public void unregisterSubscription(Subscription sub) { 297 ObjectName name = subscriptionMap.remove(sub); 298 if (name != null) { 299 try { 300 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName()); 301 ObjectName inactiveName = subscriptionKeys.remove(subscriptionKey); 302 if (inactiveName != null) { 303 inactiveDurableTopicSubscribers.remove(inactiveName); 304 managementContext.unregisterMBean(inactiveName); 305 } 306 } catch (Exception e) { 307 LOG.error("Failed to unregister subscription {}", sub, e); 308 } 309 } 310 } 311 312 protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception { 313 if (dest.isQueue()) { 314 if (dest.isTemporary()) { 315 temporaryQueues.put(key, view); 316 } else { 317 queues.put(key, view); 318 } 319 } else { 320 if (dest.isTemporary()) { 321 temporaryTopics.put(key, view); 322 } else { 323 topics.put(key, view); 324 } 325 } 326 try { 327 if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) { 328 registeredMBeans.add(key); 329 } 330 } catch (Throwable e) { 331 LOG.warn("Failed to register MBean {}", key); 332 LOG.debug("Failure reason: ", e); 333 } 334 } 335 336 protected void unregisterDestination(ObjectName key) throws Exception { 337 338 DestinationView view = removeAndRemember(topics, key, null); 339 view = removeAndRemember(queues, key, view); 340 view = removeAndRemember(temporaryQueues, key, view); 341 view = removeAndRemember(temporaryTopics, key, view); 342 if (registeredMBeans.remove(key)) { 343 try { 344 managementContext.unregisterMBean(key); 345 } catch (Throwable e) { 346 LOG.warn("Failed to unregister MBean {}", key); 347 LOG.debug("Failure reason: ", e); 348 } 349 } 350 if (view != null) { 351 key = view.getSlowConsumerStrategy(); 352 if (key!= null && registeredMBeans.remove(key)) { 353 try { 354 managementContext.unregisterMBean(key); 355 } catch (Throwable e) { 356 LOG.warn("Failed to unregister slow consumer strategy MBean {}", key); 357 LOG.debug("Failure reason: ", e); 358 } 359 } 360 } 361 } 362 363 protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception { 364 365 if (dest != null) { 366 if (dest.isQueue()) { 367 if (dest.isTemporary()) { 368 temporaryQueueProducers.put(key, view); 369 } else { 370 queueProducers.put(key, view); 371 } 372 } else { 373 if (dest.isTemporary()) { 374 temporaryTopicProducers.put(key, view); 375 } else { 376 topicProducers.put(key, view); 377 } 378 } 379 } else { 380 dynamicDestinationProducers.put(key, view); 381 } 382 383 try { 384 if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) { 385 registeredMBeans.add(key); 386 } 387 } catch (Throwable e) { 388 LOG.warn("Failed to register MBean {}", key); 389 LOG.debug("Failure reason: ", e); 390 } 391 } 392 393 protected void unregisterProducer(ObjectName key) throws Exception { 394 queueProducers.remove(key); 395 topicProducers.remove(key); 396 temporaryQueueProducers.remove(key); 397 temporaryTopicProducers.remove(key); 398 dynamicDestinationProducers.remove(key); 399 if (registeredMBeans.remove(key)) { 400 try { 401 managementContext.unregisterMBean(key); 402 } catch (Throwable e) { 403 LOG.warn("Failed to unregister MBean {}", key); 404 LOG.debug("Failure reason: ", e); 405 } 406 } 407 } 408 409 private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) { 410 DestinationView candidate = map.remove(key); 411 if (candidate != null && view == null) { 412 view = candidate; 413 } 414 return candidate != null ? candidate : view; 415 } 416 417 protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception { 418 ActiveMQDestination dest = info.getDestination(); 419 if (dest.isQueue()) { 420 if (dest.isTemporary()) { 421 temporaryQueueSubscribers.put(key, view); 422 } else { 423 queueSubscribers.put(key, view); 424 } 425 } else { 426 if (dest.isTemporary()) { 427 temporaryTopicSubscribers.put(key, view); 428 } else { 429 if (info.isDurable()) { 430 durableTopicSubscribers.put(key, view); 431 // unregister any inactive durable subs 432 try { 433 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 434 if (inactiveName != null) { 435 inactiveDurableTopicSubscribers.remove(inactiveName); 436 registeredMBeans.remove(inactiveName); 437 managementContext.unregisterMBean(inactiveName); 438 } 439 } catch (Throwable e) { 440 LOG.error("Unable to unregister inactive durable subscriber {}", subscriptionKey, e); 441 } 442 } else { 443 topicSubscribers.put(key, view); 444 } 445 } 446 } 447 448 try { 449 if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) { 450 registeredMBeans.add(key); 451 } 452 } catch (Throwable e) { 453 LOG.warn("Failed to register MBean {}", key); 454 LOG.debug("Failure reason: ", e); 455 } 456 } 457 458 protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception { 459 queueSubscribers.remove(key); 460 topicSubscribers.remove(key); 461 temporaryQueueSubscribers.remove(key); 462 temporaryTopicSubscribers.remove(key); 463 if (registeredMBeans.remove(key)) { 464 try { 465 managementContext.unregisterMBean(key); 466 } catch (Throwable e) { 467 LOG.warn("Failed to unregister MBean {}", key); 468 LOG.debug("Failure reason: ", e); 469 } 470 } 471 DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key); 472 if (view != null) { 473 // need to put this back in the inactive list 474 SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName()); 475 if (addToInactive) { 476 SubscriptionInfo info = new SubscriptionInfo(); 477 info.setClientId(subscriptionKey.getClientId()); 478 info.setSubscriptionName(subscriptionKey.getSubscriptionName()); 479 info.setDestination(new ActiveMQTopic(view.getDestinationName())); 480 info.setSelector(view.getSelector()); 481 addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null)); 482 } 483 } 484 } 485 486 protected void buildExistingSubscriptions() throws Exception { 487 Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>(); 488 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations(); 489 if (destinations != null) { 490 for (ActiveMQDestination dest : destinations) { 491 if (dest.isTopic()) { 492 SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest); 493 if (infos != null) { 494 for (int i = 0; i < infos.length; i++) { 495 SubscriptionInfo info = infos[i]; 496 SubscriptionKey key = new SubscriptionKey(info); 497 if (!alreadyKnown(key)) { 498 LOG.debug("Restoring durable subscription MBean {}", info); 499 subscriptions.put(key, info); 500 } 501 } 502 } 503 } 504 } 505 } 506 507 for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) { 508 addInactiveSubscription(entry.getKey(), entry.getValue(), null); 509 } 510 } 511 512 private boolean alreadyKnown(SubscriptionKey key) { 513 boolean known = false; 514 known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key); 515 LOG.trace("Sub with key: {}, {} already registered", key, (known ? "": "not")); 516 return known; 517 } 518 519 protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) { 520 try { 521 ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info); 522 ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo); 523 SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription); 524 525 try { 526 if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName) != null) { 527 registeredMBeans.add(objectName); 528 } 529 } catch (Throwable e) { 530 LOG.warn("Failed to register MBean {}", key); 531 LOG.debug("Failure reason: ", e); 532 } 533 534 inactiveDurableTopicSubscribers.put(objectName, view); 535 subscriptionKeys.put(key, objectName); 536 } catch (Exception e) { 537 LOG.error("Failed to register subscription {}", info, e); 538 } 539 } 540 541 public CompositeData[] browse(SubscriptionView view) throws OpenDataException { 542 Message[] messages = getSubscriberMessages(view); 543 CompositeData c[] = new CompositeData[messages.length]; 544 for (int i = 0; i < c.length; i++) { 545 try { 546 c[i] = OpenTypeSupport.convert(messages[i]); 547 } catch (Throwable e) { 548 LOG.error("Failed to browse: {}", view, e); 549 } 550 } 551 return c; 552 } 553 554 public TabularData browseAsTable(SubscriptionView view) throws OpenDataException { 555 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); 556 Message[] messages = getSubscriberMessages(view); 557 CompositeType ct = factory.getCompositeType(); 558 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"}); 559 TabularDataSupport rc = new TabularDataSupport(tt); 560 for (int i = 0; i < messages.length; i++) { 561 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); 562 } 563 return rc; 564 } 565 566 public void remove(SubscriptionView view, String messageId) throws Exception { 567 ActiveMQDestination destination = getTopicDestination(view); 568 if (destination != null) { 569 final Destination topic = getTopicRegion().getDestinationMap().get(destination); 570 final MessageAck messageAck = new MessageAck(); 571 messageAck.setMessageID(new MessageId(messageId)); 572 messageAck.setDestination(destination); 573 574 topic.getMessageStore().removeMessage(brokerService.getAdminConnectionContext(), messageAck); 575 576 // if sub is active, remove from cursor 577 if (view.subscription instanceof DurableTopicSubscription) { 578 final DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) view.subscription; 579 final MessageReference messageReference = new NullMessageReference(); 580 messageReference.getMessage().setMessageId(messageAck.getFirstMessageId()); 581 durableTopicSubscription.getPending().remove(messageReference); 582 } 583 584 } else { 585 throw new IllegalStateException("can't determine topic for sub:" + view); 586 } 587 } 588 589 protected Message[] getSubscriberMessages(SubscriptionView view) { 590 ActiveMQDestination destination = getTopicDestination(view); 591 if (destination != null) { 592 Destination topic = getTopicRegion().getDestinationMap().get(destination); 593 return topic.browse(); 594 595 } else { 596 LOG.warn("can't determine topic to browse for sub:" + view); 597 return new Message[]{}; 598 } 599 } 600 601 private ActiveMQDestination getTopicDestination(SubscriptionView view) { 602 ActiveMQDestination destination = null; 603 if (view.subscription instanceof DurableTopicSubscription) { 604 destination = new ActiveMQTopic(view.getDestinationName()); 605 } else if (view instanceof InactiveDurableSubscriptionView) { 606 destination = ((InactiveDurableSubscriptionView)view).subscriptionInfo.getDestination(); 607 } 608 return destination; 609 } 610 611 private ObjectName[] onlyNonSuppressed (Set<ObjectName> set){ 612 List<ObjectName> nonSuppressed = new ArrayList<ObjectName>(); 613 for(ObjectName key : set){ 614 if (managementContext.isAllowedToRegister(key)){ 615 nonSuppressed.add(key); 616 } 617 } 618 return nonSuppressed.toArray(new ObjectName[nonSuppressed.size()]); 619 } 620 621 protected ObjectName[] getTopics() { 622 Set<ObjectName> set = topics.keySet(); 623 return set.toArray(new ObjectName[set.size()]); 624 } 625 626 protected ObjectName[] getTopicsNonSuppressed() { 627 return onlyNonSuppressed(topics.keySet()); 628 } 629 630 protected ObjectName[] getQueues() { 631 Set<ObjectName> set = queues.keySet(); 632 return set.toArray(new ObjectName[set.size()]); 633 } 634 635 protected ObjectName[] getQueuesNonSuppressed() { 636 return onlyNonSuppressed(queues.keySet()); 637 } 638 639 protected ObjectName[] getTemporaryTopics() { 640 Set<ObjectName> set = temporaryTopics.keySet(); 641 return set.toArray(new ObjectName[set.size()]); 642 } 643 644 protected ObjectName[] getTemporaryTopicsNonSuppressed() { 645 return onlyNonSuppressed(temporaryTopics.keySet()); 646 } 647 648 protected ObjectName[] getTemporaryQueues() { 649 Set<ObjectName> set = temporaryQueues.keySet(); 650 return set.toArray(new ObjectName[set.size()]); 651 } 652 653 protected ObjectName[] getTemporaryQueuesNonSuppressed() { 654 return onlyNonSuppressed(temporaryQueues.keySet()); 655 } 656 657 protected ObjectName[] getTopicSubscribers() { 658 Set<ObjectName> set = topicSubscribers.keySet(); 659 return set.toArray(new ObjectName[set.size()]); 660 } 661 662 protected ObjectName[] getTopicSubscribersNonSuppressed() { 663 return onlyNonSuppressed(topicSubscribers.keySet()); 664 } 665 666 protected ObjectName[] getDurableTopicSubscribers() { 667 Set<ObjectName> set = durableTopicSubscribers.keySet(); 668 return set.toArray(new ObjectName[set.size()]); 669 } 670 671 protected ObjectName[] getDurableTopicSubscribersNonSuppressed() { 672 return onlyNonSuppressed(durableTopicSubscribers.keySet()); 673 } 674 675 protected ObjectName[] getQueueSubscribers() { 676 Set<ObjectName> set = queueSubscribers.keySet(); 677 return set.toArray(new ObjectName[set.size()]); 678 } 679 680 protected ObjectName[] getQueueSubscribersNonSuppressed() { 681 return onlyNonSuppressed(queueSubscribers.keySet()); 682 } 683 684 protected ObjectName[] getTemporaryTopicSubscribers() { 685 Set<ObjectName> set = temporaryTopicSubscribers.keySet(); 686 return set.toArray(new ObjectName[set.size()]); 687 } 688 689 protected ObjectName[] getTemporaryTopicSubscribersNonSuppressed() { 690 return onlyNonSuppressed(temporaryTopicSubscribers.keySet()); 691 } 692 693 protected ObjectName[] getTemporaryQueueSubscribers() { 694 Set<ObjectName> set = temporaryQueueSubscribers.keySet(); 695 return set.toArray(new ObjectName[set.size()]); 696 } 697 698 protected ObjectName[] getTemporaryQueueSubscribersNonSuppressed() { 699 return onlyNonSuppressed(temporaryQueueSubscribers.keySet()); 700 } 701 702 protected ObjectName[] getInactiveDurableTopicSubscribers() { 703 Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet(); 704 return set.toArray(new ObjectName[set.size()]); 705 } 706 707 protected ObjectName[] getInactiveDurableTopicSubscribersNonSuppressed() { 708 return onlyNonSuppressed(inactiveDurableTopicSubscribers.keySet()); 709 } 710 711 protected ObjectName[] getTopicProducers() { 712 Set<ObjectName> set = topicProducers.keySet(); 713 return set.toArray(new ObjectName[set.size()]); 714 } 715 716 protected ObjectName[] getTopicProducersNonSuppressed() { 717 return onlyNonSuppressed(topicProducers.keySet()); 718 } 719 720 protected ObjectName[] getQueueProducers() { 721 Set<ObjectName> set = queueProducers.keySet(); 722 return set.toArray(new ObjectName[set.size()]); 723 } 724 725 protected ObjectName[] getQueueProducersNonSuppressed() { 726 return onlyNonSuppressed(queueProducers.keySet()); 727 } 728 729 protected ObjectName[] getTemporaryTopicProducers() { 730 Set<ObjectName> set = temporaryTopicProducers.keySet(); 731 return set.toArray(new ObjectName[set.size()]); 732 } 733 734 protected ObjectName[] getTemporaryTopicProducersNonSuppressed() { 735 return onlyNonSuppressed(temporaryTopicProducers.keySet()); 736 } 737 738 protected ObjectName[] getTemporaryQueueProducers() { 739 Set<ObjectName> set = temporaryQueueProducers.keySet(); 740 return set.toArray(new ObjectName[set.size()]); 741 } 742 743 protected ObjectName[] getTemporaryQueueProducersNonSuppressed() { 744 return onlyNonSuppressed(temporaryQueueProducers.keySet()); 745 } 746 747 protected ObjectName[] getDynamicDestinationProducers() { 748 Set<ObjectName> set = dynamicDestinationProducers.keySet(); 749 return set.toArray(new ObjectName[set.size()]); 750 } 751 752 protected ObjectName[] getDynamicDestinationProducersNonSuppressed() { 753 return onlyNonSuppressed(dynamicDestinationProducers.keySet()); 754 } 755 756 public Broker getContextBroker() { 757 return contextBroker; 758 } 759 760 public void setContextBroker(Broker contextBroker) { 761 this.contextBroker = contextBroker; 762 } 763 764 public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException { 765 ObjectName objectName = null; 766 try { 767 objectName = BrokerMBeanSupport.createAbortSlowConsumerStrategyName(brokerObjectName, strategy); 768 if (!registeredMBeans.contains(objectName)) { 769 770 AbortSlowConsumerStrategyView view = null; 771 if (strategy instanceof AbortSlowAckConsumerStrategy) { 772 view = new AbortSlowAckConsumerStrategyView(this, (AbortSlowAckConsumerStrategy) strategy); 773 } else { 774 view = new AbortSlowConsumerStrategyView(this, strategy); 775 } 776 777 if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName) != null) { 778 registeredMBeans.add(objectName); 779 } 780 } 781 } catch (Exception e) { 782 LOG.warn("Failed to register MBean {}", strategy); 783 LOG.debug("Failure reason: ", e); 784 } 785 return objectName; 786 } 787 788 public void registerRecoveredTransactionMBean(XATransaction transaction) { 789 try { 790 ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction); 791 if (!registeredMBeans.contains(objectName)) { 792 RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction); 793 if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName) != null) { 794 registeredMBeans.add(objectName); 795 } 796 } 797 } catch (Exception e) { 798 LOG.warn("Failed to register prepared transaction MBean {}", transaction); 799 LOG.debug("Failure reason: ", e); 800 } 801 } 802 803 public void unregister(XATransaction transaction) { 804 try { 805 ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction); 806 if (registeredMBeans.remove(objectName)) { 807 try { 808 managementContext.unregisterMBean(objectName); 809 } catch (Throwable e) { 810 LOG.warn("Failed to unregister MBean {}", objectName); 811 LOG.debug("Failure reason: ", e); 812 } 813 } 814 } catch (Exception e) { 815 LOG.warn("Failed to create object name to unregister {}", transaction, e); 816 } 817 } 818 819 public ObjectName getSubscriberObjectName(Subscription key) { 820 return subscriptionMap.get(key); 821 } 822 823 public Subscription getSubscriber(ObjectName key) { 824 Subscription sub = null; 825 for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) { 826 if (entry.getValue().equals(key)) { 827 sub = entry.getKey(); 828 break; 829 } 830 } 831 return sub; 832 } 833 834 public Map<ObjectName, DestinationView> getQueueViews() { 835 return queues; 836 } 837 838 public Map<ObjectName, DestinationView> getTopicViews() { 839 return topics; 840 } 841 842 public DestinationView getQueueView(String queueName) throws MalformedObjectNameException { 843 ObjectName objName = BrokerMBeanSupport.createDestinationName(brokerObjectName.toString(), "Queue", queueName); 844 return queues.get(objName); 845 } 846 847 public Set<ObjectName> getRegisteredMbeans() { 848 return registeredMBeans; 849 } 850}