001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.util.Enumeration; 020import java.util.concurrent.atomic.AtomicBoolean; 021 022import javax.jms.*; 023import javax.jms.IllegalStateException; 024 025import org.apache.activemq.command.ActiveMQDestination; 026import org.apache.activemq.command.ConsumerId; 027import org.apache.activemq.command.MessageDispatch; 028import org.apache.activemq.selector.SelectorParser; 029 030/** 031 * A client uses a <CODE>QueueBrowser</CODE> object to look at messages on a 032 * queue without removing them. <p/> 033 * <P> 034 * The <CODE>getEnumeration</CODE> method returns a <CODE> 035 * java.util.Enumeration</CODE> 036 * that is used to scan the queue's messages. It may be an enumeration of the 037 * entire content of a queue, or it may contain only the messages matching a 038 * message selector. <p/> 039 * <P> 040 * Messages may be arriving and expiring while the scan is done. The JMS API 041 * does not require the content of an enumeration to be a static snapshot of 042 * queue content. Whether these changes are visible or not depends on the JMS 043 * provider. <p/> 044 * <P> 045 * A <CODE>QueueBrowser</CODE> can be created from either a <CODE>Session 046 * </CODE> 047 * or a <CODE>QueueSession</CODE>. 048 * 049 * @see javax.jms.Session#createBrowser 050 * @see javax.jms.QueueSession#createBrowser 051 * @see javax.jms.QueueBrowser 052 * @see javax.jms.QueueReceiver 053 */ 054 055public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration { 056 057 private final ActiveMQSession session; 058 private final ActiveMQDestination destination; 059 private final String selector; 060 061 private ActiveMQMessageConsumer consumer; 062 private boolean closed; 063 private final ConsumerId consumerId; 064 private final AtomicBoolean browseDone = new AtomicBoolean(true); 065 private final boolean dispatchAsync; 066 private Object semaphore = new Object(); 067 068 /** 069 * Constructor for an ActiveMQQueueBrowser - used internally 070 * @throws JMSException 071 */ 072 protected ActiveMQQueueBrowser(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination destination, String selector, boolean dispatchAsync) throws JMSException { 073 if (destination == null) { 074 throw new InvalidDestinationException("Don't understand null destinations"); 075 } else if (destination.getPhysicalName() == null) { 076 throw new InvalidDestinationException("The destination object was not given a physical name."); 077 } 078 if (selector != null && selector.trim().length() != 0) { 079 // Validate the selector 080 SelectorParser.parse(selector); 081 } 082 083 this.session = session; 084 this.consumerId = consumerId; 085 this.destination = destination; 086 this.selector = selector; 087 this.dispatchAsync = dispatchAsync; 088 } 089 090 /** 091 * @throws JMSException 092 */ 093 private ActiveMQMessageConsumer createConsumer() throws JMSException { 094 browseDone.set(false); 095 ActiveMQPrefetchPolicy prefetchPolicy = session.connection.getPrefetchPolicy(); 096 097 return new ActiveMQMessageConsumer(session, consumerId, destination, null, selector, prefetchPolicy.getQueueBrowserPrefetch(), prefetchPolicy 098 .getMaximumPendingMessageLimit(), false, true, dispatchAsync, null) { 099 public void dispatch(MessageDispatch md) { 100 if (md.getMessage() == null) { 101 browseDone.set(true); 102 } else { 103 super.dispatch(md); 104 } 105 notifyMessageAvailable(); 106 } 107 }; 108 } 109 110 private void destroyConsumer() { 111 if (consumer == null) { 112 return; 113 } 114 try { 115 if (session.getTransacted() && session.getTransactionContext().isInLocalTransaction()) { 116 session.commit(); 117 } 118 consumer.close(); 119 consumer = null; 120 } catch (JMSException e) { 121 e.printStackTrace(); 122 } 123 } 124 125 /** 126 * Gets an enumeration for browsing the current queue messages in the order 127 * they would be received. 128 * 129 * @return an enumeration for browsing the messages 130 * @throws JMSException if the JMS provider fails to get the enumeration for 131 * this browser due to some internal error. 132 */ 133 134 public Enumeration getEnumeration() throws JMSException { 135 checkClosed(); 136 if (consumer == null) { 137 consumer = createConsumer(); 138 } 139 return this; 140 } 141 142 private void checkClosed() throws IllegalStateException { 143 if (closed) { 144 throw new IllegalStateException("The Consumer is closed"); 145 } 146 } 147 148 /** 149 * @return true if more messages to process 150 */ 151 public boolean hasMoreElements() { 152 while (true) { 153 154 synchronized (this) { 155 if (consumer == null) { 156 return false; 157 } 158 } 159 160 if (consumer.getMessageSize() > 0) { 161 return true; 162 } 163 164 if (browseDone.get() || !session.isRunning()) { 165 destroyConsumer(); 166 return false; 167 } 168 169 waitForMessage(); 170 } 171 } 172 173 /** 174 * @return the next message 175 */ 176 public Object nextElement() { 177 while (true) { 178 179 synchronized (this) { 180 if (consumer == null) { 181 return null; 182 } 183 } 184 185 try { 186 javax.jms.Message answer = consumer.receiveNoWait(); 187 if (answer != null) { 188 return answer; 189 } 190 } catch (JMSException e) { 191 this.session.connection.onClientInternalException(e); 192 return null; 193 } 194 195 if (browseDone.get() || !session.isRunning()) { 196 destroyConsumer(); 197 return null; 198 } 199 200 waitForMessage(); 201 } 202 } 203 204 public synchronized void close() throws JMSException { 205 browseDone.set(true); 206 destroyConsumer(); 207 closed = true; 208 } 209 210 /** 211 * Gets the queue associated with this queue browser. 212 * 213 * @return the queue 214 * @throws JMSException if the JMS provider fails to get the queue 215 * associated with this browser due to some internal error. 216 */ 217 218 public Queue getQueue() throws JMSException { 219 return (Queue)destination; 220 } 221 222 public String getMessageSelector() throws JMSException { 223 return selector; 224 } 225 226 // Implementation methods 227 // ------------------------------------------------------------------------- 228 229 /** 230 * Wait on a semaphore for a fixed amount of time for a message to come in. 231 * @throws JMSException 232 */ 233 protected void waitForMessage() { 234 try { 235 consumer.sendPullCommand(-1); 236 synchronized (semaphore) { 237 semaphore.wait(2000); 238 } 239 } catch (InterruptedException e) { 240 Thread.currentThread().interrupt(); 241 } catch (JMSException e) { 242 } 243 244 } 245 246 protected void notifyMessageAvailable() { 247 synchronized (semaphore) { 248 semaphore.notifyAll(); 249 } 250 } 251 252 public String toString() { 253 return "ActiveMQQueueBrowser { value=" + consumerId + " }"; 254 } 255 256}