001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.jmx;
018
019import java.io.IOException;
020import java.util.Set;
021
022import javax.jms.InvalidSelectorException;
023import javax.management.ObjectName;
024
025import org.apache.activemq.broker.BrokerService;
026import org.apache.activemq.broker.ConnectionContext;
027import org.apache.activemq.broker.region.Subscription;
028import org.apache.activemq.command.ActiveMQDestination;
029import org.apache.activemq.command.ActiveMQQueue;
030import org.apache.activemq.command.ActiveMQTopic;
031import org.apache.activemq.command.ConsumerInfo;
032import org.apache.activemq.filter.DestinationFilter;
033import org.apache.activemq.util.IOExceptionSupport;
034
035/**
036 *
037 */
038public class SubscriptionView implements SubscriptionViewMBean {
039
040    protected final Subscription subscription;
041    protected final String clientId;
042    protected final String userName;
043
044    /**
045     * Constructor
046     *
047     * @param subs
048     */
049    public SubscriptionView(String clientId, String userName, Subscription subs) {
050        this.clientId = clientId;
051        this.subscription = subs;
052        this.userName = userName;
053    }
054
055    /**
056     * @return the clientId
057     */
058    @Override
059    public String getClientId() {
060        return clientId;
061    }
062
063    /**
064     * @returns the ObjectName of the Connection that created this subscription
065     */
066    @Override
067    public ObjectName getConnection() {
068        ObjectName result = null;
069
070        if (clientId != null && subscription != null) {
071            ConnectionContext ctx = subscription.getContext();
072            if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) {
073                BrokerService service = ctx.getBroker().getBrokerService();
074                ManagementContext managementCtx = service.getManagementContext();
075                if (managementCtx != null) {
076
077                    try {
078                        ObjectName query = createConnectionQuery(managementCtx, service.getBrokerName());
079                        Set<ObjectName> names = managementCtx.queryNames(query, null);
080                        if (names.size() == 1) {
081                            result = names.iterator().next();
082                        }
083                    } catch (Exception e) {
084                    }
085                }
086            }
087        }
088        return result;
089    }
090
091
092
093    private ObjectName createConnectionQuery(ManagementContext ctx, String brokerName) throws IOException {
094        try {
095            return BrokerMBeanSupport.createConnectionQuery(ctx.getJmxDomainName(), brokerName, clientId);
096        } catch (Throwable e) {
097            throw IOExceptionSupport.create(e);
098        }
099    }
100
101    /**
102     * @return the id of the Connection the Subscription is on
103     */
104    @Override
105    public String getConnectionId() {
106        ConsumerInfo info = getConsumerInfo();
107        if (info != null) {
108            return info.getConsumerId().getConnectionId();
109        }
110        return "NOTSET";
111    }
112
113    /**
114     * @return the id of the Session the subscription is on
115     */
116    @Override
117    public long getSessionId() {
118        ConsumerInfo info = getConsumerInfo();
119        if (info != null) {
120            return info.getConsumerId().getSessionId();
121        }
122        return 0;
123    }
124
125    /**
126     * @return the id of the Subscription
127     */
128    @Deprecated
129    @Override
130    public long getSubcriptionId() {
131        return getSubscriptionId();
132    }
133
134    /**
135     * @return the id of the Subscription
136     */
137    @Override
138    public long getSubscriptionId() {
139        ConsumerInfo info = getConsumerInfo();
140        if (info != null) {
141            return info.getConsumerId().getValue();
142        }
143        return 0;
144    }
145
146    /**
147     * @return the destination name
148     */
149    @Override
150    public String getDestinationName() {
151        ConsumerInfo info = getConsumerInfo();
152        if (info != null) {
153            ActiveMQDestination dest = info.getDestination();
154            return dest.getPhysicalName();
155        }
156        return "NOTSET";
157    }
158
159    @Override
160    public String getSelector() {
161        if (subscription != null) {
162            return subscription.getSelector();
163        }
164        return null;
165    }
166
167    @Override
168    public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
169        if (subscription != null) {
170            subscription.setSelector(selector);
171        } else {
172            throw new UnsupportedOperationException("No subscription object");
173        }
174    }
175
176    /**
177     * @return true if the destination is a Queue
178     */
179    @Override
180    public boolean isDestinationQueue() {
181        ConsumerInfo info = getConsumerInfo();
182        if (info != null) {
183            ActiveMQDestination dest = info.getDestination();
184            return dest.isQueue();
185        }
186        return false;
187    }
188
189    /**
190     * @return true of the destination is a Topic
191     */
192    @Override
193    public boolean isDestinationTopic() {
194        ConsumerInfo info = getConsumerInfo();
195        if (info != null) {
196            ActiveMQDestination dest = info.getDestination();
197            return dest.isTopic();
198        }
199        return false;
200    }
201
202    /**
203     * @return true if the destination is temporary
204     */
205    @Override
206    public boolean isDestinationTemporary() {
207        ConsumerInfo info = getConsumerInfo();
208        if (info != null) {
209            ActiveMQDestination dest = info.getDestination();
210            return dest.isTemporary();
211        }
212        return false;
213    }
214
215    /**
216     * @return true if the subscriber is active
217     */
218    @Override
219    public boolean isActive() {
220        return true;
221    }
222
223    @Override
224    public boolean isNetwork() {
225        ConsumerInfo info = getConsumerInfo();
226        if (info != null) {
227            return info.isNetworkSubscription();
228        }
229        return false;
230    }
231
232    /**
233     * The subscription should release as may references as it can to help the
234     * garbage collector reclaim memory.
235     */
236    public void gc() {
237        if (subscription != null) {
238            subscription.gc();
239        }
240    }
241
242    /**
243     * @return whether or not the subscriber is retroactive or not
244     */
245    @Override
246    public boolean isRetroactive() {
247        ConsumerInfo info = getConsumerInfo();
248        return info != null ? info.isRetroactive() : false;
249    }
250
251    /**
252     * @return whether or not the subscriber is an exclusive consumer
253     */
254    @Override
255    public boolean isExclusive() {
256        ConsumerInfo info = getConsumerInfo();
257        return info != null ? info.isExclusive() : false;
258    }
259
260    /**
261     * @return whether or not the subscriber is durable (persistent)
262     */
263    @Override
264    public boolean isDurable() {
265        ConsumerInfo info = getConsumerInfo();
266        return info != null ? info.isDurable() : false;
267    }
268
269    /**
270     * @return whether or not the subscriber ignores local messages
271     */
272    @Override
273    public boolean isNoLocal() {
274        ConsumerInfo info = getConsumerInfo();
275        return info != null ? info.isNoLocal() : false;
276    }
277
278    /**
279     * @return whether or not the subscriber is configured for async dispatch
280     */
281    @Override
282    public boolean isDispatchAsync() {
283        ConsumerInfo info = getConsumerInfo();
284        return info != null ? info.isDispatchAsync() : false;
285    }
286
287    /**
288     * @return the maximum number of pending messages allowed in addition to the
289     *         prefetch size. If enabled to a non-zero value then this will
290     *         perform eviction of messages for slow consumers on non-durable
291     *         topics.
292     */
293    @Override
294    public int getMaximumPendingMessageLimit() {
295        ConsumerInfo info = getConsumerInfo();
296        return info != null ? info.getMaximumPendingMessageLimit() : 0;
297    }
298
299    /**
300     * @return the consumer priority
301     */
302    @Override
303    public byte getPriority() {
304        ConsumerInfo info = getConsumerInfo();
305        return info != null ? info.getPriority() : 0;
306    }
307
308    /**
309     * @return the name of the consumer which is only used for durable
310     *         consumers.
311     */
312    @Deprecated
313    @Override
314    public String getSubcriptionName() {
315        return getSubscriptionName();
316    }
317
318    /**
319     * @return the name of the consumer which is only used for durable
320     *         consumers.
321     */
322    @Override
323    public String getSubscriptionName() {
324        ConsumerInfo info = getConsumerInfo();
325        return info != null ? info.getSubscriptionName() : null;
326    }
327
328    /**
329     * @return number of messages pending delivery
330     */
331    @Override
332    public int getPendingQueueSize() {
333        return subscription != null ? subscription.getPendingQueueSize() : 0;
334    }
335
336    /**
337     * @return number of messages dispatched
338     */
339    @Override
340    public int getDispatchedQueueSize() {
341        return subscription != null ? subscription.getDispatchedQueueSize() : 0;
342    }
343
344    @Override
345    public int getMessageCountAwaitingAcknowledge() {
346        return getDispatchedQueueSize();
347    }
348
349    /**
350     * @return number of messages that matched the subscription
351     */
352    @Override
353    public long getDispatchedCounter() {
354        return subscription != null ? subscription.getDispatchedCounter() : 0;
355    }
356
357    /**
358     * @return number of messages that matched the subscription
359     */
360    @Override
361    public long getEnqueueCounter() {
362        return subscription != null ? subscription.getEnqueueCounter() : 0;
363    }
364
365    /**
366     * @return number of messages queued by the client
367     */
368    @Override
369    public long getDequeueCounter() {
370        return subscription != null ? subscription.getDequeueCounter() : 0;
371    }
372
373    protected ConsumerInfo getConsumerInfo() {
374        return subscription != null ? subscription.getConsumerInfo() : null;
375    }
376
377    /**
378     * @return pretty print
379     */
380    @Override
381    public String toString() {
382        return "SubscriptionView: " + getClientId() + ":" + getConnectionId();
383    }
384
385    /**
386     */
387    @Override
388    public int getPrefetchSize() {
389        return subscription != null ? subscription.getPrefetchSize() : 0;
390    }
391
392    @Override
393    public boolean isMatchingQueue(String queueName) {
394        if (isDestinationQueue()) {
395            return matchesDestination(new ActiveMQQueue(queueName));
396        }
397        return false;
398    }
399
400    @Override
401    public boolean isMatchingTopic(String topicName) {
402        if (isDestinationTopic()) {
403            return matchesDestination(new ActiveMQTopic(topicName));
404        }
405        return false;
406    }
407
408    /**
409     * Return true if this subscription matches the given destination
410     *
411     * @param destination the destination to compare against
412     * @return true if this subscription matches the given destination
413     */
414    public boolean matchesDestination(ActiveMQDestination destination) {
415        ActiveMQDestination subscriptionDestination = subscription.getActiveMQDestination();
416        DestinationFilter filter = DestinationFilter.parseFilter(subscriptionDestination);
417        return filter.matches(destination);
418    }
419
420    @Override
421    public boolean isSlowConsumer() {
422        return subscription.isSlowConsumer();
423    }
424
425    @Override
426    public String getUserName() {
427        return userName;
428    }
429
430    @Override
431    public void resetStatistics() {
432        if (subscription != null && subscription.getSubscriptionStatistics() != null){
433            subscription.getSubscriptionStatistics().reset();
434        }
435    }
436
437    @Override
438    public long getConsumedCount() {
439        return subscription != null ? subscription.getConsumedCount() : 0;
440    }
441}