001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.Collections;
020import java.util.LinkedList;
021import java.util.List;
022import java.util.Set;
023import org.apache.activemq.ActiveMQMessageAudit;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.BaseDestination;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.MessageReference;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.usage.SystemUsage;
032
033/**
034 * Abstract method holder for pending message (messages awaiting disptach to a
035 * consumer) cursor
036 * 
037 * 
038 */
039public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
040    protected int memoryUsageHighWaterMark = 70;
041    protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
042    protected SystemUsage systemUsage;
043    protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
044    protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
045    protected boolean enableAudit=true;
046    protected ActiveMQMessageAudit audit;
047    protected boolean useCache=true;
048    private boolean cacheEnabled=true;
049    private boolean started=false;
050    protected MessageReference last = null;
051    protected final boolean prioritizedMessages;
052    
053    public AbstractPendingMessageCursor(boolean prioritizedMessages) {
054        this.prioritizedMessages=prioritizedMessages;
055    }
056  
057
058    public synchronized void start() throws Exception  {
059        if (!started && enableAudit && audit==null) {
060            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
061        }
062        started=true;
063    }
064
065    public synchronized void stop() throws Exception  {
066        started=false;
067        gc();
068    }
069
070    public void add(ConnectionContext context, Destination destination) throws Exception {
071    }
072
073    @SuppressWarnings("unchecked")
074    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
075        return Collections.EMPTY_LIST;
076    }
077
078    public boolean isRecoveryRequired() {
079        return true;
080    }
081
082    public void addMessageFirst(MessageReference node) throws Exception {
083    }
084
085    public boolean addMessageLast(MessageReference node) throws Exception {
086        return true;
087    }
088    
089    public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
090        return addMessageLast(node);
091    }
092
093    public void addRecoveredMessage(MessageReference node) throws Exception {
094        addMessageLast(node);
095    }
096
097    public void clear() {
098    }
099
100    public boolean hasNext() {
101        return false;
102    }
103
104    public boolean isEmpty() {
105        return false;
106    }
107
108    public boolean isEmpty(Destination destination) {
109        return isEmpty();
110    }
111
112    public MessageReference next() {
113        return null;
114    }
115
116    public void remove() {
117    }
118
119    public void reset() {
120    }
121
122    public int size() {
123        return 0;
124    }
125
126    public int getMaxBatchSize() {
127        return maxBatchSize;
128    }
129
130    public void setMaxBatchSize(int maxBatchSize) {
131        this.maxBatchSize = maxBatchSize;
132    }
133
134    protected void fillBatch() throws Exception {
135    }
136
137    public void resetForGC() {
138        reset();
139    }
140
141    public void remove(MessageReference node) {
142    }
143
144    public void gc() {
145    }
146
147    public void setSystemUsage(SystemUsage usageManager) {
148        this.systemUsage = usageManager;
149    }
150
151    public boolean hasSpace() {
152        // allow isFull to verify parent usage and otherwise enforce local memoryUsageHighWaterMark
153        return systemUsage != null ? (!isParentFull() && systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
154    }
155
156    private boolean isParentFull() {
157        boolean result = false;
158        if (systemUsage != null) {
159            if (systemUsage.getMemoryUsage().getParent() != null) {
160                return systemUsage.getMemoryUsage().getParent().getPercentUsage() >= 100;
161            }
162        }
163        return result;
164    }
165
166    public boolean isFull() {
167        return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
168    }
169
170    public void release() {
171    }
172
173    public boolean hasMessagesBufferedToDeliver() {
174        return false;
175    }
176
177    /**
178     * @return the memoryUsageHighWaterMark
179     */
180    public int getMemoryUsageHighWaterMark() {
181        return memoryUsageHighWaterMark;
182    }
183
184    /**
185     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
186     */
187    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
188        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
189    }
190
191    /**
192     * @return the usageManager
193     */
194    public SystemUsage getSystemUsage() {
195        return this.systemUsage;
196    }
197
198    /**
199     * destroy the cursor
200     * 
201     * @throws Exception
202     */
203    public void destroy() throws Exception {
204        stop();
205    }
206
207    /**
208     * Page in a restricted number of messages
209     * 
210     * @param maxItems maximum number of messages to return
211     * @return a list of paged in messages
212     */
213    public LinkedList<MessageReference> pageInList(int maxItems) {
214        throw new RuntimeException("Not supported");
215    }
216
217    /**
218     * @return the maxProducersToAudit
219     */
220    public int getMaxProducersToAudit() {
221        return maxProducersToAudit;
222    }
223
224    /**
225     * @param maxProducersToAudit the maxProducersToAudit to set
226     */
227    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
228        this.maxProducersToAudit = maxProducersToAudit;
229        if (audit != null) {
230            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
231        }
232    }
233
234    /**
235     * @return the maxAuditDepth
236     */
237    public int getMaxAuditDepth() {
238        return maxAuditDepth;
239    }
240    
241
242    /**
243     * @param maxAuditDepth the maxAuditDepth to set
244     */
245    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
246        this.maxAuditDepth = maxAuditDepth;
247        if (audit != null) {
248            audit.setAuditDepth(maxAuditDepth);
249        }
250    }
251    
252    
253    /**
254     * @return the enableAudit
255     */
256    public boolean isEnableAudit() {
257        return enableAudit;
258    }
259
260    /**
261     * @param enableAudit the enableAudit to set
262     */
263    public synchronized void setEnableAudit(boolean enableAudit) {
264        this.enableAudit = enableAudit;
265        if (enableAudit && started && audit==null) {
266            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
267        }
268    }
269    
270    public boolean isTransient() {
271        return false;
272    }
273    
274       
275    /**
276     * set the audit
277     * @param audit new audit component
278     */
279    public void setMessageAudit(ActiveMQMessageAudit audit) {
280        this.audit=audit;
281    }
282    
283    
284    /**
285     * @return the audit
286     */
287    public ActiveMQMessageAudit getMessageAudit() {
288        return audit;
289    }
290    
291    public boolean isUseCache() {
292        return useCache;
293    }
294
295    public void setUseCache(boolean useCache) {
296        this.useCache = useCache;
297    }
298
299    public synchronized boolean isDuplicate(MessageId messageId) {
300        boolean unique = recordUniqueId(messageId);
301        rollback(messageId);
302        return !unique;
303    }
304    
305    /**
306     * records a message id and checks if it is a duplicate
307     * @param messageId
308     * @return true if id is unique, false otherwise.
309     */
310    public synchronized boolean recordUniqueId(MessageId messageId) {
311        if (!enableAudit || audit==null) {
312            return true;
313        }
314        return !audit.isDuplicate(messageId);
315    }
316    
317    public synchronized void rollback(MessageId id) {
318        if (audit != null) {
319            audit.rollback(id);
320        }
321    }
322    
323    public synchronized boolean isStarted() {
324        return started;
325    }
326    
327    public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
328        boolean result = false;
329        Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
330        if (destinations != null) {
331            for (Destination dest:destinations) {
332                if (dest.isPrioritizedMessages()) {
333                    result = true;
334                    break;
335                }
336            }
337        }
338        return result;
339
340    }
341
342    public synchronized boolean isCacheEnabled() {
343        return cacheEnabled;
344    }
345
346    public synchronized void setCacheEnabled(boolean val) {
347        cacheEnabled = val;
348    }
349
350    public void rebase() {
351    }
352}