001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.util.*;
021import java.util.Map.Entry;
022import java.util.concurrent.ConcurrentHashMap;
023import java.util.concurrent.CopyOnWriteArraySet;
024import java.util.concurrent.ExecutorService;
025import java.util.concurrent.ThreadPoolExecutor;
026
027import javax.jms.IllegalStateException;
028import javax.management.InstanceNotFoundException;
029import javax.management.MalformedObjectNameException;
030import javax.management.ObjectName;
031import javax.management.openmbean.CompositeData;
032import javax.management.openmbean.CompositeDataSupport;
033import javax.management.openmbean.CompositeType;
034import javax.management.openmbean.OpenDataException;
035import javax.management.openmbean.TabularData;
036import javax.management.openmbean.TabularDataSupport;
037import javax.management.openmbean.TabularType;
038
039import org.apache.activemq.broker.Broker;
040import org.apache.activemq.broker.BrokerService;
041import org.apache.activemq.broker.ConnectionContext;
042import org.apache.activemq.broker.ProducerBrokerExchange;
043import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
044import org.apache.activemq.broker.region.Destination;
045import org.apache.activemq.broker.region.DestinationFactory;
046import org.apache.activemq.broker.region.DestinationInterceptor;
047import org.apache.activemq.broker.region.DurableTopicSubscription;
048import org.apache.activemq.broker.region.MessageReference;
049import org.apache.activemq.broker.region.NullMessageReference;
050import org.apache.activemq.broker.region.Queue;
051import org.apache.activemq.broker.region.Region;
052import org.apache.activemq.broker.region.RegionBroker;
053import org.apache.activemq.broker.region.Subscription;
054import org.apache.activemq.broker.region.Topic;
055import org.apache.activemq.broker.region.TopicRegion;
056import org.apache.activemq.broker.region.TopicSubscription;
057import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
058import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
059import org.apache.activemq.command.ActiveMQDestination;
060import org.apache.activemq.command.ActiveMQMessage;
061import org.apache.activemq.command.ActiveMQTopic;
062import org.apache.activemq.command.ConnectionInfo;
063import org.apache.activemq.command.ConsumerInfo;
064import org.apache.activemq.command.Message;
065import org.apache.activemq.command.MessageAck;
066import org.apache.activemq.command.MessageId;
067import org.apache.activemq.command.ProducerInfo;
068import org.apache.activemq.command.SubscriptionInfo;
069import org.apache.activemq.thread.Scheduler;
070import org.apache.activemq.thread.TaskRunnerFactory;
071import org.apache.activemq.transaction.XATransaction;
072import org.apache.activemq.usage.SystemUsage;
073import org.apache.activemq.util.ServiceStopper;
074import org.apache.activemq.util.SubscriptionKey;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078public class ManagedRegionBroker extends RegionBroker {
079    private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
080    private final ManagementContext managementContext;
081    private final ObjectName brokerObjectName;
082    private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
083    private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
084    private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
085    private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
086    private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
087    private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
088    private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
089    private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
090    private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
091    private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
092    private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
093    private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
094    private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
095    private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
096    private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
097    private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
098    private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
099    private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
100    /* This is the first broker in the broker interceptor chain. */
101    private Broker contextBroker;
102
103    private final ExecutorService asyncInvokeService;
104    private final long mbeanTimeout;
105
106    public ManagedRegionBroker(BrokerService brokerService, ManagementContext context, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
107                               DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
108        super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor,scheduler,executor);
109        this.managementContext = context;
110        this.brokerObjectName = brokerObjectName;
111        this.mbeanTimeout = brokerService.getMbeanInvocationTimeout();
112        this.asyncInvokeService = mbeanTimeout > 0 ? executor : null;;
113    }
114
115    @Override
116    public void start() throws Exception {
117        super.start();
118        // build all existing durable subscriptions
119        buildExistingSubscriptions();
120    }
121
122    @Override
123    protected void doStop(ServiceStopper stopper) {
124        super.doStop(stopper);
125        // lets remove any mbeans not yet removed
126        for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
127            ObjectName name = iter.next();
128            try {
129                managementContext.unregisterMBean(name);
130            } catch (InstanceNotFoundException e) {
131                LOG.warn("The MBean {} is no longer registered with JMX", name);
132            } catch (Exception e) {
133                stopper.onException(this, e);
134            }
135        }
136        registeredMBeans.clear();
137    }
138
139    @Override
140    protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
141        return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
142    }
143
144    @Override
145    protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
146        return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
147    }
148
149    @Override
150    protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
151        return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
152    }
153
154    @Override
155    protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
156        return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
157    }
158
159    public void register(ActiveMQDestination destName, Destination destination) {
160        // TODO refactor to allow views for custom destinations
161        try {
162            ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
163            DestinationView view;
164            if (destination instanceof Queue) {
165                view = new QueueView(this, (Queue)destination);
166            } else if (destination instanceof Topic) {
167                view = new TopicView(this, (Topic)destination);
168            } else {
169                view = null;
170                LOG.warn("JMX View is not supported for custom destination {}", destination);
171            }
172            if (view != null) {
173                registerDestination(objectName, destName, view);
174            }
175        } catch (Exception e) {
176            LOG.error("Failed to register destination {}", destName, e);
177        }
178    }
179
180    public void unregister(ActiveMQDestination destName) {
181        try {
182            ObjectName objectName = BrokerMBeanSupport.createDestinationName(brokerObjectName, destName);
183            unregisterDestination(objectName);
184        } catch (Exception e) {
185            LOG.error("Failed to unregister {}", destName, e);
186        }
187    }
188
189    public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
190        String connectionClientId = context.getClientId();
191
192        SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
193        try {
194            ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, sub.getConsumerInfo());
195            SubscriptionView view;
196            if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
197                // add offline subscribers to inactive list
198                SubscriptionInfo info = new SubscriptionInfo();
199                info.setClientId(context.getClientId());
200                info.setSubscriptionName(sub.getConsumerInfo().getSubscriptionName());
201                info.setDestination(sub.getConsumerInfo().getDestination());
202                info.setSelector(sub.getSelector());
203                addInactiveSubscription(key, info, sub);
204            } else {
205                String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
206                if (sub.getConsumerInfo().isDurable()) {
207                    view = new DurableSubscriptionView(this, brokerService, context.getClientId(), userName, sub);
208                } else {
209                    if (sub instanceof TopicSubscription) {
210                        view = new TopicSubscriptionView(context.getClientId(), userName, (TopicSubscription) sub);
211                    } else {
212                        view = new SubscriptionView(context.getClientId(), userName, sub);
213                    }
214                }
215                registerSubscription(objectName, sub.getConsumerInfo(), key, view);
216            }
217            subscriptionMap.put(sub, objectName);
218            return objectName;
219        } catch (Exception e) {
220            LOG.error("Failed to register subscription {}", sub, e);
221            return null;
222        }
223    }
224
225    @Override
226    public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
227        super.addConnection(context, info);
228        this.contextBroker.getBrokerService().incrementCurrentConnections();
229        this.contextBroker.getBrokerService().incrementTotalConnections();
230    }
231
232    @Override
233    public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
234        super.removeConnection(context, info, error);
235        this.contextBroker.getBrokerService().decrementCurrentConnections();
236    }
237
238    @Override
239    public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
240        Subscription sub = super.addConsumer(context, info);
241        SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
242        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
243        if (inactiveName != null) {
244            // if it was inactive, register it
245            registerSubscription(context, sub);
246        }
247        return sub;
248    }
249
250    @Override
251    public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
252        for (Subscription sub : subscriptionMap.keySet()) {
253            if (sub.getConsumerInfo().equals(info)) {
254               // unregister all consumer subs
255               unregisterSubscription(subscriptionMap.get(sub), true);
256            }
257        }
258        super.removeConsumer(context, info);
259    }
260
261    @Override
262    public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
263        super.addProducer(context, info);
264        String connectionClientId = context.getClientId();
265        ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
266        String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
267        ProducerView view = new ProducerView(info, connectionClientId, userName, this);
268        registerProducer(objectName, info.getDestination(), view);
269    }
270
271    @Override
272    public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
273        ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, context.getClientId(), info);
274        unregisterProducer(objectName);
275        super.removeProducer(context, info);
276    }
277
278    @Override
279    public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
280        if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
281            ProducerInfo info = exchange.getProducerState().getInfo();
282            if (info.getDestination() == null && info.getProducerId() != null) {
283                ObjectName objectName = BrokerMBeanSupport.createProducerName(brokerObjectName, exchange.getConnectionContext().getClientId(), info);
284                ProducerView view = this.dynamicDestinationProducers.get(objectName);
285                if (view != null) {
286                    ActiveMQDestination dest = message.getDestination();
287                    if (dest != null) {
288                        view.setLastUsedDestinationName(dest);
289                    }
290                }
291            }
292         }
293        super.send(exchange, message);
294    }
295
296    public void unregisterSubscription(Subscription sub) {
297        ObjectName name = subscriptionMap.remove(sub);
298        if (name != null) {
299            try {
300                SubscriptionKey subscriptionKey = new SubscriptionKey(sub.getContext().getClientId(), sub.getConsumerInfo().getSubscriptionName());
301                ObjectName inactiveName = subscriptionKeys.remove(subscriptionKey);
302                if (inactiveName != null) {
303                    inactiveDurableTopicSubscribers.remove(inactiveName);
304                    managementContext.unregisterMBean(inactiveName);
305                }
306            } catch (Exception e) {
307                LOG.error("Failed to unregister subscription {}", sub, e);
308            }
309        }
310    }
311
312    protected void registerDestination(ObjectName key, ActiveMQDestination dest, DestinationView view) throws Exception {
313        if (dest.isQueue()) {
314            if (dest.isTemporary()) {
315                temporaryQueues.put(key, view);
316            } else {
317                queues.put(key, view);
318            }
319        } else {
320            if (dest.isTemporary()) {
321                temporaryTopics.put(key, view);
322            } else {
323                topics.put(key, view);
324            }
325        }
326        try {
327            if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) {
328                registeredMBeans.add(key);
329            }
330        } catch (Throwable e) {
331            LOG.warn("Failed to register MBean {}", key);
332            LOG.debug("Failure reason: ", e);
333        }
334    }
335
336    protected void unregisterDestination(ObjectName key) throws Exception {
337
338        DestinationView view = removeAndRemember(topics, key, null);
339        view = removeAndRemember(queues, key, view);
340        view = removeAndRemember(temporaryQueues, key, view);
341        view = removeAndRemember(temporaryTopics, key, view);
342        if (registeredMBeans.remove(key)) {
343            try {
344                managementContext.unregisterMBean(key);
345            } catch (Throwable e) {
346                LOG.warn("Failed to unregister MBean {}", key);
347                LOG.debug("Failure reason: ", e);
348            }
349        }
350        if (view != null) {
351            key = view.getSlowConsumerStrategy();
352            if (key!= null && registeredMBeans.remove(key)) {
353                try {
354                    managementContext.unregisterMBean(key);
355                } catch (Throwable e) {
356                    LOG.warn("Failed to unregister slow consumer strategy MBean {}", key);
357                    LOG.debug("Failure reason: ", e);
358                }
359            }
360        }
361    }
362
363    protected void registerProducer(ObjectName key, ActiveMQDestination dest, ProducerView view) throws Exception {
364
365        if (dest != null) {
366            if (dest.isQueue()) {
367                if (dest.isTemporary()) {
368                    temporaryQueueProducers.put(key, view);
369                } else {
370                    queueProducers.put(key, view);
371                }
372            } else {
373                if (dest.isTemporary()) {
374                    temporaryTopicProducers.put(key, view);
375                } else {
376                    topicProducers.put(key, view);
377                }
378            }
379        } else {
380            dynamicDestinationProducers.put(key, view);
381        }
382
383        try {
384            if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) {
385                registeredMBeans.add(key);
386            }
387        } catch (Throwable e) {
388            LOG.warn("Failed to register MBean {}", key);
389            LOG.debug("Failure reason: ", e);
390        }
391    }
392
393    protected void unregisterProducer(ObjectName key) throws Exception {
394        queueProducers.remove(key);
395        topicProducers.remove(key);
396        temporaryQueueProducers.remove(key);
397        temporaryTopicProducers.remove(key);
398        dynamicDestinationProducers.remove(key);
399        if (registeredMBeans.remove(key)) {
400            try {
401                managementContext.unregisterMBean(key);
402            } catch (Throwable e) {
403                LOG.warn("Failed to unregister MBean {}", key);
404                LOG.debug("Failure reason: ", e);
405            }
406        }
407    }
408
409    private DestinationView removeAndRemember(Map<ObjectName, DestinationView> map, ObjectName key, DestinationView view) {
410        DestinationView candidate = map.remove(key);
411        if (candidate != null && view == null) {
412            view = candidate;
413        }
414        return candidate != null ? candidate : view;
415    }
416
417    protected void registerSubscription(ObjectName key, ConsumerInfo info, SubscriptionKey subscriptionKey, SubscriptionView view) throws Exception {
418        ActiveMQDestination dest = info.getDestination();
419        if (dest.isQueue()) {
420            if (dest.isTemporary()) {
421                temporaryQueueSubscribers.put(key, view);
422            } else {
423                queueSubscribers.put(key, view);
424            }
425        } else {
426            if (dest.isTemporary()) {
427                temporaryTopicSubscribers.put(key, view);
428            } else {
429                if (info.isDurable()) {
430                    durableTopicSubscribers.put(key, view);
431                    // unregister any inactive durable subs
432                    try {
433                        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
434                        if (inactiveName != null) {
435                            inactiveDurableTopicSubscribers.remove(inactiveName);
436                            registeredMBeans.remove(inactiveName);
437                            managementContext.unregisterMBean(inactiveName);
438                        }
439                    } catch (Throwable e) {
440                        LOG.error("Unable to unregister inactive durable subscriber {}", subscriptionKey, e);
441                    }
442                } else {
443                    topicSubscribers.put(key, view);
444                }
445            }
446        }
447
448        try {
449            if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) {
450                registeredMBeans.add(key);
451            }
452        } catch (Throwable e) {
453            LOG.warn("Failed to register MBean {}", key);
454            LOG.debug("Failure reason: ", e);
455        }
456    }
457
458    protected void unregisterSubscription(ObjectName key, boolean addToInactive) throws Exception {
459        queueSubscribers.remove(key);
460        topicSubscribers.remove(key);
461        temporaryQueueSubscribers.remove(key);
462        temporaryTopicSubscribers.remove(key);
463        if (registeredMBeans.remove(key)) {
464            try {
465                managementContext.unregisterMBean(key);
466            } catch (Throwable e) {
467                LOG.warn("Failed to unregister MBean {}", key);
468                LOG.debug("Failure reason: ", e);
469            }
470        }
471        DurableSubscriptionView view = (DurableSubscriptionView)durableTopicSubscribers.remove(key);
472        if (view != null) {
473            // need to put this back in the inactive list
474            SubscriptionKey subscriptionKey = new SubscriptionKey(view.getClientId(), view.getSubscriptionName());
475            if (addToInactive) {
476                SubscriptionInfo info = new SubscriptionInfo();
477                info.setClientId(subscriptionKey.getClientId());
478                info.setSubscriptionName(subscriptionKey.getSubscriptionName());
479                info.setDestination(new ActiveMQTopic(view.getDestinationName()));
480                info.setSelector(view.getSelector());
481                addInactiveSubscription(subscriptionKey, info, (brokerService.isKeepDurableSubsActive() ? view.subscription : null));
482            }
483        }
484    }
485
486    protected void buildExistingSubscriptions() throws Exception {
487        Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
488        Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
489        if (destinations != null) {
490            for (ActiveMQDestination dest : destinations) {
491                if (dest.isTopic()) {
492                    SubscriptionInfo[] infos = destinationFactory.getAllDurableSubscriptions((ActiveMQTopic)dest);
493                    if (infos != null) {
494                        for (int i = 0; i < infos.length; i++) {
495                            SubscriptionInfo info = infos[i];
496                            SubscriptionKey key = new SubscriptionKey(info);
497                            if (!alreadyKnown(key)) {
498                                LOG.debug("Restoring durable subscription MBean {}", info);
499                                subscriptions.put(key, info);
500                            }
501                        }
502                    }
503                }
504            }
505        }
506
507        for (Map.Entry<SubscriptionKey, SubscriptionInfo> entry : subscriptions.entrySet()) {
508            addInactiveSubscription(entry.getKey(), entry.getValue(), null);
509        }
510    }
511
512    private boolean alreadyKnown(SubscriptionKey key) {
513        boolean known = false;
514        known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
515        LOG.trace("Sub with key: {}, {} already registered", key, (known ? "": "not"));
516        return known;
517    }
518
519    protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
520        try {
521            ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
522            ObjectName objectName = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo);
523            SubscriptionView view = new InactiveDurableSubscriptionView(this, brokerService, key.getClientId(), info, subscription);
524
525            try {
526                if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName) != null) {
527                    registeredMBeans.add(objectName);
528                }
529            } catch (Throwable e) {
530                LOG.warn("Failed to register MBean {}", key);
531                LOG.debug("Failure reason: ", e);
532            }
533
534            inactiveDurableTopicSubscribers.put(objectName, view);
535            subscriptionKeys.put(key, objectName);
536        } catch (Exception e) {
537            LOG.error("Failed to register subscription {}", info, e);
538        }
539    }
540
541    public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
542        Message[] messages = getSubscriberMessages(view);
543        CompositeData c[] = new CompositeData[messages.length];
544        for (int i = 0; i < c.length; i++) {
545            try {
546                c[i] = OpenTypeSupport.convert(messages[i]);
547            } catch (Throwable e) {
548                LOG.error("Failed to browse: {}", view, e);
549            }
550        }
551        return c;
552    }
553
554    public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
555        OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
556        Message[] messages = getSubscriberMessages(view);
557        CompositeType ct = factory.getCompositeType();
558        TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
559        TabularDataSupport rc = new TabularDataSupport(tt);
560        for (int i = 0; i < messages.length; i++) {
561            rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
562        }
563        return rc;
564    }
565
566    public void remove(SubscriptionView view, String messageId)  throws Exception {
567        ActiveMQDestination destination = getTopicDestination(view);
568        if (destination != null) {
569            final Destination topic = getTopicRegion().getDestinationMap().get(destination);
570            final MessageAck messageAck = new MessageAck();
571            messageAck.setMessageID(new MessageId(messageId));
572            messageAck.setDestination(destination);
573
574            topic.getMessageStore().removeMessage(brokerService.getAdminConnectionContext(), messageAck);
575
576            // if sub is active, remove from cursor
577            if (view.subscription instanceof DurableTopicSubscription) {
578                final DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) view.subscription;
579                final MessageReference messageReference = new NullMessageReference();
580                messageReference.getMessage().setMessageId(messageAck.getFirstMessageId());
581                durableTopicSubscription.getPending().remove(messageReference);
582            }
583
584        } else {
585            throw new IllegalStateException("can't determine topic for sub:" + view);
586        }
587    }
588
589    protected Message[] getSubscriberMessages(SubscriptionView view) {
590        ActiveMQDestination destination = getTopicDestination(view);
591        if (destination != null) {
592            Destination topic = getTopicRegion().getDestinationMap().get(destination);
593            return topic.browse();
594
595        } else {
596            LOG.warn("can't determine topic to browse for sub:" + view);
597            return new Message[]{};
598        }
599    }
600
601    private ActiveMQDestination getTopicDestination(SubscriptionView view) {
602        ActiveMQDestination destination = null;
603        if (view.subscription instanceof DurableTopicSubscription) {
604            destination = new ActiveMQTopic(view.getDestinationName());
605        } else if (view instanceof InactiveDurableSubscriptionView) {
606            destination = ((InactiveDurableSubscriptionView)view).subscriptionInfo.getDestination();
607        }
608        return destination;
609    }
610
611    private ObjectName[] onlyNonSuppressed (Set<ObjectName> set){
612        List<ObjectName> nonSuppressed = new ArrayList<ObjectName>();
613        for(ObjectName key : set){
614            if (managementContext.isAllowedToRegister(key)){
615                nonSuppressed.add(key);
616            }
617        }
618        return nonSuppressed.toArray(new ObjectName[nonSuppressed.size()]);
619    }
620
621    protected ObjectName[] getTopics() {
622        Set<ObjectName> set = topics.keySet();
623        return set.toArray(new ObjectName[set.size()]);
624    }
625
626    protected ObjectName[] getTopicsNonSuppressed() {
627        return onlyNonSuppressed(topics.keySet());
628    }
629
630    protected ObjectName[] getQueues() {
631        Set<ObjectName> set = queues.keySet();
632        return set.toArray(new ObjectName[set.size()]);
633    }
634
635    protected ObjectName[] getQueuesNonSuppressed() {
636        return onlyNonSuppressed(queues.keySet());
637    }
638
639    protected ObjectName[] getTemporaryTopics() {
640        Set<ObjectName> set = temporaryTopics.keySet();
641        return set.toArray(new ObjectName[set.size()]);
642    }
643
644    protected ObjectName[] getTemporaryTopicsNonSuppressed() {
645        return onlyNonSuppressed(temporaryTopics.keySet());
646    }
647
648    protected ObjectName[] getTemporaryQueues() {
649        Set<ObjectName> set = temporaryQueues.keySet();
650        return set.toArray(new ObjectName[set.size()]);
651    }
652
653    protected ObjectName[] getTemporaryQueuesNonSuppressed() {
654        return onlyNonSuppressed(temporaryQueues.keySet());
655    }
656
657    protected ObjectName[] getTopicSubscribers() {
658        Set<ObjectName> set = topicSubscribers.keySet();
659        return set.toArray(new ObjectName[set.size()]);
660    }
661
662    protected ObjectName[] getTopicSubscribersNonSuppressed() {
663        return onlyNonSuppressed(topicSubscribers.keySet());
664    }
665
666    protected ObjectName[] getDurableTopicSubscribers() {
667        Set<ObjectName> set = durableTopicSubscribers.keySet();
668        return set.toArray(new ObjectName[set.size()]);
669    }
670
671    protected ObjectName[] getDurableTopicSubscribersNonSuppressed() {
672        return onlyNonSuppressed(durableTopicSubscribers.keySet());
673    }
674
675    protected ObjectName[] getQueueSubscribers() {
676        Set<ObjectName> set = queueSubscribers.keySet();
677        return set.toArray(new ObjectName[set.size()]);
678    }
679
680    protected ObjectName[] getQueueSubscribersNonSuppressed() {
681        return onlyNonSuppressed(queueSubscribers.keySet());
682    }
683
684    protected ObjectName[] getTemporaryTopicSubscribers() {
685        Set<ObjectName> set = temporaryTopicSubscribers.keySet();
686        return set.toArray(new ObjectName[set.size()]);
687    }
688
689    protected ObjectName[] getTemporaryTopicSubscribersNonSuppressed() {
690        return onlyNonSuppressed(temporaryTopicSubscribers.keySet());
691    }
692
693    protected ObjectName[] getTemporaryQueueSubscribers() {
694        Set<ObjectName> set = temporaryQueueSubscribers.keySet();
695        return set.toArray(new ObjectName[set.size()]);
696    }
697
698    protected ObjectName[] getTemporaryQueueSubscribersNonSuppressed() {
699        return onlyNonSuppressed(temporaryQueueSubscribers.keySet());
700    }
701
702    protected ObjectName[] getInactiveDurableTopicSubscribers() {
703        Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
704        return set.toArray(new ObjectName[set.size()]);
705    }
706
707    protected ObjectName[] getInactiveDurableTopicSubscribersNonSuppressed() {
708        return onlyNonSuppressed(inactiveDurableTopicSubscribers.keySet());
709    }
710
711    protected ObjectName[] getTopicProducers() {
712        Set<ObjectName> set = topicProducers.keySet();
713        return set.toArray(new ObjectName[set.size()]);
714    }
715
716    protected ObjectName[] getTopicProducersNonSuppressed() {
717        return onlyNonSuppressed(topicProducers.keySet());
718    }
719
720    protected ObjectName[] getQueueProducers() {
721        Set<ObjectName> set = queueProducers.keySet();
722        return set.toArray(new ObjectName[set.size()]);
723    }
724
725    protected ObjectName[] getQueueProducersNonSuppressed() {
726        return onlyNonSuppressed(queueProducers.keySet());
727    }
728
729    protected ObjectName[] getTemporaryTopicProducers() {
730        Set<ObjectName> set = temporaryTopicProducers.keySet();
731        return set.toArray(new ObjectName[set.size()]);
732    }
733
734    protected ObjectName[] getTemporaryTopicProducersNonSuppressed() {
735        return onlyNonSuppressed(temporaryTopicProducers.keySet());
736    }
737
738    protected ObjectName[] getTemporaryQueueProducers() {
739        Set<ObjectName> set = temporaryQueueProducers.keySet();
740        return set.toArray(new ObjectName[set.size()]);
741    }
742
743    protected ObjectName[] getTemporaryQueueProducersNonSuppressed() {
744        return onlyNonSuppressed(temporaryQueueProducers.keySet());
745    }
746
747    protected ObjectName[] getDynamicDestinationProducers() {
748        Set<ObjectName> set = dynamicDestinationProducers.keySet();
749        return set.toArray(new ObjectName[set.size()]);
750    }
751
752    protected ObjectName[] getDynamicDestinationProducersNonSuppressed() {
753        return onlyNonSuppressed(dynamicDestinationProducers.keySet());
754    }
755
756    public Broker getContextBroker() {
757        return contextBroker;
758    }
759
760    public void setContextBroker(Broker contextBroker) {
761        this.contextBroker = contextBroker;
762    }
763
764    public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
765        ObjectName objectName = null;
766        try {
767            objectName = BrokerMBeanSupport.createAbortSlowConsumerStrategyName(brokerObjectName, strategy);
768            if (!registeredMBeans.contains(objectName))  {
769
770                AbortSlowConsumerStrategyView view = null;
771                if (strategy instanceof AbortSlowAckConsumerStrategy) {
772                    view = new AbortSlowAckConsumerStrategyView(this, (AbortSlowAckConsumerStrategy) strategy);
773                } else {
774                    view = new AbortSlowConsumerStrategyView(this, strategy);
775                }
776
777                if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName) != null) {
778                    registeredMBeans.add(objectName);
779                }
780            }
781        } catch (Exception e) {
782            LOG.warn("Failed to register MBean {}", strategy);
783            LOG.debug("Failure reason: ", e);
784        }
785        return objectName;
786    }
787
788    public void registerRecoveredTransactionMBean(XATransaction transaction) {
789        try {
790            ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
791            if (!registeredMBeans.contains(objectName))  {
792                RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
793                if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName) != null) {
794                    registeredMBeans.add(objectName);
795                }
796            }
797        } catch (Exception e) {
798            LOG.warn("Failed to register prepared transaction MBean {}", transaction);
799            LOG.debug("Failure reason: ", e);
800        }
801    }
802
803    public void unregister(XATransaction transaction) {
804        try {
805            ObjectName objectName = BrokerMBeanSupport.createXATransactionName(brokerObjectName, transaction);
806            if (registeredMBeans.remove(objectName)) {
807                try {
808                    managementContext.unregisterMBean(objectName);
809                } catch (Throwable e) {
810                    LOG.warn("Failed to unregister MBean {}", objectName);
811                    LOG.debug("Failure reason: ", e);
812                }
813            }
814        } catch (Exception e) {
815            LOG.warn("Failed to create object name to unregister {}", transaction, e);
816        }
817    }
818
819    public ObjectName getSubscriberObjectName(Subscription key) {
820        return subscriptionMap.get(key);
821    }
822
823    public Subscription getSubscriber(ObjectName key) {
824        Subscription sub = null;
825        for (Entry<Subscription, ObjectName> entry: subscriptionMap.entrySet()) {
826            if (entry.getValue().equals(key)) {
827                sub = entry.getKey();
828                break;
829            }
830        }
831        return sub;
832    }
833
834    public Map<ObjectName, DestinationView> getQueueViews() {
835        return queues;
836    }
837
838    public Map<ObjectName, DestinationView> getTopicViews() {
839        return topics;
840    }
841
842    public DestinationView getQueueView(String queueName) throws MalformedObjectNameException {
843        ObjectName objName = BrokerMBeanSupport.createDestinationName(brokerObjectName.toString(), "Queue", queueName);
844        return queues.get(objName);
845    }
846
847    public Set<ObjectName> getRegisteredMbeans() {
848        return registeredMBeans;
849    }
850}