001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.Collections; 020import java.util.LinkedList; 021import java.util.List; 022import java.util.Set; 023import org.apache.activemq.ActiveMQMessageAudit; 024import org.apache.activemq.broker.Broker; 025import org.apache.activemq.broker.ConnectionContext; 026import org.apache.activemq.broker.region.BaseDestination; 027import org.apache.activemq.broker.region.Destination; 028import org.apache.activemq.broker.region.MessageReference; 029import org.apache.activemq.broker.region.Subscription; 030import org.apache.activemq.command.MessageId; 031import org.apache.activemq.usage.SystemUsage; 032 033/** 034 * Abstract method holder for pending message (messages awaiting disptach to a 035 * consumer) cursor 036 * 037 * 038 */ 039public abstract class AbstractPendingMessageCursor implements PendingMessageCursor { 040 protected int memoryUsageHighWaterMark = 70; 041 protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE; 042 protected SystemUsage systemUsage; 043 protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT; 044 protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH; 045 protected boolean enableAudit=true; 046 protected ActiveMQMessageAudit audit; 047 protected boolean useCache=true; 048 protected boolean cacheEnabled=true; 049 private boolean started=false; 050 protected MessageReference last = null; 051 protected final boolean prioritizedMessages; 052 053 public AbstractPendingMessageCursor(boolean prioritizedMessages) { 054 this.prioritizedMessages=prioritizedMessages; 055 } 056 057 058 public synchronized void start() throws Exception { 059 if (!started && enableAudit && audit==null) { 060 audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 061 } 062 started=true; 063 } 064 065 public synchronized void stop() throws Exception { 066 started=false; 067 gc(); 068 } 069 070 public void add(ConnectionContext context, Destination destination) throws Exception { 071 } 072 073 @SuppressWarnings("unchecked") 074 public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception { 075 return Collections.EMPTY_LIST; 076 } 077 078 public boolean isRecoveryRequired() { 079 return true; 080 } 081 082 public void addMessageFirst(MessageReference node) throws Exception { 083 } 084 085 public boolean addMessageLast(MessageReference node) throws Exception { 086 return true; 087 } 088 089 public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception { 090 return addMessageLast(node); 091 } 092 093 public void addRecoveredMessage(MessageReference node) throws Exception { 094 addMessageLast(node); 095 } 096 097 public void clear() { 098 } 099 100 public boolean hasNext() { 101 return false; 102 } 103 104 public boolean isEmpty() { 105 return false; 106 } 107 108 public boolean isEmpty(Destination destination) { 109 return isEmpty(); 110 } 111 112 public MessageReference next() { 113 return null; 114 } 115 116 public void remove() { 117 } 118 119 public void reset() { 120 } 121 122 public int size() { 123 return 0; 124 } 125 126 public int getMaxBatchSize() { 127 return maxBatchSize; 128 } 129 130 public void setMaxBatchSize(int maxBatchSize) { 131 this.maxBatchSize = maxBatchSize; 132 } 133 134 protected void fillBatch() throws Exception { 135 } 136 137 public void resetForGC() { 138 reset(); 139 } 140 141 public void remove(MessageReference node) { 142 } 143 144 public void gc() { 145 } 146 147 public void setSystemUsage(SystemUsage usageManager) { 148 this.systemUsage = usageManager; 149 } 150 151 public boolean hasSpace() { 152 // allow isFull to verify parent usage and otherwise enforce local memoryUsageHighWaterMark 153 return systemUsage != null ? (!isParentFull() && systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true; 154 } 155 156 boolean parentHasSpace(int waterMark) { 157 boolean result = true; 158 if (systemUsage != null) { 159 if (systemUsage.getMemoryUsage().getParent() != null) { 160 return systemUsage.getMemoryUsage().getParent().getPercentUsage() <= waterMark; 161 } 162 } 163 return result; 164 } 165 166 private boolean isParentFull() { 167 boolean result = false; 168 if (systemUsage != null) { 169 if (systemUsage.getMemoryUsage().getParent() != null) { 170 return systemUsage.getMemoryUsage().getParent().getPercentUsage() >= 100; 171 } 172 } 173 return result; 174 } 175 176 public boolean isFull() { 177 return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false; 178 } 179 180 public void release() { 181 } 182 183 public boolean hasMessagesBufferedToDeliver() { 184 return false; 185 } 186 187 /** 188 * @return the memoryUsageHighWaterMark 189 */ 190 public int getMemoryUsageHighWaterMark() { 191 return memoryUsageHighWaterMark; 192 } 193 194 /** 195 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 196 */ 197 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 198 this.memoryUsageHighWaterMark = memoryUsageHighWaterMark; 199 } 200 201 /** 202 * @return the usageManager 203 */ 204 public SystemUsage getSystemUsage() { 205 return this.systemUsage; 206 } 207 208 /** 209 * destroy the cursor 210 * 211 * @throws Exception 212 */ 213 public void destroy() throws Exception { 214 stop(); 215 } 216 217 /** 218 * Page in a restricted number of messages 219 * 220 * @param maxItems maximum number of messages to return 221 * @return a list of paged in messages 222 */ 223 public LinkedList<MessageReference> pageInList(int maxItems) { 224 throw new RuntimeException("Not supported"); 225 } 226 227 /** 228 * @return the maxProducersToAudit 229 */ 230 public int getMaxProducersToAudit() { 231 return maxProducersToAudit; 232 } 233 234 /** 235 * @param maxProducersToAudit the maxProducersToAudit to set 236 */ 237 public synchronized void setMaxProducersToAudit(int maxProducersToAudit) { 238 this.maxProducersToAudit = maxProducersToAudit; 239 if (audit != null) { 240 audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit); 241 } 242 } 243 244 /** 245 * @return the maxAuditDepth 246 */ 247 public int getMaxAuditDepth() { 248 return maxAuditDepth; 249 } 250 251 252 /** 253 * @param maxAuditDepth the maxAuditDepth to set 254 */ 255 public synchronized void setMaxAuditDepth(int maxAuditDepth) { 256 this.maxAuditDepth = maxAuditDepth; 257 if (audit != null) { 258 audit.setAuditDepth(maxAuditDepth); 259 } 260 } 261 262 263 /** 264 * @return the enableAudit 265 */ 266 public boolean isEnableAudit() { 267 return enableAudit; 268 } 269 270 /** 271 * @param enableAudit the enableAudit to set 272 */ 273 public synchronized void setEnableAudit(boolean enableAudit) { 274 this.enableAudit = enableAudit; 275 if (enableAudit && started && audit==null) { 276 audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 277 } 278 } 279 280 public boolean isTransient() { 281 return false; 282 } 283 284 285 /** 286 * set the audit 287 * @param audit new audit component 288 */ 289 public void setMessageAudit(ActiveMQMessageAudit audit) { 290 this.audit=audit; 291 } 292 293 294 /** 295 * @return the audit 296 */ 297 public ActiveMQMessageAudit getMessageAudit() { 298 return audit; 299 } 300 301 public boolean isUseCache() { 302 return useCache; 303 } 304 305 public void setUseCache(boolean useCache) { 306 this.useCache = useCache; 307 } 308 309 public synchronized boolean isDuplicate(MessageId messageId) { 310 boolean unique = recordUniqueId(messageId); 311 rollback(messageId); 312 return !unique; 313 } 314 315 /** 316 * records a message id and checks if it is a duplicate 317 * @param messageId 318 * @return true if id is unique, false otherwise. 319 */ 320 public synchronized boolean recordUniqueId(MessageId messageId) { 321 if (!enableAudit || audit==null) { 322 return true; 323 } 324 return !audit.isDuplicate(messageId); 325 } 326 327 public synchronized void rollback(MessageId id) { 328 if (audit != null) { 329 audit.rollback(id); 330 } 331 } 332 333 public synchronized boolean isStarted() { 334 return started; 335 } 336 337 public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) { 338 boolean result = false; 339 Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination()); 340 if (destinations != null) { 341 for (Destination dest:destinations) { 342 if (dest.isPrioritizedMessages()) { 343 result = true; 344 break; 345 } 346 } 347 } 348 return result; 349 350 } 351 352 public synchronized boolean isCacheEnabled() { 353 return cacheEnabled; 354 } 355 356 public synchronized void setCacheEnabled(boolean val) { 357 cacheEnabled = val; 358 } 359 360 public void rebase() { 361 } 362}