001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.util;
018
019import java.io.Serializable;
020import java.util.LinkedList;
021
022/**
023 * Holder for many bitArrays - used for message audit
024 *
025 *
026 */
027public class BitArrayBin implements Serializable {
028
029    private static final long serialVersionUID = 1L;
030    private final LinkedList<BitArray> list;
031    private int maxNumberOfArrays;
032    private int firstIndex = -1;  // leave 'int' for old serialization compatibility and introduce new 'long' field
033    private long lastInOrderBit=-1;
034    private long longFirstIndex=-1;
035    /**
036     * Create a BitArrayBin to a certain window size (number of messages to
037     * keep)
038     *
039     * @param windowSize
040     */
041    public BitArrayBin(int windowSize) {
042        maxNumberOfArrays = ((windowSize + 1) / BitArray.LONG_SIZE) + 1;
043        maxNumberOfArrays = Math.max(maxNumberOfArrays, 1);
044        list = new LinkedList<BitArray>();
045        for (int i = 0; i < maxNumberOfArrays; i++) {
046            list.add(null);
047        }
048    }
049
050    /**
051     * Set a bit
052     *
053     * @param index
054     * @param value
055     * @return true if set
056     */
057    public boolean setBit(long index, boolean value) {
058        boolean answer = false;
059        BitArray ba = getBitArray(index);
060        if (ba != null) {
061            int offset = getOffset(index);
062            if (offset >= 0) {
063                answer = ba.set(offset, value);
064            }
065        }
066        return answer;
067    }
068
069    /**
070     * Test if in order
071     * @param index
072     * @return true if next message is in order
073     */
074    public boolean isInOrder(long index) {
075        boolean result = false;
076        if (lastInOrderBit == -1) {
077            result = true;
078        } else {
079            result = lastInOrderBit + 1 == index;
080        }
081        lastInOrderBit = index;
082        return result;
083
084    }
085
086    /**
087     * Get the boolean value at the index
088     *
089     * @param index
090     * @return true/false
091     */
092    public boolean getBit(long index) {
093        boolean answer = index >= longFirstIndex;
094        BitArray ba = getBitArray(index);
095        if (ba != null) {
096            int offset = getOffset(index);
097            if (offset >= 0) {
098                answer = ba.get(offset);
099                return answer;
100            }
101        } else {
102            // gone passed range for previous bins so assume set
103            answer = true;
104        }
105        return answer;
106    }
107
108    /**
109     * Get the BitArray for the index
110     *
111     * @param index
112     * @return BitArray
113     */
114    private BitArray getBitArray(long index) {
115        int bin = getBin(index);
116        BitArray answer = null;
117        if (bin >= 0) {
118            if (bin >= maxNumberOfArrays) {
119                int overShoot = bin - maxNumberOfArrays + 1;
120                while (overShoot > 0) {
121                    list.removeFirst();
122                    longFirstIndex += BitArray.LONG_SIZE;
123                    list.add(new BitArray());
124                    overShoot--;
125                }
126
127                bin = maxNumberOfArrays - 1;
128            }
129            answer = list.get(bin);
130            if (answer == null) {
131                answer = new BitArray();
132                list.set(bin, answer);
133            }
134        }
135        return answer;
136    }
137
138    /**
139     * Get the index of the bin from the total index
140     *
141     * @param index
142     * @return the index of the bin
143     */
144    private int getBin(long index) {
145        int answer = 0;
146        if (longFirstIndex < 0) {
147            longFirstIndex = (int) (index - (index % BitArray.LONG_SIZE));
148        } else if (longFirstIndex >= 0) {
149            answer = (int)((index - longFirstIndex) / BitArray.LONG_SIZE);
150        }
151        return answer;
152    }
153
154    /**
155     * Get the offset into a bin from the total index
156     *
157     * @param index
158     * @return the relative offset into a bin
159     */
160    private int getOffset(long index) {
161        int answer = 0;
162        if (longFirstIndex >= 0) {
163            answer = (int)((index - longFirstIndex) - (BitArray.LONG_SIZE * getBin(index)));
164        }
165        return answer;
166    }
167
168    public long getLastSetIndex() {
169        long result = -1;
170
171        if (longFirstIndex >=0) {
172            result = longFirstIndex;
173            BitArray last = null;
174            for (int lastBitArrayIndex = maxNumberOfArrays -1; lastBitArrayIndex >= 0; lastBitArrayIndex--) {
175                last = list.get(lastBitArrayIndex);
176                if (last != null) {
177                    result += last.length() -1;
178                    result += lastBitArrayIndex * BitArray.LONG_SIZE;
179                    break;
180                }
181            }
182        }
183        return result;
184    }
185}