001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.Iterator; 020import java.util.LinkedList; 021import java.util.ListIterator; 022import java.util.concurrent.CancellationException; 023import java.util.concurrent.Future; 024import java.util.concurrent.TimeUnit; 025import java.util.concurrent.TimeoutException; 026import org.apache.activemq.broker.region.Destination; 027import org.apache.activemq.broker.region.MessageReference; 028import org.apache.activemq.broker.region.Subscription; 029import org.apache.activemq.command.Message; 030import org.apache.activemq.command.MessageId; 031import org.apache.activemq.store.MessageRecoveryListener; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * Store based cursor 037 * 038 */ 039public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener { 040 private static final Logger LOG = LoggerFactory.getLogger(AbstractStoreCursor.class); 041 protected final Destination regionDestination; 042 protected final PendingList batchList; 043 private Iterator<MessageReference> iterator = null; 044 protected boolean batchResetNeeded = false; 045 protected int size; 046 private LinkedList<MessageId> pendingCachedIds = new LinkedList<>(); 047 private static int SYNC_ADD = 0; 048 private static int ASYNC_ADD = 1; 049 final MessageId[] lastCachedIds = new MessageId[2]; 050 protected boolean hadSpace = false; 051 052 protected AbstractStoreCursor(Destination destination) { 053 super((destination != null ? destination.isPrioritizedMessages():false)); 054 this.regionDestination=destination; 055 if (this.prioritizedMessages) { 056 this.batchList= new PrioritizedPendingList(); 057 } else { 058 this.batchList = new OrderedPendingList(); 059 } 060 } 061 062 063 public final synchronized void start() throws Exception{ 064 if (!isStarted()) { 065 super.start(); 066 resetBatch(); 067 resetSize(); 068 setCacheEnabled(size==0&&useCache); 069 } 070 } 071 072 protected void resetSize() { 073 this.size = getStoreSize(); 074 } 075 076 @Override 077 public void rebase() { 078 MessageId lastAdded = lastCachedIds[SYNC_ADD]; 079 if (lastAdded != null) { 080 try { 081 setBatch(lastAdded); 082 } catch (Exception e) { 083 LOG.error("{} - Failed to set batch on rebase", this, e); 084 throw new RuntimeException(e); 085 } 086 } 087 } 088 089 public final synchronized void stop() throws Exception { 090 resetBatch(); 091 super.stop(); 092 gc(); 093 } 094 095 096 public final boolean recoverMessage(Message message) throws Exception { 097 return recoverMessage(message,false); 098 } 099 100 public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { 101 boolean recovered = false; 102 if (recordUniqueId(message.getMessageId())) { 103 if (!cached) { 104 message.setRegionDestination(regionDestination); 105 if( message.getMemoryUsage()==null ) { 106 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 107 } 108 } 109 message.incrementReferenceCount(); 110 batchList.addMessageLast(message); 111 clearIterator(true); 112 recovered = true; 113 } else if (!cached) { 114 // a duplicate from the store (!cached) - needs to be removed/acked - otherwise it will get re dispatched on restart 115 if (duplicateFromStoreExcepted(message)) { 116 if (LOG.isTraceEnabled()) { 117 LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); 118 } 119 } else { 120 LOG.warn("{} - cursor got duplicate from store {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); 121 duplicate(message); 122 } 123 } else { 124 LOG.warn("{} - cursor got duplicate send {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); 125 if (gotToTheStore(message)) { 126 duplicate(message); 127 } 128 } 129 return recovered; 130 } 131 132 protected boolean duplicateFromStoreExcepted(Message message) { 133 // expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true for 134 // which this existing unused flag has been repurposed 135 return message.isRecievedByDFBridge(); 136 } 137 138 public static boolean gotToTheStore(Message message) throws Exception { 139 if (message.isRecievedByDFBridge()) { 140 // concurrent store and dispatch - wait to see if the message gets to the store to see 141 // if the index suppressed it (original still present), or whether it was stored and needs to be removed 142 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 143 if (possibleFuture instanceof Future) { 144 ((Future) possibleFuture).get(); 145 } 146 // need to access again after wait on future 147 Object sequence = message.getMessageId().getFutureOrSequenceLong(); 148 return (sequence != null && sequence instanceof Long && Long.compare((Long) sequence, -1l) != 0); 149 } 150 return true; 151 } 152 153 // track for processing outside of store index lock so we can dlq 154 final LinkedList<Message> duplicatesFromStore = new LinkedList<Message>(); 155 private void duplicate(Message message) { 156 duplicatesFromStore.add(message); 157 } 158 159 void dealWithDuplicates() { 160 for (Message message : duplicatesFromStore) { 161 regionDestination.duplicateFromStore(message, getSubscription()); 162 } 163 duplicatesFromStore.clear(); 164 } 165 166 public final synchronized void reset() { 167 if (batchList.isEmpty()) { 168 try { 169 fillBatch(); 170 } catch (Exception e) { 171 LOG.error("{} - Failed to fill batch", this, e); 172 throw new RuntimeException(e); 173 } 174 } 175 clearIterator(true); 176 size(); 177 } 178 179 180 public synchronized void release() { 181 clearIterator(false); 182 } 183 184 private synchronized void clearIterator(boolean ensureIterator) { 185 boolean haveIterator = this.iterator != null; 186 this.iterator=null; 187 if(haveIterator&&ensureIterator) { 188 ensureIterator(); 189 } 190 } 191 192 private synchronized void ensureIterator() { 193 if(this.iterator==null) { 194 this.iterator=this.batchList.iterator(); 195 } 196 } 197 198 199 public final void finished() { 200 } 201 202 203 public final synchronized boolean hasNext() { 204 if (batchList.isEmpty()) { 205 try { 206 fillBatch(); 207 } catch (Exception e) { 208 LOG.error("{} - Failed to fill batch", this, e); 209 throw new RuntimeException(e); 210 } 211 } 212 ensureIterator(); 213 return this.iterator.hasNext(); 214 } 215 216 217 public final synchronized MessageReference next() { 218 MessageReference result = null; 219 if (!this.batchList.isEmpty()&&this.iterator.hasNext()) { 220 result = this.iterator.next(); 221 } 222 last = result; 223 if (result != null) { 224 result.incrementReferenceCount(); 225 } 226 return result; 227 } 228 229 public synchronized boolean addMessageLast(MessageReference node) throws Exception { 230 boolean disableCache = false; 231 if (hasSpace()) { 232 if (isCacheEnabled()) { 233 if (recoverMessage(node.getMessage(),true)) { 234 trackLastCached(node); 235 } else { 236 dealWithDuplicates(); 237 return false; 238 } 239 } 240 } else { 241 disableCache = true; 242 } 243 244 if (disableCache && isCacheEnabled()) { 245 if (LOG.isTraceEnabled()) { 246 LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); 247 } 248 syncWithStore(node.getMessage()); 249 setCacheEnabled(false); 250 } 251 size++; 252 return true; 253 } 254 255 @Override 256 public synchronized boolean isCacheEnabled() { 257 return super.isCacheEnabled() || enableCacheNow(); 258 } 259 260 protected boolean enableCacheNow() { 261 boolean result = false; 262 if (canEnableCash()) { 263 setCacheEnabled(true); 264 result = true; 265 if (LOG.isTraceEnabled()) { 266 LOG.trace("{} enabling cache on empty store", this); 267 } 268 } 269 return result; 270 } 271 272 protected boolean canEnableCash() { 273 return useCache && size==0 && hasSpace() && isStarted(); 274 } 275 276 private void syncWithStore(Message currentAdd) throws Exception { 277 pruneLastCached(); 278 for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) { 279 MessageId lastPending = it.previous(); 280 Object futureOrLong = lastPending.getFutureOrSequenceLong(); 281 if (futureOrLong instanceof Future) { 282 Future future = (Future) futureOrLong; 283 if (future.isCancelled()) { 284 continue; 285 } 286 try { 287 future.get(5, TimeUnit.SECONDS); 288 setLastCachedId(ASYNC_ADD, lastPending); 289 } catch (CancellationException ok) { 290 continue; 291 } catch (TimeoutException potentialDeadlock) { 292 LOG.debug("{} timed out waiting for async add", this, potentialDeadlock); 293 } catch (Exception worstCaseWeReplay) { 294 LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay); 295 } 296 } else { 297 setLastCachedId(ASYNC_ADD, lastPending); 298 } 299 break; 300 } 301 302 MessageId candidate = lastCachedIds[ASYNC_ADD]; 303 if (candidate != null) { 304 // ensure we don't skip current possibly sync add b/c we waited on the future 305 if (!isAsync(currentAdd) && Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) < 0) { 306 if (LOG.isTraceEnabled()) { 307 LOG.trace("no set batch from async:" + candidate.getFutureOrSequenceLong() + " >= than current: " + currentAdd.getMessageId().getFutureOrSequenceLong() + ", " + this); 308 } 309 candidate = null; 310 } 311 } 312 if (candidate == null) { 313 candidate = lastCachedIds[SYNC_ADD]; 314 } 315 if (candidate != null) { 316 setBatch(candidate); 317 } 318 // cleanup 319 lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null; 320 pendingCachedIds.clear(); 321 } 322 323 private void trackLastCached(MessageReference node) { 324 if (isAsync(node.getMessage())) { 325 pruneLastCached(); 326 pendingCachedIds.add(node.getMessageId()); 327 } else { 328 setLastCachedId(SYNC_ADD, node.getMessageId()); 329 } 330 } 331 332 private static final boolean isAsync(Message message) { 333 return message.isRecievedByDFBridge() || message.getMessageId().getFutureOrSequenceLong() instanceof Future; 334 } 335 336 private void pruneLastCached() { 337 for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) { 338 MessageId candidate = it.next(); 339 final Object futureOrLong = candidate.getFutureOrSequenceLong(); 340 if (futureOrLong instanceof Future) { 341 Future future = (Future) futureOrLong; 342 if (future.isCancelled()) { 343 it.remove(); 344 } else { 345 // we don't want to wait for work to complete 346 break; 347 } 348 } else { 349 // complete 350 setLastCachedId(ASYNC_ADD, candidate); 351 352 // keep lock step with sync adds while order is preserved 353 if (lastCachedIds[SYNC_ADD] != null) { 354 long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong(); 355 if (Long.compare((Long)futureOrLong, next) == 0) { 356 setLastCachedId(SYNC_ADD, candidate); 357 } 358 } 359 it.remove(); 360 } 361 } 362 } 363 364 private void setLastCachedId(final int index, MessageId candidate) { 365 MessageId lastCacheId = lastCachedIds[index]; 366 if (lastCacheId == null) { 367 lastCachedIds[index] = candidate; 368 } else { 369 Object lastCacheFutureOrSequenceLong = lastCacheId.getFutureOrSequenceLong(); 370 Object candidateOrSequenceLong = candidate.getFutureOrSequenceLong(); 371 if (lastCacheFutureOrSequenceLong == null) { // possibly null for topics 372 lastCachedIds[index] = candidate; 373 } else if (candidateOrSequenceLong != null && 374 Long.compare(((Long) candidateOrSequenceLong), ((Long) lastCacheFutureOrSequenceLong)) > 0) { 375 lastCachedIds[index] = candidate; 376 } else if (LOG.isTraceEnabled()) { 377 LOG.trace("no set last cached[" + index + "] current:" + lastCacheFutureOrSequenceLong + " <= than candidate: " + candidateOrSequenceLong+ ", " + this); 378 } 379 } 380 } 381 382 protected void setBatch(MessageId messageId) throws Exception { 383 } 384 385 386 public synchronized void addMessageFirst(MessageReference node) throws Exception { 387 size++; 388 } 389 390 391 public final synchronized void remove() { 392 size--; 393 if (iterator!=null) { 394 iterator.remove(); 395 } 396 if (last != null) { 397 last.decrementReferenceCount(); 398 } 399 } 400 401 402 public final synchronized void remove(MessageReference node) { 403 if (batchList.remove(node) != null) { 404 size--; 405 setCacheEnabled(false); 406 } 407 } 408 409 410 public final synchronized void clear() { 411 gc(); 412 } 413 414 415 public synchronized void gc() { 416 for (MessageReference msg : batchList) { 417 rollback(msg.getMessageId()); 418 msg.decrementReferenceCount(); 419 } 420 batchList.clear(); 421 clearIterator(false); 422 batchResetNeeded = true; 423 setCacheEnabled(false); 424 } 425 426 protected final synchronized void fillBatch() { 427 if (LOG.isTraceEnabled()) { 428 LOG.trace("{} fillBatch", this); 429 } 430 if (batchResetNeeded) { 431 resetSize(); 432 setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size)); 433 resetBatch(); 434 this.batchResetNeeded = false; 435 } 436 if (this.batchList.isEmpty() && this.size >0) { 437 try { 438 doFillBatch(); 439 } catch (Exception e) { 440 LOG.error("{} - Failed to fill batch", this, e); 441 throw new RuntimeException(e); 442 } 443 } 444 } 445 446 447 public final synchronized boolean isEmpty() { 448 // negative means more messages added to store through queue.send since last reset 449 return size == 0; 450 } 451 452 453 public final synchronized boolean hasMessagesBufferedToDeliver() { 454 return !batchList.isEmpty(); 455 } 456 457 458 public final synchronized int size() { 459 if (size < 0) { 460 this.size = getStoreSize(); 461 } 462 return size; 463 } 464 465 @Override 466 public String toString() { 467 return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded 468 + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled() 469 + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() 470 + ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null") 471 + ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null"); 472 } 473 474 protected abstract void doFillBatch() throws Exception; 475 476 protected abstract void resetBatch(); 477 478 protected abstract int getStoreSize(); 479 480 protected abstract boolean isStoreEmpty(); 481 482 public Subscription getSubscription() { 483 return null; 484 } 485}