001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region; 018 019import java.io.IOException; 020import java.util.ArrayList; 021import java.util.HashMap; 022import java.util.List; 023import java.util.Map; 024 025import javax.jms.JMSException; 026 027import org.apache.activemq.broker.Broker; 028import org.apache.activemq.broker.ConnectionContext; 029import org.apache.activemq.command.ConsumerInfo; 030import org.apache.activemq.command.MessageAck; 031import org.apache.activemq.command.MessageId; 032import org.apache.activemq.filter.MessageEvaluationContext; 033import org.apache.activemq.usage.SystemUsage; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037public class QueueBrowserSubscription extends QueueSubscription { 038 039 protected static final Logger LOG = LoggerFactory.getLogger(QueueBrowserSubscription.class); 040 041 int queueRefs; 042 boolean browseDone; 043 boolean destinationsAdded; 044 045 private final Map<MessageId, Object> audit = new HashMap<MessageId, Object>(); 046 private long maxMessages; 047 048 public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException { 049 super(broker, usageManager, context, info); 050 } 051 052 @Override 053 protected boolean canDispatch(MessageReference node) { 054 return !((QueueMessageReference) node).isAcked(); 055 } 056 057 @Override 058 public synchronized String toString() { 059 return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() + 060 ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() + 061 ", delivered=" + this.prefetchExtension + ", pending=" + getPendingQueueSize(); 062 } 063 064 synchronized public void destinationsAdded() throws Exception { 065 destinationsAdded = true; 066 checkDone(); 067 } 068 069 public boolean isDuplicate(MessageId messageId) { 070 071 if (!audit.containsKey(messageId)) { 072 audit.put(messageId, Boolean.TRUE); 073 return false; 074 } 075 076 return true; 077 } 078 079 private void checkDone() throws Exception { 080 if (!browseDone && queueRefs == 0 && destinationsAdded) { 081 browseDone = true; 082 add(QueueMessageReference.NULL_MESSAGE); 083 } 084 } 085 086 @Override 087 public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { 088 return !browseDone && super.matches(node, context); 089 } 090 091 /** 092 * Since we are a browser we don't really remove the message from the queue. 093 */ 094 @Override 095 protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException { 096 if (info.isNetworkSubscription()) { 097 super.acknowledge(context, ack, n); 098 } 099 } 100 101 synchronized public void incrementQueueRef() { 102 queueRefs++; 103 } 104 105 synchronized public void decrementQueueRef() throws Exception { 106 if (queueRefs > 0) { 107 queueRefs--; 108 } 109 checkDone(); 110 } 111 112 @Override 113 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 114 super.remove(context, destination); 115 // there's no unacked messages that needs to be redelivered 116 // in case of browser 117 return new ArrayList<MessageReference>(); 118 } 119 120 public boolean atMax() { 121 return maxMessages > 0 && getEnqueueCounter() >= maxMessages; 122 } 123 124 public void setMaxMessages(long max) { 125 maxMessages = max; 126 } 127}