001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.broker.region.MessageReference;
021import org.apache.activemq.broker.region.Queue;
022import org.apache.activemq.command.Message;
023import org.apache.activemq.usage.SystemUsage;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027/**
028 * Store based Cursor for Queues
029 */
030public class StoreQueueCursor extends AbstractPendingMessageCursor {
031
032    private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class);
033    private final Broker broker;
034    private int pendingCount;
035    private final Queue queue;
036    private PendingMessageCursor nonPersistent;
037    private final QueueStorePrefetch persistent;
038    private boolean started;
039    private PendingMessageCursor currentCursor;
040
041    /**
042     * Construct
043     * @param broker
044     * @param queue
045     */
046    public StoreQueueCursor(Broker broker,Queue queue) {
047        super((queue != null ? queue.isPrioritizedMessages():false));
048        this.broker=broker;
049        this.queue = queue;
050        this.persistent = new QueueStorePrefetch(queue, broker);
051        currentCursor = persistent;
052    }
053
054    public synchronized void start() throws Exception {
055        started = true;
056        super.start();
057        if (nonPersistent == null) {
058            if (broker.getBrokerService().isPersistent()) {
059                nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages);
060            }else {
061                nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
062            }
063            nonPersistent.setMaxBatchSize(getMaxBatchSize());
064            nonPersistent.setSystemUsage(systemUsage);
065            nonPersistent.setEnableAudit(isEnableAudit());
066            nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
067            nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
068        }
069        nonPersistent.setMessageAudit(getMessageAudit());
070        nonPersistent.start();
071        persistent.setMessageAudit(getMessageAudit());
072        persistent.start();
073        pendingCount = persistent.size() + nonPersistent.size();
074    }
075
076    public synchronized void stop() throws Exception {
077        started = false;
078        if (nonPersistent != null) {
079//            nonPersistent.clear();
080//            nonPersistent.stop();
081//            nonPersistent.gc();
082          nonPersistent.destroy();
083        }
084        persistent.stop();
085        persistent.gc();
086        super.stop();
087        pendingCount = 0;
088    }
089
090    public synchronized boolean addMessageLast(MessageReference node) throws Exception {
091        boolean result = true;
092        if (node != null) {
093            Message msg = node.getMessage();
094            if (started) {
095                pendingCount++;
096                if (!msg.isPersistent()) {
097                    nonPersistent.addMessageLast(node);
098                }
099            }
100            if (msg.isPersistent()) {
101                result = persistent.addMessageLast(node);
102            }
103        }
104        return result;
105    }
106
107    public synchronized void addMessageFirst(MessageReference node) throws Exception {
108        if (node != null) {
109            Message msg = node.getMessage();
110            if (started) {
111                pendingCount++;
112                if (!msg.isPersistent()) {
113                    nonPersistent.addMessageFirst(node);
114                }
115            }
116            if (msg.isPersistent()) {
117                persistent.addMessageFirst(node);
118            }
119        }
120    }
121
122    public synchronized void clear() {
123        pendingCount = 0;
124    }
125
126    public synchronized boolean hasNext() {
127        try {
128            getNextCursor();
129        } catch (Exception e) {
130            LOG.error("Failed to get current cursor ", e);
131            throw new RuntimeException(e);
132       }
133       return currentCursor != null ? currentCursor.hasNext() : false;
134    }
135
136    public synchronized MessageReference next() {
137        MessageReference result = currentCursor != null ? currentCursor.next() : null;
138        return result;
139    }
140
141    public synchronized void remove() {
142        if (currentCursor != null) {
143            currentCursor.remove();
144        }
145        pendingCount--;
146    }
147
148    public synchronized void remove(MessageReference node) {
149        if (!node.isPersistent()) {
150            nonPersistent.remove(node);
151        } else {
152            persistent.remove(node);
153        }
154        pendingCount--;
155    }
156
157    public synchronized void reset() {
158        nonPersistent.reset();
159        persistent.reset();
160        pendingCount = persistent.size() + nonPersistent.size();
161    }
162
163    public void release() {
164        nonPersistent.release();
165        persistent.release();
166    }
167
168
169    public synchronized int size() {
170        if (pendingCount < 0) {
171            pendingCount = persistent.size() + nonPersistent.size();
172        }
173        return pendingCount;
174    }
175
176    public synchronized boolean isEmpty() {
177        // if negative, more messages arrived in store since last reset so non empty
178        return pendingCount == 0;
179    }
180
181    /**
182     * Informs the Broker if the subscription needs to intervention to recover
183     * it's state e.g. DurableTopicSubscriber may do
184     *
185     * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
186     * @return true if recovery required
187     */
188    public boolean isRecoveryRequired() {
189        return false;
190    }
191
192    /**
193     * @return the nonPersistent Cursor
194     */
195    public PendingMessageCursor getNonPersistent() {
196        return this.nonPersistent;
197    }
198
199    /**
200     * @param nonPersistent cursor to set
201     */
202    public void setNonPersistent(PendingMessageCursor nonPersistent) {
203        this.nonPersistent = nonPersistent;
204    }
205
206    public void setMaxBatchSize(int maxBatchSize) {
207        persistent.setMaxBatchSize(maxBatchSize);
208        if (nonPersistent != null) {
209            nonPersistent.setMaxBatchSize(maxBatchSize);
210        }
211        super.setMaxBatchSize(maxBatchSize);
212    }
213
214
215    public void setMaxProducersToAudit(int maxProducersToAudit) {
216        super.setMaxProducersToAudit(maxProducersToAudit);
217        if (persistent != null) {
218            persistent.setMaxProducersToAudit(maxProducersToAudit);
219        }
220        if (nonPersistent != null) {
221            nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
222        }
223    }
224
225    public void setMaxAuditDepth(int maxAuditDepth) {
226        super.setMaxAuditDepth(maxAuditDepth);
227        if (persistent != null) {
228            persistent.setMaxAuditDepth(maxAuditDepth);
229        }
230        if (nonPersistent != null) {
231            nonPersistent.setMaxAuditDepth(maxAuditDepth);
232        }
233    }
234
235    public void setEnableAudit(boolean enableAudit) {
236        super.setEnableAudit(enableAudit);
237        if (persistent != null) {
238            persistent.setEnableAudit(enableAudit);
239        }
240        if (nonPersistent != null) {
241            nonPersistent.setEnableAudit(enableAudit);
242        }
243    }
244
245    @Override
246    public void setUseCache(boolean useCache) {
247        super.setUseCache(useCache);
248        if (persistent != null) {
249            persistent.setUseCache(useCache);
250        }
251        if (nonPersistent != null) {
252            nonPersistent.setUseCache(useCache);
253        }
254    }
255
256    @Override
257    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
258        super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
259        if (persistent != null) {
260            persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
261        }
262        if (nonPersistent != null) {
263            nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
264        }
265    }
266
267
268
269    public synchronized void gc() {
270        if (persistent != null) {
271            persistent.gc();
272        }
273        if (nonPersistent != null) {
274            nonPersistent.gc();
275        }
276        pendingCount = persistent.size() + nonPersistent.size();
277    }
278
279    public void setSystemUsage(SystemUsage usageManager) {
280        super.setSystemUsage(usageManager);
281        if (persistent != null) {
282            persistent.setSystemUsage(usageManager);
283        }
284        if (nonPersistent != null) {
285            nonPersistent.setSystemUsage(usageManager);
286        }
287    }
288
289    protected synchronized PendingMessageCursor getNextCursor() throws Exception {
290        if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) {
291            currentCursor = currentCursor == persistent ? nonPersistent : persistent;
292            // sanity check
293            if (currentCursor.isEmpty()) {
294                currentCursor = currentCursor == persistent ? nonPersistent : persistent;
295            }
296        }
297        return currentCursor;
298    }
299
300    @Override
301    public boolean isCacheEnabled() {
302        boolean cacheEnabled = isUseCache();
303        if (cacheEnabled) {
304            if (persistent != null) {
305                cacheEnabled &= persistent.isCacheEnabled();
306            }
307            if (nonPersistent != null) {
308                cacheEnabled &= nonPersistent.isCacheEnabled();
309            }
310            setCacheEnabled(cacheEnabled);
311        }
312        return cacheEnabled;
313    }
314
315    @Override
316    public void rebase() {
317        persistent.rebase();
318        reset();
319    }
320
321}