001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.io.IOException;
020import java.util.LinkedList;
021import java.util.List;
022import org.apache.activemq.ActiveMQMessageAudit;
023import org.apache.activemq.Service;
024import org.apache.activemq.broker.ConnectionContext;
025import org.apache.activemq.broker.region.Destination;
026import org.apache.activemq.broker.region.MessageReference;
027import org.apache.activemq.command.MessageId;
028import org.apache.activemq.usage.SystemUsage;
029
030/**
031 * Interface to pending message (messages awaiting disptach to a consumer)
032 * cursor
033 * 
034 * 
035 */
036public interface PendingMessageCursor extends Service {
037
038    /**
039     * Add a destination
040     * 
041     * @param context
042     * @param destination
043     * @throws Exception
044     */
045    void add(ConnectionContext context, Destination destination) throws Exception;
046
047    /**
048     * remove a destination
049     * 
050     * @param context
051     * @param destination
052     * @throws Exception
053     */
054    List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception;
055
056    /**
057     * @return true if there are no pending messages
058     */
059    boolean isEmpty();
060
061    /**
062     * check if a Destination is Empty for this cursor
063     * 
064     * @param destination
065     * @return true id the Destination is empty
066     */
067    boolean isEmpty(Destination destination);
068
069    /**
070     * reset the cursor
071     */
072    void reset();
073
074    /**
075     * hint to the cursor to release any locks it might have grabbed after a
076     * reset
077     */
078    void release();
079
080    /**
081     * add message to await dispatch
082     * 
083     * @param node
084     * @return boolean true if successful, false if cursor traps a duplicate
085     * @throws IOException
086     * @throws Exception
087     */
088    boolean addMessageLast(MessageReference node) throws Exception;
089
090    /**
091     * add message to await dispatch - if it can
092     * 
093     * @param node
094     * @param maxWaitTime 
095     * @return true if successful
096     * @throws IOException
097     * @throws Exception
098     */
099    boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception;
100
101    /**
102     * add message to await dispatch
103     * 
104     * @param node
105     * @throws Exception
106     */
107    void addMessageFirst(MessageReference node) throws Exception;
108
109    /**
110     * Add a message recovered from a retroactive policy
111     * 
112     * @param node
113     * @throws Exception
114     */
115    void addRecoveredMessage(MessageReference node) throws Exception;
116
117    /**
118     * @return true if there pending messages to dispatch
119     */
120    boolean hasNext();
121
122    /**
123     * @return the next pending message with its reference count increment
124     */
125    MessageReference next();
126
127    /**
128     * remove the message at the cursor position
129     */
130    void remove();
131
132    /**
133     * @return the number of pending messages
134     */
135    int size();
136
137    /**
138     * clear all pending messages
139     */
140    void clear();
141
142    /**
143     * Informs the Broker if the subscription needs to intervention to recover
144     * it's state e.g. DurableTopicSubscriber may do
145     * 
146     * @return true if recovery required
147     */
148    boolean isRecoveryRequired();
149
150    /**
151     * @return the maximum batch size
152     */
153    int getMaxBatchSize();
154
155    /**
156     * Set the max batch size
157     * 
158     * @param maxBatchSize
159     */
160    void setMaxBatchSize(int maxBatchSize);
161
162    /**
163     * Give the cursor a hint that we are about to remove messages from memory
164     * only
165     */
166    void resetForGC();
167
168    /**
169     * remove a node
170     * 
171     * @param node
172     */
173    void remove(MessageReference node);
174
175    /**
176     * free up any internal buffers
177     */
178    void gc();
179
180    /**
181     * Set the UsageManager
182     * 
183     * @param systemUsage
184     * @see org.apache.activemq.usage.SystemUsage
185     */
186    void setSystemUsage(SystemUsage systemUsage);
187
188    /**
189     * @return the usageManager
190     */
191    SystemUsage getSystemUsage();
192
193    /**
194     * @return the memoryUsageHighWaterMark
195     */
196    int getMemoryUsageHighWaterMark();
197
198    /**
199     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
200     */
201    void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark);
202
203    /**
204     * @return true if the cursor is full
205     */
206    boolean isFull();
207    
208    /**
209     * @return true if the cursor has space to page messages into
210     */
211    public boolean hasSpace();
212
213    /**
214     * @return true if the cursor has buffered messages ready to deliver
215     */
216    boolean hasMessagesBufferedToDeliver();
217
218    /**
219     * destroy the cursor
220     * 
221     * @throws Exception
222     */
223    void destroy() throws Exception;
224
225    /**
226     * Page in a restricted number of messages and increment the reference count
227     * 
228     * @param maxItems
229     * @return a list of paged in messages
230     */
231    LinkedList<MessageReference> pageInList(int maxItems);
232    
233    /**
234     * set the maximum number of producers to track at one time
235     * @param value
236     */
237    void setMaxProducersToAudit(int value);
238    
239    /**
240     * @return the maximum number of producers to audit
241     */
242    int getMaxProducersToAudit();
243    
244    /**
245     * Set the maximum depth of message ids to track
246     * @param depth 
247     */
248    void setMaxAuditDepth(int depth);
249    
250    /**
251     * @return the audit depth
252     */
253    int getMaxAuditDepth();
254    
255    /**
256     * @return the enableAudit
257     */
258    public boolean isEnableAudit();
259    /**
260     * @param enableAudit the enableAudit to set
261     */
262    public void setEnableAudit(boolean enableAudit);
263    
264    /**
265     * @return true if the underlying state of this cursor 
266     * disappears when the broker shuts down
267     */
268    public boolean isTransient();
269    
270    
271    /**
272     * set the audit
273     * @param audit
274     */
275    public void setMessageAudit(ActiveMQMessageAudit audit);
276    
277    
278    /**
279     * @return the audit - could be null
280     */
281    public ActiveMQMessageAudit getMessageAudit();
282    
283    /**
284     * use a cache to improve performance
285     * @param useCache
286     */
287    public void setUseCache(boolean useCache);
288    
289    /**
290     * @return true if a cache may be used
291     */
292    public boolean isUseCache();
293    
294    /**
295     * remove from auditing the message id
296     * @param id
297     */
298    public void rollback(MessageId id);
299
300    /**
301     * @return true if cache is being used
302     */
303    public boolean isCacheEnabled();
304
305    public void rebase();
306
307}