001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.jmx; 018 019import java.io.IOException; 020import java.net.URISyntaxException; 021import java.util.ArrayList; 022import java.util.Collections; 023import java.util.HashMap; 024import java.util.Iterator; 025import java.util.List; 026import java.util.Map; 027 028import javax.jms.Connection; 029import javax.jms.InvalidSelectorException; 030import javax.jms.MessageProducer; 031import javax.jms.Session; 032import javax.management.MalformedObjectNameException; 033import javax.management.ObjectName; 034import javax.management.openmbean.CompositeData; 035import javax.management.openmbean.CompositeDataSupport; 036import javax.management.openmbean.CompositeType; 037import javax.management.openmbean.OpenDataException; 038import javax.management.openmbean.TabularData; 039import javax.management.openmbean.TabularDataSupport; 040import javax.management.openmbean.TabularType; 041import org.apache.activemq.ActiveMQConnectionFactory; 042import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; 043import org.apache.activemq.broker.region.Destination; 044import org.apache.activemq.broker.region.Subscription; 045import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy; 046import org.apache.activemq.broker.region.policy.SlowConsumerStrategy; 047import org.apache.activemq.command.ActiveMQDestination; 048import org.apache.activemq.command.ActiveMQMessage; 049import org.apache.activemq.command.ActiveMQTextMessage; 050import org.apache.activemq.command.Message; 051import org.apache.activemq.filter.BooleanExpression; 052import org.apache.activemq.filter.MessageEvaluationContext; 053import org.apache.activemq.selector.SelectorParser; 054import org.apache.activemq.util.URISupport; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058public class DestinationView implements DestinationViewMBean { 059 private static final Logger LOG = LoggerFactory.getLogger(DestinationViewMBean.class); 060 protected final Destination destination; 061 protected final ManagedRegionBroker broker; 062 063 public DestinationView(ManagedRegionBroker broker, Destination destination) { 064 this.broker = broker; 065 this.destination = destination; 066 } 067 068 public void gc() { 069 destination.gc(); 070 } 071 072 @Override 073 public String getName() { 074 return destination.getName(); 075 } 076 077 @Override 078 public void resetStatistics() { 079 destination.getDestinationStatistics().reset(); 080 } 081 082 @Override 083 public long getEnqueueCount() { 084 return destination.getDestinationStatistics().getEnqueues().getCount(); 085 } 086 087 @Override 088 public long getDequeueCount() { 089 return destination.getDestinationStatistics().getDequeues().getCount(); 090 } 091 092 @Override 093 public long getForwardCount() { 094 return destination.getDestinationStatistics().getForwards().getCount(); 095 } 096 097 @Override 098 public long getDispatchCount() { 099 return destination.getDestinationStatistics().getDispatched().getCount(); 100 } 101 102 @Override 103 public long getInFlightCount() { 104 return destination.getDestinationStatistics().getInflight().getCount(); 105 } 106 107 @Override 108 public long getExpiredCount() { 109 return destination.getDestinationStatistics().getExpired().getCount(); 110 } 111 112 @Override 113 public long getConsumerCount() { 114 return destination.getDestinationStatistics().getConsumers().getCount(); 115 } 116 117 @Override 118 public long getQueueSize() { 119 return destination.getDestinationStatistics().getMessages().getCount(); 120 } 121 122 public long getMessagesCached() { 123 return destination.getDestinationStatistics().getMessagesCached().getCount(); 124 } 125 126 @Override 127 public int getMemoryPercentUsage() { 128 return destination.getMemoryUsage().getPercentUsage(); 129 } 130 131 @Override 132 public long getMemoryUsageByteCount() { 133 return destination.getMemoryUsage().getUsage(); 134 } 135 136 @Override 137 public long getMemoryLimit() { 138 return destination.getMemoryUsage().getLimit(); 139 } 140 141 @Override 142 public void setMemoryLimit(long limit) { 143 destination.getMemoryUsage().setLimit(limit); 144 } 145 146 @Override 147 public double getAverageEnqueueTime() { 148 return destination.getDestinationStatistics().getProcessTime().getAverageTime(); 149 } 150 151 @Override 152 public long getMaxEnqueueTime() { 153 return destination.getDestinationStatistics().getProcessTime().getMaxTime(); 154 } 155 156 @Override 157 public long getMinEnqueueTime() { 158 return destination.getDestinationStatistics().getProcessTime().getMinTime(); 159 } 160 161 /** 162 * @return the average size of a message (bytes) 163 */ 164 public long getAverageMessageSize() { 165 // we are okay with the size without decimals so cast to long 166 return (long) destination.getDestinationStatistics().getMessageSize().getAverageSize(); 167 } 168 169 /** 170 * @return the max size of a message (bytes) 171 */ 172 public long getMaxMessageSize() { 173 return destination.getDestinationStatistics().getMessageSize().getMaxSize(); 174 } 175 176 /** 177 * @return the min size of a message (bytes) 178 */ 179 public long getMinMessageSize() { 180 return destination.getDestinationStatistics().getMessageSize().getMinSize(); 181 } 182 183 184 @Override 185 public boolean isPrioritizedMessages() { 186 return destination.isPrioritizedMessages(); 187 } 188 189 @Override 190 public CompositeData[] browse() throws OpenDataException { 191 try { 192 return browse(null); 193 } catch (InvalidSelectorException e) { 194 // should not happen. 195 throw new RuntimeException(e); 196 } 197 } 198 199 @Override 200 public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException { 201 Message[] messages = destination.browse(); 202 ArrayList<CompositeData> c = new ArrayList<CompositeData>(); 203 204 MessageEvaluationContext ctx = new MessageEvaluationContext(); 205 ctx.setDestination(destination.getActiveMQDestination()); 206 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 207 208 for (int i = 0; i < messages.length; i++) { 209 try { 210 211 if (selectorExpression == null) { 212 c.add(OpenTypeSupport.convert(messages[i])); 213 } else { 214 ctx.setMessageReference(messages[i]); 215 if (selectorExpression.matches(ctx)) { 216 c.add(OpenTypeSupport.convert(messages[i])); 217 } 218 } 219 220 } catch (Throwable e) { 221 // TODO DELETE ME 222 System.out.println(e); 223 e.printStackTrace(); 224 // TODO DELETE ME 225 LOG.warn("exception browsing destination", e); 226 } 227 } 228 229 CompositeData rc[] = new CompositeData[c.size()]; 230 c.toArray(rc); 231 return rc; 232 } 233 234 /** 235 * Browses the current destination returning a list of messages 236 */ 237 @Override 238 public List<Object> browseMessages() throws InvalidSelectorException { 239 return browseMessages(null); 240 } 241 242 /** 243 * Browses the current destination with the given selector returning a list 244 * of messages 245 */ 246 @Override 247 public List<Object> browseMessages(String selector) throws InvalidSelectorException { 248 Message[] messages = destination.browse(); 249 ArrayList<Object> answer = new ArrayList<Object>(); 250 251 MessageEvaluationContext ctx = new MessageEvaluationContext(); 252 ctx.setDestination(destination.getActiveMQDestination()); 253 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 254 255 for (int i = 0; i < messages.length; i++) { 256 try { 257 Message message = messages[i]; 258 message.setReadOnlyBody(true); 259 if (selectorExpression == null) { 260 answer.add(message); 261 } else { 262 ctx.setMessageReference(message); 263 if (selectorExpression.matches(ctx)) { 264 answer.add(message); 265 } 266 } 267 268 } catch (Throwable e) { 269 LOG.warn("exception browsing destination", e); 270 } 271 } 272 return answer; 273 } 274 275 @Override 276 public TabularData browseAsTable() throws OpenDataException { 277 try { 278 return browseAsTable(null); 279 } catch (InvalidSelectorException e) { 280 throw new RuntimeException(e); 281 } 282 } 283 284 @Override 285 public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException { 286 OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class); 287 Message[] messages = destination.browse(); 288 CompositeType ct = factory.getCompositeType(); 289 TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] { "JMSMessageID" }); 290 TabularDataSupport rc = new TabularDataSupport(tt); 291 292 MessageEvaluationContext ctx = new MessageEvaluationContext(); 293 ctx.setDestination(destination.getActiveMQDestination()); 294 BooleanExpression selectorExpression = selector == null ? null : SelectorParser.parse(selector); 295 296 for (int i = 0; i < messages.length; i++) { 297 try { 298 if (selectorExpression == null) { 299 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); 300 } else { 301 ctx.setMessageReference(messages[i]); 302 if (selectorExpression.matches(ctx)) { 303 rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i]))); 304 } 305 } 306 } catch (Throwable e) { 307 LOG.warn("exception browsing destination", e); 308 } 309 } 310 311 return rc; 312 } 313 314 @Override 315 public String sendTextMessageWithProperties(String properties) throws Exception { 316 String[] kvs = properties.split(","); 317 Map<String, String> props = new HashMap<String, String>(); 318 for (String kv : kvs) { 319 String[] it = kv.split("="); 320 if (it.length == 2) { 321 props.put(it[0],it[1]); 322 } 323 } 324 return sendTextMessage(props, props.remove("body"), props.remove("username"), props.remove("password")); 325 } 326 327 @Override 328 public String sendTextMessage(String body) throws Exception { 329 return sendTextMessage(Collections.EMPTY_MAP, body); 330 } 331 332 @Override 333 public String sendTextMessage(Map headers, String body) throws Exception { 334 return sendTextMessage(headers, body, null, null); 335 } 336 337 @Override 338 public String sendTextMessage(String body, String user, @Sensitive String password) throws Exception { 339 return sendTextMessage(Collections.EMPTY_MAP, body, user, password); 340 } 341 342 @Override 343 public String sendTextMessage(Map<String, String> headers, String body, String userName, @Sensitive String password) throws Exception { 344 345 String brokerUrl = "vm://" + broker.getBrokerName(); 346 ActiveMQDestination dest = destination.getActiveMQDestination(); 347 348 ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUrl); 349 Connection connection = null; 350 try { 351 352 connection = cf.createConnection(userName, password); 353 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 354 MessageProducer producer = session.createProducer(dest); 355 ActiveMQTextMessage msg = (ActiveMQTextMessage) session.createTextMessage(body); 356 357 for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) { 358 Map.Entry entry = (Map.Entry) iter.next(); 359 msg.setObjectProperty((String) entry.getKey(), entry.getValue()); 360 } 361 362 producer.setDeliveryMode(msg.getJMSDeliveryMode()); 363 producer.setPriority(msg.getPriority()); 364 long ttl = 0; 365 if (msg.getExpiration() != 0) { 366 ttl = msg.getExpiration() - System.currentTimeMillis(); 367 } else { 368 String timeToLive = headers.get("timeToLive"); 369 if (timeToLive != null) { 370 ttl = Integer.valueOf(timeToLive); 371 } 372 } 373 producer.setTimeToLive(ttl > 0 ? ttl : 0); 374 producer.send(msg); 375 376 return msg.getJMSMessageID(); 377 378 } finally { 379 connection.close(); 380 } 381 382 } 383 384 @Override 385 public int getMaxAuditDepth() { 386 return destination.getMaxAuditDepth(); 387 } 388 389 @Override 390 public int getMaxProducersToAudit() { 391 return destination.getMaxProducersToAudit(); 392 } 393 394 public boolean isEnableAudit() { 395 return destination.isEnableAudit(); 396 } 397 398 public void setEnableAudit(boolean enableAudit) { 399 destination.setEnableAudit(enableAudit); 400 } 401 402 @Override 403 public void setMaxAuditDepth(int maxAuditDepth) { 404 destination.setMaxAuditDepth(maxAuditDepth); 405 } 406 407 @Override 408 public void setMaxProducersToAudit(int maxProducersToAudit) { 409 destination.setMaxProducersToAudit(maxProducersToAudit); 410 } 411 412 @Override 413 public float getMemoryUsagePortion() { 414 return destination.getMemoryUsage().getUsagePortion(); 415 } 416 417 @Override 418 public long getProducerCount() { 419 return destination.getDestinationStatistics().getProducers().getCount(); 420 } 421 422 @Override 423 public boolean isProducerFlowControl() { 424 return destination.isProducerFlowControl(); 425 } 426 427 @Override 428 public void setMemoryUsagePortion(float value) { 429 destination.getMemoryUsage().setUsagePortion(value); 430 } 431 432 @Override 433 public void setProducerFlowControl(boolean producerFlowControl) { 434 destination.setProducerFlowControl(producerFlowControl); 435 } 436 437 @Override 438 public boolean isAlwaysRetroactive() { 439 return destination.isAlwaysRetroactive(); 440 } 441 442 @Override 443 public void setAlwaysRetroactive(boolean alwaysRetroactive) { 444 destination.setAlwaysRetroactive(alwaysRetroactive); 445 } 446 447 /** 448 * Set's the interval at which warnings about producers being blocked by 449 * resource usage will be triggered. Values of 0 or less will disable 450 * warnings 451 * 452 * @param blockedProducerWarningInterval the interval at which warning about 453 * blocked producers will be triggered. 454 */ 455 @Override 456 public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) { 457 destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval); 458 } 459 460 /** 461 * 462 * @return the interval at which warning about blocked producers will be 463 * triggered. 464 */ 465 @Override 466 public long getBlockedProducerWarningInterval() { 467 return destination.getBlockedProducerWarningInterval(); 468 } 469 470 @Override 471 public int getMaxPageSize() { 472 return destination.getMaxPageSize(); 473 } 474 475 @Override 476 public void setMaxPageSize(int pageSize) { 477 destination.setMaxPageSize(pageSize); 478 } 479 480 @Override 481 public boolean isUseCache() { 482 return destination.isUseCache(); 483 } 484 485 @Override 486 public void setUseCache(boolean value) { 487 destination.setUseCache(value); 488 } 489 490 @Override 491 public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException { 492 List<Subscription> subscriptions = destination.getConsumers(); 493 ObjectName[] answer = new ObjectName[subscriptions.size()]; 494 ObjectName brokerObjectName = broker.getBrokerService().getBrokerObjectName(); 495 int index = 0; 496 for (Subscription subscription : subscriptions) { 497 String connectionClientId = subscription.getContext().getClientId(); 498 answer[index++] = BrokerMBeanSupport.createSubscriptionName(brokerObjectName, connectionClientId, subscription.getConsumerInfo()); 499 } 500 return answer; 501 } 502 503 @Override 504 public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException { 505 ObjectName result = null; 506 SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy(); 507 if (strategy != null && strategy instanceof AbortSlowConsumerStrategy) { 508 result = broker.registerSlowConsumerStrategy((AbortSlowConsumerStrategy)strategy); 509 } 510 return result; 511 } 512 513 @Override 514 public String getOptions() { 515 Map<String, String> options = destination.getActiveMQDestination().getOptions(); 516 String optionsString = ""; 517 try { 518 if (options != null) { 519 optionsString = URISupport.createQueryString(options); 520 } 521 } catch (URISyntaxException ignored) {} 522 return optionsString; 523 } 524 525 @Override 526 public boolean isDLQ() { 527 return destination.getActiveMQDestination().isDLQ(); 528 } 529 530 @Override 531 public void setDLQ(boolean val) { 532 destination.getActiveMQDestination().setDLQ(val); 533 } 534 535 @Override 536 public long getBlockedSends() { 537 return destination.getDestinationStatistics().getBlockedSends().getCount(); 538 } 539 540 @Override 541 public double getAverageBlockedTime() { 542 return destination.getDestinationStatistics().getBlockedTime().getAverageTime(); 543 } 544 545 @Override 546 public long getTotalBlockedTime() { 547 return destination.getDestinationStatistics().getBlockedTime().getTotalTime(); 548 } 549 550}