001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.memory;
018
019import java.io.IOException;
020import java.util.Collections;
021import java.util.HashMap;
022import java.util.Iterator;
023import java.util.Map;
024import java.util.Map.Entry;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.command.ActiveMQDestination;
027import org.apache.activemq.command.Message;
028import org.apache.activemq.command.MessageAck;
029import org.apache.activemq.command.MessageId;
030import org.apache.activemq.command.SubscriptionInfo;
031import org.apache.activemq.store.MessageRecoveryListener;
032import org.apache.activemq.store.TopicMessageStore;
033import org.apache.activemq.util.LRUCache;
034import org.apache.activemq.util.SubscriptionKey;
035
036/**
037 * 
038 */
039public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
040
041    private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
042    private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;
043
044    public MemoryTopicMessageStore(ActiveMQDestination destination) {
045        this(destination, new LRUCache<MessageId, Message>(100, 100, 0.75f, false), makeSubscriptionInfoMap());
046    }
047
048    public MemoryTopicMessageStore(ActiveMQDestination destination, Map<MessageId, Message> messageTable, Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase) {
049        super(destination, messageTable);
050        this.subscriberDatabase = subscriberDatabase;
051        this.topicSubMap = makeSubMap();
052    }
053
054    protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
055        return Collections.synchronizedMap(new HashMap<SubscriptionKey, SubscriptionInfo>());
056    }
057    
058    protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() {
059        return Collections.synchronizedMap(new HashMap<SubscriptionKey, MemoryTopicSub>());
060    }
061
062    public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
063        super.addMessage(context, message);
064        for (Iterator<MemoryTopicSub> i = topicSubMap.values().iterator(); i.hasNext();) {
065            MemoryTopicSub sub = i.next();
066            sub.addMessage(message.getMessageId(), message);
067        }
068    }
069
070    public synchronized void acknowledge(ConnectionContext context, String clientId, String subscriptionName,
071                                         MessageId messageId, MessageAck ack) throws IOException {
072        SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
073        MemoryTopicSub sub = topicSubMap.get(key);
074        if (sub != null) {
075            sub.removeMessage(messageId);
076        }
077    }
078
079    public synchronized SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
080        return subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
081    }
082
083    public synchronized void addSubscription(SubscriptionInfo info, boolean retroactive) throws IOException {
084        SubscriptionKey key = new SubscriptionKey(info);
085        MemoryTopicSub sub = new MemoryTopicSub();
086        topicSubMap.put(key, sub);
087        if (retroactive) {
088            for (Iterator i = messageTable.entrySet().iterator(); i.hasNext();) {
089                Map.Entry entry = (Entry)i.next();
090                sub.addMessage((MessageId)entry.getKey(), (Message)entry.getValue());
091            }
092        }
093        subscriberDatabase.put(key, info);
094    }
095
096    public synchronized void deleteSubscription(String clientId, String subscriptionName) {
097        org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
098        subscriberDatabase.remove(key);
099        topicSubMap.remove(key);
100    }
101
102    public synchronized void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
103        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
104        if (sub != null) {
105            sub.recoverSubscription(listener);
106        }
107    }
108
109    public synchronized void delete() {
110        super.delete();
111        subscriberDatabase.clear();
112        topicSubMap.clear();
113    }
114
115    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
116        return subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
117    }
118
119    public synchronized int getMessageCount(String clientId, String subscriberName) throws IOException {
120        int result = 0;
121        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriberName));
122        if (sub != null) {
123            result = sub.size();
124        }
125        return result;
126    }
127
128    public synchronized void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
129        MemoryTopicSub sub = this.topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
130        if (sub != null) {
131            sub.recoverNextMessages(maxReturned, listener);
132        }
133    }
134
135    public void resetBatching(String clientId, String subscriptionName) {
136        MemoryTopicSub sub = topicSubMap.get(new SubscriptionKey(clientId, subscriptionName));
137        if (sub != null) {
138            sub.resetBatching();
139        }
140    }
141}