001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.broker.region.cursors;
018
019import java.util.Iterator;
020import java.util.LinkedList;
021import java.util.ListIterator;
022import java.util.concurrent.CancellationException;
023import java.util.concurrent.Future;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.TimeoutException;
026import org.apache.activemq.broker.region.Destination;
027import org.apache.activemq.broker.region.MessageReference;
028import org.apache.activemq.broker.region.Subscription;
029import org.apache.activemq.command.Message;
030import org.apache.activemq.command.MessageId;
031import org.apache.activemq.store.MessageRecoveryListener;
032import org.slf4j.Logger;
033import org.slf4j.LoggerFactory;
034
035/**
036 *  Store based cursor
037 *
038 */
039public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener {
040    private static final Logger LOG = LoggerFactory.getLogger(AbstractStoreCursor.class);
041    protected final Destination regionDestination;
042    protected final PendingList batchList;
043    private Iterator<MessageReference> iterator = null;
044    protected boolean batchResetNeeded = false;
045    protected int size;
046    private LinkedList<MessageId> pendingCachedIds = new LinkedList<>();
047    private static int SYNC_ADD = 0;
048    private static int ASYNC_ADD = 1;
049    final MessageId[] lastCachedIds = new MessageId[2];
050    protected boolean hadSpace = false;
051
052    protected AbstractStoreCursor(Destination destination) {
053        super((destination != null ? destination.isPrioritizedMessages():false));
054        this.regionDestination=destination;
055        if (this.prioritizedMessages) {
056            this.batchList= new PrioritizedPendingList();
057        } else {
058            this.batchList = new OrderedPendingList();
059        }
060    }
061
062
063    public final synchronized void start() throws Exception{
064        if (!isStarted()) {
065            super.start();
066            resetBatch();
067            resetSize();
068            setCacheEnabled(size==0&&useCache);
069        }
070    }
071
072    protected void resetSize() {
073        this.size = getStoreSize();
074    }
075
076    @Override
077    public void rebase() {
078        MessageId lastAdded = lastCachedIds[SYNC_ADD];
079        if (lastAdded != null) {
080            try {
081                setBatch(lastAdded);
082            } catch (Exception e) {
083                LOG.error("{} - Failed to set batch on rebase", this, e);
084                throw new RuntimeException(e);
085            }
086        }
087    }
088
089    public final synchronized void stop() throws Exception {
090        resetBatch();
091        super.stop();
092        gc();
093    }
094
095
096    public final boolean recoverMessage(Message message) throws Exception {
097        return recoverMessage(message,false);
098    }
099
100    public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
101        boolean recovered = false;
102        if (recordUniqueId(message.getMessageId())) {
103            if (!cached) {
104                message.setRegionDestination(regionDestination);
105                if( message.getMemoryUsage()==null ) {
106                    message.setMemoryUsage(this.getSystemUsage().getMemoryUsage());
107                }
108            }
109            message.incrementReferenceCount();
110            batchList.addMessageLast(message);
111            clearIterator(true);
112            recovered = true;
113        } else if (!cached) {
114            // a duplicate from the store (!cached) - needs to be removed/acked - otherwise it will get re dispatched on restart
115            if (duplicateFromStoreExcepted(message)) {
116                if (LOG.isTraceEnabled()) {
117                    LOG.trace("{} store replayed pending message due to concurrentStoreAndDispatchQueues {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
118                }
119            } else {
120                LOG.warn("{} - cursor got duplicate from store {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
121                duplicate(message);
122            }
123        } else {
124            LOG.warn("{} - cursor got duplicate send {} seq: {}", this, message.getMessageId(), message.getMessageId().getFutureOrSequenceLong());
125            if (gotToTheStore(message)) {
126                duplicate(message);
127            }
128        }
129        return recovered;
130    }
131
132    protected boolean duplicateFromStoreExcepted(Message message) {
133        // expected for messages pending acks with kahadb.concurrentStoreAndDispatchQueues=true for
134        // which this existing unused flag has been repurposed
135        return message.isRecievedByDFBridge();
136    }
137
138    public static boolean gotToTheStore(Message message) throws Exception {
139        if (message.isRecievedByDFBridge()) {
140            // concurrent store and dispatch - wait to see if the message gets to the store to see
141            // if the index suppressed it (original still present), or whether it was stored and needs to be removed
142            Object possibleFuture = message.getMessageId().getFutureOrSequenceLong();
143            if (possibleFuture instanceof Future) {
144                ((Future) possibleFuture).get();
145            }
146            // need to access again after wait on future
147            Object sequence = message.getMessageId().getFutureOrSequenceLong();
148            return (sequence != null && sequence instanceof Long && Long.compare((Long) sequence, -1l) != 0);
149        }
150        return true;
151    }
152
153    // track for processing outside of store index lock so we can dlq
154    final LinkedList<Message> duplicatesFromStore = new LinkedList<Message>();
155    private void duplicate(Message message) {
156        duplicatesFromStore.add(message);
157    }
158
159    void dealWithDuplicates() {
160        for (Message message : duplicatesFromStore) {
161            regionDestination.duplicateFromStore(message, getSubscription());
162        }
163        duplicatesFromStore.clear();
164    }
165
166    public final synchronized void reset() {
167        if (batchList.isEmpty()) {
168            try {
169                fillBatch();
170            } catch (Exception e) {
171                LOG.error("{} - Failed to fill batch", this, e);
172                throw new RuntimeException(e);
173            }
174        }
175        clearIterator(true);
176        size();
177    }
178
179
180    public synchronized void release() {
181        clearIterator(false);
182    }
183
184    private synchronized void clearIterator(boolean ensureIterator) {
185        boolean haveIterator = this.iterator != null;
186        this.iterator=null;
187        if(haveIterator&&ensureIterator) {
188            ensureIterator();
189        }
190    }
191
192    private synchronized void ensureIterator() {
193        if(this.iterator==null) {
194            this.iterator=this.batchList.iterator();
195        }
196    }
197
198
199    public final void finished() {
200    }
201
202
203    public final synchronized boolean hasNext() {
204        if (batchList.isEmpty()) {
205            try {
206                fillBatch();
207            } catch (Exception e) {
208                LOG.error("{} - Failed to fill batch", this, e);
209                throw new RuntimeException(e);
210            }
211        }
212        ensureIterator();
213        return this.iterator.hasNext();
214    }
215
216
217    public final synchronized MessageReference next() {
218        MessageReference result = null;
219        if (!this.batchList.isEmpty()&&this.iterator.hasNext()) {
220            result = this.iterator.next();
221        }
222        last = result;
223        if (result != null) {
224            result.incrementReferenceCount();
225        }
226        return result;
227    }
228
229    public synchronized boolean addMessageLast(MessageReference node) throws Exception {
230        boolean disableCache = false;
231        if (hasSpace()) {
232            if (isCacheEnabled()) {
233                if (recoverMessage(node.getMessage(),true)) {
234                    trackLastCached(node);
235                } else {
236                    dealWithDuplicates();
237                    return false;
238                }
239            }
240        } else {
241            disableCache = true;
242        }
243
244        if (disableCache && isCacheEnabled()) {
245            if (LOG.isTraceEnabled()) {
246                LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
247            }
248            syncWithStore(node.getMessage());
249            setCacheEnabled(false);
250        }
251        size++;
252        return true;
253    }
254
255    @Override
256    public synchronized boolean isCacheEnabled() {
257        return super.isCacheEnabled() || enableCacheNow();
258    }
259
260    protected boolean enableCacheNow() {
261        boolean result = false;
262        if (canEnableCash()) {
263            setCacheEnabled(true);
264            result = true;
265            if (LOG.isTraceEnabled()) {
266                LOG.trace("{} enabling cache on empty store", this);
267            }
268        }
269        return result;
270    }
271
272    protected boolean canEnableCash() {
273        return useCache && size==0 && hasSpace() && isStarted();
274    }
275
276    private void syncWithStore(Message currentAdd) throws Exception {
277        pruneLastCached();
278        for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) {
279            MessageId lastPending = it.previous();
280            Object futureOrLong = lastPending.getFutureOrSequenceLong();
281            if (futureOrLong instanceof Future) {
282                Future future = (Future) futureOrLong;
283                if (future.isCancelled()) {
284                    continue;
285                }
286                try {
287                    future.get(5, TimeUnit.SECONDS);
288                    setLastCachedId(ASYNC_ADD, lastPending);
289                } catch (CancellationException ok) {
290                    continue;
291                } catch (TimeoutException potentialDeadlock) {
292                    LOG.debug("{} timed out waiting for async add", this, potentialDeadlock);
293                } catch (Exception worstCaseWeReplay) {
294                    LOG.debug("{} exception waiting for async add", this, worstCaseWeReplay);
295                }
296            } else {
297                setLastCachedId(ASYNC_ADD, lastPending);
298            }
299            break;
300        }
301
302        MessageId candidate = lastCachedIds[ASYNC_ADD];
303        if (candidate != null) {
304            // ensure we don't skip current possibly sync add b/c we waited on the future
305            if (!isAsync(currentAdd) && Long.compare(((Long) currentAdd.getMessageId().getFutureOrSequenceLong()), ((Long) lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong())) < 0) {
306                if (LOG.isTraceEnabled()) {
307                    LOG.trace("no set batch from async:" + candidate.getFutureOrSequenceLong() + " >= than current: " + currentAdd.getMessageId().getFutureOrSequenceLong() + ", " + this);
308                }
309                candidate = null;
310            }
311        }
312        if (candidate == null) {
313            candidate = lastCachedIds[SYNC_ADD];
314        }
315        if (candidate != null) {
316            setBatch(candidate);
317        }
318        // cleanup
319        lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null;
320        pendingCachedIds.clear();
321    }
322
323    private void trackLastCached(MessageReference node) {
324        if (isAsync(node.getMessage())) {
325            pruneLastCached();
326            pendingCachedIds.add(node.getMessageId());
327        } else {
328            setLastCachedId(SYNC_ADD, node.getMessageId());
329        }
330    }
331
332    private static final boolean isAsync(Message message) {
333        return message.isRecievedByDFBridge() || message.getMessageId().getFutureOrSequenceLong() instanceof Future;
334    }
335
336    private void pruneLastCached() {
337        for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
338            MessageId candidate = it.next();
339            final Object futureOrLong = candidate.getFutureOrSequenceLong();
340            if (futureOrLong instanceof Future) {
341                Future future = (Future) futureOrLong;
342                if (future.isCancelled()) {
343                    it.remove();
344                } else {
345                    // we don't want to wait for work to complete
346                    break;
347                }
348            } else {
349                // complete
350                setLastCachedId(ASYNC_ADD, candidate);
351
352                // keep lock step with sync adds while order is preserved
353                if (lastCachedIds[SYNC_ADD] != null) {
354                    long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong();
355                    if (Long.compare((Long)futureOrLong, next) == 0) {
356                        setLastCachedId(SYNC_ADD, candidate);
357                    }
358                }
359                it.remove();
360            }
361        }
362    }
363
364    private void setLastCachedId(final int index, MessageId candidate) {
365        MessageId lastCacheId = lastCachedIds[index];
366        if (lastCacheId == null) {
367            lastCachedIds[index] = candidate;
368        } else {
369            Object lastCacheFutureOrSequenceLong = lastCacheId.getFutureOrSequenceLong();
370            Object candidateOrSequenceLong = candidate.getFutureOrSequenceLong();
371            if (lastCacheFutureOrSequenceLong == null) { // possibly null for topics
372                lastCachedIds[index] = candidate;
373            } else if (candidateOrSequenceLong != null &&
374                    Long.compare(((Long) candidateOrSequenceLong), ((Long) lastCacheFutureOrSequenceLong)) > 0) {
375                lastCachedIds[index] = candidate;
376            } else if (LOG.isTraceEnabled()) {
377                LOG.trace("no set last cached[" + index + "] current:" + lastCacheFutureOrSequenceLong + " <= than candidate: " + candidateOrSequenceLong+ ", " + this);
378            }
379        }
380    }
381
382    protected void setBatch(MessageId messageId) throws Exception {
383    }
384
385
386    public synchronized void addMessageFirst(MessageReference node) throws Exception {
387        size++;
388    }
389
390
391    public final synchronized void remove() {
392        size--;
393        if (iterator!=null) {
394            iterator.remove();
395        }
396        if (last != null) {
397            last.decrementReferenceCount();
398        }
399    }
400
401
402    public final synchronized void remove(MessageReference node) {
403        if (batchList.remove(node) != null) {
404            size--;
405            setCacheEnabled(false);
406        }
407    }
408
409
410    public final synchronized void clear() {
411        gc();
412    }
413
414
415    public synchronized void gc() {
416        for (MessageReference msg : batchList) {
417            rollback(msg.getMessageId());
418            msg.decrementReferenceCount();
419        }
420        batchList.clear();
421        clearIterator(false);
422        batchResetNeeded = true;
423        setCacheEnabled(false);
424    }
425
426    protected final synchronized void fillBatch() {
427        if (LOG.isTraceEnabled()) {
428            LOG.trace("{} fillBatch", this);
429        }
430        if (batchResetNeeded) {
431            resetSize();
432            setMaxBatchSize(Math.min(regionDestination.getMaxPageSize(), size));
433            resetBatch();
434            this.batchResetNeeded = false;
435        }
436        if (this.batchList.isEmpty() && this.size >0) {
437            try {
438                doFillBatch();
439            } catch (Exception e) {
440                LOG.error("{} - Failed to fill batch", this, e);
441                throw new RuntimeException(e);
442            }
443        }
444    }
445
446
447    public final synchronized boolean isEmpty() {
448        // negative means more messages added to store through queue.send since last reset
449        return size == 0;
450    }
451
452
453    public final synchronized boolean hasMessagesBufferedToDeliver() {
454        return !batchList.isEmpty();
455    }
456
457
458    public final synchronized int size() {
459        if (size < 0) {
460            this.size = getStoreSize();
461        }
462        return size;
463    }
464
465    @Override
466    public String toString() {
467        return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
468                    + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
469                    + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size()
470                    + ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null")
471                    + ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null");
472    }
473
474    protected abstract void doFillBatch() throws Exception;
475
476    protected abstract void resetBatch();
477
478    protected abstract int getStoreSize();
479
480    protected abstract boolean isStoreEmpty();
481
482    public Subscription getSubscription() {
483        return null;
484    }
485}