001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.advisory; 018 019import java.util.concurrent.atomic.AtomicBoolean; 020import java.util.concurrent.atomic.AtomicInteger; 021 022import javax.jms.Connection; 023import javax.jms.Destination; 024import javax.jms.JMSException; 025import javax.jms.Message; 026import javax.jms.MessageListener; 027import javax.jms.Session; 028 029import org.apache.activemq.ActiveMQMessageConsumer; 030import org.apache.activemq.Service; 031import org.apache.activemq.command.ActiveMQDestination; 032import org.apache.activemq.command.ActiveMQMessage; 033import org.apache.activemq.command.ActiveMQTopic; 034import org.apache.activemq.command.ConsumerId; 035import org.apache.activemq.command.ConsumerInfo; 036import org.apache.activemq.command.RemoveInfo; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * An object which can be used to listen to the number of active consumers 042 * available on a given destination. 043 * 044 * 045 */ 046public class ConsumerEventSource implements Service, MessageListener { 047 private static final Logger LOG = LoggerFactory.getLogger(ConsumerEventSource.class); 048 049 private final Connection connection; 050 private final ActiveMQDestination destination; 051 private ConsumerListener listener; 052 private AtomicBoolean started = new AtomicBoolean(false); 053 private AtomicInteger consumerCount = new AtomicInteger(); 054 private Session session; 055 private ActiveMQMessageConsumer consumer; 056 057 public ConsumerEventSource(Connection connection, Destination destination) throws JMSException { 058 this.connection = connection; 059 this.destination = ActiveMQDestination.transform(destination); 060 } 061 062 public void setConsumerListener(ConsumerListener listener) { 063 this.listener = listener; 064 } 065 066 public String getConsumerId() { 067 return consumer != null ? consumer.getConsumerId().toString() : "NOT_SET"; 068 } 069 070 public void start() throws Exception { 071 if (started.compareAndSet(false, true)) { 072 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 073 ActiveMQTopic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic(destination); 074 consumer = (ActiveMQMessageConsumer) session.createConsumer(advisoryTopic); 075 consumer.setMessageListener(this); 076 } 077 } 078 079 public void stop() throws Exception { 080 if (started.compareAndSet(true, false)) { 081 if (session != null) { 082 session.close(); 083 } 084 } 085 } 086 087 public void onMessage(Message message) { 088 if (message instanceof ActiveMQMessage) { 089 ActiveMQMessage activeMessage = (ActiveMQMessage)message; 090 Object command = activeMessage.getDataStructure(); 091 int count = 0; 092 if (command instanceof ConsumerInfo) { 093 count = consumerCount.incrementAndGet(); 094 count = extractConsumerCountFromMessage(message, count); 095 fireConsumerEvent(new ConsumerStartedEvent(this, destination, (ConsumerInfo)command, count)); 096 } else if (command instanceof RemoveInfo) { 097 RemoveInfo removeInfo = (RemoveInfo)command; 098 if (removeInfo.isConsumerRemove()) { 099 count = consumerCount.decrementAndGet(); 100 count = extractConsumerCountFromMessage(message, count); 101 fireConsumerEvent(new ConsumerStoppedEvent(this, destination, (ConsumerId)removeInfo.getObjectId(), count)); 102 } 103 } else { 104 LOG.warn("Unknown command: " + command); 105 } 106 } else { 107 LOG.warn("Unknown message type: " + message + ". Message ignored"); 108 } 109 } 110 111 /** 112 * Lets rely by default on the broker telling us what the consumer count is 113 * as it can ensure that we are up to date at all times and have not 114 * received messages out of order etc. 115 */ 116 protected int extractConsumerCountFromMessage(Message message, int count) { 117 try { 118 Object value = message.getObjectProperty("consumerCount"); 119 if (value instanceof Number) { 120 Number n = (Number)value; 121 return n.intValue(); 122 } 123 LOG.warn("No consumerCount header available on the message: " + message); 124 } catch (Exception e) { 125 LOG.warn("Failed to extract consumerCount from message: " + message + ".Reason: " + e, e); 126 } 127 return count; 128 } 129 130 protected void fireConsumerEvent(ConsumerEvent event) { 131 if (listener != null) { 132 listener.onConsumerEvent(event); 133 } 134 } 135 136}