001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.io.IOException; 020import java.util.LinkedList; 021import java.util.List; 022import org.apache.activemq.ActiveMQMessageAudit; 023import org.apache.activemq.Service; 024import org.apache.activemq.broker.ConnectionContext; 025import org.apache.activemq.broker.region.Destination; 026import org.apache.activemq.broker.region.MessageReference; 027import org.apache.activemq.command.MessageId; 028import org.apache.activemq.usage.SystemUsage; 029 030/** 031 * Interface to pending message (messages awaiting disptach to a consumer) 032 * cursor 033 * 034 * 035 */ 036public interface PendingMessageCursor extends Service { 037 038 /** 039 * Add a destination 040 * 041 * @param context 042 * @param destination 043 * @throws Exception 044 */ 045 void add(ConnectionContext context, Destination destination) throws Exception; 046 047 /** 048 * remove a destination 049 * 050 * @param context 051 * @param destination 052 * @throws Exception 053 */ 054 List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception; 055 056 /** 057 * @return true if there are no pending messages 058 */ 059 boolean isEmpty(); 060 061 /** 062 * check if a Destination is Empty for this cursor 063 * 064 * @param destination 065 * @return true id the Destination is empty 066 */ 067 boolean isEmpty(Destination destination); 068 069 /** 070 * reset the cursor 071 */ 072 void reset(); 073 074 /** 075 * hint to the cursor to release any locks it might have grabbed after a 076 * reset 077 */ 078 void release(); 079 080 /** 081 * add message to await dispatch 082 * 083 * @param node 084 * @return boolean true if successful, false if cursor traps a duplicate 085 * @throws IOException 086 * @throws Exception 087 */ 088 boolean addMessageLast(MessageReference node) throws Exception; 089 090 /** 091 * add message to await dispatch - if it can 092 * 093 * @param node 094 * @param maxWaitTime 095 * @return true if successful 096 * @throws IOException 097 * @throws Exception 098 */ 099 boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception; 100 101 /** 102 * add message to await dispatch 103 * 104 * @param node 105 * @throws Exception 106 */ 107 void addMessageFirst(MessageReference node) throws Exception; 108 109 /** 110 * Add a message recovered from a retroactive policy 111 * 112 * @param node 113 * @throws Exception 114 */ 115 void addRecoveredMessage(MessageReference node) throws Exception; 116 117 /** 118 * @return true if there pending messages to dispatch 119 */ 120 boolean hasNext(); 121 122 /** 123 * @return the next pending message with its reference count increment 124 */ 125 MessageReference next(); 126 127 /** 128 * remove the message at the cursor position 129 */ 130 void remove(); 131 132 /** 133 * @return the number of pending messages 134 */ 135 int size(); 136 137 /** 138 * clear all pending messages 139 */ 140 void clear(); 141 142 /** 143 * Informs the Broker if the subscription needs to intervention to recover 144 * it's state e.g. DurableTopicSubscriber may do 145 * 146 * @return true if recovery required 147 */ 148 boolean isRecoveryRequired(); 149 150 /** 151 * @return the maximum batch size 152 */ 153 int getMaxBatchSize(); 154 155 /** 156 * Set the max batch size 157 * 158 * @param maxBatchSize 159 */ 160 void setMaxBatchSize(int maxBatchSize); 161 162 /** 163 * Give the cursor a hint that we are about to remove messages from memory 164 * only 165 */ 166 void resetForGC(); 167 168 /** 169 * remove a node 170 * 171 * @param node 172 */ 173 void remove(MessageReference node); 174 175 /** 176 * free up any internal buffers 177 */ 178 void gc(); 179 180 /** 181 * Set the UsageManager 182 * 183 * @param systemUsage 184 * @see org.apache.activemq.usage.SystemUsage 185 */ 186 void setSystemUsage(SystemUsage systemUsage); 187 188 /** 189 * @return the usageManager 190 */ 191 SystemUsage getSystemUsage(); 192 193 /** 194 * @return the memoryUsageHighWaterMark 195 */ 196 int getMemoryUsageHighWaterMark(); 197 198 /** 199 * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set 200 */ 201 void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark); 202 203 /** 204 * @return true if the cursor is full 205 */ 206 boolean isFull(); 207 208 /** 209 * @return true if the cursor has space to page messages into 210 */ 211 public boolean hasSpace(); 212 213 /** 214 * @return true if the cursor has buffered messages ready to deliver 215 */ 216 boolean hasMessagesBufferedToDeliver(); 217 218 /** 219 * destroy the cursor 220 * 221 * @throws Exception 222 */ 223 void destroy() throws Exception; 224 225 /** 226 * Page in a restricted number of messages and increment the reference count 227 * 228 * @param maxItems 229 * @return a list of paged in messages 230 */ 231 LinkedList<MessageReference> pageInList(int maxItems); 232 233 /** 234 * set the maximum number of producers to track at one time 235 * @param value 236 */ 237 void setMaxProducersToAudit(int value); 238 239 /** 240 * @return the maximum number of producers to audit 241 */ 242 int getMaxProducersToAudit(); 243 244 /** 245 * Set the maximum depth of message ids to track 246 * @param depth 247 */ 248 void setMaxAuditDepth(int depth); 249 250 /** 251 * @return the audit depth 252 */ 253 int getMaxAuditDepth(); 254 255 /** 256 * @return the enableAudit 257 */ 258 public boolean isEnableAudit(); 259 /** 260 * @param enableAudit the enableAudit to set 261 */ 262 public void setEnableAudit(boolean enableAudit); 263 264 /** 265 * @return true if the underlying state of this cursor 266 * disappears when the broker shuts down 267 */ 268 public boolean isTransient(); 269 270 271 /** 272 * set the audit 273 * @param audit 274 */ 275 public void setMessageAudit(ActiveMQMessageAudit audit); 276 277 278 /** 279 * @return the audit - could be null 280 */ 281 public ActiveMQMessageAudit getMessageAudit(); 282 283 /** 284 * use a cache to improve performance 285 * @param useCache 286 */ 287 public void setUseCache(boolean useCache); 288 289 /** 290 * @return true if a cache may be used 291 */ 292 public boolean isUseCache(); 293 294 /** 295 * remove from auditing the message id 296 * @param id 297 */ 298 public void rollback(MessageId id); 299 300 /** 301 * @return true if cache is being used 302 */ 303 public boolean isCacheEnabled(); 304 305 public void rebase(); 306 307}