001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region;
018
019import java.io.IOException;
020import java.util.ArrayList;
021import java.util.HashMap;
022import java.util.List;
023import java.util.Map;
024
025import javax.jms.JMSException;
026
027import org.apache.activemq.broker.Broker;
028import org.apache.activemq.broker.ConnectionContext;
029import org.apache.activemq.command.ConsumerInfo;
030import org.apache.activemq.command.MessageAck;
031import org.apache.activemq.command.MessageId;
032import org.apache.activemq.filter.MessageEvaluationContext;
033import org.apache.activemq.usage.SystemUsage;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037public class QueueBrowserSubscription extends QueueSubscription {
038
039    protected static final Logger LOG = LoggerFactory.getLogger(QueueBrowserSubscription.class);
040
041    int queueRefs;
042    boolean browseDone;
043    boolean destinationsAdded;
044
045    private final Map<MessageId, Object> audit = new HashMap<MessageId, Object>();
046    private long maxMessages;
047
048    public QueueBrowserSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info) throws JMSException {
049        super(broker, usageManager, context, info);
050    }
051
052    @Override
053    protected boolean canDispatch(MessageReference node) {
054        return !((QueueMessageReference) node).isAcked();
055    }
056
057    @Override
058    public synchronized String toString() {
059        return "QueueBrowserSubscription:" + " consumer=" + info.getConsumerId() +
060               ", destinations=" + destinations.size() + ", dispatched=" + dispatched.size() +
061               ", delivered=" + this.prefetchExtension + ", pending=" + getPendingQueueSize();
062    }
063
064    synchronized public void destinationsAdded() throws Exception {
065        destinationsAdded = true;
066        checkDone();
067    }
068
069    public boolean isDuplicate(MessageId messageId) {
070
071        if (!audit.containsKey(messageId)) {
072            audit.put(messageId, Boolean.TRUE);
073            return false;
074        }
075
076        return true;
077    }
078
079    private void checkDone() throws Exception {
080        if (!browseDone && queueRefs == 0 && destinationsAdded) {
081            browseDone = true;
082            add(QueueMessageReference.NULL_MESSAGE);
083        }
084    }
085
086    @Override
087    public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
088        return !browseDone && super.matches(node, context);
089    }
090
091    /**
092     * Since we are a browser we don't really remove the message from the queue.
093     */
094    @Override
095    protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
096        if (info.isNetworkSubscription()) {
097            super.acknowledge(context, ack, n);
098        }
099    }
100
101    synchronized public void incrementQueueRef() {
102        queueRefs++;
103    }
104
105    synchronized public void decrementQueueRef() throws Exception {
106        if (queueRefs > 0) {
107            queueRefs--;
108        }
109        checkDone();
110    }
111
112    @Override
113    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
114        super.remove(context, destination);
115        // there's no unacked messages that needs to be redelivered
116        // in case of browser
117        return new ArrayList<MessageReference>();
118    }
119
120    public boolean atMax() {
121        return maxMessages > 0 && getEnqueueCounter() >= maxMessages;
122    }
123
124    public void setMaxMessages(long max) {
125        maxMessages = max;
126    }
127}