001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import org.apache.activemq.broker.Broker; 020import org.apache.activemq.broker.region.MessageReference; 021import org.apache.activemq.broker.region.Queue; 022import org.apache.activemq.command.Message; 023import org.apache.activemq.command.MessageId; 024import org.apache.activemq.usage.SystemUsage; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028/** 029 * Store based Cursor for Queues 030 */ 031public class StoreQueueCursor extends AbstractPendingMessageCursor { 032 033 private static final Logger LOG = LoggerFactory.getLogger(StoreQueueCursor.class); 034 private final Broker broker; 035 private int pendingCount; 036 private final Queue queue; 037 private PendingMessageCursor nonPersistent; 038 private final QueueStorePrefetch persistent; 039 private boolean started; 040 private PendingMessageCursor currentCursor; 041 042 /** 043 * Construct 044 * @param broker 045 * @param queue 046 */ 047 public StoreQueueCursor(Broker broker,Queue queue) { 048 super((queue != null ? queue.isPrioritizedMessages():false)); 049 this.broker=broker; 050 this.queue = queue; 051 this.persistent = new QueueStorePrefetch(queue, broker); 052 currentCursor = persistent; 053 } 054 055 public synchronized void start() throws Exception { 056 started = true; 057 super.start(); 058 if (nonPersistent == null) { 059 if (broker.getBrokerService().isPersistent()) { 060 nonPersistent = new FilePendingMessageCursor(broker,queue.getName(),this.prioritizedMessages); 061 }else { 062 nonPersistent = new VMPendingMessageCursor(this.prioritizedMessages); 063 } 064 nonPersistent.setMaxBatchSize(getMaxBatchSize()); 065 nonPersistent.setSystemUsage(systemUsage); 066 nonPersistent.setEnableAudit(isEnableAudit()); 067 nonPersistent.setMaxAuditDepth(getMaxAuditDepth()); 068 nonPersistent.setMaxProducersToAudit(getMaxProducersToAudit()); 069 } 070 nonPersistent.setMessageAudit(getMessageAudit()); 071 nonPersistent.start(); 072 persistent.setMessageAudit(getMessageAudit()); 073 persistent.start(); 074 pendingCount = persistent.size() + nonPersistent.size(); 075 } 076 077 public synchronized void stop() throws Exception { 078 started = false; 079 if (nonPersistent != null) { 080 nonPersistent.destroy(); 081 } 082 persistent.stop(); 083 persistent.gc(); 084 super.stop(); 085 pendingCount = 0; 086 } 087 088 public synchronized boolean addMessageLast(MessageReference node) throws Exception { 089 boolean result = true; 090 if (node != null) { 091 Message msg = node.getMessage(); 092 if (started) { 093 pendingCount++; 094 if (!msg.isPersistent()) { 095 nonPersistent.addMessageLast(node); 096 } 097 } 098 if (msg.isPersistent()) { 099 result = persistent.addMessageLast(node); 100 } 101 } 102 return result; 103 } 104 105 public synchronized void addMessageFirst(MessageReference node) throws Exception { 106 if (node != null) { 107 Message msg = node.getMessage(); 108 if (started) { 109 pendingCount++; 110 if (!msg.isPersistent()) { 111 nonPersistent.addMessageFirst(node); 112 } 113 } 114 if (msg.isPersistent()) { 115 persistent.addMessageFirst(node); 116 } 117 } 118 } 119 120 public synchronized void clear() { 121 pendingCount = 0; 122 } 123 124 public synchronized boolean hasNext() { 125 try { 126 getNextCursor(); 127 } catch (Exception e) { 128 LOG.error("Failed to get current cursor ", e); 129 throw new RuntimeException(e); 130 } 131 return currentCursor != null ? currentCursor.hasNext() : false; 132 } 133 134 public synchronized MessageReference next() { 135 MessageReference result = currentCursor != null ? currentCursor.next() : null; 136 return result; 137 } 138 139 public synchronized void remove() { 140 if (currentCursor != null) { 141 currentCursor.remove(); 142 } 143 pendingCount--; 144 } 145 146 public synchronized void remove(MessageReference node) { 147 if (!node.isPersistent()) { 148 nonPersistent.remove(node); 149 } else { 150 persistent.remove(node); 151 } 152 pendingCount--; 153 } 154 155 public synchronized void reset() { 156 nonPersistent.reset(); 157 persistent.reset(); 158 pendingCount = persistent.size() + nonPersistent.size(); 159 } 160 161 public void release() { 162 nonPersistent.release(); 163 persistent.release(); 164 } 165 166 167 public synchronized int size() { 168 if (pendingCount < 0) { 169 pendingCount = persistent.size() + nonPersistent.size(); 170 } 171 return pendingCount; 172 } 173 174 public synchronized boolean isEmpty() { 175 // if negative, more messages arrived in store since last reset so non empty 176 return pendingCount == 0; 177 } 178 179 /** 180 * Informs the Broker if the subscription needs to intervention to recover 181 * it's state e.g. DurableTopicSubscriber may do 182 * 183 * @see org.apache.activemq.broker.region.cursors.PendingMessageCursor 184 * @return true if recovery required 185 */ 186 public boolean isRecoveryRequired() { 187 return false; 188 } 189 190 /** 191 * @return the nonPersistent Cursor 192 */ 193 public PendingMessageCursor getNonPersistent() { 194 return this.nonPersistent; 195 } 196 197 /** 198 * @param nonPersistent cursor to set 199 */ 200 public void setNonPersistent(PendingMessageCursor nonPersistent) { 201 this.nonPersistent = nonPersistent; 202 } 203 204 /** 205 * @return the persistent Cursor 206 */ 207 public PendingMessageCursor getPersistent() { return this.persistent; } 208 209 @Override 210 public void setMaxBatchSize(int maxBatchSize) { 211 persistent.setMaxBatchSize(maxBatchSize); 212 if (nonPersistent != null) { 213 nonPersistent.setMaxBatchSize(maxBatchSize); 214 } 215 super.setMaxBatchSize(maxBatchSize); 216 } 217 218 219 public void setMaxProducersToAudit(int maxProducersToAudit) { 220 super.setMaxProducersToAudit(maxProducersToAudit); 221 if (persistent != null) { 222 persistent.setMaxProducersToAudit(maxProducersToAudit); 223 } 224 if (nonPersistent != null) { 225 nonPersistent.setMaxProducersToAudit(maxProducersToAudit); 226 } 227 } 228 229 public void setMaxAuditDepth(int maxAuditDepth) { 230 super.setMaxAuditDepth(maxAuditDepth); 231 if (persistent != null) { 232 persistent.setMaxAuditDepth(maxAuditDepth); 233 } 234 if (nonPersistent != null) { 235 nonPersistent.setMaxAuditDepth(maxAuditDepth); 236 } 237 } 238 239 public void setEnableAudit(boolean enableAudit) { 240 super.setEnableAudit(enableAudit); 241 if (persistent != null) { 242 persistent.setEnableAudit(enableAudit); 243 } 244 if (nonPersistent != null) { 245 nonPersistent.setEnableAudit(enableAudit); 246 } 247 } 248 249 @Override 250 public void rollback(MessageId id) { 251 nonPersistent.rollback(id); 252 persistent.rollback(id); 253 } 254 255 @Override 256 public void setUseCache(boolean useCache) { 257 super.setUseCache(useCache); 258 if (persistent != null) { 259 persistent.setUseCache(useCache); 260 } 261 if (nonPersistent != null) { 262 nonPersistent.setUseCache(useCache); 263 } 264 } 265 266 @Override 267 public void setMemoryUsageHighWaterMark(int memoryUsageHighWaterMark) { 268 super.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 269 if (persistent != null) { 270 persistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 271 } 272 if (nonPersistent != null) { 273 nonPersistent.setMemoryUsageHighWaterMark(memoryUsageHighWaterMark); 274 } 275 } 276 277 278 279 public synchronized void gc() { 280 if (persistent != null) { 281 persistent.gc(); 282 } 283 if (nonPersistent != null) { 284 nonPersistent.gc(); 285 } 286 pendingCount = persistent.size() + nonPersistent.size(); 287 } 288 289 public void setSystemUsage(SystemUsage usageManager) { 290 super.setSystemUsage(usageManager); 291 if (persistent != null) { 292 persistent.setSystemUsage(usageManager); 293 } 294 if (nonPersistent != null) { 295 nonPersistent.setSystemUsage(usageManager); 296 } 297 } 298 299 protected synchronized PendingMessageCursor getNextCursor() throws Exception { 300 if (currentCursor == null || !currentCursor.hasMessagesBufferedToDeliver()) { 301 currentCursor = currentCursor == persistent ? nonPersistent : persistent; 302 // sanity check 303 if (currentCursor.isEmpty()) { 304 currentCursor = currentCursor == persistent ? nonPersistent : persistent; 305 } 306 } 307 return currentCursor; 308 } 309 310 @Override 311 public boolean isCacheEnabled() { 312 boolean cacheEnabled = isUseCache(); 313 if (cacheEnabled) { 314 if (persistent != null) { 315 cacheEnabled &= persistent.isCacheEnabled(); 316 } 317 if (nonPersistent != null) { 318 cacheEnabled &= nonPersistent.isCacheEnabled(); 319 } 320 setCacheEnabled(cacheEnabled); 321 } 322 return cacheEnabled; 323 } 324 325 @Override 326 public void rebase() { 327 persistent.rebase(); 328 reset(); 329 } 330 331}