001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.advisory; 018 019import java.util.Set; 020import java.util.concurrent.CopyOnWriteArraySet; 021import java.util.concurrent.atomic.AtomicBoolean; 022 023import javax.jms.Connection; 024import javax.jms.JMSException; 025import javax.jms.Message; 026import javax.jms.MessageConsumer; 027import javax.jms.MessageListener; 028import javax.jms.Session; 029 030import org.apache.activemq.command.ActiveMQDestination; 031import org.apache.activemq.command.ActiveMQMessage; 032import org.apache.activemq.command.ActiveMQQueue; 033import org.apache.activemq.command.ActiveMQTempQueue; 034import org.apache.activemq.command.ActiveMQTempTopic; 035import org.apache.activemq.command.ActiveMQTopic; 036import org.apache.activemq.command.DestinationInfo; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * A helper class which keeps track of the Destinations available in a broker and allows you to listen to them 042 * being created or deleted. 043 * 044 * 045 */ 046public class DestinationSource implements MessageListener { 047 private static final Logger LOG = LoggerFactory.getLogger(ConsumerEventSource.class); 048 private AtomicBoolean started = new AtomicBoolean(false); 049 private final Connection connection; 050 private Session session; 051 private MessageConsumer queueConsumer; 052 private MessageConsumer topicConsumer; 053 private MessageConsumer tempTopicConsumer; 054 private MessageConsumer tempQueueConsumer; 055 private Set<ActiveMQQueue> queues = new CopyOnWriteArraySet<ActiveMQQueue>(); 056 private Set<ActiveMQTopic> topics = new CopyOnWriteArraySet<ActiveMQTopic>(); 057 private Set<ActiveMQTempQueue> temporaryQueues = new CopyOnWriteArraySet<ActiveMQTempQueue>(); 058 private Set<ActiveMQTempTopic> temporaryTopics = new CopyOnWriteArraySet<ActiveMQTempTopic>(); 059 private DestinationListener listener; 060 061 public DestinationSource(Connection connection) throws JMSException { 062 this.connection = connection; 063 } 064 065 public DestinationListener getListener() { 066 return listener; 067 } 068 069 public void setDestinationListener(DestinationListener listener) { 070 this.listener = listener; 071 } 072 073 /** 074 * Returns the current queues available on the broker 075 */ 076 public Set<ActiveMQQueue> getQueues() { 077 return queues; 078 } 079 080 /** 081 * Returns the current topics on the broker 082 */ 083 public Set<ActiveMQTopic> getTopics() { 084 return topics; 085 } 086 087 /** 088 * Returns the current temporary topics available on the broker 089 */ 090 public Set<ActiveMQTempQueue> getTemporaryQueues() { 091 return temporaryQueues; 092 } 093 094 /** 095 * Returns the current temporary queues available on the broker 096 */ 097 public Set<ActiveMQTempTopic> getTemporaryTopics() { 098 return temporaryTopics; 099 } 100 101 public void start() throws JMSException { 102 if (started.compareAndSet(false, true)) { 103 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 104 queueConsumer = session.createConsumer(AdvisorySupport.QUEUE_ADVISORY_TOPIC); 105 queueConsumer.setMessageListener(this); 106 107 topicConsumer = session.createConsumer(AdvisorySupport.TOPIC_ADVISORY_TOPIC); 108 topicConsumer.setMessageListener(this); 109 110 tempQueueConsumer = session.createConsumer(AdvisorySupport.TEMP_QUEUE_ADVISORY_TOPIC); 111 tempQueueConsumer.setMessageListener(this); 112 113 tempTopicConsumer = session.createConsumer(AdvisorySupport.TEMP_TOPIC_ADVISORY_TOPIC); 114 tempTopicConsumer.setMessageListener(this); 115 } 116 } 117 118 public void stop() throws JMSException { 119 if (started.compareAndSet(true, false)) { 120 if (session != null) { 121 session.close(); 122 } 123 } 124 } 125 126 public void onMessage(Message message) { 127 if (message instanceof ActiveMQMessage) { 128 ActiveMQMessage activeMessage = (ActiveMQMessage) message; 129 Object command = activeMessage.getDataStructure(); 130 if (command instanceof DestinationInfo) { 131 DestinationInfo destinationInfo = (DestinationInfo) command; 132 DestinationEvent event = new DestinationEvent(this, destinationInfo); 133 fireDestinationEvent(event); 134 } 135 else { 136 LOG.warn("Unknown dataStructure: " + command); 137 } 138 } 139 else { 140 LOG.warn("Unknown message type: " + message + ". Message ignored"); 141 } 142 } 143 144 protected void fireDestinationEvent(DestinationEvent event) { 145 // now lets update the data structures 146 ActiveMQDestination destination = event.getDestination(); 147 boolean add = event.isAddOperation(); 148 if (destination instanceof ActiveMQQueue) { 149 ActiveMQQueue queue = (ActiveMQQueue) destination; 150 if (add) { 151 queues.add(queue); 152 } 153 else { 154 queues.remove(queue); 155 } 156 } 157 else if (destination instanceof ActiveMQTopic) { 158 ActiveMQTopic topic = (ActiveMQTopic) destination; 159 if (add) { 160 topics.add(topic); 161 } 162 else { 163 topics.remove(topic); 164 } 165 } 166 else if (destination instanceof ActiveMQTempQueue) { 167 ActiveMQTempQueue queue = (ActiveMQTempQueue) destination; 168 if (add) { 169 temporaryQueues.add(queue); 170 } 171 else { 172 temporaryQueues.remove(queue); 173 } 174 } 175 else if (destination instanceof ActiveMQTempTopic) { 176 ActiveMQTempTopic topic = (ActiveMQTempTopic) destination; 177 if (add) { 178 temporaryTopics.add(topic); 179 } 180 else { 181 temporaryTopics.remove(topic); 182 } 183 } 184 else { 185 LOG.warn("Unknown destination type: " + destination); 186 } 187 if (listener != null) { 188 listener.onDestinationEvent(event); 189 } 190 } 191}