001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import org.apache.activemq.broker.Broker;
020import org.apache.activemq.broker.region.MessageReference;
021import org.apache.activemq.broker.region.Queue;
022import org.apache.activemq.command.Message;
023import org.apache.activemq.command.MessageId;
024import org.apache.activemq.usage.SystemUsage;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028/**
029 * Store based Cursor for Queues
030 */
031public class StoreQueueCursor extends AbstractPendingMessageCursor {
032
033    private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class);
034    private final Broker broker;
035    private int pendingCount;
036    private final Queue queue;
037    private PendingMessageCursor nonPersistent;
038    private final QueueStorePrefetch persistent;
039    private boolean started;
040    private PendingMessageCursor currentCursor;
041
042    /**
043     * Construct
044     * @param broker
045     * @param queue
046     */
047    public StoreQueueCursor(Broker broker,Queue queue) {
048        super((queue != null ? queue.isPrioritizedMessages():false));
049        this.broker=broker;
050        this.queue = queue;
051        this.persistent = new QueueStorePrefetch(queue, broker);
052        currentCursor = persistent;
053    }
054
055    public synchronized void start() throws Exception {
056        started = true;
057        super.start();
058        if (nonPersistent == null) {
059            if (broker.getBrokerService().isPersistent()) {
060                nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages);
061            }else {
062                nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages);
063            }
064            nonPersistent.setMaxBatchSize(getMaxBatchSize());
065            nonPersistent.setSystemUsage(systemUsage);
066            nonPersistent.setEnableAudit(isEnableAudit());
067            nonPersistent.setMaxAuditDepth(getMaxAuditDepth());
068            nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit());
069        }
070        nonPersistent.setMessageAudit(getMessageAudit());
071        nonPersistent.start();
072        persistent.setMessageAudit(getMessageAudit());
073        persistent.start();
074        pendingCount = persistent.size() + nonPersistent.size();
075    }
076
077    public synchronized void stop() throws Exception {
078        started = false;
079        if (nonPersistent != null) {
080          nonPersistent.destroy();
081        }
082        persistent.stop();
083        persistent.gc();
084        super.stop();
085        pendingCount = 0;
086    }
087
088    public synchronized boolean addMessageLast(MessageReference node) throws Exception {
089        boolean result = true;
090        if (node != null) {
091            Message msg = node.getMessage();
092            if (started) {
093                pendingCount++;
094                if (!msg.isPersistent()) {
095                    nonPersistent.addMessageLast(node);
096                }
097            }
098            if (msg.isPersistent()) {
099                result = persistent.addMessageLast(node);
100            }
101        }
102        return result;
103    }
104
105    public synchronized void addMessageFirst(MessageReference node) throws Exception {
106        if (node != null) {
107            Message msg = node.getMessage();
108            if (started) {
109                pendingCount++;
110                if (!msg.isPersistent()) {
111                    nonPersistent.addMessageFirst(node);
112                }
113            }
114            if (msg.isPersistent()) {
115                persistent.addMessageFirst(node);
116            }
117        }
118    }
119
120    public synchronized void clear() {
121        pendingCount = 0;
122    }
123
124    public synchronized boolean hasNext() {
125        try {
126            getNextCursor();
127        } catch (Exception e) {
128            LOG.error("Failed to get current cursor ", e);
129            throw new RuntimeException(e);
130       }
131       return currentCursor != null ? currentCursor.hasNext() : false;
132    }
133
134    public synchronized MessageReference next() {
135        MessageReference result = currentCursor != null ? currentCursor.next() : null;
136        return result;
137    }
138
139    public synchronized void remove() {
140        if (currentCursor != null) {
141            currentCursor.remove();
142        }
143        pendingCount--;
144    }
145
146    public synchronized void remove(MessageReference node) {
147        if (!node.isPersistent()) {
148            nonPersistent.remove(node);
149        } else {
150            persistent.remove(node);
151        }
152        pendingCount--;
153    }
154
155    public synchronized void reset() {
156        nonPersistent.reset();
157        persistent.reset();
158        pendingCount = persistent.size() + nonPersistent.size();
159    }
160
161    public void release() {
162        nonPersistent.release();
163        persistent.release();
164    }
165
166
167    public synchronized int size() {
168        if (pendingCount < 0) {
169            pendingCount = persistent.size() + nonPersistent.size();
170        }
171        return pendingCount;
172    }
173
174    public synchronized boolean isEmpty() {
175        // if negative, more messages arrived in store since last reset so non empty
176        return pendingCount == 0;
177    }
178
179    /**
180     * Informs the Broker if the subscription needs to intervention to recover
181     * it's state e.g. DurableTopicSubscriber may do
182     *
183     * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor
184     * @return true if recovery required
185     */
186    public boolean isRecoveryRequired() {
187        return false;
188    }
189
190    /**
191     * @return the nonPersistent Cursor
192     */
193    public PendingMessageCursor getNonPersistent() {
194        return this.nonPersistent;
195    }
196
197    /**
198     * @param nonPersistent cursor to set
199     */
200    public void setNonPersistent(PendingMessageCursor nonPersistent) {
201        this.nonPersistent = nonPersistent;
202    }
203
204    /**
205     * @return the persistent Cursor
206     */
207    public PendingMessageCursor getPersistent() { return  this.persistent; }
208
209    @Override
210    public void setMaxBatchSize(int maxBatchSize) {
211        persistent.setMaxBatchSize(maxBatchSize);
212        if (nonPersistent != null) {
213            nonPersistent.setMaxBatchSize(maxBatchSize);
214        }
215        super.setMaxBatchSize(maxBatchSize);
216    }
217
218
219    public void setMaxProducersToAudit(int maxProducersToAudit) {
220        super.setMaxProducersToAudit(maxProducersToAudit);
221        if (persistent != null) {
222            persistent.setMaxProducersToAudit(maxProducersToAudit);
223        }
224        if (nonPersistent != null) {
225            nonPersistent.setMaxProducersToAudit(maxProducersToAudit);
226        }
227    }
228
229    public void setMaxAuditDepth(int maxAuditDepth) {
230        super.setMaxAuditDepth(maxAuditDepth);
231        if (persistent != null) {
232            persistent.setMaxAuditDepth(maxAuditDepth);
233        }
234        if (nonPersistent != null) {
235            nonPersistent.setMaxAuditDepth(maxAuditDepth);
236        }
237    }
238
239    public void setEnableAudit(boolean enableAudit) {
240        super.setEnableAudit(enableAudit);
241        if (persistent != null) {
242            persistent.setEnableAudit(enableAudit);
243        }
244        if (nonPersistent != null) {
245            nonPersistent.setEnableAudit(enableAudit);
246        }
247    }
248
249    @Override
250    public void rollback(MessageId id) {
251        nonPersistent.rollback(id);
252        persistent.rollback(id);
253    }
254
255    @Override
256    public void setUseCache(boolean useCache) {
257        super.setUseCache(useCache);
258        if (persistent != null) {
259            persistent.setUseCache(useCache);
260        }
261        if (nonPersistent != null) {
262            nonPersistent.setUseCache(useCache);
263        }
264    }
265
266    @Override
267    public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) {
268        super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
269        if (persistent != null) {
270            persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
271        }
272        if (nonPersistent != null) {
273            nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark);
274        }
275    }
276
277
278
279    public synchronized void gc() {
280        if (persistent != null) {
281            persistent.gc();
282        }
283        if (nonPersistent != null) {
284            nonPersistent.gc();
285        }
286        pendingCount = persistent.size() + nonPersistent.size();
287    }
288
289    public void setSystemUsage(SystemUsage usageManager) {
290        super.setSystemUsage(usageManager);
291        if (persistent != null) {
292            persistent.setSystemUsage(usageManager);
293        }
294        if (nonPersistent != null) {
295            nonPersistent.setSystemUsage(usageManager);
296        }
297    }
298
299    protected synchronized PendingMessageCursor getNextCursor() throws Exception {
300        if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) {
301            currentCursor = currentCursor == persistent ? nonPersistent : persistent;
302            // sanity check
303            if (currentCursor.isEmpty()) {
304                currentCursor = currentCursor == persistent ? nonPersistent : persistent;
305            }
306        }
307        return currentCursor;
308    }
309
310    @Override
311    public boolean isCacheEnabled() {
312        boolean cacheEnabled = isUseCache();
313        if (cacheEnabled) {
314            if (persistent != null) {
315                cacheEnabled &= persistent.isCacheEnabled();
316            }
317            if (nonPersistent != null) {
318                cacheEnabled &= nonPersistent.isCacheEnabled();
319            }
320            setCacheEnabled(cacheEnabled);
321        }
322        return cacheEnabled;
323    }
324
325    @Override
326    public void rebase() {
327        persistent.rebase();
328        reset();
329    }
330
331}