001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.Collections;
020import java.util.LinkedList;
021import java.util.List;
022import java.util.Set;
023import org.apache.activemq.ActiveMQMessageAudit;
024import org.apache.activemq.broker.Broker;
025import org.apache.activemq.broker.ConnectionContext;
026import org.apache.activemq.broker.region.BaseDestination;
027import org.apache.activemq.broker.region.Destination;
028import org.apache.activemq.broker.region.MessageReference;
029import org.apache.activemq.broker.region.Subscription;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.usage.SystemUsage;
032
033/**
034 * Abstract method holder for pending message (messages awaiting disptach to a
035 * consumer) cursor
036 * 
037 * 
038 */
039public abstract class AbstractPendingMessageCursor implements PendingMessageCursor {
040    protected int memoryUsageHighWaterMark = 70;
041    protected int maxBatchSize = BaseDestination.MAX_PAGE_SIZE;
042    protected SystemUsage systemUsage;
043    protected int maxProducersToAudit = BaseDestination.MAX_PRODUCERS_TO_AUDIT;
044    protected int maxAuditDepth = BaseDestination.MAX_AUDIT_DEPTH;
045    protected boolean enableAudit=true;
046    protected ActiveMQMessageAudit audit;
047    protected boolean useCache=true;
048    protected boolean cacheEnabled=true;
049    private boolean started=false;
050    protected MessageReference last = null;
051    protected final boolean prioritizedMessages;
052    
053    public AbstractPendingMessageCursor(boolean prioritizedMessages) {
054        this.prioritizedMessages=prioritizedMessages;
055    }
056  
057
058    public synchronized void start() throws Exception  {
059        if (!started && enableAudit && audit==null) {
060            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
061        }
062        started=true;
063    }
064
065    public synchronized void stop() throws Exception  {
066        started=false;
067        gc();
068    }
069
070    public void add(ConnectionContext context, Destination destination) throws Exception {
071    }
072
073    @SuppressWarnings("unchecked")
074    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
075        return Collections.EMPTY_LIST;
076    }
077
078    public boolean isRecoveryRequired() {
079        return true;
080    }
081
082    public void addMessageFirst(MessageReference node) throws Exception {
083    }
084
085    public boolean addMessageLast(MessageReference node) throws Exception {
086        return true;
087    }
088    
089    public boolean tryAddMessageLast(MessageReference node, long maxWaitTime) throws Exception {
090        return addMessageLast(node);
091    }
092
093    public void addRecoveredMessage(MessageReference node) throws Exception {
094        addMessageLast(node);
095    }
096
097    public void clear() {
098    }
099
100    public boolean hasNext() {
101        return false;
102    }
103
104    public boolean isEmpty() {
105        return false;
106    }
107
108    public boolean isEmpty(Destination destination) {
109        return isEmpty();
110    }
111
112    public MessageReference next() {
113        return null;
114    }
115
116    public void remove() {
117    }
118
119    public void reset() {
120    }
121
122    public int size() {
123        return 0;
124    }
125
126    public int getMaxBatchSize() {
127        return maxBatchSize;
128    }
129
130    public void setMaxBatchSize(int maxBatchSize) {
131        this.maxBatchSize = maxBatchSize;
132    }
133
134    protected void fillBatch() throws Exception {
135    }
136
137    public void resetForGC() {
138        reset();
139    }
140
141    public void remove(MessageReference node) {
142    }
143
144    public void gc() {
145    }
146
147    public void setSystemUsage(SystemUsage usageManager) {
148        this.systemUsage = usageManager;
149    }
150
151    public boolean hasSpace() {
152        // allow isFull to verify parent usage and otherwise enforce local memoryUsageHighWaterMark
153        return systemUsage != null ? (!isParentFull() && systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
154    }
155
156    boolean parentHasSpace(int waterMark) {
157        boolean result = true;
158        if (systemUsage != null) {
159            if (systemUsage.getMemoryUsage().getParent() != null) {
160                return systemUsage.getMemoryUsage().getParent().getPercentUsage() <= waterMark;
161            }
162        }
163        return result;
164    }
165
166    private boolean isParentFull() {
167        boolean result = false;
168        if (systemUsage != null) {
169            if (systemUsage.getMemoryUsage().getParent() != null) {
170                return systemUsage.getMemoryUsage().getParent().getPercentUsage() >= 100;
171            }
172        }
173        return result;
174    }
175
176    public boolean isFull() {
177        return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
178    }
179
180    public void release() {
181    }
182
183    public boolean hasMessagesBufferedToDeliver() {
184        return false;
185    }
186
187    /**
188     * @return the memoryUsageHighWaterMark
189     */
190    public int getMemoryUsageHighWaterMark() {
191        return memoryUsageHighWaterMark;
192    }
193
194    /**
195     * @param memoryUsageHighWaterMark the memoryUsageHighWaterMark to set
196     */
197    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
198        this.memoryUsageHighWaterMark = memoryUsageHighWaterMark;
199    }
200
201    /**
202     * @return the usageManager
203     */
204    public SystemUsage getSystemUsage() {
205        return this.systemUsage;
206    }
207
208    /**
209     * destroy the cursor
210     * 
211     * @throws Exception
212     */
213    public void destroy() throws Exception {
214        stop();
215    }
216
217    /**
218     * Page in a restricted number of messages
219     * 
220     * @param maxItems maximum number of messages to return
221     * @return a list of paged in messages
222     */
223    public LinkedList<MessageReference> pageInList(int maxItems) {
224        throw new RuntimeException("Not supported");
225    }
226
227    /**
228     * @return the maxProducersToAudit
229     */
230    public int getMaxProducersToAudit() {
231        return maxProducersToAudit;
232    }
233
234    /**
235     * @param maxProducersToAudit the maxProducersToAudit to set
236     */
237    public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
238        this.maxProducersToAudit = maxProducersToAudit;
239        if (audit != null) {
240            audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
241        }
242    }
243
244    /**
245     * @return the maxAuditDepth
246     */
247    public int getMaxAuditDepth() {
248        return maxAuditDepth;
249    }
250    
251
252    /**
253     * @param maxAuditDepth the maxAuditDepth to set
254     */
255    public synchronized void setMaxAuditDepth(int maxAuditDepth) {
256        this.maxAuditDepth = maxAuditDepth;
257        if (audit != null) {
258            audit.setAuditDepth(maxAuditDepth);
259        }
260    }
261    
262    
263    /**
264     * @return the enableAudit
265     */
266    public boolean isEnableAudit() {
267        return enableAudit;
268    }
269
270    /**
271     * @param enableAudit the enableAudit to set
272     */
273    public synchronized void setEnableAudit(boolean enableAudit) {
274        this.enableAudit = enableAudit;
275        if (enableAudit && started && audit==null) {
276            audit= new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
277        }
278    }
279    
280    public boolean isTransient() {
281        return false;
282    }
283    
284       
285    /**
286     * set the audit
287     * @param audit new audit component
288     */
289    public void setMessageAudit(ActiveMQMessageAudit audit) {
290        this.audit=audit;
291    }
292    
293    
294    /**
295     * @return the audit
296     */
297    public ActiveMQMessageAudit getMessageAudit() {
298        return audit;
299    }
300    
301    public boolean isUseCache() {
302        return useCache;
303    }
304
305    public void setUseCache(boolean useCache) {
306        this.useCache = useCache;
307    }
308
309    public synchronized boolean isDuplicate(MessageId messageId) {
310        boolean unique = recordUniqueId(messageId);
311        rollback(messageId);
312        return !unique;
313    }
314    
315    /**
316     * records a message id and checks if it is a duplicate
317     * @param messageId
318     * @return true if id is unique, false otherwise.
319     */
320    public synchronized boolean recordUniqueId(MessageId messageId) {
321        if (!enableAudit || audit==null) {
322            return true;
323        }
324        return !audit.isDuplicate(messageId);
325    }
326    
327    public synchronized void rollback(MessageId id) {
328        if (audit != null) {
329            audit.rollback(id);
330        }
331    }
332    
333    public synchronized boolean isStarted() {
334        return started;
335    }
336    
337    public static boolean isPrioritizedMessageSubscriber(Broker broker,Subscription sub) {
338        boolean result = false;
339        Set<Destination> destinations = broker.getDestinations(sub.getActiveMQDestination());
340        if (destinations != null) {
341            for (Destination dest:destinations) {
342                if (dest.isPrioritizedMessages()) {
343                    result = true;
344                    break;
345                }
346            }
347        }
348        return result;
349
350    }
351
352    public synchronized boolean isCacheEnabled() {
353        return cacheEnabled;
354    }
355
356    public synchronized void setCacheEnabled(boolean val) {
357        cacheEnabled = val;
358    }
359
360    public void rebase() {
361    }
362}