001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.advisory;
018
019import java.util.ArrayList;
020
021import javax.jms.Destination;
022import javax.jms.JMSException;
023
024import org.apache.activemq.ActiveMQMessageTransformation;
025import org.apache.activemq.command.ActiveMQDestination;
026import org.apache.activemq.command.ActiveMQTopic;
027
028public final class AdvisorySupport {
029    public static final String ADVISORY_TOPIC_PREFIX = "ActiveMQ.Advisory.";
030    public static final ActiveMQTopic CONNECTION_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX
031            + "Connection");
032    public static final ActiveMQTopic QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Queue");
033    public static final ActiveMQTopic TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "Topic");
034    public static final ActiveMQTopic TEMP_QUEUE_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempQueue");
035    public static final ActiveMQTopic TEMP_TOPIC_ADVISORY_TOPIC = new ActiveMQTopic(ADVISORY_TOPIC_PREFIX + "TempTopic");
036    public static final String PRODUCER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Producer.";
037    public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue.";
038    public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic.";
039    public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer.";
040    public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue.";
041    public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic.";
042    public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic.";
043    public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue.";
044    public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
045    public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
046    public static final String SLOW_CONSUMER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "SlowConsumer.";
047    public static final String FAST_PRODUCER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FastProducer.";
048    public static final String MESSAGE_DISCAREDED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDiscarded.";
049    public static final String FULL_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "FULL.";
050    public static final String MESSAGE_DELIVERED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDelivered.";
051    public static final String MESSAGE_CONSUMED_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageConsumed.";
052    public static final String MESSAGE_DLQ_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MessageDLQd.";
053    public static final String MASTER_BROKER_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "MasterBroker";
054    public static final String NETWORK_BRIDGE_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NetworkBridge";
055    public static final String NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX = NETWORK_BRIDGE_TOPIC_PREFIX + ".ForwardFailure";
056    public static final String AGENT_TOPIC = "ActiveMQ.Agent";
057    public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
058    public static final String MSG_PROPERTY_ORIGIN_BROKER_ID = "originBrokerId";
059    public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME = "originBrokerName";
060    public static final String MSG_PROPERTY_ORIGIN_BROKER_URL = "originBrokerURL";
061    public static final String MSG_PROPERTY_USAGE_NAME = "usageName";
062    public static final String MSG_PROPERTY_USAGE_COUNT = "usageCount";
063
064    public static final String MSG_PROPERTY_CONSUMER_ID = "consumerId";
065    public static final String MSG_PROPERTY_PRODUCER_ID = "producerId";
066    public static final String MSG_PROPERTY_MESSAGE_ID = "orignalMessageId";
067    public static final String MSG_PROPERTY_DESTINATION = "orignalDestination";
068    public static final String MSG_PROPERTY_CONSUMER_COUNT = "consumerCount";
069    public static final String MSG_PROPERTY_DISCARDED_COUNT = "discardedCount";
070
071    public static final ActiveMQTopic ALL_DESTINATIONS_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
072            TOPIC_ADVISORY_TOPIC.getPhysicalName() + "," + QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," +
073                    TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
074    public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(
075            TEMP_QUEUE_ADVISORY_TOPIC.getPhysicalName() + "," + TEMP_TOPIC_ADVISORY_TOPIC.getPhysicalName());
076    private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);
077
078    private AdvisorySupport() {
079    }
080
081    public static ActiveMQTopic getConnectionAdvisoryTopic() {
082        return CONNECTION_ADVISORY_TOPIC;
083    }
084
085    public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(Destination destination) throws JMSException {
086        return getAllDestinationAdvisoryTopics(ActiveMQMessageTransformation.transformDestination(destination));
087    }
088
089    public static ActiveMQTopic[] getAllDestinationAdvisoryTopics(ActiveMQDestination destination) throws JMSException {
090        ArrayList<ActiveMQTopic> result = new ArrayList<ActiveMQTopic>();
091
092        result.add(getConsumerAdvisoryTopic(destination));
093        result.add(getProducerAdvisoryTopic(destination));
094        result.add(getExpiredMessageTopic(destination));
095        result.add(getNoConsumersAdvisoryTopic(destination));
096        result.add(getSlowConsumerAdvisoryTopic(destination));
097        result.add(getFastProducerAdvisoryTopic(destination));
098        result.add(getMessageDiscardedAdvisoryTopic(destination));
099        result.add(getMessageDeliveredAdvisoryTopic(destination));
100        result.add(getMessageConsumedAdvisoryTopic(destination));
101        result.add(getMessageDLQdAdvisoryTopic(destination));
102        result.add(getFullAdvisoryTopic(destination));
103
104        return result.toArray(new ActiveMQTopic[0]);
105    }
106
107    public static ActiveMQTopic getConsumerAdvisoryTopic(Destination destination) throws JMSException {
108        return getConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
109    }
110
111    public static ActiveMQTopic getConsumerAdvisoryTopic(ActiveMQDestination destination) {
112        String prefix;
113        if (destination.isQueue()) {
114            prefix = QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX;
115        } else {
116            prefix = TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX;
117        }
118        return getAdvisoryTopic(destination, prefix, true);
119    }
120
121    public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException {
122        return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
123    }
124
125    public static ActiveMQTopic getProducerAdvisoryTopic(ActiveMQDestination destination) {
126        String prefix;
127        if (destination.isQueue()) {
128            prefix = QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX;
129        } else {
130            prefix = TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX;
131        }
132        return getAdvisoryTopic(destination, prefix, false);
133    }
134
135    private static ActiveMQTopic getAdvisoryTopic(ActiveMQDestination destination, String prefix, boolean consumerTopics) {
136        return new ActiveMQTopic(prefix + destination.getPhysicalName().replaceAll(",", "&sbquo;"));
137    }
138
139    public static ActiveMQTopic getExpiredMessageTopic(Destination destination) throws JMSException {
140        return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination));
141    }
142
143    public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
144        if (destination.isQueue()) {
145            return getExpiredQueueMessageAdvisoryTopic(destination);
146        }
147        return getExpiredTopicMessageAdvisoryTopic(destination);
148    }
149
150    public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
151        String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
152        return new ActiveMQTopic(name);
153    }
154
155    public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(Destination destination) throws JMSException {
156        return getExpiredQueueMessageAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
157    }
158
159    public static ActiveMQTopic getExpiredQueueMessageAdvisoryTopic(ActiveMQDestination destination) {
160        String name = EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + destination.getPhysicalName();
161        return new ActiveMQTopic(name);
162    }
163
164    public static ActiveMQTopic getNoConsumersAdvisoryTopic(Destination destination) throws JMSException {
165        return getExpiredMessageTopic(ActiveMQMessageTransformation.transformDestination(destination));
166    }
167
168    public static ActiveMQTopic getNoConsumersAdvisoryTopic(ActiveMQDestination destination) {
169        if (destination.isQueue()) {
170            return getNoQueueConsumersAdvisoryTopic(destination);
171        }
172        return getNoTopicConsumersAdvisoryTopic(destination);
173    }
174
175    public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(Destination destination) throws JMSException {
176        return getNoTopicConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
177    }
178
179    public static ActiveMQTopic getNoTopicConsumersAdvisoryTopic(ActiveMQDestination destination) {
180        String name = NO_TOPIC_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
181        return new ActiveMQTopic(name);
182    }
183
184    public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(Destination destination) throws JMSException {
185        return getNoQueueConsumersAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
186    }
187
188    public static ActiveMQTopic getNoQueueConsumersAdvisoryTopic(ActiveMQDestination destination) {
189        String name = NO_QUEUE_CONSUMERS_TOPIC_PREFIX + destination.getPhysicalName();
190        return new ActiveMQTopic(name);
191    }
192
193    public static ActiveMQTopic getSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
194        return getSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
195    }
196
197    public static ActiveMQTopic getSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
198        String name = SLOW_CONSUMER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
199                + destination.getPhysicalName();
200        return new ActiveMQTopic(name);
201    }
202
203    public static ActiveMQTopic getFastProducerAdvisoryTopic(Destination destination) throws JMSException {
204        return getFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
205    }
206
207    public static ActiveMQTopic getFastProducerAdvisoryTopic(ActiveMQDestination destination) {
208        String name = FAST_PRODUCER_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
209                + destination.getPhysicalName();
210        return new ActiveMQTopic(name);
211    }
212
213    public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
214        return getMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
215    }
216
217    public static ActiveMQTopic getMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
218        String name = MESSAGE_DISCAREDED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
219                + destination.getPhysicalName();
220        return new ActiveMQTopic(name);
221    }
222
223    public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
224        return getMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
225    }
226
227    public static ActiveMQTopic getMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
228        String name = MESSAGE_DELIVERED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
229                + destination.getPhysicalName();
230        return new ActiveMQTopic(name);
231    }
232
233    public static ActiveMQTopic getMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
234        return getMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
235    }
236
237    public static ActiveMQTopic getMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
238        String name = MESSAGE_CONSUMED_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
239                + destination.getPhysicalName();
240        return new ActiveMQTopic(name);
241    }
242
243    public static ActiveMQTopic getMessageDLQdAdvisoryTopic(ActiveMQDestination destination) {
244        String name = MESSAGE_DLQ_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
245                + destination.getPhysicalName();
246        return new ActiveMQTopic(name);
247    }
248
249    public static ActiveMQTopic getMasterBrokerAdvisoryTopic() {
250        return new ActiveMQTopic(MASTER_BROKER_TOPIC_PREFIX);
251    }
252
253    public static ActiveMQTopic getNetworkBridgeAdvisoryTopic() {
254        return new ActiveMQTopic(NETWORK_BRIDGE_TOPIC_PREFIX);
255    }
256
257    public static ActiveMQTopic getFullAdvisoryTopic(Destination destination) throws JMSException {
258        return getFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
259    }
260
261    public static ActiveMQTopic getFullAdvisoryTopic(ActiveMQDestination destination) {
262        String name = FULL_TOPIC_PREFIX + destination.getDestinationTypeAsString() + "."
263                + destination.getPhysicalName();
264        return new ActiveMQTopic(name);
265    }
266
267    public static ActiveMQTopic getDestinationAdvisoryTopic(Destination destination) throws JMSException {
268        return getDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
269    }
270
271    public static ActiveMQTopic getDestinationAdvisoryTopic(ActiveMQDestination destination) {
272        switch (destination.getDestinationType()) {
273            case ActiveMQDestination.QUEUE_TYPE:
274                return QUEUE_ADVISORY_TOPIC;
275            case ActiveMQDestination.TOPIC_TYPE:
276                return TOPIC_ADVISORY_TOPIC;
277            case ActiveMQDestination.TEMP_QUEUE_TYPE:
278                return TEMP_QUEUE_ADVISORY_TOPIC;
279            case ActiveMQDestination.TEMP_TOPIC_TYPE:
280                return TEMP_TOPIC_ADVISORY_TOPIC;
281            default:
282                throw new RuntimeException("Unknown destination type: " + destination.getDestinationType());
283        }
284    }
285
286    public static boolean isDestinationAdvisoryTopic(Destination destination) throws JMSException {
287        return isDestinationAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
288    }
289
290    public static boolean isTempDestinationAdvisoryTopic(ActiveMQDestination destination) {
291        if (destination.isComposite()) {
292            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
293            for (int i = 0; i < compositeDestinations.length; i++) {
294                if (!isTempDestinationAdvisoryTopic(compositeDestinations[i])) {
295                    return false;
296                }
297            }
298            return true;
299        } else {
300            return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC);
301        }
302    }
303
304    public static boolean isDestinationAdvisoryTopic(ActiveMQDestination destination) {
305        if (destination.isComposite()) {
306            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
307            for (int i = 0; i < compositeDestinations.length; i++) {
308                if (isDestinationAdvisoryTopic(compositeDestinations[i])) {
309                    return true;
310                }
311            }
312            return false;
313        } else {
314            return destination.equals(TEMP_QUEUE_ADVISORY_TOPIC) || destination.equals(TEMP_TOPIC_ADVISORY_TOPIC)
315                    || destination.equals(QUEUE_ADVISORY_TOPIC) || destination.equals(TOPIC_ADVISORY_TOPIC);
316        }
317    }
318
319    public static boolean isAdvisoryTopic(Destination destination) throws JMSException {
320        return isAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
321    }
322
323    public static boolean isAdvisoryTopic(ActiveMQDestination destination) {
324        if (destination != null) {
325            if (destination.isComposite()) {
326                ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
327                for (int i = 0; i < compositeDestinations.length; i++) {
328                    if (isAdvisoryTopic(compositeDestinations[i])) {
329                        return true;
330                    }
331                }
332                return false;
333            } else {
334                return destination.isTopic() && destination.getPhysicalName().startsWith(ADVISORY_TOPIC_PREFIX);
335            }
336        }
337        return false;
338    }
339
340    public static boolean isConnectionAdvisoryTopic(Destination destination) throws JMSException {
341        return isConnectionAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
342    }
343
344    public static boolean isConnectionAdvisoryTopic(ActiveMQDestination destination) {
345        if (destination.isComposite()) {
346            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
347            for (int i = 0; i < compositeDestinations.length; i++) {
348                if (isConnectionAdvisoryTopic(compositeDestinations[i])) {
349                    return true;
350                }
351            }
352            return false;
353        } else {
354            return destination.equals(CONNECTION_ADVISORY_TOPIC);
355        }
356    }
357
358    public static boolean isProducerAdvisoryTopic(Destination destination) throws JMSException {
359        return isProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
360    }
361
362    public static boolean isProducerAdvisoryTopic(ActiveMQDestination destination) {
363        if (destination.isComposite()) {
364            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
365            for (int i = 0; i < compositeDestinations.length; i++) {
366                if (isProducerAdvisoryTopic(compositeDestinations[i])) {
367                    return true;
368                }
369            }
370            return false;
371        } else {
372            return destination.isTopic() && destination.getPhysicalName().startsWith(PRODUCER_ADVISORY_TOPIC_PREFIX);
373        }
374    }
375
376    public static boolean isConsumerAdvisoryTopic(Destination destination) throws JMSException {
377        return isConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
378    }
379
380    public static boolean isConsumerAdvisoryTopic(ActiveMQDestination destination) {
381        if (destination.isComposite()) {
382            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
383            for (int i = 0; i < compositeDestinations.length; i++) {
384                if (isConsumerAdvisoryTopic(compositeDestinations[i])) {
385                    return true;
386                }
387            }
388            return false;
389        } else {
390            return destination.isTopic() && destination.getPhysicalName().startsWith(CONSUMER_ADVISORY_TOPIC_PREFIX);
391        }
392    }
393
394    public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException {
395        return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
396    }
397
398    public static boolean isSlowConsumerAdvisoryTopic(ActiveMQDestination destination) {
399        if (destination.isComposite()) {
400            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
401            for (int i = 0; i < compositeDestinations.length; i++) {
402                if (isSlowConsumerAdvisoryTopic(compositeDestinations[i])) {
403                    return true;
404                }
405            }
406            return false;
407        } else {
408            return destination.isTopic() && destination.getPhysicalName().startsWith(SLOW_CONSUMER_TOPIC_PREFIX);
409        }
410    }
411
412    public static boolean isFastProducerAdvisoryTopic(Destination destination) throws JMSException {
413        return isFastProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
414    }
415
416    public static boolean isFastProducerAdvisoryTopic(ActiveMQDestination destination) {
417        if (destination.isComposite()) {
418            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
419            for (int i = 0; i < compositeDestinations.length; i++) {
420                if (isFastProducerAdvisoryTopic(compositeDestinations[i])) {
421                    return true;
422                }
423            }
424            return false;
425        } else {
426            return destination.isTopic() && destination.getPhysicalName().startsWith(FAST_PRODUCER_TOPIC_PREFIX);
427        }
428    }
429
430    public static boolean isMessageConsumedAdvisoryTopic(Destination destination) throws JMSException {
431        return isMessageConsumedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
432    }
433
434    public static boolean isMessageConsumedAdvisoryTopic(ActiveMQDestination destination) {
435        if (destination.isComposite()) {
436            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
437            for (int i = 0; i < compositeDestinations.length; i++) {
438                if (isMessageConsumedAdvisoryTopic(compositeDestinations[i])) {
439                    return true;
440                }
441            }
442            return false;
443        } else {
444            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_CONSUMED_TOPIC_PREFIX);
445        }
446    }
447
448    public static boolean isMasterBrokerAdvisoryTopic(Destination destination) throws JMSException {
449        return isMasterBrokerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
450    }
451
452    public static boolean isMasterBrokerAdvisoryTopic(ActiveMQDestination destination) {
453        if (destination.isComposite()) {
454            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
455            for (int i = 0; i < compositeDestinations.length; i++) {
456                if (isMasterBrokerAdvisoryTopic(compositeDestinations[i])) {
457                    return true;
458                }
459            }
460            return false;
461        } else {
462            return destination.isTopic() && destination.getPhysicalName().startsWith(MASTER_BROKER_TOPIC_PREFIX);
463        }
464    }
465
466    public static boolean isMessageDeliveredAdvisoryTopic(Destination destination) throws JMSException {
467        return isMessageDeliveredAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
468    }
469
470    public static boolean isMessageDeliveredAdvisoryTopic(ActiveMQDestination destination) {
471        if (destination.isComposite()) {
472            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
473            for (int i = 0; i < compositeDestinations.length; i++) {
474                if (isMessageDeliveredAdvisoryTopic(compositeDestinations[i])) {
475                    return true;
476                }
477            }
478            return false;
479        } else {
480            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DELIVERED_TOPIC_PREFIX);
481        }
482    }
483
484    public static boolean isMessageDiscardedAdvisoryTopic(Destination destination) throws JMSException {
485        return isMessageDiscardedAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
486    }
487
488    public static boolean isMessageDiscardedAdvisoryTopic(ActiveMQDestination destination) {
489        if (destination.isComposite()) {
490            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
491            for (int i = 0; i < compositeDestinations.length; i++) {
492                if (isMessageDiscardedAdvisoryTopic(compositeDestinations[i])) {
493                    return true;
494                }
495            }
496            return false;
497        } else {
498            return destination.isTopic() && destination.getPhysicalName().startsWith(MESSAGE_DISCAREDED_TOPIC_PREFIX);
499        }
500    }
501
502    public static boolean isFullAdvisoryTopic(Destination destination) throws JMSException {
503        return isFullAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
504    }
505
506    public static boolean isFullAdvisoryTopic(ActiveMQDestination destination) {
507        if (destination.isComposite()) {
508            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
509            for (int i = 0; i < compositeDestinations.length; i++) {
510                if (isFullAdvisoryTopic(compositeDestinations[i])) {
511                    return true;
512                }
513            }
514            return false;
515        } else {
516            return destination.isTopic() && destination.getPhysicalName().startsWith(FULL_TOPIC_PREFIX);
517        }
518    }
519
520    public static boolean isNetworkBridgeAdvisoryTopic(Destination destination) throws JMSException {
521        return isNetworkBridgeAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination));
522    }
523
524    public static boolean isNetworkBridgeAdvisoryTopic(ActiveMQDestination destination) {
525        if (destination.isComposite()) {
526            ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations();
527            for (int i = 0; i < compositeDestinations.length; i++) {
528                if (isNetworkBridgeAdvisoryTopic(compositeDestinations[i])) {
529                    return true;
530                }
531            }
532            return false;
533        } else {
534            return destination.isTopic() && destination.getPhysicalName().startsWith(NETWORK_BRIDGE_TOPIC_PREFIX);
535        }
536    }
537
538    /**
539     * Returns the agent topic which is used to send commands to the broker
540     */
541    public static Destination getAgentDestination() {
542        return AGENT_TOPIC_DESTINATION;
543    }
544
545    public static ActiveMQTopic getNetworkBridgeForwardFailureAdvisoryTopic() {
546        return new ActiveMQTopic(NETWORK_BRIDGE_FORWARD_FAILURE_TOPIC_PREFIX);
547    }
548}