001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.advisory; 018 019import java.util.ArrayList; 020 021import javax.jms.Destination; 022import javax.jms.JMSException; 023 024import org.apache.activemq.ActiveMQMessageTransformation; 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.ActiveMQTopic; 027 028public final class AdvisorySupport { 029 public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory."; 030 public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX 031 + "Connection"); 032 public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue"); 033 public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic"); 034 public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue"); 035 public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic"); 036 public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer."; 037 public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue."; 038 public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic."; 039 public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer."; 040 public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue."; 041 public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic."; 042 public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic."; 043 public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue."; 044 public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic."; 045 public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue."; 046 public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer."; 047 public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastProducer."; 048 public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded."; 049 public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL."; 050 public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered."; 051 public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed."; 052 public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd."; 053 public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker"; 054 public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge"; 055 public static final String NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX = NETWORK_BRIDGE_TOPIC_PREFIX + ".ForwardFailure"; 056 public static final String AGENT_TOPIC = "ActiveMQ.Agent"; 057 public static final String ADIVSORY_MESSAGE_TYPE = "Advisory"; 058 public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId"; 059 public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME = "originBrokerName"; 060 public static final String MSG_PROPERTY_ORIGIN_BROKER_URL = "originBrokerURL"; 061 public static final String MSG_PROPERTY_USAGE_NAME = "usageName"; 062 public static final String MSG_PROPERTY_USAGE_COUNT = "usageCount"; 063 064 public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId"; 065 public static final String MSG_PROPERTY_PRODUCER_ID = "producerId"; 066 public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId"; 067 public static final String MSG_PROPERTY_DESTINATION = "orignalDestination"; 068 public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount"; 069 public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount"; 070 071 public static final ActiveMQTopic ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic( 072 TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," + QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + 073 TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName()); 074 public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic( 075 TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName()); 076 private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC); 077 078 private AdvisorySupport() { 079 } 080 081 public static ActiveMQTopic getConnectionAdvisoryTopic() { 082 return CONNECTION_ADVISORY_TOPIC; 083 } 084 085 public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(Destination destination) throws JMSException { 086 return getAllDestinationAdvisoryTopics(ActiveMQMessageTransformation.transformDestination(destination)); 087 } 088 089 public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(ActiveMQDestination destination) throws JMSException { 090 ArrayList<ActiveMQTopic> result = new ArrayList<ActiveMQTopic>(); 091 092 result.add(getConsumerAdvisoryTopic(destination)); 093 result.add(getProducerAdvisoryTopic(destination)); 094 result.add(getExpiredMessageTopic(destination)); 095 result.add(getNoConsumersAdvisoryTopic(destination)); 096 result.add(getSlowConsumerAdvisoryTopic(destination)); 097 result.add(getFastProducerAdvisoryTopic(destination)); 098 result.add(getMessageDiscardedAdvisoryTopic(destination)); 099 result.add(getMessageDeliveredAdvisoryTopic(destination)); 100 result.add(getMessageConsumedAdvisoryTopic(destination)); 101 result.add(getMessageDLQdAdvisoryTopic(destination)); 102 result.add(getFullAdvisoryTopic(destination)); 103 104 return result.toArray(new ActiveMQTopic[0]); 105 } 106 107 public static ActiveMQTopic getConsumerAdvisoryTopic(Destination destination) throws JMSException { 108 return getConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 109 } 110 111 public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) { 112 String prefix; 113 if (destination.isQueue()) { 114 prefix = QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX; 115 } else { 116 prefix = TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX; 117 } 118 return getAdvisoryTopic(destination, prefix, true); 119 } 120 121 public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException { 122 return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 123 } 124 125 public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) { 126 String prefix; 127 if (destination.isQueue()) { 128 prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX; 129 } else { 130 prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX; 131 } 132 return getAdvisoryTopic(destination, prefix, false); 133 } 134 135 private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination destination, String prefix, boolean consumerTopics) { 136 return new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "‚")); 137 } 138 139 public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException { 140 return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination)); 141 } 142 143 public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) { 144 if (destination.isQueue()) { 145 return getExpiredQueueMessageAdvisoryTopic(destination); 146 } 147 return getExpiredTopicMessageAdvisoryTopic(destination); 148 } 149 150 public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) { 151 String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName(); 152 return new ActiveMQTopic(name); 153 } 154 155 public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(Destination destination) throws JMSException { 156 return getExpiredQueueMessageAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 157 } 158 159 public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) { 160 String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName(); 161 return new ActiveMQTopic(name); 162 } 163 164 public static ActiveMQTopic getNoConsumersAdvisoryTopic(Destination destination) throws JMSException { 165 return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination)); 166 } 167 168 public static ActiveMQTopic getNoConsumersAdvisoryTopic(ActiveMQDestination destination) { 169 if (destination.isQueue()) { 170 return getNoQueueConsumersAdvisoryTopic(destination); 171 } 172 return getNoTopicConsumersAdvisoryTopic(destination); 173 } 174 175 public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(Destination destination) throws JMSException { 176 return getNoTopicConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 177 } 178 179 public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) { 180 String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName(); 181 return new ActiveMQTopic(name); 182 } 183 184 public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(Destination destination) throws JMSException { 185 return getNoQueueConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 186 } 187 188 public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) { 189 String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName(); 190 return new ActiveMQTopic(name); 191 } 192 193 public static ActiveMQTopic getSlowConsumerAdvisoryTopic(Destination destination) throws JMSException { 194 return getSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 195 } 196 197 public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) { 198 String name = SLOW_CONSUMER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 199 + destination.getPhysicalName(); 200 return new ActiveMQTopic(name); 201 } 202 203 public static ActiveMQTopic getFastProducerAdvisoryTopic(Destination destination) throws JMSException { 204 return getFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 205 } 206 207 public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) { 208 String name = FAST_PRODUCER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 209 + destination.getPhysicalName(); 210 return new ActiveMQTopic(name); 211 } 212 213 public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException { 214 return getMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 215 } 216 217 public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) { 218 String name = MESSAGE_DISCAREDED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 219 + destination.getPhysicalName(); 220 return new ActiveMQTopic(name); 221 } 222 223 public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException { 224 return getMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 225 } 226 227 public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) { 228 String name = MESSAGE_DELIVERED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 229 + destination.getPhysicalName(); 230 return new ActiveMQTopic(name); 231 } 232 233 public static ActiveMQTopic getMessageConsumedAdvisoryTopic(Destination destination) throws JMSException { 234 return getMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 235 } 236 237 public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) { 238 String name = MESSAGE_CONSUMED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 239 + destination.getPhysicalName(); 240 return new ActiveMQTopic(name); 241 } 242 243 public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) { 244 String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 245 + destination.getPhysicalName(); 246 return new ActiveMQTopic(name); 247 } 248 249 public static ActiveMQTopic getMasterBrokerAdvisoryTopic() { 250 return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX); 251 } 252 253 public static ActiveMQTopic getNetworkBridgeAdvisoryTopic() { 254 return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX); 255 } 256 257 public static ActiveMQTopic getFullAdvisoryTopic(Destination destination) throws JMSException { 258 return getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 259 } 260 261 public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) { 262 String name = FULL_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "." 263 + destination.getPhysicalName(); 264 return new ActiveMQTopic(name); 265 } 266 267 public static ActiveMQTopic getDestinationAdvisoryTopic(Destination destination) throws JMSException { 268 return getDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 269 } 270 271 public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) { 272 switch (destination.getDestinationType()) { 273 case ActiveMQDestination.QUEUE_TYPE: 274 return QUEUE_ADVISORY_TOPIC; 275 case ActiveMQDestination.TOPIC_TYPE: 276 return TOPIC_ADVISORY_TOPIC; 277 case ActiveMQDestination.TEMP_QUEUE_TYPE: 278 return TEMP_QUEUE_ADVISORY_TOPIC; 279 case ActiveMQDestination.TEMP_TOPIC_TYPE: 280 return TEMP_TOPIC_ADVISORY_TOPIC; 281 default: 282 throw new RuntimeException("Unknown destination type: " + destination.getDestinationType()); 283 } 284 } 285 286 public static boolean isDestinationAdvisoryTopic(Destination destination) throws JMSException { 287 return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 288 } 289 290 public static boolean isTempDestinationAdvisoryTopic(ActiveMQDestination destination) { 291 if (destination.isComposite()) { 292 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 293 for (int i = 0; i < compositeDestinations.length; i++) { 294 if (!isTempDestinationAdvisoryTopic(compositeDestinations[i])) { 295 return false; 296 } 297 } 298 return true; 299 } else { 300 return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC); 301 } 302 } 303 304 public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) { 305 if (destination.isComposite()) { 306 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 307 for (int i = 0; i < compositeDestinations.length; i++) { 308 if (isDestinationAdvisoryTopic(compositeDestinations[i])) { 309 return true; 310 } 311 } 312 return false; 313 } else { 314 return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC) 315 || destination.equals(QUEUE_ADVISORY_TOPIC) || destination.equals(TOPIC_ADVISORY_TOPIC); 316 } 317 } 318 319 public static boolean isAdvisoryTopic(Destination destination) throws JMSException { 320 return isAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 321 } 322 323 public static boolean isAdvisoryTopic(ActiveMQDestination destination) { 324 if (destination != null) { 325 if (destination.isComposite()) { 326 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 327 for (int i = 0; i < compositeDestinations.length; i++) { 328 if (isAdvisoryTopic(compositeDestinations[i])) { 329 return true; 330 } 331 } 332 return false; 333 } else { 334 return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX); 335 } 336 } 337 return false; 338 } 339 340 public static boolean isConnectionAdvisoryTopic(Destination destination) throws JMSException { 341 return isConnectionAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 342 } 343 344 public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) { 345 if (destination.isComposite()) { 346 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 347 for (int i = 0; i < compositeDestinations.length; i++) { 348 if (isConnectionAdvisoryTopic(compositeDestinations[i])) { 349 return true; 350 } 351 } 352 return false; 353 } else { 354 return destination.equals(CONNECTION_ADVISORY_TOPIC); 355 } 356 } 357 358 public static boolean isProducerAdvisoryTopic(Destination destination) throws JMSException { 359 return isProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 360 } 361 362 public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) { 363 if (destination.isComposite()) { 364 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 365 for (int i = 0; i < compositeDestinations.length; i++) { 366 if (isProducerAdvisoryTopic(compositeDestinations[i])) { 367 return true; 368 } 369 } 370 return false; 371 } else { 372 return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX); 373 } 374 } 375 376 public static boolean isConsumerAdvisoryTopic(Destination destination) throws JMSException { 377 return isConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 378 } 379 380 public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) { 381 if (destination.isComposite()) { 382 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 383 for (int i = 0; i < compositeDestinations.length; i++) { 384 if (isConsumerAdvisoryTopic(compositeDestinations[i])) { 385 return true; 386 } 387 } 388 return false; 389 } else { 390 return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX); 391 } 392 } 393 394 public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException { 395 return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 396 } 397 398 public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) { 399 if (destination.isComposite()) { 400 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 401 for (int i = 0; i < compositeDestinations.length; i++) { 402 if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) { 403 return true; 404 } 405 } 406 return false; 407 } else { 408 return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX); 409 } 410 } 411 412 public static boolean isFastProducerAdvisoryTopic(Destination destination) throws JMSException { 413 return isFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 414 } 415 416 public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) { 417 if (destination.isComposite()) { 418 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 419 for (int i = 0; i < compositeDestinations.length; i++) { 420 if (isFastProducerAdvisoryTopic(compositeDestinations[i])) { 421 return true; 422 } 423 } 424 return false; 425 } else { 426 return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX); 427 } 428 } 429 430 public static boolean isMessageConsumedAdvisoryTopic(Destination destination) throws JMSException { 431 return isMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 432 } 433 434 public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) { 435 if (destination.isComposite()) { 436 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 437 for (int i = 0; i < compositeDestinations.length; i++) { 438 if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) { 439 return true; 440 } 441 } 442 return false; 443 } else { 444 return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX); 445 } 446 } 447 448 public static boolean isMasterBrokerAdvisoryTopic(Destination destination) throws JMSException { 449 return isMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 450 } 451 452 public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) { 453 if (destination.isComposite()) { 454 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 455 for (int i = 0; i < compositeDestinations.length; i++) { 456 if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) { 457 return true; 458 } 459 } 460 return false; 461 } else { 462 return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX); 463 } 464 } 465 466 public static boolean isMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException { 467 return isMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 468 } 469 470 public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) { 471 if (destination.isComposite()) { 472 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 473 for (int i = 0; i < compositeDestinations.length; i++) { 474 if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) { 475 return true; 476 } 477 } 478 return false; 479 } else { 480 return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX); 481 } 482 } 483 484 public static boolean isMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException { 485 return isMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 486 } 487 488 public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) { 489 if (destination.isComposite()) { 490 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 491 for (int i = 0; i < compositeDestinations.length; i++) { 492 if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) { 493 return true; 494 } 495 } 496 return false; 497 } else { 498 return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX); 499 } 500 } 501 502 public static boolean isFullAdvisoryTopic(Destination destination) throws JMSException { 503 return isFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 504 } 505 506 public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) { 507 if (destination.isComposite()) { 508 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 509 for (int i = 0; i < compositeDestinations.length; i++) { 510 if (isFullAdvisoryTopic(compositeDestinations[i])) { 511 return true; 512 } 513 } 514 return false; 515 } else { 516 return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX); 517 } 518 } 519 520 public static boolean isNetworkBridgeAdvisoryTopic(Destination destination) throws JMSException { 521 return isNetworkBridgeAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); 522 } 523 524 public static boolean isNetworkBridgeAdvisoryTopic(ActiveMQDestination destination) { 525 if (destination.isComposite()) { 526 ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); 527 for (int i = 0; i < compositeDestinations.length; i++) { 528 if (isNetworkBridgeAdvisoryTopic(compositeDestinations[i])) { 529 return true; 530 } 531 } 532 return false; 533 } else { 534 return destination.isTopic() && destination.getPhysicalName().startsWith(NETWORK_BRIDGE_TOPIC_PREFIX); 535 } 536 } 537 538 /** 539 * Returns the agent topic which is used to send commands to the broker 540 */ 541 public static Destination getAgentDestination() { 542 return AGENT_TOPIC_DESTINATION; 543 } 544 545 public static ActiveMQTopic getNetworkBridgeForwardFailureAdvisoryTopic() { 546 return new ActiveMQTopic(NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX); 547 } 548}