001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.advisory; 018 019import java.util.concurrent.atomic.AtomicBoolean; 020import java.util.concurrent.atomic.AtomicInteger; 021 022import javax.jms.Connection; 023import javax.jms.Destination; 024import javax.jms.JMSException; 025import javax.jms.Message; 026import javax.jms.MessageConsumer; 027import javax.jms.MessageListener; 028import javax.jms.Session; 029 030import org.apache.activemq.Service; 031import org.apache.activemq.command.ActiveMQDestination; 032import org.apache.activemq.command.ActiveMQMessage; 033import org.apache.activemq.command.ActiveMQTopic; 034import org.apache.activemq.command.ProducerId; 035import org.apache.activemq.command.ProducerInfo; 036import org.apache.activemq.command.RemoveInfo; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * An object which can be used to listen to the number of active consumers 042 * available on a given destination. 043 * 044 * 045 */ 046public class ProducerEventSource implements Service, MessageListener { 047 private static final Logger LOG = LoggerFactory.getLogger(ProducerEventSource.class); 048 049 private final Connection connection; 050 private final ActiveMQDestination destination; 051 private ProducerListener listener; 052 private AtomicBoolean started = new AtomicBoolean(false); 053 private AtomicInteger producerCount = new AtomicInteger(); 054 private Session session; 055 private MessageConsumer consumer; 056 057 public ProducerEventSource(Connection connection, Destination destination) throws JMSException { 058 this.connection = connection; 059 this.destination = ActiveMQDestination.transform(destination); 060 } 061 062 public void setProducerListener(ProducerListener listener) { 063 this.listener = listener; 064 } 065 066 public void start() throws Exception { 067 if (started.compareAndSet(false, true)) { 068 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 069 ActiveMQTopic advisoryTopic = AdvisorySupport.getProducerAdvisoryTopic(destination); 070 consumer = session.createConsumer(advisoryTopic); 071 consumer.setMessageListener(this); 072 } 073 } 074 075 public void stop() throws Exception { 076 if (started.compareAndSet(true, false)) { 077 if (session != null) { 078 session.close(); 079 } 080 } 081 } 082 083 public void onMessage(Message message) { 084 if (message instanceof ActiveMQMessage) { 085 ActiveMQMessage activeMessage = (ActiveMQMessage)message; 086 Object command = activeMessage.getDataStructure(); 087 int count = 0; 088 if (command instanceof ProducerInfo) { 089 count = producerCount.incrementAndGet(); 090 count = extractProducerCountFromMessage(message, count); 091 fireProducerEvent(new ProducerStartedEvent(this, destination, (ProducerInfo)command, count)); 092 } else if (command instanceof RemoveInfo) { 093 RemoveInfo removeInfo = (RemoveInfo)command; 094 if (removeInfo.isProducerRemove()) { 095 count = producerCount.decrementAndGet(); 096 count = extractProducerCountFromMessage(message, count); 097 fireProducerEvent(new ProducerStoppedEvent(this, destination, (ProducerId)removeInfo.getObjectId(), count)); 098 } 099 } else { 100 LOG.warn("Unknown command: " + command); 101 } 102 } else { 103 LOG.warn("Unknown message type: " + message + ". Message ignored"); 104 } 105 } 106 107 protected int extractProducerCountFromMessage(Message message, int count) { 108 try { 109 Object value = message.getObjectProperty("producerCount"); 110 if (value instanceof Number) { 111 Number n = (Number)value; 112 return n.intValue(); 113 } 114 LOG.warn("No producerCount header available on the message: " + message); 115 } catch (Exception e) { 116 LOG.warn("Failed to extract producerCount from message: " + message + ".Reason: " + e, e); 117 } 118 return count; 119 } 120 121 protected void fireProducerEvent(ProducerEvent event) { 122 if (listener != null) { 123 listener.onProducerEvent(event); 124 } 125 } 126 127}