001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq; 018 019import java.io.Serializable; 020 021import javax.jms.JMSException; 022import javax.jms.Message; 023 024import org.apache.activemq.broker.region.MessageReference; 025import org.apache.activemq.command.MessageId; 026import org.apache.activemq.command.ProducerId; 027import org.apache.activemq.util.BitArrayBin; 028import org.apache.activemq.util.IdGenerator; 029import org.apache.activemq.util.LRUCache; 030 031/** 032 * Provides basic audit functions for Messages without sync 033 * 034 * 035 */ 036public class ActiveMQMessageAuditNoSync implements Serializable { 037 038 private static final long serialVersionUID = 1L; 039 040 public static final int DEFAULT_WINDOW_SIZE = 2048; 041 public static final int MAXIMUM_PRODUCER_COUNT = 64; 042 private int auditDepth; 043 private int maximumNumberOfProducersToTrack; 044 private final LRUCache<String, BitArrayBin> map; 045 private transient boolean modified = true; 046 047 /** 048 * Default Constructor windowSize = 2048, maximumNumberOfProducersToTrack = 64 049 */ 050 public ActiveMQMessageAuditNoSync() { 051 this(DEFAULT_WINDOW_SIZE, MAXIMUM_PRODUCER_COUNT); 052 } 053 054 /** 055 * Construct a MessageAudit 056 * 057 * @param auditDepth range of ids to track 058 * @param maximumNumberOfProducersToTrack number of producers expected in the system 059 */ 060 public ActiveMQMessageAuditNoSync(int auditDepth, final int maximumNumberOfProducersToTrack) { 061 this.auditDepth = auditDepth; 062 this.maximumNumberOfProducersToTrack=maximumNumberOfProducersToTrack; 063 this.map = new LRUCache<String, BitArrayBin>(0, maximumNumberOfProducersToTrack, 0.75f, true); 064 } 065 066 /** 067 * @return the auditDepth 068 */ 069 public int getAuditDepth() { 070 return auditDepth; 071 } 072 073 /** 074 * @param auditDepth the auditDepth to set 075 */ 076 public void setAuditDepth(int auditDepth) { 077 this.auditDepth = auditDepth; 078 this.modified = true; 079 } 080 081 /** 082 * @return the maximumNumberOfProducersToTrack 083 */ 084 public int getMaximumNumberOfProducersToTrack() { 085 return maximumNumberOfProducersToTrack; 086 } 087 088 /** 089 * @param maximumNumberOfProducersToTrack the maximumNumberOfProducersToTrack to set 090 */ 091 public void setMaximumNumberOfProducersToTrack(int maximumNumberOfProducersToTrack) { 092 093 if (maximumNumberOfProducersToTrack < this.maximumNumberOfProducersToTrack){ 094 LRUCache<String, BitArrayBin> newMap = new LRUCache<String, BitArrayBin>(0,maximumNumberOfProducersToTrack,0.75f,true); 095 /** 096 * As putAll will access the entries in the right order, 097 * this shouldn't result in wrong cache entries being removed 098 */ 099 newMap.putAll(this.map); 100 this.map.clear(); 101 this.map.putAll(newMap); 102 } 103 this.map.setMaxCacheSize(maximumNumberOfProducersToTrack); 104 this.maximumNumberOfProducersToTrack = maximumNumberOfProducersToTrack; 105 this.modified = true; 106 } 107 108 /** 109 * Checks if this message has been seen before 110 * 111 * @param message 112 * @return true if the message is a duplicate 113 * @throws JMSException 114 */ 115 public boolean isDuplicate(Message message) throws JMSException { 116 return isDuplicate(message.getJMSMessageID()); 117 } 118 119 /** 120 * checks whether this messageId has been seen before and adds this 121 * messageId to the list 122 * 123 * @param id 124 * @return true if the message is a duplicate 125 */ 126 public boolean isDuplicate(String id) { 127 boolean answer = false; 128 String seed = IdGenerator.getSeedFromId(id); 129 if (seed != null) { 130 BitArrayBin bab = map.get(seed); 131 if (bab == null) { 132 bab = new BitArrayBin(auditDepth); 133 map.put(seed, bab); 134 modified = true; 135 } 136 long index = IdGenerator.getSequenceFromId(id); 137 if (index >= 0) { 138 answer = bab.setBit(index, true); 139 modified = true; 140 } 141 } 142 return answer; 143 } 144 145 /** 146 * Checks if this message has been seen before 147 * 148 * @param message 149 * @return true if the message is a duplicate 150 */ 151 public boolean isDuplicate(final MessageReference message) { 152 MessageId id = message.getMessageId(); 153 return isDuplicate(id); 154 } 155 156 /** 157 * Checks if this messageId has been seen before 158 * 159 * @param id 160 * @return true if the message is a duplicate 161 */ 162 public boolean isDuplicate(final MessageId id) { 163 boolean answer = false; 164 165 if (id != null) { 166 ProducerId pid = id.getProducerId(); 167 if (pid != null) { 168 BitArrayBin bab = map.get(pid.toString()); 169 if (bab == null) { 170 bab = new BitArrayBin(auditDepth); 171 map.put(pid.toString(), bab); 172 modified = true; 173 } 174 answer = bab.setBit(id.getProducerSequenceId(), true); 175 } 176 } 177 return answer; 178 } 179 180 /** 181 * mark this message as being received 182 * 183 * @param message 184 */ 185 public void rollback(final MessageReference message) { 186 MessageId id = message.getMessageId(); 187 rollback(id); 188 } 189 190 /** 191 * mark this message as being received 192 * 193 * @param id 194 */ 195 public void rollback(final MessageId id) { 196 if (id != null) { 197 ProducerId pid = id.getProducerId(); 198 if (pid != null) { 199 BitArrayBin bab = map.get(pid.toString()); 200 if (bab != null) { 201 bab.setBit(id.getProducerSequenceId(), false); 202 modified = true; 203 } 204 } 205 } 206 } 207 208 public void rollback(final String id) { 209 String seed = IdGenerator.getSeedFromId(id); 210 if (seed != null) { 211 BitArrayBin bab = map.get(seed); 212 if (bab != null) { 213 long index = IdGenerator.getSequenceFromId(id); 214 bab.setBit(index, false); 215 modified = true; 216 } 217 } 218 } 219 220 /** 221 * Check the message is in order 222 * @param msg 223 * @return 224 * @throws JMSException 225 */ 226 public boolean isInOrder(Message msg) throws JMSException { 227 return isInOrder(msg.getJMSMessageID()); 228 } 229 230 /** 231 * Check the message id is in order 232 * @param id 233 * @return 234 */ 235 public boolean isInOrder(final String id) { 236 boolean answer = true; 237 238 if (id != null) { 239 String seed = IdGenerator.getSeedFromId(id); 240 if (seed != null) { 241 BitArrayBin bab = map.get(seed); 242 if (bab != null) { 243 long index = IdGenerator.getSequenceFromId(id); 244 answer = bab.isInOrder(index); 245 modified = true; 246 } 247 } 248 } 249 return answer; 250 } 251 252 /** 253 * Check the MessageId is in order 254 * @param message 255 * @return 256 */ 257 public boolean isInOrder(final MessageReference message) { 258 return isInOrder(message.getMessageId()); 259 } 260 261 /** 262 * Check the MessageId is in order 263 * @param id 264 * @return 265 */ 266 public boolean isInOrder(final MessageId id) { 267 boolean answer = false; 268 269 if (id != null) { 270 ProducerId pid = id.getProducerId(); 271 if (pid != null) { 272 BitArrayBin bab = map.get(pid.toString()); 273 if (bab == null) { 274 bab = new BitArrayBin(auditDepth); 275 map.put(pid.toString(), bab); 276 modified = true; 277 } 278 answer = bab.isInOrder(id.getProducerSequenceId()); 279 280 } 281 } 282 return answer; 283 } 284 285 public long getLastSeqId(ProducerId id) { 286 long result = -1; 287 BitArrayBin bab = map.get(id.toString()); 288 if (bab != null) { 289 result = bab.getLastSetIndex(); 290 } 291 return result; 292 } 293 294 public void clear() { 295 map.clear(); 296 } 297 298 /** 299 * Returns if the Audit has been modified since last check, this method does not 300 * reset the modified flag. If the caller needs to reset the flag in order to avoid 301 * serializing an unchanged Audit then its up the them to reset it themselves. 302 * 303 * @return true if the Audit has been modified. 304 */ 305 public boolean isModified() { 306 return this.modified; 307 } 308 309 public void setModified(boolean modified) { 310 this.modified = modified; 311 } 312 313 /** 314 * Reads and returns the current modified state of the Audit, once called the state is 315 * reset to false. This method is useful for code the needs to know if it should write 316 * out the Audit or otherwise execute some logic based on the Audit having changed since 317 * last check. 318 * 319 * @return true if the Audit has been modified since last check. 320 */ 321 public boolean modified() { 322 if (this.modified) { 323 this.modified = false; 324 return true; 325 } 326 327 return false; 328 } 329}