001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.broker.region.cursors; 018 019import java.util.Iterator; 020import java.util.LinkedList; 021import java.util.ListIterator; 022import java.util.concurrent.CancellationException; 023import java.util.concurrent.ExecutionException; 024import java.util.concurrent.Future; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.TimeoutException; 027import org.apache.activemq.broker.region.Destination; 028import org.apache.activemq.broker.region.MessageReference; 029import org.apache.activemq.broker.region.Subscription; 030import org.apache.activemq.command.Message; 031import org.apache.activemq.command.MessageId; 032import org.apache.activemq.store.MessageRecoveryListener; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Store based cursor 038 * 039 */ 040public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener { 041 private static final Logger LOG = LoggerFactory.getLogger(AbstractStoreCursor.class); 042 protected final Destination regionDestination; 043 protected final PendingList batchList; 044 private Iterator<MessageReference> iterator = null; 045 protected boolean batchResetNeeded = false; 046 protected int size; 047 private LinkedList<MessageId> pendingCachedIds = new LinkedList<>(); 048 private static int SYNC_ADD = 0; 049 private static int ASYNC_ADD = 1; 050 final MessageId[] lastCachedIds = new MessageId[2]; 051 protected boolean hadSpace = false; 052 053 protected AbstractStoreCursor(Destination destination) { 054 super((destination != null ? destination.isPrioritizedMessages():false)); 055 this.regionDestination=destination; 056 if (this.prioritizedMessages) { 057 this.batchList= new PrioritizedPendingList(); 058 } else { 059 this.batchList = new OrderedPendingList(); 060 } 061 } 062 063 064 public final synchronized void start() throws Exception{ 065 if (!isStarted()) { 066 super.start(); 067 resetBatch(); 068 resetSize(); 069 setCacheEnabled(size==0&&useCache); 070 } 071 } 072 073 protected void resetSize() { 074 this.size = getStoreSize(); 075 } 076 077 @Override 078 public void rebase() { 079 MessageId lastAdded = lastCachedIds[SYNC_ADD]; 080 if (lastAdded != null) { 081 try { 082 setBatch(lastAdded); 083 } catch (Exception e) { 084 LOG.error("{} - Failed to set batch on rebase", this, e); 085 throw new RuntimeException(e); 086 } 087 } 088 } 089 090 public final synchronized void stop() throws Exception { 091 resetBatch(); 092 super.stop(); 093 gc(); 094 } 095 096 097 public final boolean recoverMessage(Message message) throws Exception { 098 return recoverMessage(message,false); 099 } 100 101 public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception { 102 boolean recovered = false; 103 if (recordUniqueId(message.getMessageId())) { 104 if (!cached) { 105 message.setRegionDestination(regionDestination); 106 if( message.getMemoryUsage()==null ) { 107 message.setMemoryUsage(this.getSystemUsage().getMemoryUsage()); 108 } 109 } 110 message.incrementReferenceCount(); 111 batchList.addMessageLast(message); 112 clearIterator(true); 113 recovered = true; 114 } else if (!cached) { 115 // a duplicate from the store (!cached) - needs to be removed/acked - otherwise it will get re dispatched on restart 116 if (duplicateFromStoreExcepted(message)) { 117 if (LOG.isTraceEnabled()) { 118 LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); 119 } 120 } else { 121 LOG.warn("{} - cursor got duplicate from store {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); 122 duplicate(message); 123 } 124 } else { 125 LOG.warn("{} - cursor got duplicate send {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong()); 126 if (gotToTheStore(message)) { 127 duplicate(message); 128 } 129 } 130 return recovered; 131 } 132 133 protected boolean duplicateFromStoreExcepted(Message message) { 134 // expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true for 135 // which this existing unused flag has been repurposed 136 return message.isRecievedByDFBridge(); 137 } 138 139 public static boolean gotToTheStore(Message message) throws Exception { 140 if (message.isRecievedByDFBridge()) { 141 // concurrent store and dispatch - wait to see if the message gets to the store to see 142 // if the index suppressed it (original still present), or whether it was stored and needs to be removed 143 Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); 144 if (possibleFuture instanceof Future) { 145 try { 146 ((Future) possibleFuture).get(); 147 } catch (Exception okToErrorOrCancelStoreOp) {} 148 } 149 // need to access again after wait on future 150 Object sequence = message.getMessageId().getFutureOrSequenceLong(); 151 return (sequence != null && sequence instanceof Long && Long.compare((Long) sequence, -1l) != 0); 152 } 153 return true; 154 } 155 156 // track for processing outside of store index lock so we can dlq 157 final LinkedList<Message> duplicatesFromStore = new LinkedList<Message>(); 158 private void duplicate(Message message) { 159 duplicatesFromStore.add(message); 160 } 161 162 void dealWithDuplicates() { 163 for (Message message : duplicatesFromStore) { 164 regionDestination.duplicateFromStore(message, getSubscription()); 165 } 166 duplicatesFromStore.clear(); 167 } 168 169 public final synchronized void reset() { 170 if (batchList.isEmpty()) { 171 try { 172 fillBatch(); 173 } catch (Exception e) { 174 LOG.error("{} - Failed to fill batch", this, e); 175 throw new RuntimeException(e); 176 } 177 } 178 clearIterator(true); 179 size(); 180 } 181 182 183 public synchronized void release() { 184 clearIterator(false); 185 } 186 187 private synchronized void clearIterator(boolean ensureIterator) { 188 boolean haveIterator = this.iterator != null; 189 this.iterator=null; 190 if(haveIterator&&ensureIterator) { 191 ensureIterator(); 192 } 193 } 194 195 private synchronized void ensureIterator() { 196 if(this.iterator==null) { 197 this.iterator=this.batchList.iterator(); 198 } 199 } 200 201 202 public final void finished() { 203 } 204 205 206 public final synchronized boolean hasNext() { 207 if (batchList.isEmpty()) { 208 try { 209 fillBatch(); 210 } catch (Exception e) { 211 LOG.error("{} - Failed to fill batch", this, e); 212 throw new RuntimeException(e); 213 } 214 } 215 ensureIterator(); 216 return this.iterator.hasNext(); 217 } 218 219 220 public final synchronized MessageReference next() { 221 MessageReference result = null; 222 if (!this.batchList.isEmpty()&&this.iterator.hasNext()) { 223 result = this.iterator.next(); 224 } 225 last = result; 226 if (result != null) { 227 result.incrementReferenceCount(); 228 } 229 return result; 230 } 231 232 public synchronized boolean addMessageLast(MessageReference node) throws Exception { 233 boolean disableCache = false; 234 if (hasSpace()) { 235 if (isCacheEnabled()) { 236 if (recoverMessage(node.getMessage(),true)) { 237 trackLastCached(node); 238 } else { 239 dealWithDuplicates(); 240 return false; 241 } 242 } 243 } else { 244 disableCache = true; 245 } 246 247 if (disableCache && isCacheEnabled()) { 248 if (LOG.isTraceEnabled()) { 249 LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong()); 250 } 251 syncWithStore(node.getMessage()); 252 setCacheEnabled(false); 253 } 254 size++; 255 return true; 256 } 257 258 @Override 259 public synchronized boolean isCacheEnabled() { 260 return super.isCacheEnabled() || enableCacheNow(); 261 } 262 263 protected boolean enableCacheNow() { 264 boolean result = false; 265 if (canEnableCash()) { 266 setCacheEnabled(true); 267 result = true; 268 if (LOG.isTraceEnabled()) { 269 LOG.trace("{} enabling cache on empty store", this); 270 } 271 } 272 return result; 273 } 274 275 protected boolean canEnableCash() { 276 return useCache && size==0 && hasSpace() && isStarted(); 277 } 278 279 private void syncWithStore(Message currentAdd) throws Exception { 280 pruneLastCached(); 281 for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) { 282 MessageId lastPending = it.previous(); 283 Object futureOrLong = lastPending.getFutureOrSequenceLong(); 284 if (futureOrLong instanceof Future) { 285 Future future = (Future) futureOrLong; 286 if (future.isCancelled()) { 287 continue; 288 } 289 try { 290 future.get(5, TimeUnit.SECONDS); 291 setLastCachedId(ASYNC_ADD, lastPending); 292 } catch (CancellationException ok) { 293 continue; 294 } catch (TimeoutException potentialDeadlock) { 295 LOG.debug("{} timed out waiting for async add", this, potentialDeadlock); 296 } catch (Exception worstCaseWeReplay) { 297 LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay); 298 } 299 } else { 300 setLastCachedId(ASYNC_ADD, lastPending); 301 } 302 break; 303 } 304 305 MessageId candidate = lastCachedIds[ASYNC_ADD]; 306 if (candidate != null) { 307 // ensure we don't skip current possibly sync add b/c we waited on the future 308 if (!isAsync(currentAdd) && Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) < 0) { 309 if (LOG.isTraceEnabled()) { 310 LOG.trace("no set batch from async:" + candidate.getFutureOrSequenceLong() + " >= than current: " + currentAdd.getMessageId().getFutureOrSequenceLong() + ", " + this); 311 } 312 candidate = null; 313 } 314 } 315 if (candidate == null) { 316 candidate = lastCachedIds[SYNC_ADD]; 317 } 318 if (candidate != null) { 319 setBatch(candidate); 320 } 321 // cleanup 322 lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null; 323 pendingCachedIds.clear(); 324 } 325 326 private void trackLastCached(MessageReference node) { 327 if (isAsync(node.getMessage())) { 328 pruneLastCached(); 329 pendingCachedIds.add(node.getMessageId()); 330 } else { 331 setLastCachedId(SYNC_ADD, node.getMessageId()); 332 } 333 } 334 335 private static final boolean isAsync(Message message) { 336 return message.isRecievedByDFBridge() || message.getMessageId().getFutureOrSequenceLong() instanceof Future; 337 } 338 339 private void pruneLastCached() { 340 for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) { 341 MessageId candidate = it.next(); 342 final Object futureOrLong = candidate.getFutureOrSequenceLong(); 343 if (futureOrLong instanceof Future) { 344 Future future = (Future) futureOrLong; 345 if (future.isDone()) { 346 if (future.isCancelled()) { 347 it.remove(); 348 } else { 349 // check for exception, we may be seeing old state 350 try { 351 future.get(0, TimeUnit.SECONDS); 352 // stale; if we get a result next prune will see Long 353 } catch (ExecutionException expected) { 354 it.remove(); 355 } catch (Exception unexpected) { 356 LOG.debug("{} unexpected exception verifying exception state of future", this, unexpected); 357 } 358 } 359 } else { 360 // we don't want to wait for work to complete 361 break; 362 } 363 } else { 364 // complete 365 setLastCachedId(ASYNC_ADD, candidate); 366 367 // keep lock step with sync adds while order is preserved 368 if (lastCachedIds[SYNC_ADD] != null) { 369 long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong(); 370 if (Long.compare((Long)futureOrLong, next) == 0) { 371 setLastCachedId(SYNC_ADD, candidate); 372 } 373 } 374 it.remove(); 375 } 376 } 377 } 378 379 private void setLastCachedId(final int index, MessageId candidate) { 380 MessageId lastCacheId = lastCachedIds[index]; 381 if (lastCacheId == null) { 382 lastCachedIds[index] = candidate; 383 } else { 384 Object lastCacheFutureOrSequenceLong = lastCacheId.getFutureOrSequenceLong(); 385 Object candidateOrSequenceLong = candidate.getFutureOrSequenceLong(); 386 if (lastCacheFutureOrSequenceLong == null) { // possibly null for topics 387 lastCachedIds[index] = candidate; 388 } else if (candidateOrSequenceLong != null && 389 Long.compare(((Long) candidateOrSequenceLong), ((Long) lastCacheFutureOrSequenceLong)) > 0) { 390 lastCachedIds[index] = candidate; 391 } else if (LOG.isTraceEnabled()) { 392 LOG.trace("no set last cached[" + index + "] current:" + lastCacheFutureOrSequenceLong + " <= than candidate: " + candidateOrSequenceLong+ ", " + this); 393 } 394 } 395 } 396 397 protected void setBatch(MessageId messageId) throws Exception { 398 } 399 400 401 public synchronized void addMessageFirst(MessageReference node) throws Exception { 402 size++; 403 } 404 405 406 public final synchronized void remove() { 407 size--; 408 if (iterator!=null) { 409 iterator.remove(); 410 } 411 if (last != null) { 412 last.decrementReferenceCount(); 413 } 414 } 415 416 417 public final synchronized void remove(MessageReference node) { 418 if (batchList.remove(node) != null) { 419 size--; 420 setCacheEnabled(false); 421 } 422 } 423 424 425 public final synchronized void clear() { 426 gc(); 427 } 428 429 430 public synchronized void gc() { 431 for (MessageReference msg : batchList) { 432 rollback(msg.getMessageId()); 433 msg.decrementReferenceCount(); 434 } 435 batchList.clear(); 436 clearIterator(false); 437 batchResetNeeded = true; 438 setCacheEnabled(false); 439 } 440 441 protected final synchronized void fillBatch() { 442 if (LOG.isTraceEnabled()) { 443 LOG.trace("{} fillBatch", this); 444 } 445 if (batchResetNeeded) { 446 resetSize(); 447 setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size)); 448 resetBatch(); 449 this.batchResetNeeded = false; 450 } 451 if (this.batchList.isEmpty() && this.size >0) { 452 try { 453 doFillBatch(); 454 } catch (Exception e) { 455 LOG.error("{} - Failed to fill batch", this, e); 456 throw new RuntimeException(e); 457 } 458 } 459 } 460 461 462 public final synchronized boolean isEmpty() { 463 // negative means more messages added to store through queue.send since last reset 464 return size == 0; 465 } 466 467 468 public final synchronized boolean hasMessagesBufferedToDeliver() { 469 return !batchList.isEmpty(); 470 } 471 472 473 public final synchronized int size() { 474 if (size < 0) { 475 this.size = getStoreSize(); 476 } 477 return size; 478 } 479 480 @Override 481 public String toString() { 482 return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded 483 + ",size=" + this.size + ",cacheEnabled=" + cacheEnabled 484 + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() 485 + ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null") 486 + ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null"); 487 } 488 489 protected abstract void doFillBatch() throws Exception; 490 491 protected abstract void resetBatch(); 492 493 protected abstract int getStoreSize(); 494 495 protected abstract boolean isStoreEmpty(); 496 497 public Subscription getSubscription() { 498 return null; 499 } 500}