001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.IOException; 020import java.util.HashMap; 021import java.util.Iterator; 022import java.util.Map; 023import java.util.Map.Entry; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.CopyOnWriteArraySet; 027import java.util.concurrent.ExecutorService; 028import java.util.concurrent.ThreadPoolExecutor; 029 030import javax.jms.IllegalStateException; 031import javax.management.InstanceNotFoundException; 032import javax.management.MalformedObjectNameException; 033import javax.management.ObjectName; 034import javax.management.openmbean.CompositeData; 035import javax.management.openmbean.CompositeDataSupport; 036import javax.management.openmbean.CompositeType; 037import javax.management.openmbean.OpenDataException; 038import javax.management.openmbean.TabularData; 039import javax.management.openmbean.TabularDataSupport; 040import javax.management.openmbean.TabularType; 041 042import org.apache.activemq.broker.Broker; 043import org.apache.activemq.broker.BrokerService; 044import org.apache.activemq.broker.ConnectionContext; 045import org.apache.activemq.broker.ProducerBrokerExchange; 046import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 047import org.apache.activemq.broker.region.Destination; 048import org.apache.activemq.broker.region.DestinationFactory; 049import org.apache.activemq.broker.region.DestinationInterceptor; 050import org.apache.activemq.broker.region.DurableTopicSubscription; 051import org.apache.activemq.broker.region.MessageReference; 052import org.apache.activemq.broker.region.NullMessageReference; 053import org.apache.activemq.broker.region.Queue; 054import org.apache.activemq.broker.region.Region; 055import org.apache.activemq.broker.region.RegionBroker; 056import org.apache.activemq.broker.region.Subscription; 057import org.apache.activemq.broker.region.Topic; 058import org.apache.activemq.broker.region.TopicRegion; 059import org.apache.activemq.broker.region.TopicSubscription; 060import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy; 061import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; 062import org.apache.activemq.command.ActiveMQDestination; 063import org.apache.activemq.command.ActiveMQMessage; 064import org.apache.activemq.command.ActiveMQTopic; 065import org.apache.activemq.command.ConnectionInfo; 066import org.apache.activemq.command.ConsumerInfo; 067import org.apache.activemq.command.Message; 068import org.apache.activemq.command.MessageAck; 069import org.apache.activemq.command.MessageId; 070import org.apache.activemq.command.ProducerInfo; 071import org.apache.activemq.command.SubscriptionInfo; 072import org.apache.activemq.thread.Scheduler; 073import org.apache.activemq.thread.TaskRunnerFactory; 074import org.apache.activemq.transaction.XATransaction; 075import org.apache.activemq.usage.SystemUsage; 076import org.apache.activemq.util.ServiceStopper; 077import org.apache.activemq.util.SubscriptionKey; 078import org.slf4j.Logger; 079import org.slf4j.LoggerFactory; 080 081public class ManagedRegionBroker extends RegionBroker { 082 private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class); 083 private final ManagementContext managementContext; 084 private final ObjectName brokerObjectName; 085 private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>(); 086 private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>(); 087 private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>(); 088 private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>(); 089 private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 090 private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 091 private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 092 private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 093 private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 094 private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); 095 private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 096 private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 097 private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 098 private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 099 private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); 100 private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>(); 101 private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>(); 102 private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>(); 103 /* This is the first broker in the broker interceptor chain. */ 104 private Broker contextBroker; 105 106 private final ExecutorService asyncInvokeService; 107 private final long mbeanTimeout; 108 109 public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, 110 DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException { 111 super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor); 112 this.managementContext = context; 113 this.brokerObjectName = brokerObjectName; 114 this.mbeanTimeout = brokerService.getMbeanInvocationTimeout(); 115 this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;; 116 } 117 118 @Override 119 public void start() throws Exception { 120 super.start(); 121 // build all existing durable subscriptions 122 buildExistingSubscriptions(); 123 } 124 125 @Override 126 protected void doStop(ServiceStopper stopper) { 127 super.doStop(stopper); 128 // lets remove any mbeans not yet removed 129 for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) { 130 ObjectName name = iter.next(); 131 try { 132 managementContext.unregisterMBean(name); 133 } catch (InstanceNotFoundException e) { 134 LOG.warn("The MBean {} is no longer registered with JMX", name); 135 } catch (Exception e) { 136 stopper.onException(this, e); 137 } 138 } 139 registeredMBeans.clear(); 140 } 141 142 @Override 143 protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 144 return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 145 } 146 147 @Override 148 protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 149 return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 150 } 151 152 @Override 153 protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 154 return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 155 } 156 157 @Override 158 protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { 159 return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); 160 } 161 162 public void register(ActiveMQDestination destName, Destination destination) { 163 // TODO refactor to allow views for custom destinations 164 try { 165 ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName); 166 DestinationView view; 167 if (destination instanceof Queue) { 168 view = new QueueView(this, (Queue)destination); 169 } else if (destination instanceof Topic) { 170 view = new TopicView(this, (Topic)destination); 171 } else { 172 view = null; 173 LOG.warn("JMX View is not supported for custom destination {}", destination); 174 } 175 if (view != null) { 176 registerDestination(objectName, destName, view); 177 } 178 } catch (Exception e) { 179 LOG.error("Failed to register destination {}", destName, e); 180 } 181 } 182 183 public void unregister(ActiveMQDestination destName) { 184 try { 185 ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName); 186 unregisterDestination(objectName); 187 } catch (Exception e) { 188 LOG.error("Failed to unregister {}", destName, e); 189 } 190 } 191 192 public ObjectName registerSubscription(ConnectionContext context, Subscription sub) { 193 String connectionClientId = context.getClientId(); 194 195 SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName()); 196 try { 197 ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, sub.getConsumerInfo()); 198 SubscriptionView view; 199 if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) { 200 // add offline subscribers to inactive list 201 SubscriptionInfo info = new SubscriptionInfo(); 202 info.setClientId(context.getClientId()); 203 info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName()); 204 info.setDestination(sub.getConsumerInfo().getDestination()); 205 info.setSelector(sub.getSelector()); 206 addInactiveSubscription(key, info, sub); 207 } else { 208 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null; 209 if (sub.getConsumerInfo().isDurable()) { 210 view = new DurableSubscriptionView(this, brokerService, context.getClientId(), userName, sub); 211 } else { 212 if (sub instanceof TopicSubscription) { 213 view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub); 214 } else { 215 view = new SubscriptionView(context.getClientId(), userName, sub); 216 } 217 } 218 registerSubscription(objectName, sub.getConsumerInfo(), key, view); 219 } 220 subscriptionMap.put(sub, objectName); 221 return objectName; 222 } catch (Exception e) { 223 LOG.error("Failed to register subscription {}", sub, e); 224 return null; 225 } 226 } 227 228 @Override 229 public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { 230 super.addConnection(context, info); 231 this.contextBroker.getBrokerService().incrementCurrentConnections(); 232 this.contextBroker.getBrokerService().incrementTotalConnections(); 233 } 234 235 @Override 236 public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception { 237 super.removeConnection(context, info, error); 238 this.contextBroker.getBrokerService().decrementCurrentConnections(); 239 } 240 241 @Override 242 public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 243 Subscription sub = super.addConsumer(context, info); 244 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName()); 245 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 246 if (inactiveName != null) { 247 // if it was inactive, register it 248 registerSubscription(context, sub); 249 } 250 return sub; 251 } 252 253 @Override 254 public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { 255 for (Subscription sub : subscriptionMap.keySet()) { 256 if (sub.getConsumerInfo().equals(info)) { 257 // unregister all consumer subs 258 unregisterSubscription(subscriptionMap.get(sub), true); 259 } 260 } 261 super.removeConsumer(context, info); 262 } 263 264 @Override 265 public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { 266 super.addProducer(context, info); 267 String connectionClientId = context.getClientId(); 268 ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info); 269 String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null; 270 ProducerView view = new ProducerView(info, connectionClientId, userName, this); 271 registerProducer(objectName, info.getDestination(), view); 272 } 273 274 @Override 275 public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception { 276 ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info); 277 unregisterProducer(objectName); 278 super.removeProducer(context, info); 279 } 280 281 @Override 282 public void send(ProducerBrokerExchange exchange, Message message) throws Exception { 283 if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) { 284 ProducerInfo info = exchange.getProducerState().getInfo(); 285 if (info.getDestination() == null && info.getProducerId() != null) { 286 ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, exchange.getConnectionContext().getClientId(), info); 287 ProducerView view = this.dynamicDestinationProducers.get(objectName); 288 if (view != null) { 289 ActiveMQDestination dest = message.getDestination(); 290 if (dest != null) { 291 view.setLastUsedDestinationName(dest); 292 } 293 } 294 } 295 } 296 super.send(exchange, message); 297 } 298 299 public void unregisterSubscription(Subscription sub) { 300 ObjectName name = subscriptionMap.remove(sub); 301 if (name != null) { 302 try { 303 SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName()); 304 ObjectName inactiveName = subscriptionKeys.remove(subscriptionKey); 305 if (inactiveName != null) { 306 inactiveDurableTopicSubscribers.remove(inactiveName); 307 managementContext.unregisterMBean(inactiveName); 308 } 309 } catch (Exception e) { 310 LOG.error("Failed to unregister subscription {}", sub, e); 311 } 312 } 313 } 314 315 protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception { 316 if (dest.isQueue()) { 317 if (dest.isTemporary()) { 318 temporaryQueues.put(key, view); 319 } else { 320 queues.put(key, view); 321 } 322 } else { 323 if (dest.isTemporary()) { 324 temporaryTopics.put(key, view); 325 } else { 326 topics.put(key, view); 327 } 328 } 329 try { 330 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key); 331 registeredMBeans.add(key); 332 } catch (Throwable e) { 333 LOG.warn("Failed to register MBean {}", key); 334 LOG.debug("Failure reason: ", e); 335 } 336 } 337 338 protected void unregisterDestination(ObjectName key) throws Exception { 339 340 DestinationView view = removeAndRemember(topics, key, null); 341 view = removeAndRemember(queues, key, view); 342 view = removeAndRemember(temporaryQueues, key, view); 343 view = removeAndRemember(temporaryTopics, key, view); 344 if (registeredMBeans.remove(key)) { 345 try { 346 managementContext.unregisterMBean(key); 347 } catch (Throwable e) { 348 LOG.warn("Failed to unregister MBean {}", key); 349 LOG.debug("Failure reason: ", e); 350 } 351 } 352 if (view != null) { 353 key = view.getSlowConsumerStrategy(); 354 if (key!= null && registeredMBeans.remove(key)) { 355 try { 356 managementContext.unregisterMBean(key); 357 } catch (Throwable e) { 358 LOG.warn("Failed to unregister slow consumer strategy MBean {}", key); 359 LOG.debug("Failure reason: ", e); 360 } 361 } 362 } 363 } 364 365 protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception { 366 367 if (dest != null) { 368 if (dest.isQueue()) { 369 if (dest.isTemporary()) { 370 temporaryQueueProducers.put(key, view); 371 } else { 372 queueProducers.put(key, view); 373 } 374 } else { 375 if (dest.isTemporary()) { 376 temporaryTopicProducers.put(key, view); 377 } else { 378 topicProducers.put(key, view); 379 } 380 } 381 } else { 382 dynamicDestinationProducers.put(key, view); 383 } 384 385 try { 386 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key); 387 registeredMBeans.add(key); 388 } catch (Throwable e) { 389 LOG.warn("Failed to register MBean {}", key); 390 LOG.debug("Failure reason: ", e); 391 } 392 } 393 394 protected void unregisterProducer(ObjectName key) throws Exception { 395 queueProducers.remove(key); 396 topicProducers.remove(key); 397 temporaryQueueProducers.remove(key); 398 temporaryTopicProducers.remove(key); 399 dynamicDestinationProducers.remove(key); 400 if (registeredMBeans.remove(key)) { 401 try { 402 managementContext.unregisterMBean(key); 403 } catch (Throwable e) { 404 LOG.warn("Failed to unregister MBean {}", key); 405 LOG.debug("Failure reason: ", e); 406 } 407 } 408 } 409 410 private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) { 411 DestinationView candidate = map.remove(key); 412 if (candidate != null && view == null) { 413 view = candidate; 414 } 415 return candidate != null ? candidate : view; 416 } 417 418 protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception { 419 ActiveMQDestination dest = info.getDestination(); 420 if (dest.isQueue()) { 421 if (dest.isTemporary()) { 422 temporaryQueueSubscribers.put(key, view); 423 } else { 424 queueSubscribers.put(key, view); 425 } 426 } else { 427 if (dest.isTemporary()) { 428 temporaryTopicSubscribers.put(key, view); 429 } else { 430 if (info.isDurable()) { 431 durableTopicSubscribers.put(key, view); 432 // unregister any inactive durable subs 433 try { 434 ObjectName inactiveName = subscriptionKeys.get(subscriptionKey); 435 if (inactiveName != null) { 436 inactiveDurableTopicSubscribers.remove(inactiveName); 437 registeredMBeans.remove(inactiveName); 438 managementContext.unregisterMBean(inactiveName); 439 } 440 } catch (Throwable e) { 441 LOG.error("Unable to unregister inactive durable subscriber {}", subscriptionKey, e); 442 } 443 } else { 444 topicSubscribers.put(key, view); 445 } 446 } 447 } 448 449 try { 450 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key); 451 registeredMBeans.add(key); 452 } catch (Throwable e) { 453 LOG.warn("Failed to register MBean {}", key); 454 LOG.debug("Failure reason: ", e); 455 } 456 } 457 458 protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception { 459 queueSubscribers.remove(key); 460 topicSubscribers.remove(key); 461 temporaryQueueSubscribers.remove(key); 462 temporaryTopicSubscribers.remove(key); 463 if (registeredMBeans.remove(key)) { 464 try { 465 managementContext.unregisterMBean(key); 466 } catch (Throwable e) { 467 LOG.warn("Failed to unregister MBean {}", key); 468 LOG.debug("Failure reason: ", e); 469 } 470 } 471 DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key); 472 if (view != null) { 473 // need to put this back in the inactive list 474 SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName()); 475 if (addToInactive) { 476 SubscriptionInfo info = new SubscriptionInfo(); 477 info.setClientId(subscriptionKey.getClientId()); 478 info.setSubscriptionName(subscriptionKey.getSubscriptionName()); 479 info.setDestination(new ActiveMQTopic(view.getDestinationName())); 480 info.setSelector(view.getSelector()); 481 addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null)); 482 } 483 } 484 } 485 486 protected void buildExistingSubscriptions() throws Exception { 487 Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>(); 488 Set<ActiveMQDestination> destinations = destinationFactory.getDestinations(); 489 if (destinations != null) { 490 for (ActiveMQDestination dest : destinations) { 491 if (dest.isTopic()) { 492 SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest); 493 if (infos != null) { 494 for (int i = 0; i < infos.length; i++) { 495 SubscriptionInfo info = infos[i]; 496 SubscriptionKey key = new SubscriptionKey(info); 497 if (!alreadyKnown(key)) { 498 LOG.debug("Restoring durable subscription MBean {}", info); 499 subscriptions.put(key, info); 500 } 501 } 502 } 503 } 504 } 505 } 506 507 for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) { 508 addInactiveSubscription(entry.getKey(), entry.getValue(), null); 509 } 510 } 511 512 private boolean alreadyKnown(SubscriptionKey key) { 513 boolean known = false; 514 known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key); 515 LOG.trace("Sub with key: {}, {} already registered", key, (known ? "": "not")); 516 return known; 517 } 518 519 protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) { 520 try { 521 ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info); 522 ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo); 523 SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription); 524 525 try { 526 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName); 527 registeredMBeans.add(objectName); 528 } catch (Throwable e) { 529 LOG.warn("Failed to register MBean {}", key); 530 LOG.debug("Failure reason: ", e); 531 } 532 533 inactiveDurableTopicSubscribers.put(objectName, view); 534 subscriptionKeys.put(key, objectName); 535 } catch (Exception e) { 536 LOG.error("Failed to register subscription {}", info, e); 537 } 538 } 539 540 public CompositeData[] browse(SubscriptionView view) throws OpenDataException { 541 Message[] messages = getSubscriberMessages(view); 542 CompositeData c[] = new CompositeData[messages.length]; 543 for (int i = 0; i < c.length; i++) { 544 try { 545 c[i] = OpenTypeSupport.convert(messages[i]); 546 } catch (Throwable e) { 547 LOG.error("Failed to browse: {}", view, e); 548 } 549 } 550 return c; 551 } 552 553 public TabularData browseAsTable(SubscriptionView view) throws OpenDataException { 554 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); 555 Message[] messages = getSubscriberMessages(view); 556 CompositeType ct = factory.getCompositeType(); 557 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"}); 558 TabularDataSupport rc = new TabularDataSupport(tt); 559 for (int i = 0; i < messages.length; i++) { 560 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); 561 } 562 return rc; 563 } 564 565 public void remove(SubscriptionView view, String messageId) throws Exception { 566 ActiveMQDestination destination = getTopicDestination(view); 567 if (destination != null) { 568 final Destination topic = getTopicRegion().getDestinationMap().get(destination); 569 final MessageAck messageAck = new MessageAck(); 570 messageAck.setMessageID(new MessageId(messageId)); 571 messageAck.setDestination(destination); 572 573 topic.getMessageStore().removeMessage(brokerService.getAdminConnectionContext(), messageAck); 574 575 // if sub is active, remove from cursor 576 if (view.subscription instanceof DurableTopicSubscription) { 577 final DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) view.subscription; 578 final MessageReference messageReference = new NullMessageReference(); 579 messageReference.getMessage().setMessageId(messageAck.getFirstMessageId()); 580 durableTopicSubscription.getPending().remove(messageReference); 581 } 582 583 } else { 584 throw new IllegalStateException("can't determine topic for sub:" + view); 585 } 586 } 587 588 protected Message[] getSubscriberMessages(SubscriptionView view) { 589 ActiveMQDestination destination = getTopicDestination(view); 590 if (destination != null) { 591 Destination topic = getTopicRegion().getDestinationMap().get(destination); 592 return topic.browse(); 593 594 } else { 595 LOG.warn("can't determine topic to browse for sub:" + view); 596 return new Message[]{}; 597 } 598 } 599 600 private ActiveMQDestination getTopicDestination(SubscriptionView view) { 601 ActiveMQDestination destination = null; 602 if (view.subscription instanceof DurableTopicSubscription) { 603 destination = new ActiveMQTopic(view.getDestinationName()); 604 } else if (view instanceof InactiveDurableSubscriptionView) { 605 destination = ((InactiveDurableSubscriptionView)view).subscriptionInfo.getDestination(); 606 } 607 return destination; 608 } 609 610 protected ObjectName[] getTopics() { 611 Set<ObjectName> set = topics.keySet(); 612 return set.toArray(new ObjectName[set.size()]); 613 } 614 615 protected ObjectName[] getQueues() { 616 Set<ObjectName> set = queues.keySet(); 617 return set.toArray(new ObjectName[set.size()]); 618 } 619 620 protected ObjectName[] getTemporaryTopics() { 621 Set<ObjectName> set = temporaryTopics.keySet(); 622 return set.toArray(new ObjectName[set.size()]); 623 } 624 625 protected ObjectName[] getTemporaryQueues() { 626 Set<ObjectName> set = temporaryQueues.keySet(); 627 return set.toArray(new ObjectName[set.size()]); 628 } 629 630 protected ObjectName[] getTopicSubscribers() { 631 Set<ObjectName> set = topicSubscribers.keySet(); 632 return set.toArray(new ObjectName[set.size()]); 633 } 634 635 protected ObjectName[] getDurableTopicSubscribers() { 636 Set<ObjectName> set = durableTopicSubscribers.keySet(); 637 return set.toArray(new ObjectName[set.size()]); 638 } 639 640 protected ObjectName[] getQueueSubscribers() { 641 Set<ObjectName> set = queueSubscribers.keySet(); 642 return set.toArray(new ObjectName[set.size()]); 643 } 644 645 protected ObjectName[] getTemporaryTopicSubscribers() { 646 Set<ObjectName> set = temporaryTopicSubscribers.keySet(); 647 return set.toArray(new ObjectName[set.size()]); 648 } 649 650 protected ObjectName[] getTemporaryQueueSubscribers() { 651 Set<ObjectName> set = temporaryQueueSubscribers.keySet(); 652 return set.toArray(new ObjectName[set.size()]); 653 } 654 655 protected ObjectName[] getInactiveDurableTopicSubscribers() { 656 Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet(); 657 return set.toArray(new ObjectName[set.size()]); 658 } 659 660 protected ObjectName[] getTopicProducers() { 661 Set<ObjectName> set = topicProducers.keySet(); 662 return set.toArray(new ObjectName[set.size()]); 663 } 664 665 protected ObjectName[] getQueueProducers() { 666 Set<ObjectName> set = queueProducers.keySet(); 667 return set.toArray(new ObjectName[set.size()]); 668 } 669 670 protected ObjectName[] getTemporaryTopicProducers() { 671 Set<ObjectName> set = temporaryTopicProducers.keySet(); 672 return set.toArray(new ObjectName[set.size()]); 673 } 674 675 protected ObjectName[] getTemporaryQueueProducers() { 676 Set<ObjectName> set = temporaryQueueProducers.keySet(); 677 return set.toArray(new ObjectName[set.size()]); 678 } 679 680 protected ObjectName[] getDynamicDestinationProducers() { 681 Set<ObjectName> set = dynamicDestinationProducers.keySet(); 682 return set.toArray(new ObjectName[set.size()]); 683 } 684 685 public Broker getContextBroker() { 686 return contextBroker; 687 } 688 689 public void setContextBroker(Broker contextBroker) { 690 this.contextBroker = contextBroker; 691 } 692 693 public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException { 694 ObjectName objectName = null; 695 try { 696 objectName = BrokerMBeanSupport.createAbortSlowConsumerStrategyName(brokerObjectName, strategy); 697 if (!registeredMBeans.contains(objectName)) { 698 699 AbortSlowConsumerStrategyView view = null; 700 if (strategy instanceof AbortSlowAckConsumerStrategy) { 701 view = new AbortSlowAckConsumerStrategyView(this, (AbortSlowAckConsumerStrategy) strategy); 702 } else { 703 view = new AbortSlowConsumerStrategyView(this, strategy); 704 } 705 706 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName); 707 registeredMBeans.add(objectName); 708 } 709 } catch (Exception e) { 710 LOG.warn("Failed to register MBean {}", strategy); 711 LOG.debug("Failure reason: ", e); 712 } 713 return objectName; 714 } 715 716 public void registerRecoveredTransactionMBean(XATransaction transaction) { 717 try { 718 ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction); 719 if (!registeredMBeans.contains(objectName)) { 720 RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction); 721 AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName); 722 registeredMBeans.add(objectName); 723 } 724 } catch (Exception e) { 725 LOG.warn("Failed to register prepared transaction MBean {}", transaction); 726 LOG.debug("Failure reason: ", e); 727 } 728 } 729 730 public void unregister(XATransaction transaction) { 731 try { 732 ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction); 733 if (registeredMBeans.remove(objectName)) { 734 try { 735 managementContext.unregisterMBean(objectName); 736 } catch (Throwable e) { 737 LOG.warn("Failed to unregister MBean {}", objectName); 738 LOG.debug("Failure reason: ", e); 739 } 740 } 741 } catch (Exception e) { 742 LOG.warn("Failed to create object name to unregister {}", transaction, e); 743 } 744 } 745 746 public ObjectName getSubscriberObjectName(Subscription key) { 747 return subscriptionMap.get(key); 748 } 749 750 public Subscription getSubscriber(ObjectName key) { 751 Subscription sub = null; 752 for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) { 753 if (entry.getValue().equals(key)) { 754 sub = entry.getKey(); 755 break; 756 } 757 } 758 return sub; 759 } 760 761 public Map<ObjectName, DestinationView> getQueueViews() { 762 return queues; 763 } 764 765 public Map<ObjectName, DestinationView> getTopicViews() { 766 return topics; 767 } 768 769 public DestinationView getQueueView(String queueName) throws MalformedObjectNameException { 770 ObjectName objName = BrokerMBeanSupport.createDestinationName(brokerObjectName.toString(), "Queue", queueName); 771 return queues.get(objName); 772 } 773}