001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.apache.activemq; 019 020import java.util.Collections; 021import java.util.LinkedList; 022import java.util.List; 023import java.util.concurrent.CountDownLatch; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import javax.jms.ConnectionConsumer; 027import javax.jms.IllegalStateException; 028import javax.jms.JMSException; 029import javax.jms.ServerSession; 030import javax.jms.ServerSessionPool; 031import javax.jms.Session; 032 033import org.apache.activemq.command.ConsumerId; 034import org.apache.activemq.command.ConsumerInfo; 035import org.apache.activemq.command.MessageDispatch; 036 037/** 038 * For application servers, <CODE>Connection</CODE> objects provide a special 039 * facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The 040 * messages it is to consume are specified by a <CODE>Destination</CODE> and a 041 * message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be 042 * given a <CODE>ServerSessionPool</CODE> to use for processing its messages. 043 * <p/> 044 * <P> 045 * Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a 046 * <CODE>ServerSession</CODE> from its pool, loads it with a single message, 047 * and starts it. As traffic picks up, messages can back up. If this happens, a 048 * <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE> 049 * with more than one message. This reduces the thread context switches and 050 * minimizes resource use at the expense of some serialization of message 051 * processing. 052 * 053 * @see javax.jms.Connection#createConnectionConsumer 054 * @see javax.jms.Connection#createDurableConnectionConsumer 055 * @see javax.jms.QueueConnection#createConnectionConsumer 056 * @see javax.jms.TopicConnection#createConnectionConsumer 057 * @see javax.jms.TopicConnection#createDurableConnectionConsumer 058 */ 059 060public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQDispatcher { 061 062 private ActiveMQConnection connection; 063 private ServerSessionPool sessionPool; 064 private ConsumerInfo consumerInfo; 065 private boolean closed; 066 067 /** 068 * Create a ConnectionConsumer 069 * 070 * @param theConnection 071 * @param theSessionPool 072 * @param theConsumerInfo 073 * @throws JMSException 074 */ 075 protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo) throws JMSException { 076 this.connection = theConnection; 077 this.sessionPool = theSessionPool; 078 this.consumerInfo = theConsumerInfo; 079 080 this.connection.addConnectionConsumer(this); 081 this.connection.addDispatcher(consumerInfo.getConsumerId(), this); 082 this.connection.syncSendPacket(this.consumerInfo); 083 } 084 085 /** 086 * Gets the server session pool associated with this connection consumer. 087 * 088 * @return the server session pool used by this connection consumer 089 * @throws JMSException if the JMS provider fails to get the server session 090 * pool associated with this consumer due to some internal 091 * error. 092 */ 093 094 public ServerSessionPool getServerSessionPool() throws JMSException { 095 if (closed) { 096 throw new IllegalStateException("The Connection Consumer is closed"); 097 } 098 return this.sessionPool; 099 } 100 101 /** 102 * Closes the connection consumer. <p/> 103 * <P> 104 * Since a provider may allocate some resources on behalf of a connection 105 * consumer outside the Java virtual machine, clients should close these 106 * resources when they are not needed. Relying on garbage collection to 107 * eventually reclaim these resources may not be timely enough. 108 * 109 * @throws JMSException 110 */ 111 112 public void close() throws JMSException { 113 if (!closed) { 114 dispose(); 115 this.connection.asyncSendPacket(this.consumerInfo.createRemoveCommand()); 116 } 117 118 } 119 120 public void dispose() { 121 if (!closed) { 122 this.connection.removeDispatcher(consumerInfo.getConsumerId()); 123 this.connection.removeConnectionConsumer(this); 124 closed = true; 125 } 126 } 127 128 public void dispatch(MessageDispatch messageDispatch) { 129 try { 130 messageDispatch.setConsumer(this); 131 132 ServerSession serverSession = sessionPool.getServerSession(); 133 Session s = serverSession.getSession(); 134 ActiveMQSession session = null; 135 136 if (s instanceof ActiveMQSession) { 137 session = (ActiveMQSession)s; 138 } else if (s instanceof ActiveMQTopicSession) { 139 ActiveMQTopicSession topicSession = (ActiveMQTopicSession)s; 140 session = (ActiveMQSession)topicSession.getNext(); 141 } else if (s instanceof ActiveMQQueueSession) { 142 ActiveMQQueueSession queueSession = (ActiveMQQueueSession)s; 143 session = (ActiveMQSession)queueSession.getNext(); 144 } else { 145 connection.onClientInternalException(new JMSException("Session pool provided an invalid session type: " + s.getClass())); 146 return; 147 } 148 149 session.dispatch(messageDispatch); 150 serverSession.start(); 151 } catch (JMSException e) { 152 connection.onAsyncException(e); 153 } 154 } 155 156 public String toString() { 157 return "ActiveMQConnectionConsumer { value=" + consumerInfo.getConsumerId() + " }"; 158 } 159 160 public void clearMessagesInProgress(AtomicInteger transportInterruptionProcessingComplete) { 161 // future: may want to deal with rollback of in progress messages to track re deliveries 162 // before indicating that all is complete. 163 } 164 165 public ConsumerInfo getConsumerInfo() { 166 return consumerInfo; 167 } 168}