001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.group;
018
019import java.util.Map;
020
021import org.apache.activemq.broker.region.Destination;
022import org.apache.activemq.command.ConsumerId;
023import org.apache.activemq.memory.LRUMap;
024
025/**
026 * Uses hash-code buckets to associate consumers with sets of message group IDs.
027 * 
028 * 
029 */
030public class MessageGroupHashBucket implements MessageGroupMap {
031
032    private final int bucketCount;
033    private final ConsumerId[] consumers;
034    private final LRUMap<String,String>cache;
035
036    public MessageGroupHashBucket(int bucketCount, int cachedSize) {
037        this.bucketCount = bucketCount;
038        this.consumers = new ConsumerId[bucketCount];
039        this.cache=new LRUMap<String,String>(cachedSize);
040    }
041
042    public synchronized void put(String groupId, ConsumerId consumerId) {
043        int bucket = getBucketNumber(groupId);
044        consumers[bucket] = consumerId;
045        if (consumerId != null){
046          cache.put(groupId,consumerId.toString());
047        }
048    }
049
050    public synchronized ConsumerId get(String groupId) {
051        int bucket = getBucketNumber(groupId);
052        //excersise cache
053        cache.get(groupId);
054        return consumers[bucket];
055    }
056
057    public synchronized ConsumerId removeGroup(String groupId) {
058        int bucket = getBucketNumber(groupId);
059        ConsumerId answer = consumers[bucket];
060        consumers[bucket] = null;
061        cache.remove(groupId);
062        return answer;
063    }
064
065    public synchronized MessageGroupSet removeConsumer(ConsumerId consumerId) {
066        MessageGroupSet answer = null;
067        for (int i = 0; i < consumers.length; i++) {
068            ConsumerId owner = consumers[i];
069            if (owner != null && owner.equals(consumerId)) {
070                answer = createMessageGroupSet(i, answer);
071                consumers[i] = null;
072            }
073        }
074        if (answer == null) {
075            // make an empty set
076            answer = EmptyMessageGroupSet.INSTANCE;
077        }
078        return answer;
079    }
080
081    public synchronized void removeAll(){
082        for (int i =0; i < consumers.length; i++){
083            consumers[i] = null;
084        }
085    }
086
087    @Override
088    public Map<String, String> getGroups() {
089        return cache;
090    }
091
092    @Override
093    public String getType() {
094        return "bucket";
095    }
096
097    public void setDestination(Destination destination) {}
098
099    public int getBucketCount(){
100        return bucketCount;
101    }
102
103
104    public String toString() {
105        int count = 0;
106        for (int i = 0; i < consumers.length; i++) {
107            if (consumers[i] != null) {
108                count++;
109            }
110        }
111        return "active message group buckets: " + count;
112    }
113
114    protected MessageGroupSet createMessageGroupSet(int bucketNumber, final MessageGroupSet parent) {
115        final MessageGroupSet answer = createMessageGroupSet(bucketNumber);
116        if (parent == null) {
117            return answer;
118        } else {
119            // union the two sets together
120            return new MessageGroupSet() {
121                public boolean contains(String groupID) {
122                    return parent.contains(groupID) || answer.contains(groupID);
123                }
124            };
125        }
126    }
127
128    protected MessageGroupSet createMessageGroupSet(final int bucketNumber) {
129        return new MessageGroupSet() {
130            public boolean contains(String groupID) {
131                int bucket = getBucketNumber(groupID);
132                return bucket == bucketNumber;
133            }
134        };
135    }
136
137    protected int getBucketNumber(String groupId) {
138        int bucket = groupId.hashCode() % bucketCount;
139        // bucket could be negative
140        if (bucket < 0) {
141            bucket *= -1;
142        }
143        return bucket;
144    }
145}