001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import org.apache.activemq.broker.Broker; 020import org.apache.activemq.broker.region.MessageReference; 021import org.apache.activemq.broker.region.Queue; 022import org.apache.activemq.command.Message; 023import org.apache.activemq.usage.SystemUsage; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026 027/** 028 * Store based Cursor for Queues 029 */ 030public class StoreQueueCursor extends AbstractPendingMessageCursor { 031 032 private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class); 033 private final Broker broker; 034 private int pendingCount; 035 private final Queue queue; 036 private PendingMessageCursor nonPersistent; 037 private final QueueStorePrefetch persistent; 038 private boolean started; 039 private PendingMessageCursor currentCursor; 040 041 /** 042 * Construct 043 * @param broker 044 * @param queue 045 */ 046 public StoreQueueCursor(Broker broker,Queue queue) { 047 super((queue != null ? queue.isPrioritizedMessages():false)); 048 this.broker=broker; 049 this.queue = queue; 050 this.persistent = new QueueStorePrefetch(queue, broker); 051 currentCursor = persistent; 052 } 053 054 public synchronized void start() throws Exception { 055 started = true; 056 super.start(); 057 if (nonPersistent == null) { 058 if (broker.getBrokerService().isPersistent()) { 059 nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages); 060 }else { 061 nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); 062 } 063 nonPersistent.setMaxBatchSize(getMaxBatchSize()); 064 nonPersistent.setSystemUsage(systemUsage); 065 nonPersistent.setEnableAudit(isEnableAudit()); 066 nonPersistent.setMaxAuditDepth(getMaxAuditDepth()); 067 nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit()); 068 } 069 nonPersistent.setMessageAudit(getMessageAudit()); 070 nonPersistent.start(); 071 persistent.setMessageAudit(getMessageAudit()); 072 persistent.start(); 073 pendingCount = persistent.size() + nonPersistent.size(); 074 } 075 076 public synchronized void stop() throws Exception { 077 started = false; 078 if (nonPersistent != null) { 079// nonPersistent.clear(); 080// nonPersistent.stop(); 081// nonPersistent.gc(); 082 nonPersistent.destroy(); 083 } 084 persistent.stop(); 085 persistent.gc(); 086 super.stop(); 087 pendingCount = 0; 088 } 089 090 public synchronized boolean addMessageLast(MessageReference node) throws Exception { 091 boolean result = true; 092 if (node != null) { 093 Message msg = node.getMessage(); 094 if (started) { 095 pendingCount++; 096 if (!msg.isPersistent()) { 097 nonPersistent.addMessageLast(node); 098 } 099 } 100 if (msg.isPersistent()) { 101 result = persistent.addMessageLast(node); 102 } 103 } 104 return result; 105 } 106 107 public synchronized void addMessageFirst(MessageReference node) throws Exception { 108 if (node != null) { 109 Message msg = node.getMessage(); 110 if (started) { 111 pendingCount++; 112 if (!msg.isPersistent()) { 113 nonPersistent.addMessageFirst(node); 114 } 115 } 116 if (msg.isPersistent()) { 117 persistent.addMessageFirst(node); 118 } 119 } 120 } 121 122 public synchronized void clear() { 123 pendingCount = 0; 124 } 125 126 public synchronized boolean hasNext() { 127 try { 128 getNextCursor(); 129 } catch (Exception e) { 130 LOG.error("Failed to get current cursor ", e); 131 throw new RuntimeException(e); 132 } 133 return currentCursor != null ? currentCursor.hasNext() : false; 134 } 135 136 public synchronized MessageReference next() { 137 MessageReference result = currentCursor != null ? currentCursor.next() : null; 138 return result; 139 } 140 141 public synchronized void remove() { 142 if (currentCursor != null) { 143 currentCursor.remove(); 144 } 145 pendingCount--; 146 } 147 148 public synchronized void remove(MessageReference node) { 149 if (!node.isPersistent()) { 150 nonPersistent.remove(node); 151 } else { 152 persistent.remove(node); 153 } 154 pendingCount--; 155 } 156 157 public synchronized void reset() { 158 nonPersistent.reset(); 159 persistent.reset(); 160 pendingCount = persistent.size() + nonPersistent.size(); 161 } 162 163 public void release() { 164 nonPersistent.release(); 165 persistent.release(); 166 } 167 168 169 public synchronized int size() { 170 if (pendingCount < 0) { 171 pendingCount = persistent.size() + nonPersistent.size(); 172 } 173 return pendingCount; 174 } 175 176 public synchronized boolean isEmpty() { 177 // if negative, more messages arrived in store since last reset so non empty 178 return pendingCount == 0; 179 } 180 181 /** 182 * Informs the Broker if the subscription needs to intervention to recover 183 * it's state e.g. DurableTopicSubscriber may do 184 * 185 * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor 186 * @return true if recovery required 187 */ 188 public boolean isRecoveryRequired() { 189 return false; 190 } 191 192 /** 193 * @return the nonPersistent Cursor 194 */ 195 public PendingMessageCursor getNonPersistent() { 196 return this.nonPersistent; 197 } 198 199 /** 200 * @param nonPersistent cursor to set 201 */ 202 public void setNonPersistent(PendingMessageCursor nonPersistent) { 203 this.nonPersistent = nonPersistent; 204 } 205 206 public void setMaxBatchSize(int maxBatchSize) { 207 persistent.setMaxBatchSize(maxBatchSize); 208 if (nonPersistent != null) { 209 nonPersistent.setMaxBatchSize(maxBatchSize); 210 } 211 super.setMaxBatchSize(maxBatchSize); 212 } 213 214 215 public void setMaxProducersToAudit(int maxProducersToAudit) { 216 super.setMaxProducersToAudit(maxProducersToAudit); 217 if (persistent != null) { 218 persistent.setMaxProducersToAudit(maxProducersToAudit); 219 } 220 if (nonPersistent != null) { 221 nonPersistent.setMaxProducersToAudit(maxProducersToAudit); 222 } 223 } 224 225 public void setMaxAuditDepth(int maxAuditDepth) { 226 super.setMaxAuditDepth(maxAuditDepth); 227 if (persistent != null) { 228 persistent.setMaxAuditDepth(maxAuditDepth); 229 } 230 if (nonPersistent != null) { 231 nonPersistent.setMaxAuditDepth(maxAuditDepth); 232 } 233 } 234 235 public void setEnableAudit(boolean enableAudit) { 236 super.setEnableAudit(enableAudit); 237 if (persistent != null) { 238 persistent.setEnableAudit(enableAudit); 239 } 240 if (nonPersistent != null) { 241 nonPersistent.setEnableAudit(enableAudit); 242 } 243 } 244 245 @Override 246 public void setUseCache(boolean useCache) { 247 super.setUseCache(useCache); 248 if (persistent != null) { 249 persistent.setUseCache(useCache); 250 } 251 if (nonPersistent != null) { 252 nonPersistent.setUseCache(useCache); 253 } 254 } 255 256 @Override 257 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 258 super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 259 if (persistent != null) { 260 persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 261 } 262 if (nonPersistent != null) { 263 nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 264 } 265 } 266 267 268 269 public synchronized void gc() { 270 if (persistent != null) { 271 persistent.gc(); 272 } 273 if (nonPersistent != null) { 274 nonPersistent.gc(); 275 } 276 pendingCount = persistent.size() + nonPersistent.size(); 277 } 278 279 public void setSystemUsage(SystemUsage usageManager) { 280 super.setSystemUsage(usageManager); 281 if (persistent != null) { 282 persistent.setSystemUsage(usageManager); 283 } 284 if (nonPersistent != null) { 285 nonPersistent.setSystemUsage(usageManager); 286 } 287 } 288 289 protected synchronized PendingMessageCursor getNextCursor() throws Exception { 290 if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) { 291 currentCursor = currentCursor == persistent ? nonPersistent : persistent; 292 // sanity check 293 if (currentCursor.isEmpty()) { 294 currentCursor = currentCursor == persistent ? nonPersistent : persistent; 295 } 296 } 297 return currentCursor; 298 } 299 300 @Override 301 public boolean isCacheEnabled() { 302 boolean cacheEnabled = isUseCache(); 303 if (cacheEnabled) { 304 if (persistent != null) { 305 cacheEnabled &= persistent.isCacheEnabled(); 306 } 307 if (nonPersistent != null) { 308 cacheEnabled &= nonPersistent.isCacheEnabled(); 309 } 310 setCacheEnabled(cacheEnabled); 311 } 312 return cacheEnabled; 313 } 314 315 @Override 316 public void rebase() { 317 persistent.rebase(); 318 reset(); 319 } 320 321}