001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq;
018
019import java.io.Serializable;
020
021import javax.jms.JMSException;
022import javax.jms.Message;
023
024import org.apache.activemq.broker.region.MessageReference;
025import org.apache.activemq.command.MessageId;
026import org.apache.activemq.command.ProducerId;
027import org.apache.activemq.util.BitArrayBin;
028import org.apache.activemq.util.IdGenerator;
029import org.apache.activemq.util.LRUCache;
030
031/**
032 * Provides basic audit functions for Messages without sync
033 *
034 *
035 */
036public class ActiveMQMessageAuditNoSync implements Serializable {
037
038    private static final long serialVersionUID = 1L;
039
040    public static final int DEFAULT_WINDOW_SIZE = 2048;
041    public static final int MAXIMUM_PRODUCER_COUNT = 64;
042    private int auditDepth;
043    private int maximumNumberOfProducersToTrack;
044    private final LRUCache<String, BitArrayBin> map;
045    private transient boolean modified = true;
046
047    /**
048     * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = 64
049     */
050    public ActiveMQMessageAuditNoSync() {
051        this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT);
052    }
053
054    /**
055     * Construct a MessageAudit
056     *
057     * @param auditDepth range of ids to track
058     * @param maximumNumberOfProducersToTrack number of producers expected in the system
059     */
060    public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) {
061        this.auditDepth = auditDepth;
062        this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack;
063        this.map = new LRUCache<String, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true);
064    }
065
066    /**
067     * @return the auditDepth
068     */
069    public int getAuditDepth() {
070        return auditDepth;
071    }
072
073    /**
074     * @param auditDepth the auditDepth to set
075     */
076    public void setAuditDepth(int auditDepth) {
077        this.auditDepth = auditDepth;
078        this.modified = true;
079    }
080
081    /**
082     * @return the maximumNumberOfProducersToTrack
083     */
084    public int getMaximumNumberOfProducersToTrack() {
085        return maximumNumberOfProducersToTrack;
086    }
087
088    /**
089     * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set
090     */
091    public void setMaximumNumberOfProducersToTrack(int maximumNumberOfProducersToTrack) {
092
093        if (maximumNumberOfProducersToTrack < this.maximumNumberOfProducersToTrack){
094            LRUCache<String, BitArrayBin> newMap = new LRUCache<String, BitArrayBin>(0,maximumNumberOfProducersToTrack,0.75f,true);
095            /**
096             * As putAll will access the entries in the right order,
097             * this shouldn't result in wrong cache entries being removed
098             */
099            newMap.putAll(this.map);
100            this.map.clear();
101            this.map.putAll(newMap);
102        }
103        this.map.setMaxCacheSize(maximumNumberOfProducersToTrack);
104        this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack;
105        this.modified = true;
106    }
107
108    /**
109     * Checks if this message has been seen before
110     *
111     * @param message
112     * @return true if the message is a duplicate
113     * @throws JMSException
114     */
115    public boolean isDuplicate(Message message) throws JMSException {
116        return isDuplicate(message.getJMSMessageID());
117    }
118
119    /**
120     * checks whether this messageId has been seen before and adds this
121     * messageId to the list
122     *
123     * @param id
124     * @return true if the message is a duplicate
125     */
126    public boolean isDuplicate(String id) {
127        boolean answer = false;
128        String seed = IdGenerator.getSeedFromId(id);
129        if (seed != null) {
130            BitArrayBin bab = map.get(seed);
131            if (bab == null) {
132                bab = new BitArrayBin(auditDepth);
133                map.put(seed, bab);
134                modified = true;
135            }
136            long index = IdGenerator.getSequenceFromId(id);
137            if (index >= 0) {
138                answer = bab.setBit(index, true);
139                modified = true;
140            }
141        }
142        return answer;
143    }
144
145    /**
146     * Checks if this message has been seen before
147     *
148     * @param message
149     * @return true if the message is a duplicate
150     */
151    public boolean isDuplicate(final MessageReference message) {
152        MessageId id = message.getMessageId();
153        return isDuplicate(id);
154    }
155
156    /**
157     * Checks if this messageId has been seen before
158     *
159     * @param id
160     * @return true if the message is a duplicate
161     */
162    public boolean isDuplicate(final MessageId id) {
163        boolean answer = false;
164
165        if (id != null) {
166            ProducerId pid = id.getProducerId();
167            if (pid != null) {
168                BitArrayBin bab = map.get(pid.toString());
169                if (bab == null) {
170                    bab = new BitArrayBin(auditDepth);
171                    map.put(pid.toString(), bab);
172                    modified = true;
173                }
174                answer = bab.setBit(id.getProducerSequenceId(), true);
175            }
176        }
177        return answer;
178    }
179
180    /**
181     * mark this message as being received
182     *
183     * @param message
184     */
185    public void rollback(final MessageReference message) {
186        MessageId id = message.getMessageId();
187        rollback(id);
188    }
189
190    /**
191     * mark this message as being received
192     *
193     * @param id
194     */
195    public void rollback(final  MessageId id) {
196        if (id != null) {
197            ProducerId pid = id.getProducerId();
198            if (pid != null) {
199                BitArrayBin bab = map.get(pid.toString());
200                if (bab != null) {
201                    bab.setBit(id.getProducerSequenceId(), false);
202                    modified = true;
203                }
204            }
205        }
206    }
207
208    public void rollback(final String id) {
209        String seed = IdGenerator.getSeedFromId(id);
210        if (seed != null) {
211            BitArrayBin bab = map.get(seed);
212            if (bab != null) {
213                long index = IdGenerator.getSequenceFromId(id);
214                bab.setBit(index, false);
215                modified = true;
216            }
217        }
218    }
219
220    /**
221     * Check the message is in order
222     * @param msg
223     * @return
224     * @throws JMSException
225     */
226    public boolean isInOrder(Message msg) throws JMSException {
227        return isInOrder(msg.getJMSMessageID());
228    }
229
230    /**
231     * Check the message id is in order
232     * @param id
233     * @return
234     */
235    public boolean isInOrder(final String id) {
236        boolean answer = true;
237
238        if (id != null) {
239            String seed = IdGenerator.getSeedFromId(id);
240            if (seed != null) {
241                BitArrayBin bab = map.get(seed);
242                if (bab != null) {
243                    long index = IdGenerator.getSequenceFromId(id);
244                    answer = bab.isInOrder(index);
245                    modified = true;
246                }
247            }
248        }
249        return answer;
250    }
251
252    /**
253     * Check the MessageId is in order
254     * @param message
255     * @return
256     */
257    public boolean isInOrder(final MessageReference message) {
258        return isInOrder(message.getMessageId());
259    }
260
261    /**
262     * Check the MessageId is in order
263     * @param id
264     * @return
265     */
266    public boolean isInOrder(final MessageId id) {
267        boolean answer = false;
268
269        if (id != null) {
270            ProducerId pid = id.getProducerId();
271            if (pid != null) {
272                BitArrayBin bab = map.get(pid.toString());
273                if (bab == null) {
274                    bab = new BitArrayBin(auditDepth);
275                    map.put(pid.toString(), bab);
276                    modified = true;
277                }
278                answer = bab.isInOrder(id.getProducerSequenceId());
279
280            }
281        }
282        return answer;
283    }
284
285    public long getLastSeqId(ProducerId id) {
286        long result = -1;
287        BitArrayBin bab = map.get(id.toString());
288        if (bab != null) {
289            result = bab.getLastSetIndex();
290        }
291        return result;
292    }
293
294    public void clear() {
295        map.clear();
296    }
297
298    /**
299     * Returns if the Audit has been modified since last check, this method does not
300     * reset the modified flag.  If the caller needs to reset the flag in order to avoid
301     * serializing an unchanged Audit then its up the them to reset it themselves.
302     *
303     * @return true if the Audit has been modified.
304     */
305    public boolean isModified() {
306        return this.modified;
307    }
308
309    public void setModified(boolean modified) {
310        this.modified = modified;
311    }
312
313    /**
314     * Reads and returns the current modified state of the Audit, once called the state is
315     * reset to false.  This method is useful for code the needs to know if it should write
316     * out the Audit or otherwise execute some logic based on the Audit having changed since
317     * last check.
318     *
319     * @return true if the Audit has been modified since last check.
320     */
321    public boolean modified() {
322        if (this.modified) {
323            this.modified = false;
324            return true;
325        }
326
327        return false;
328    }
329}