001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.IOException; 020import java.util.Set; 021 022import javax.jms.InvalidSelectorException; 023import javax.management.ObjectName; 024 025import org.apache.activemq.broker.BrokerService; 026import org.apache.activemq.broker.ConnectionContext; 027import org.apache.activemq.broker.region.Subscription; 028import org.apache.activemq.command.ActiveMQDestination; 029import org.apache.activemq.command.ActiveMQQueue; 030import org.apache.activemq.command.ActiveMQTopic; 031import org.apache.activemq.command.ConsumerInfo; 032import org.apache.activemq.filter.DestinationFilter; 033import org.apache.activemq.util.IOExceptionSupport; 034 035/** 036 * 037 */ 038public class SubscriptionView implements SubscriptionViewMBean { 039 040 protected final Subscription subscription; 041 protected final String clientId; 042 protected final String userName; 043 044 /** 045 * Constructor 046 * 047 * @param subs 048 */ 049 public SubscriptionView(String clientId, String userName, Subscription subs) { 050 this.clientId = clientId; 051 this.subscription = subs; 052 this.userName = userName; 053 } 054 055 /** 056 * @return the clientId 057 */ 058 @Override 059 public String getClientId() { 060 return clientId; 061 } 062 063 /** 064 * @returns the ObjectName of the Connection that created this subscription 065 */ 066 @Override 067 public ObjectName getConnection() { 068 ObjectName result = null; 069 070 if (clientId != null && subscription != null) { 071 ConnectionContext ctx = subscription.getContext(); 072 if (ctx != null && ctx.getBroker() != null && ctx.getBroker().getBrokerService() != null) { 073 BrokerService service = ctx.getBroker().getBrokerService(); 074 ManagementContext managementCtx = service.getManagementContext(); 075 if (managementCtx != null) { 076 077 try { 078 ObjectName query = createConnectionQuery(managementCtx, service.getBrokerName()); 079 Set<ObjectName> names = managementCtx.queryNames(query, null); 080 if (names.size() == 1) { 081 result = names.iterator().next(); 082 } 083 } catch (Exception e) { 084 } 085 } 086 } 087 } 088 return result; 089 } 090 091 092 093 private ObjectName createConnectionQuery(ManagementContext ctx, String brokerName) throws IOException { 094 try { 095 return BrokerMBeanSupport.createConnectionQuery(ctx.getJmxDomainName(), brokerName, clientId); 096 } catch (Throwable e) { 097 throw IOExceptionSupport.create(e); 098 } 099 } 100 101 /** 102 * @return the id of the Connection the Subscription is on 103 */ 104 @Override 105 public String getConnectionId() { 106 ConsumerInfo info = getConsumerInfo(); 107 if (info != null) { 108 return info.getConsumerId().getConnectionId(); 109 } 110 return "NOTSET"; 111 } 112 113 /** 114 * @return the id of the Session the subscription is on 115 */ 116 @Override 117 public long getSessionId() { 118 ConsumerInfo info = getConsumerInfo(); 119 if (info != null) { 120 return info.getConsumerId().getSessionId(); 121 } 122 return 0; 123 } 124 125 /** 126 * @return the id of the Subscription 127 */ 128 @Deprecated 129 @Override 130 public long getSubcriptionId() { 131 return getSubscriptionId(); 132 } 133 134 /** 135 * @return the id of the Subscription 136 */ 137 @Override 138 public long getSubscriptionId() { 139 ConsumerInfo info = getConsumerInfo(); 140 if (info != null) { 141 return info.getConsumerId().getValue(); 142 } 143 return 0; 144 } 145 146 /** 147 * @return the destination name 148 */ 149 @Override 150 public String getDestinationName() { 151 ConsumerInfo info = getConsumerInfo(); 152 if (info != null) { 153 ActiveMQDestination dest = info.getDestination(); 154 return dest.getPhysicalName(); 155 } 156 return "NOTSET"; 157 } 158 159 @Override 160 public String getSelector() { 161 if (subscription != null) { 162 return subscription.getSelector(); 163 } 164 return null; 165 } 166 167 @Override 168 public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException { 169 if (subscription != null) { 170 subscription.setSelector(selector); 171 } else { 172 throw new UnsupportedOperationException("No subscription object"); 173 } 174 } 175 176 /** 177 * @return true if the destination is a Queue 178 */ 179 @Override 180 public boolean isDestinationQueue() { 181 ConsumerInfo info = getConsumerInfo(); 182 if (info != null) { 183 ActiveMQDestination dest = info.getDestination(); 184 return dest.isQueue(); 185 } 186 return false; 187 } 188 189 /** 190 * @return true of the destination is a Topic 191 */ 192 @Override 193 public boolean isDestinationTopic() { 194 ConsumerInfo info = getConsumerInfo(); 195 if (info != null) { 196 ActiveMQDestination dest = info.getDestination(); 197 return dest.isTopic(); 198 } 199 return false; 200 } 201 202 /** 203 * @return true if the destination is temporary 204 */ 205 @Override 206 public boolean isDestinationTemporary() { 207 ConsumerInfo info = getConsumerInfo(); 208 if (info != null) { 209 ActiveMQDestination dest = info.getDestination(); 210 return dest.isTemporary(); 211 } 212 return false; 213 } 214 215 /** 216 * @return true if the subscriber is active 217 */ 218 @Override 219 public boolean isActive() { 220 return true; 221 } 222 223 @Override 224 public boolean isNetwork() { 225 ConsumerInfo info = getConsumerInfo(); 226 if (info != null) { 227 return info.isNetworkSubscription(); 228 } 229 return false; 230 } 231 232 /** 233 * The subscription should release as may references as it can to help the 234 * garbage collector reclaim memory. 235 */ 236 public void gc() { 237 if (subscription != null) { 238 subscription.gc(); 239 } 240 } 241 242 /** 243 * @return whether or not the subscriber is retroactive or not 244 */ 245 @Override 246 public boolean isRetroactive() { 247 ConsumerInfo info = getConsumerInfo(); 248 return info != null ? info.isRetroactive() : false; 249 } 250 251 /** 252 * @return whether or not the subscriber is an exclusive consumer 253 */ 254 @Override 255 public boolean isExclusive() { 256 ConsumerInfo info = getConsumerInfo(); 257 return info != null ? info.isExclusive() : false; 258 } 259 260 /** 261 * @return whether or not the subscriber is durable (persistent) 262 */ 263 @Override 264 public boolean isDurable() { 265 ConsumerInfo info = getConsumerInfo(); 266 return info != null ? info.isDurable() : false; 267 } 268 269 /** 270 * @return whether or not the subscriber ignores local messages 271 */ 272 @Override 273 public boolean isNoLocal() { 274 ConsumerInfo info = getConsumerInfo(); 275 return info != null ? info.isNoLocal() : false; 276 } 277 278 /** 279 * @return whether or not the subscriber is configured for async dispatch 280 */ 281 @Override 282 public boolean isDispatchAsync() { 283 ConsumerInfo info = getConsumerInfo(); 284 return info != null ? info.isDispatchAsync() : false; 285 } 286 287 /** 288 * @return the maximum number of pending messages allowed in addition to the 289 * prefetch size. If enabled to a non-zero value then this will 290 * perform eviction of messages for slow consumers on non-durable 291 * topics. 292 */ 293 @Override 294 public int getMaximumPendingMessageLimit() { 295 ConsumerInfo info = getConsumerInfo(); 296 return info != null ? info.getMaximumPendingMessageLimit() : 0; 297 } 298 299 /** 300 * @return the consumer priority 301 */ 302 @Override 303 public byte getPriority() { 304 ConsumerInfo info = getConsumerInfo(); 305 return info != null ? info.getPriority() : 0; 306 } 307 308 /** 309 * @return the name of the consumer which is only used for durable 310 * consumers. 311 */ 312 @Deprecated 313 @Override 314 public String getSubcriptionName() { 315 return getSubscriptionName(); 316 } 317 318 /** 319 * @return the name of the consumer which is only used for durable 320 * consumers. 321 */ 322 @Override 323 public String getSubscriptionName() { 324 ConsumerInfo info = getConsumerInfo(); 325 return info != null ? info.getSubscriptionName() : null; 326 } 327 328 /** 329 * @return number of messages pending delivery 330 */ 331 @Override 332 public int getPendingQueueSize() { 333 return subscription != null ? subscription.getPendingQueueSize() : 0; 334 } 335 336 /** 337 * @return number of messages dispatched 338 */ 339 @Override 340 public int getDispatchedQueueSize() { 341 return subscription != null ? subscription.getDispatchedQueueSize() : 0; 342 } 343 344 @Override 345 public int getMessageCountAwaitingAcknowledge() { 346 return getDispatchedQueueSize(); 347 } 348 349 /** 350 * @return number of messages that matched the subscription 351 */ 352 @Override 353 public long getDispatchedCounter() { 354 return subscription != null ? subscription.getDispatchedCounter() : 0; 355 } 356 357 /** 358 * @return number of messages that matched the subscription 359 */ 360 @Override 361 public long getEnqueueCounter() { 362 return subscription != null ? subscription.getEnqueueCounter() : 0; 363 } 364 365 /** 366 * @return number of messages queued by the client 367 */ 368 @Override 369 public long getDequeueCounter() { 370 return subscription != null ? subscription.getDequeueCounter() : 0; 371 } 372 373 protected ConsumerInfo getConsumerInfo() { 374 return subscription != null ? subscription.getConsumerInfo() : null; 375 } 376 377 /** 378 * @return pretty print 379 */ 380 @Override 381 public String toString() { 382 return "SubscriptionView: " + getClientId() + ":" + getConnectionId(); 383 } 384 385 /** 386 */ 387 @Override 388 public int getPrefetchSize() { 389 return subscription != null ? subscription.getPrefetchSize() : 0; 390 } 391 392 @Override 393 public boolean isMatchingQueue(String queueName) { 394 if (isDestinationQueue()) { 395 return matchesDestination(new ActiveMQQueue(queueName)); 396 } 397 return false; 398 } 399 400 @Override 401 public boolean isMatchingTopic(String topicName) { 402 if (isDestinationTopic()) { 403 return matchesDestination(new ActiveMQTopic(topicName)); 404 } 405 return false; 406 } 407 408 /** 409 * Return true if this subscription matches the given destination 410 * 411 * @param destination the destination to compare against 412 * @return true if this subscription matches the given destination 413 */ 414 public boolean matchesDestination(ActiveMQDestination destination) { 415 ActiveMQDestination subscriptionDestination = subscription.getActiveMQDestination(); 416 DestinationFilter filter = DestinationFilter.parseFilter(subscriptionDestination); 417 return filter.matches(destination); 418 } 419 420 @Override 421 public boolean isSlowConsumer() { 422 return subscription.isSlowConsumer(); 423 } 424 425 @Override 426 public String getUserName() { 427 return userName; 428 } 429 430 @Override 431 public void resetStatistics() { 432 if (subscription != null && subscription.getSubscriptionStatistics() != null){ 433 subscription.getSubscriptionStatistics().reset(); 434 } 435 } 436 437 @Override 438 public long getConsumedCount() { 439 return subscription != null ? subscription.getConsumedCount() : 0; 440 } 441}