001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.IOException; 020import java.net.URISyntaxException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027 028import javax.jms.Connection; 029import javax.jms.InvalidSelectorException; 030import javax.jms.MessageProducer; 031import javax.jms.Session; 032import javax.management.MalformedObjectNameException; 033import javax.management.ObjectName; 034import javax.management.openmbean.CompositeData; 035import javax.management.openmbean.CompositeDataSupport; 036import javax.management.openmbean.CompositeType; 037import javax.management.openmbean.OpenDataException; 038import javax.management.openmbean.TabularData; 039import javax.management.openmbean.TabularDataSupport; 040import javax.management.openmbean.TabularType; 041import org.apache.activemq.ActiveMQConnectionFactory; 042import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 043import org.apache.activemq.broker.region.Destination; 044import org.apache.activemq.broker.region.Subscription; 045import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; 046import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 047import org.apache.activemq.command.ActiveMQDestination; 048import org.apache.activemq.command.ActiveMQMessage; 049import org.apache.activemq.command.ActiveMQTextMessage; 050import org.apache.activemq.command.Message; 051import org.apache.activemq.filter.BooleanExpression; 052import org.apache.activemq.filter.NonCachedMessageEvaluationContext; 053import org.apache.activemq.selector.SelectorParser; 054import org.apache.activemq.util.URISupport; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058public class DestinationView implements DestinationViewMBean { 059 private static final Logger LOG = LoggerFactory.getLogger(DestinationViewMBean.class); 060 protected final Destination destination; 061 protected final ManagedRegionBroker broker; 062 063 public DestinationView(ManagedRegionBroker broker, Destination destination) { 064 this.broker = broker; 065 this.destination = destination; 066 } 067 068 public void gc() { 069 destination.gc(); 070 } 071 072 @Override 073 public String getName() { 074 return destination.getName(); 075 } 076 077 @Override 078 public void resetStatistics() { 079 destination.getDestinationStatistics().reset(); 080 } 081 082 @Override 083 public long getEnqueueCount() { 084 return destination.getDestinationStatistics().getEnqueues().getCount(); 085 } 086 087 @Override 088 public long getDequeueCount() { 089 return destination.getDestinationStatistics().getDequeues().getCount(); 090 } 091 092 @Override 093 public long getForwardCount() { 094 return destination.getDestinationStatistics().getForwards().getCount(); 095 } 096 097 @Override 098 public long getDispatchCount() { 099 return destination.getDestinationStatistics().getDispatched().getCount(); 100 } 101 102 @Override 103 public long getInFlightCount() { 104 return destination.getDestinationStatistics().getInflight().getCount(); 105 } 106 107 @Override 108 public long getExpiredCount() { 109 return destination.getDestinationStatistics().getExpired().getCount(); 110 } 111 112 @Override 113 public long getConsumerCount() { 114 return destination.getDestinationStatistics().getConsumers().getCount(); 115 } 116 117 @Override 118 public long getQueueSize() { 119 return destination.getDestinationStatistics().getMessages().getCount(); 120 } 121 122 public long getMessagesCached() { 123 return destination.getDestinationStatistics().getMessagesCached().getCount(); 124 } 125 126 @Override 127 public int getMemoryPercentUsage() { 128 return destination.getMemoryUsage().getPercentUsage(); 129 } 130 131 @Override 132 public long getMemoryUsageByteCount() { 133 return destination.getMemoryUsage().getUsage(); 134 } 135 136 @Override 137 public long getMemoryLimit() { 138 return destination.getMemoryUsage().getLimit(); 139 } 140 141 @Override 142 public void setMemoryLimit(long limit) { 143 destination.getMemoryUsage().setLimit(limit); 144 } 145 146 @Override 147 public double getAverageEnqueueTime() { 148 return destination.getDestinationStatistics().getProcessTime().getAverageTime(); 149 } 150 151 @Override 152 public long getMaxEnqueueTime() { 153 return destination.getDestinationStatistics().getProcessTime().getMaxTime(); 154 } 155 156 @Override 157 public long getMinEnqueueTime() { 158 return destination.getDestinationStatistics().getProcessTime().getMinTime(); 159 } 160 161 /** 162 * @return the average size of a message (bytes) 163 */ 164 @Override 165 public long getAverageMessageSize() { 166 // we are okay with the size without decimals so cast to long 167 return (long) destination.getDestinationStatistics().getMessageSize().getAverageSize(); 168 } 169 170 /** 171 * @return the max size of a message (bytes) 172 */ 173 @Override 174 public long getMaxMessageSize() { 175 return destination.getDestinationStatistics().getMessageSize().getMaxSize(); 176 } 177 178 /** 179 * @return the min size of a message (bytes) 180 */ 181 @Override 182 public long getMinMessageSize() { 183 return destination.getDestinationStatistics().getMessageSize().getMinSize(); 184 } 185 186 187 @Override 188 public boolean isPrioritizedMessages() { 189 return destination.isPrioritizedMessages(); 190 } 191 192 @Override 193 public CompositeData[] browse() throws OpenDataException { 194 try { 195 return browse(null); 196 } catch (InvalidSelectorException e) { 197 // should not happen. 198 throw new RuntimeException(e); 199 } 200 } 201 202 @Override 203 public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException { 204 Message[] messages = destination.browse(); 205 ArrayList<CompositeData> c = new ArrayList<CompositeData>(); 206 207 NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext(); 208 ctx.setDestination(destination.getActiveMQDestination()); 209 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 210 211 for (int i = 0; i < messages.length; i++) { 212 try { 213 214 if (selectorExpression == null) { 215 c.add(OpenTypeSupport.convert(messages[i])); 216 } else { 217 ctx.setMessageReference(messages[i]); 218 if (selectorExpression.matches(ctx)) { 219 c.add(OpenTypeSupport.convert(messages[i])); 220 } 221 } 222 223 } catch (Throwable e) { 224 LOG.warn("exception browsing destination", e); 225 } 226 } 227 228 CompositeData rc[] = new CompositeData[c.size()]; 229 c.toArray(rc); 230 return rc; 231 } 232 233 /** 234 * Browses the current destination returning a list of messages 235 */ 236 @Override 237 public List<Object> browseMessages() throws InvalidSelectorException { 238 return browseMessages(null); 239 } 240 241 /** 242 * Browses the current destination with the given selector returning a list 243 * of messages 244 */ 245 @Override 246 public List<Object> browseMessages(String selector) throws InvalidSelectorException { 247 Message[] messages = destination.browse(); 248 ArrayList<Object> answer = new ArrayList<Object>(); 249 250 NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext(); 251 ctx.setDestination(destination.getActiveMQDestination()); 252 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 253 254 for (int i = 0; i < messages.length; i++) { 255 try { 256 Message message = messages[i]; 257 message.setReadOnlyBody(true); 258 if (selectorExpression == null) { 259 answer.add(message); 260 } else { 261 ctx.setMessageReference(message); 262 if (selectorExpression.matches(ctx)) { 263 answer.add(message); 264 } 265 } 266 267 } catch (Throwable e) { 268 LOG.warn("exception browsing destination", e); 269 } 270 } 271 return answer; 272 } 273 274 @Override 275 public TabularData browseAsTable() throws OpenDataException { 276 try { 277 return browseAsTable(null); 278 } catch (InvalidSelectorException e) { 279 throw new RuntimeException(e); 280 } 281 } 282 283 @Override 284 public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException { 285 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); 286 Message[] messages = destination.browse(); 287 CompositeType ct = factory.getCompositeType(); 288 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" }); 289 TabularDataSupport rc = new TabularDataSupport(tt); 290 291 NonCachedMessageEvaluationContext ctx = new NonCachedMessageEvaluationContext(); 292 ctx.setDestination(destination.getActiveMQDestination()); 293 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 294 295 for (int i = 0; i < messages.length; i++) { 296 try { 297 if (selectorExpression == null) { 298 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); 299 } else { 300 ctx.setMessageReference(messages[i]); 301 if (selectorExpression.matches(ctx)) { 302 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); 303 } 304 } 305 } catch (Throwable e) { 306 LOG.warn("exception browsing destination", e); 307 } 308 } 309 310 return rc; 311 } 312 313 @Override 314 public String sendTextMessageWithProperties(String properties) throws Exception { 315 String[] kvs = properties.split(","); 316 Map<String, String> props = new HashMap<String, String>(); 317 for (String kv : kvs) { 318 String[] it = kv.split("="); 319 if (it.length == 2) { 320 props.put(it[0],it[1]); 321 } 322 } 323 return sendTextMessage(props, props.remove("body"), props.remove("username"), props.remove("password")); 324 } 325 326 @Override 327 public String sendTextMessage(String body) throws Exception { 328 return sendTextMessage(Collections.EMPTY_MAP, body); 329 } 330 331 @Override 332 public String sendTextMessage(Map headers, String body) throws Exception { 333 return sendTextMessage(headers, body, null, null); 334 } 335 336 @Override 337 public String sendTextMessage(String body, String user, @Sensitive String password) throws Exception { 338 return sendTextMessage(Collections.EMPTY_MAP, body, user, password); 339 } 340 341 @Override 342 public String sendTextMessage(Map<String, String> headers, String body, String userName, @Sensitive String password) throws Exception { 343 344 String brokerUrl = "vm://" + broker.getBrokerName(); 345 ActiveMQDestination dest = destination.getActiveMQDestination(); 346 347 ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl); 348 Connection connection = null; 349 try { 350 351 connection = cf.createConnection(userName, password); 352 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 353 MessageProducer producer = session.createProducer(dest); 354 ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body); 355 356 for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) { 357 Map.Entry entry = (Map.Entry) iter.next(); 358 msg.setObjectProperty((String) entry.getKey(), entry.getValue()); 359 } 360 361 producer.setDeliveryMode(msg.getJMSDeliveryMode()); 362 producer.setPriority(msg.getPriority()); 363 long ttl = 0; 364 if (msg.getExpiration() != 0) { 365 ttl = msg.getExpiration() - System.currentTimeMillis(); 366 } else { 367 String timeToLive = headers.get("timeToLive"); 368 if (timeToLive != null) { 369 ttl = Integer.valueOf(timeToLive); 370 } 371 } 372 producer.setTimeToLive(ttl > 0 ? ttl : 0); 373 producer.send(msg); 374 375 return msg.getJMSMessageID(); 376 377 } finally { 378 connection.close(); 379 } 380 381 } 382 383 @Override 384 public int getMaxAuditDepth() { 385 return destination.getMaxAuditDepth(); 386 } 387 388 @Override 389 public int getMaxProducersToAudit() { 390 return destination.getMaxProducersToAudit(); 391 } 392 393 public boolean isEnableAudit() { 394 return destination.isEnableAudit(); 395 } 396 397 public void setEnableAudit(boolean enableAudit) { 398 destination.setEnableAudit(enableAudit); 399 } 400 401 @Override 402 public void setMaxAuditDepth(int maxAuditDepth) { 403 destination.setMaxAuditDepth(maxAuditDepth); 404 } 405 406 @Override 407 public void setMaxProducersToAudit(int maxProducersToAudit) { 408 destination.setMaxProducersToAudit(maxProducersToAudit); 409 } 410 411 @Override 412 public float getMemoryUsagePortion() { 413 return destination.getMemoryUsage().getUsagePortion(); 414 } 415 416 @Override 417 public long getProducerCount() { 418 return destination.getDestinationStatistics().getProducers().getCount(); 419 } 420 421 @Override 422 public boolean isProducerFlowControl() { 423 return destination.isProducerFlowControl(); 424 } 425 426 @Override 427 public void setMemoryUsagePortion(float value) { 428 destination.getMemoryUsage().setUsagePortion(value); 429 } 430 431 @Override 432 public void setProducerFlowControl(boolean producerFlowControl) { 433 destination.setProducerFlowControl(producerFlowControl); 434 } 435 436 @Override 437 public boolean isAlwaysRetroactive() { 438 return destination.isAlwaysRetroactive(); 439 } 440 441 @Override 442 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 443 destination.setAlwaysRetroactive(alwaysRetroactive); 444 } 445 446 /** 447 * Set's the interval at which warnings about producers being blocked by 448 * resource usage will be triggered. Values of 0 or less will disable 449 * warnings 450 * 451 * @param blockedProducerWarningInterval the interval at which warning about 452 * blocked producers will be triggered. 453 */ 454 @Override 455 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 456 destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval); 457 } 458 459 /** 460 * 461 * @return the interval at which warning about blocked producers will be 462 * triggered. 463 */ 464 @Override 465 public long getBlockedProducerWarningInterval() { 466 return destination.getBlockedProducerWarningInterval(); 467 } 468 469 @Override 470 public int getMaxPageSize() { 471 return destination.getMaxPageSize(); 472 } 473 474 @Override 475 public void setMaxPageSize(int pageSize) { 476 destination.setMaxPageSize(pageSize); 477 } 478 479 @Override 480 public boolean isUseCache() { 481 return destination.isUseCache(); 482 } 483 484 @Override 485 public void setUseCache(boolean value) { 486 destination.setUseCache(value); 487 } 488 489 @Override 490 public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException { 491 List<Subscription> subscriptions = destination.getConsumers(); 492 ObjectName[] answer = new ObjectName[subscriptions.size()]; 493 ObjectName brokerObjectName = broker.getBrokerService().getBrokerObjectName(); 494 int index = 0; 495 for (Subscription subscription : subscriptions) { 496 String connectionClientId = subscription.getContext().getClientId(); 497 answer[index++] = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, subscription.getConsumerInfo()); 498 } 499 return answer; 500 } 501 502 @Override 503 public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException { 504 ObjectName result = null; 505 SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy(); 506 if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) { 507 result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy); 508 } 509 return result; 510 } 511 512 @Override 513 public String getOptions() { 514 Map<String, String> options = destination.getActiveMQDestination().getOptions(); 515 String optionsString = ""; 516 try { 517 if (options != null) { 518 optionsString = URISupport.createQueryString(options); 519 } 520 } catch (URISyntaxException ignored) {} 521 return optionsString; 522 } 523 524 @Override 525 public boolean isDLQ() { 526 return destination.getActiveMQDestination().isDLQ(); 527 } 528 529 @Override 530 public void setDLQ(boolean val) { 531 destination.getActiveMQDestination().setDLQ(val); 532 } 533 534 @Override 535 public long getBlockedSends() { 536 return destination.getDestinationStatistics().getBlockedSends().getCount(); 537 } 538 539 @Override 540 public double getAverageBlockedTime() { 541 return destination.getDestinationStatistics().getBlockedTime().getAverageTime(); 542 } 543 544 @Override 545 public long getTotalBlockedTime() { 546 return destination.getDestinationStatistics().getBlockedTime().getTotalTime(); 547 } 548 549}