001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.EOFException;
026import java.io.File;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.InterruptedIOException;
030import java.io.ObjectInputStream;
031import java.io.ObjectOutputStream;
032import java.io.OutputStream;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Date;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.LinkedHashMap;
042import java.util.LinkedHashSet;
043import java.util.LinkedList;
044import java.util.List;
045import java.util.Map;
046import java.util.Map.Entry;
047import java.util.Set;
048import java.util.SortedSet;
049import java.util.TreeMap;
050import java.util.TreeSet;
051
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.ConcurrentMap;
054import java.util.concurrent.Executors;
055import java.util.concurrent.ScheduledExecutorService;
056import java.util.concurrent.ThreadFactory;
057import java.util.concurrent.TimeUnit;
058import java.util.concurrent.atomic.AtomicBoolean;
059import java.util.concurrent.atomic.AtomicLong;
060import java.util.concurrent.atomic.AtomicReference;
061import java.util.concurrent.locks.ReentrantReadWriteLock;
062
063import org.apache.activemq.ActiveMQMessageAuditNoSync;
064import org.apache.activemq.broker.BrokerService;
065import org.apache.activemq.broker.BrokerServiceAware;
066import org.apache.activemq.command.TransactionId;
067import org.apache.activemq.openwire.OpenWireFormat;
068import org.apache.activemq.protobuf.Buffer;
069import org.apache.activemq.store.MessageStore;
070import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
071import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
072import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
073import org.apache.activemq.store.kahadb.data.KahaDestination;
074import org.apache.activemq.store.kahadb.data.KahaEntryType;
075import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
076import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
077import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
078import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
079import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
080import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
081import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
082import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
083import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
084import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
085import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
086import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
087import org.apache.activemq.store.kahadb.disk.index.ListIndex;
088import org.apache.activemq.store.kahadb.disk.journal.DataFile;
089import org.apache.activemq.store.kahadb.disk.journal.Journal;
090import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
091import org.apache.activemq.store.kahadb.disk.journal.Location;
092import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
093import org.apache.activemq.store.kahadb.disk.page.Page;
094import org.apache.activemq.store.kahadb.disk.page.PageFile;
095import org.apache.activemq.store.kahadb.disk.page.Transaction;
096import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
097import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
098import org.apache.activemq.store.kahadb.disk.util.Marshaller;
099import org.apache.activemq.store.kahadb.disk.util.Sequence;
100import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
101import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
102import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
103import org.apache.activemq.util.ByteSequence;
104import org.apache.activemq.util.DataByteArrayInputStream;
105import org.apache.activemq.util.DataByteArrayOutputStream;
106import org.apache.activemq.util.IOExceptionSupport;
107import org.apache.activemq.util.IOHelper;
108import org.apache.activemq.util.ServiceStopper;
109import org.apache.activemq.util.ServiceSupport;
110import org.apache.activemq.util.ThreadPoolUtils;
111import org.slf4j.Logger;
112import org.slf4j.LoggerFactory;
113import org.slf4j.MDC;
114
115public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
116
117    protected BrokerService brokerService;
118
119    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
120    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
121    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
122    protected static final Buffer UNMATCHED;
123    static {
124        UNMATCHED = new Buffer(new byte[]{});
125    }
126    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
127
128    static final int CLOSED_STATE = 1;
129    static final int OPEN_STATE = 2;
130    static final long NOT_ACKED = -1;
131
132    static final int VERSION = 5;
133
134    static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
135
136    protected class Metadata {
137        protected Page<Metadata> page;
138        protected int state;
139        protected BTreeIndex<String, StoredDestination> destinations;
140        protected Location lastUpdate;
141        protected Location firstInProgressTransactionLocation;
142        protected Location producerSequenceIdTrackerLocation = null;
143        protected Location ackMessageFileMapLocation = null;
144        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
145        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
146        protected int version = VERSION;
147        protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION;
148
149        public void read(DataInput is) throws IOException {
150            state = is.readInt();
151            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
152            if (is.readBoolean()) {
153                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
154            } else {
155                lastUpdate = null;
156            }
157            if (is.readBoolean()) {
158                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
159            } else {
160                firstInProgressTransactionLocation = null;
161            }
162            try {
163                if (is.readBoolean()) {
164                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
165                } else {
166                    producerSequenceIdTrackerLocation = null;
167                }
168            } catch (EOFException expectedOnUpgrade) {
169            }
170            try {
171                version = is.readInt();
172            } catch (EOFException expectedOnUpgrade) {
173                version = 1;
174            }
175            if (version >= 5 && is.readBoolean()) {
176                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
177            } else {
178                ackMessageFileMapLocation = null;
179            }
180            try {
181                openwireVersion = is.readInt();
182            } catch (EOFException expectedOnUpgrade) {
183                openwireVersion = OpenWireFormat.DEFAULT_VERSION;
184            }
185            LOG.info("KahaDB is version " + version);
186        }
187
188        public void write(DataOutput os) throws IOException {
189            os.writeInt(state);
190            os.writeLong(destinations.getPageId());
191
192            if (lastUpdate != null) {
193                os.writeBoolean(true);
194                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
195            } else {
196                os.writeBoolean(false);
197            }
198
199            if (firstInProgressTransactionLocation != null) {
200                os.writeBoolean(true);
201                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
202            } else {
203                os.writeBoolean(false);
204            }
205
206            if (producerSequenceIdTrackerLocation != null) {
207                os.writeBoolean(true);
208                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
209            } else {
210                os.writeBoolean(false);
211            }
212            os.writeInt(VERSION);
213            if (ackMessageFileMapLocation != null) {
214                os.writeBoolean(true);
215                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
216            } else {
217                os.writeBoolean(false);
218            }
219            os.writeInt(this.openwireVersion);
220        }
221    }
222
223    class MetadataMarshaller extends VariableMarshaller<Metadata> {
224        @Override
225        public Metadata readPayload(DataInput dataIn) throws IOException {
226            Metadata rc = createMetadata();
227            rc.read(dataIn);
228            return rc;
229        }
230
231        @Override
232        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
233            object.write(dataOut);
234        }
235    }
236
237    protected PageFile pageFile;
238    protected Journal journal;
239    protected Metadata metadata = new Metadata();
240
241    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
242
243    protected boolean failIfDatabaseIsLocked;
244
245    protected boolean deleteAllMessages;
246    protected File directory = DEFAULT_DIRECTORY;
247    protected File indexDirectory = null;
248    protected ScheduledExecutorService scheduler;
249    private final Object schedulerLock = new Object();
250
251    protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
252    protected boolean archiveDataLogs;
253    protected File directoryArchive;
254    protected AtomicLong journalSize = new AtomicLong(0);
255    long journalDiskSyncInterval = 1000;
256    long checkpointInterval = 5*1000;
257    long cleanupInterval = 30*1000;
258    boolean cleanupOnStop = true;
259    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
260    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
261    boolean enableIndexWriteAsync = false;
262    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
263    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
264    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
265
266    protected AtomicBoolean opened = new AtomicBoolean();
267    private boolean ignoreMissingJournalfiles = false;
268    private int indexCacheSize = 10000;
269    private boolean checkForCorruptJournalFiles = false;
270    private boolean checksumJournalFiles = true;
271    protected boolean forceRecoverIndex = false;
272
273    private boolean archiveCorruptedIndex = false;
274    private boolean useIndexLFRUEviction = false;
275    private float indexLFUEvictionFactor = 0.2f;
276    private boolean enableIndexDiskSyncs = true;
277    private boolean enableIndexRecoveryFile = true;
278    private boolean enableIndexPageCaching = true;
279    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
280
281    private boolean enableAckCompaction = false;
282    private int compactAcksAfterNoGC = 10;
283    private boolean compactAcksIgnoresStoreGrowth = false;
284    private int checkPointCyclesWithNoGC;
285    private int journalLogOnLastCompactionCheck;
286
287    //only set when using JournalDiskSyncStrategy.PERIODIC
288    protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>();
289
290    @Override
291    public void doStart() throws Exception {
292        load();
293    }
294
295    @Override
296    public void doStop(ServiceStopper stopper) throws Exception {
297        unload();
298    }
299
300    public void allowIOResumption() {
301        if (pageFile != null) {
302            pageFile.allowIOResumption();
303        }
304        if (journal != null) {
305            journal.allowIOResumption();
306        }
307    }
308
309    private void loadPageFile() throws IOException {
310        this.indexLock.writeLock().lock();
311        try {
312            final PageFile pageFile = getPageFile();
313            pageFile.load();
314            pageFile.tx().execute(new Transaction.Closure<IOException>() {
315                @Override
316                public void execute(Transaction tx) throws IOException {
317                    if (pageFile.getPageCount() == 0) {
318                        // First time this is created.. Initialize the metadata
319                        Page<Metadata> page = tx.allocate();
320                        assert page.getPageId() == 0;
321                        page.set(metadata);
322                        metadata.page = page;
323                        metadata.state = CLOSED_STATE;
324                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
325
326                        tx.store(metadata.page, metadataMarshaller, true);
327                    } else {
328                        Page<Metadata> page = tx.load(0, metadataMarshaller);
329                        metadata = page.get();
330                        metadata.page = page;
331                    }
332                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
333                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
334                    metadata.destinations.load(tx);
335                }
336            });
337            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
338            // Perhaps we should just keep an index of file
339            storedDestinations.clear();
340            pageFile.tx().execute(new Transaction.Closure<IOException>() {
341                @Override
342                public void execute(Transaction tx) throws IOException {
343                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
344                        Entry<String, StoredDestination> entry = iterator.next();
345                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
346                        storedDestinations.put(entry.getKey(), sd);
347
348                        if (checkForCorruptJournalFiles) {
349                            // sanity check the index also
350                            if (!entry.getValue().locationIndex.isEmpty(tx)) {
351                                if (entry.getValue().orderIndex.nextMessageId <= 0) {
352                                    throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
353                                }
354                            }
355                        }
356                    }
357                }
358            });
359            pageFile.flush();
360        } finally {
361            this.indexLock.writeLock().unlock();
362        }
363    }
364
365    private void startCheckpoint() {
366        if (checkpointInterval == 0 && cleanupInterval == 0) {
367            LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart");
368            return;
369        }
370        synchronized (schedulerLock) {
371            if (scheduler == null || scheduler.isShutdown()) {
372                scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
373
374                    @Override
375                    public Thread newThread(Runnable r) {
376                        Thread schedulerThread = new Thread(r);
377
378                        schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
379                        schedulerThread.setDaemon(true);
380
381                        return schedulerThread;
382                    }
383                });
384
385                // Short intervals for check-point and cleanups
386                long delay;
387                if (journal.isJournalDiskSyncPeriodic()) {
388                    delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500);
389                } else {
390                    delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
391                }
392
393                scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
394            }
395        }
396    }
397
398    private final class CheckpointRunner implements Runnable {
399
400        private long lastCheckpoint = System.currentTimeMillis();
401        private long lastCleanup = System.currentTimeMillis();
402        private long lastSync = System.currentTimeMillis();
403        private Location lastAsyncUpdate = null;
404
405        @Override
406        public void run() {
407            try {
408                // Decide on cleanup vs full checkpoint here.
409                if (opened.get()) {
410                    long now = System.currentTimeMillis();
411                    if (journal.isJournalDiskSyncPeriodic() &&
412                            journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) {
413                        Location currentUpdate = lastAsyncJournalUpdate.get();
414                        if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) {
415                            lastAsyncUpdate = currentUpdate;
416                            if (LOG.isTraceEnabled()) {
417                                LOG.trace("Writing trace command to trigger journal sync");
418                            }
419                            store(new KahaTraceCommand(), true, null, null);
420                        }
421                        lastSync = now;
422                    }
423                    if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
424                        checkpointCleanup(true);
425                        lastCleanup = now;
426                        lastCheckpoint = now;
427                    } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
428                        checkpointCleanup(false);
429                        lastCheckpoint = now;
430                    }
431                }
432            } catch (IOException ioe) {
433                LOG.error("Checkpoint failed", ioe);
434                brokerService.handleIOException(ioe);
435            } catch (Throwable e) {
436                LOG.error("Checkpoint failed", e);
437                brokerService.handleIOException(IOExceptionSupport.create(e));
438            }
439        }
440    }
441
442    public void open() throws IOException {
443        if( opened.compareAndSet(false, true) ) {
444            getJournal().start();
445            try {
446                loadPageFile();
447            } catch (Throwable t) {
448                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
449                if (LOG.isDebugEnabled()) {
450                    LOG.debug("Index load failure", t);
451                }
452                // try to recover index
453                try {
454                    pageFile.unload();
455                } catch (Exception ignore) {}
456                if (archiveCorruptedIndex) {
457                    pageFile.archive();
458                } else {
459                    pageFile.delete();
460                }
461                metadata = createMetadata();
462                pageFile = null;
463                loadPageFile();
464            }
465            startCheckpoint();
466            recover();
467        }
468    }
469
470    public void load() throws IOException {
471        this.indexLock.writeLock().lock();
472        IOHelper.mkdirs(directory);
473        try {
474            if (deleteAllMessages) {
475                getJournal().setCheckForCorruptionOnStartup(false);
476                getJournal().start();
477                getJournal().delete();
478                getJournal().close();
479                journal = null;
480                getPageFile().delete();
481                LOG.info("Persistence store purged.");
482                deleteAllMessages = false;
483            }
484
485            open();
486            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
487        } finally {
488            this.indexLock.writeLock().unlock();
489        }
490    }
491
492    public void close() throws IOException, InterruptedException {
493        if( opened.compareAndSet(true, false)) {
494            checkpointLock.writeLock().lock();
495            try {
496                if (metadata.page != null) {
497                    checkpointUpdate(getCleanupOnStop());
498                }
499                pageFile.unload();
500                metadata = createMetadata();
501            } finally {
502                checkpointLock.writeLock().unlock();
503            }
504            journal.close();
505            synchronized(schedulerLock) {
506                if (scheduler != null) {
507                    ThreadPoolUtils.shutdownGraceful(scheduler, -1);
508                    scheduler = null;
509                }
510            }
511            // clear the cache and journalSize on shutdown of the store
512            storeCache.clear();
513            journalSize.set(0);
514        }
515    }
516
517    public void unload() throws IOException, InterruptedException {
518        this.indexLock.writeLock().lock();
519        try {
520            if( pageFile != null && pageFile.isLoaded() ) {
521                metadata.state = CLOSED_STATE;
522                metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
523
524                if (metadata.page != null) {
525                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
526                        @Override
527                        public void execute(Transaction tx) throws IOException {
528                            tx.store(metadata.page, metadataMarshaller, true);
529                        }
530                    });
531                }
532            }
533        } finally {
534            this.indexLock.writeLock().unlock();
535        }
536        close();
537    }
538
539    // public for testing
540    @SuppressWarnings("rawtypes")
541    public Location[] getInProgressTxLocationRange() {
542        Location[] range = new Location[]{null, null};
543        synchronized (inflightTransactions) {
544            if (!inflightTransactions.isEmpty()) {
545                for (List<Operation> ops : inflightTransactions.values()) {
546                    if (!ops.isEmpty()) {
547                        trackMaxAndMin(range, ops);
548                    }
549                }
550            }
551            if (!preparedTransactions.isEmpty()) {
552                for (List<Operation> ops : preparedTransactions.values()) {
553                    if (!ops.isEmpty()) {
554                        trackMaxAndMin(range, ops);
555                    }
556                }
557            }
558        }
559        return range;
560    }
561
562    @SuppressWarnings("rawtypes")
563    private void trackMaxAndMin(Location[] range, List<Operation> ops) {
564        Location t = ops.get(0).getLocation();
565        if (range[0] == null || t.compareTo(range[0]) <= 0) {
566            range[0] = t;
567        }
568        t = ops.get(ops.size() -1).getLocation();
569        if (range[1] == null || t.compareTo(range[1]) >= 0) {
570            range[1] = t;
571        }
572    }
573
574    class TranInfo {
575        TransactionId id;
576        Location location;
577
578        class opCount {
579            int add;
580            int remove;
581        }
582        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
583
584        @SuppressWarnings("rawtypes")
585        public void track(Operation operation) {
586            if (location == null ) {
587                location = operation.getLocation();
588            }
589            KahaDestination destination;
590            boolean isAdd = false;
591            if (operation instanceof AddOperation) {
592                AddOperation add = (AddOperation) operation;
593                destination = add.getCommand().getDestination();
594                isAdd = true;
595            } else {
596                RemoveOperation removeOpperation = (RemoveOperation) operation;
597                destination = removeOpperation.getCommand().getDestination();
598            }
599            opCount opCount = destinationOpCount.get(destination);
600            if (opCount == null) {
601                opCount = new opCount();
602                destinationOpCount.put(destination, opCount);
603            }
604            if (isAdd) {
605                opCount.add++;
606            } else {
607                opCount.remove++;
608            }
609        }
610
611        @Override
612        public String toString() {
613           StringBuffer buffer = new StringBuffer();
614           buffer.append(location).append(";").append(id).append(";\n");
615           for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
616               buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
617           }
618           return buffer.toString();
619        }
620    }
621
622    @SuppressWarnings("rawtypes")
623    public String getTransactions() {
624
625        ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
626        synchronized (inflightTransactions) {
627            if (!inflightTransactions.isEmpty()) {
628                for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
629                    TranInfo info = new TranInfo();
630                    info.id = entry.getKey();
631                    for (Operation operation : entry.getValue()) {
632                        info.track(operation);
633                    }
634                    infos.add(info);
635                }
636            }
637        }
638        synchronized (preparedTransactions) {
639            if (!preparedTransactions.isEmpty()) {
640                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
641                    TranInfo info = new TranInfo();
642                    info.id = entry.getKey();
643                    for (Operation operation : entry.getValue()) {
644                        info.track(operation);
645                    }
646                    infos.add(info);
647                }
648            }
649        }
650        return infos.toString();
651    }
652
653    /**
654     * Move all the messages that were in the journal into long term storage. We
655     * just replay and do a checkpoint.
656     *
657     * @throws IOException
658     * @throws IOException
659     * @throws IllegalStateException
660     */
661    private void recover() throws IllegalStateException, IOException {
662        this.indexLock.writeLock().lock();
663        try {
664
665            long start = System.currentTimeMillis();
666            boolean requiresJournalReplay = recoverProducerAudit();
667            requiresJournalReplay |= recoverAckMessageFileMap();
668            Location lastIndoubtPosition = getRecoveryPosition();
669            Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition;
670            if (recoveryPosition != null) {
671                int redoCounter = 0;
672                int dataFileRotationTracker = recoveryPosition.getDataFileId();
673                LOG.info("Recovering from the journal @" + recoveryPosition);
674                while (recoveryPosition != null) {
675                    try {
676                        JournalCommand<?> message = load(recoveryPosition);
677                        metadata.lastUpdate = recoveryPosition;
678                        process(message, recoveryPosition, lastIndoubtPosition);
679                        redoCounter++;
680                    } catch (IOException failedRecovery) {
681                        if (isIgnoreMissingJournalfiles()) {
682                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
683                            // track this dud location
684                            journal.corruptRecoveryLocation(recoveryPosition);
685                        } else {
686                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
687                        }
688                    }
689                    recoveryPosition = journal.getNextLocation(recoveryPosition);
690                    // hold on to the minimum number of open files during recovery
691                    if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) {
692                        dataFileRotationTracker = recoveryPosition.getDataFileId();
693                        journal.cleanup();
694                    }
695                    if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
696                        LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
697                    }
698                }
699                if (LOG.isInfoEnabled()) {
700                    long end = System.currentTimeMillis();
701                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
702                }
703            }
704
705            // We may have to undo some index updates.
706            pageFile.tx().execute(new Transaction.Closure<IOException>() {
707                @Override
708                public void execute(Transaction tx) throws IOException {
709                    recoverIndex(tx);
710                }
711            });
712
713            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
714            Set<TransactionId> toRollback = new HashSet<TransactionId>();
715            Set<TransactionId> toDiscard = new HashSet<TransactionId>();
716            synchronized (inflightTransactions) {
717                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
718                    TransactionId id = it.next();
719                    if (id.isLocalTransaction()) {
720                        toRollback.add(id);
721                    } else {
722                        toDiscard.add(id);
723                    }
724                }
725                for (TransactionId tx: toRollback) {
726                    if (LOG.isDebugEnabled()) {
727                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
728                    }
729                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
730                }
731                for (TransactionId tx: toDiscard) {
732                    if (LOG.isDebugEnabled()) {
733                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
734                    }
735                    inflightTransactions.remove(tx);
736                }
737            }
738
739            synchronized (preparedTransactions) {
740                for (TransactionId txId : preparedTransactions.keySet()) {
741                    LOG.warn("Recovered prepared XA TX: [{}]", txId);
742                }
743            }
744
745        } finally {
746            this.indexLock.writeLock().unlock();
747        }
748    }
749
750    @SuppressWarnings("unused")
751    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
752        return TransactionIdConversion.convertToLocal(tx);
753    }
754
755    private Location minimum(Location x,
756                             Location y) {
757        Location min = null;
758        if (x != null) {
759            min = x;
760            if (y != null) {
761                int compare = y.compareTo(x);
762                if (compare < 0) {
763                    min = y;
764                }
765            }
766        } else {
767            min = y;
768        }
769        return min;
770    }
771
772    private boolean recoverProducerAudit() throws IOException {
773        boolean requiresReplay = true;
774        if (metadata.producerSequenceIdTrackerLocation != null) {
775            try {
776                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
777                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
778                int maxNumProducers = getMaxFailoverProducersToTrack();
779                int maxAuditDepth = getFailoverProducersAuditDepth();
780                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
781                metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
782                metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
783                requiresReplay = false;
784            } catch (Exception e) {
785                LOG.warn("Cannot recover message audit", e);
786            }
787        }
788        // got no audit stored so got to recreate via replay from start of the journal
789        return requiresReplay;
790    }
791
792    @SuppressWarnings("unchecked")
793    private boolean recoverAckMessageFileMap() throws IOException {
794        boolean requiresReplay = true;
795        if (metadata.ackMessageFileMapLocation != null) {
796            try {
797                KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
798                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
799                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
800                requiresReplay = false;
801            } catch (Exception e) {
802                LOG.warn("Cannot recover ackMessageFileMap", e);
803            }
804        }
805        // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
806        return requiresReplay;
807    }
808
809    protected void recoverIndex(Transaction tx) throws IOException {
810        long start = System.currentTimeMillis();
811        // It is possible index updates got applied before the journal updates..
812        // in that case we need to removed references to messages that are not in the journal
813        final Location lastAppendLocation = journal.getLastAppendLocation();
814        long undoCounter=0;
815
816        // Go through all the destinations to see if they have messages past the lastAppendLocation
817        for (StoredDestination sd : storedDestinations.values()) {
818
819            final ArrayList<Long> matches = new ArrayList<Long>();
820            // Find all the Locations that are >= than the last Append Location.
821            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
822                @Override
823                protected void matched(Location key, Long value) {
824                    matches.add(value);
825                }
826            });
827
828            for (Long sequenceId : matches) {
829                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
830                sd.locationIndex.remove(tx, keys.location);
831                sd.messageIdIndex.remove(tx, keys.messageId);
832                metadata.producerSequenceIdTracker.rollback(keys.messageId);
833                undoCounter++;
834                // TODO: do we need to modify the ack positions for the pub sub case?
835            }
836        }
837
838        if (undoCounter > 0) {
839            // The rolledback operations are basically in flight journal writes.  To avoid getting
840            // these the end user should do sync writes to the journal.
841            if (LOG.isInfoEnabled()) {
842                long end = System.currentTimeMillis();
843                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
844            }
845        }
846
847        undoCounter = 0;
848        start = System.currentTimeMillis();
849
850        // Lets be extra paranoid here and verify that all the datafiles being referenced
851        // by the indexes still exists.
852
853        final SequenceSet ss = new SequenceSet();
854        for (StoredDestination sd : storedDestinations.values()) {
855            // Use a visitor to cut down the number of pages that we load
856            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
857                int last=-1;
858
859                @Override
860                public boolean isInterestedInKeysBetween(Location first, Location second) {
861                    if( first==null ) {
862                        return !ss.contains(0, second.getDataFileId());
863                    } else if( second==null ) {
864                        return true;
865                    } else {
866                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
867                    }
868                }
869
870                @Override
871                public void visit(List<Location> keys, List<Long> values) {
872                    for (Location l : keys) {
873                        int fileId = l.getDataFileId();
874                        if( last != fileId ) {
875                            ss.add(fileId);
876                            last = fileId;
877                        }
878                    }
879                }
880
881            });
882        }
883        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
884        while (!ss.isEmpty()) {
885            missingJournalFiles.add((int) ss.removeFirst());
886        }
887
888        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
889            missingJournalFiles.add(entry.getKey());
890            for (Integer i : entry.getValue()) {
891                missingJournalFiles.add(i);
892            }
893        }
894
895        missingJournalFiles.removeAll(journal.getFileMap().keySet());
896
897        if (!missingJournalFiles.isEmpty()) {
898            LOG.warn("Some journal files are missing: " + missingJournalFiles);
899        }
900
901        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
902        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
903        for (Integer missing : missingJournalFiles) {
904            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
905        }
906
907        if (checkForCorruptJournalFiles) {
908            Collection<DataFile> dataFiles = journal.getFileMap().values();
909            for (DataFile dataFile : dataFiles) {
910                int id = dataFile.getDataFileId();
911                // eof to next file id
912                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
913                Sequence seq = dataFile.getCorruptedBlocks().getHead();
914                while (seq != null) {
915                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
916                        new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
917                    missingPredicates.add(visitor);
918                    knownCorruption.add(visitor);
919                    seq = seq.getNext();
920                }
921            }
922        }
923
924        if (!missingPredicates.isEmpty()) {
925            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
926                final StoredDestination sd = sdEntry.getValue();
927                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>();
928                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
929                    @Override
930                    protected void matched(Location key, Long value) {
931                        matches.put(value, key);
932                    }
933                });
934
935                // If some message references are affected by the missing data files...
936                if (!matches.isEmpty()) {
937
938                    // We either 'gracefully' recover dropping the missing messages or
939                    // we error out.
940                    if( ignoreMissingJournalfiles ) {
941                        // Update the index to remove the references to the missing data
942                        for (Long sequenceId : matches.keySet()) {
943                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
944                            sd.locationIndex.remove(tx, keys.location);
945                            sd.messageIdIndex.remove(tx, keys.messageId);
946                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
947                            undoCounter++;
948                            // TODO: do we need to modify the ack positions for the pub sub case?
949                        }
950                    } else {
951                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
952                        throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
953                    }
954                }
955            }
956        }
957
958        if (!ignoreMissingJournalfiles) {
959            if (!knownCorruption.isEmpty()) {
960                LOG.error("Detected corrupt journal files. " + knownCorruption);
961                throw new IOException("Detected corrupt journal files. " + knownCorruption);
962            }
963
964            if (!missingJournalFiles.isEmpty()) {
965                LOG.error("Detected missing journal files. " + missingJournalFiles);
966                throw new IOException("Detected missing journal files. " + missingJournalFiles);
967            }
968        }
969
970        if (undoCounter > 0) {
971            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
972            // should do sync writes to the journal.
973            if (LOG.isInfoEnabled()) {
974                long end = System.currentTimeMillis();
975                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
976            }
977        }
978    }
979
980    private Location nextRecoveryPosition;
981    private Location lastRecoveryPosition;
982
983    public void incrementalRecover() throws IOException {
984        this.indexLock.writeLock().lock();
985        try {
986            if( nextRecoveryPosition == null ) {
987                if( lastRecoveryPosition==null ) {
988                    nextRecoveryPosition = getRecoveryPosition();
989                } else {
990                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
991                }
992            }
993            while (nextRecoveryPosition != null) {
994                lastRecoveryPosition = nextRecoveryPosition;
995                metadata.lastUpdate = lastRecoveryPosition;
996                JournalCommand<?> message = load(lastRecoveryPosition);
997                process(message, lastRecoveryPosition, (IndexAware) null);
998                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
999            }
1000        } finally {
1001            this.indexLock.writeLock().unlock();
1002        }
1003    }
1004
1005    public Location getLastUpdatePosition() throws IOException {
1006        return metadata.lastUpdate;
1007    }
1008
1009    private Location getRecoveryPosition() throws IOException {
1010
1011        if (!this.forceRecoverIndex) {
1012
1013            // If we need to recover the transactions..
1014            if (metadata.firstInProgressTransactionLocation != null) {
1015                return metadata.firstInProgressTransactionLocation;
1016            }
1017
1018            // Perhaps there were no transactions...
1019            if( metadata.lastUpdate!=null) {
1020                // Start replay at the record after the last one recorded in the index file.
1021                return getNextInitializedLocation(metadata.lastUpdate);
1022            }
1023        }
1024        // This loads the first position.
1025        return journal.getNextLocation(null);
1026    }
1027
1028    private Location getNextInitializedLocation(Location location) throws IOException {
1029        Location mayNotBeInitialized = journal.getNextLocation(location);
1030        if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) {
1031            // need to init size and type to skip
1032            return journal.getNextLocation(mayNotBeInitialized);
1033        } else {
1034            return mayNotBeInitialized;
1035        }
1036    }
1037
1038    protected void checkpointCleanup(final boolean cleanup) throws IOException {
1039        long start;
1040        this.indexLock.writeLock().lock();
1041        try {
1042            start = System.currentTimeMillis();
1043            if( !opened.get() ) {
1044                return;
1045            }
1046        } finally {
1047            this.indexLock.writeLock().unlock();
1048        }
1049        checkpointUpdate(cleanup);
1050        long end = System.currentTimeMillis();
1051        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1052            if (LOG.isInfoEnabled()) {
1053                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
1054            }
1055        }
1056    }
1057
1058    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
1059        int size = data.serializedSizeFramed();
1060        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
1061        os.writeByte(data.type().getNumber());
1062        data.writeFramed(os);
1063        return os.toByteSequence();
1064    }
1065
1066    // /////////////////////////////////////////////////////////////////
1067    // Methods call by the broker to update and query the store.
1068    // /////////////////////////////////////////////////////////////////
1069    public Location store(JournalCommand<?> data) throws IOException {
1070        return store(data, false, null,null);
1071    }
1072
1073    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
1074        return store(data, false, null, null, onJournalStoreComplete);
1075    }
1076
1077    public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException {
1078        return store(data, sync, before, after, null);
1079    }
1080
1081    /**
1082     * All updated are are funneled through this method. The updates are converted
1083     * to a JournalMessage which is logged to the journal and then the data from
1084     * the JournalMessage is used to update the index just like it would be done
1085     * during a recovery process.
1086     */
1087    public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
1088        try {
1089            ByteSequence sequence = toByteSequence(data);
1090            Location location;
1091
1092            checkpointLock.readLock().lock();
1093            try {
1094
1095                long start = System.currentTimeMillis();
1096                location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
1097                long start2 = System.currentTimeMillis();
1098                //Track the last async update so we know if we need to sync at the next checkpoint
1099                if (!sync && journal.isJournalDiskSyncPeriodic()) {
1100                    lastAsyncJournalUpdate.set(location);
1101                }
1102                process(data, location, before);
1103
1104                long end = System.currentTimeMillis();
1105                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1106                    if (LOG.isInfoEnabled()) {
1107                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
1108                    }
1109                }
1110            } finally {
1111                checkpointLock.readLock().unlock();
1112            }
1113
1114            if (after != null) {
1115                after.run();
1116            }
1117
1118            return location;
1119        } catch (IOException ioe) {
1120            LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe);
1121            brokerService.handleIOException(ioe);
1122            throw ioe;
1123        }
1124    }
1125
1126    /**
1127     * Loads a previously stored JournalMessage
1128     *
1129     * @param location
1130     * @return
1131     * @throws IOException
1132     */
1133    public JournalCommand<?> load(Location location) throws IOException {
1134        long start = System.currentTimeMillis();
1135        ByteSequence data = journal.read(location);
1136        long end = System.currentTimeMillis();
1137        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1138            if (LOG.isInfoEnabled()) {
1139                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
1140            }
1141        }
1142        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
1143        byte readByte = is.readByte();
1144        KahaEntryType type = KahaEntryType.valueOf(readByte);
1145        if( type == null ) {
1146            try {
1147                is.close();
1148            } catch (IOException e) {}
1149            throw new IOException("Could not load journal record, null type information from: " + readByte + " at location: "+location);
1150        }
1151        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
1152        message.mergeFramed(is);
1153        return message;
1154    }
1155
1156    /**
1157     * do minimal recovery till we reach the last inDoubtLocation
1158     * @param data
1159     * @param location
1160     * @param inDoubtlocation
1161     * @throws IOException
1162     */
1163    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
1164        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
1165            process(data, location, (IndexAware) null);
1166        } else {
1167            // just recover producer audit
1168            data.visit(new Visitor() {
1169                @Override
1170                public void visit(KahaAddMessageCommand command) throws IOException {
1171                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1172                }
1173            });
1174        }
1175    }
1176
1177    // /////////////////////////////////////////////////////////////////
1178    // Journaled record processing methods. Once the record is journaled,
1179    // these methods handle applying the index updates. These may be called
1180    // from the recovery method too so they need to be idempotent
1181    // /////////////////////////////////////////////////////////////////
1182
1183    void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException {
1184        data.visit(new Visitor() {
1185            @Override
1186            public void visit(KahaAddMessageCommand command) throws IOException {
1187                process(command, location, onSequenceAssignedCallback);
1188            }
1189
1190            @Override
1191            public void visit(KahaRemoveMessageCommand command) throws IOException {
1192                process(command, location);
1193            }
1194
1195            @Override
1196            public void visit(KahaPrepareCommand command) throws IOException {
1197                process(command, location);
1198            }
1199
1200            @Override
1201            public void visit(KahaCommitCommand command) throws IOException {
1202                process(command, location, onSequenceAssignedCallback);
1203            }
1204
1205            @Override
1206            public void visit(KahaRollbackCommand command) throws IOException {
1207                process(command, location);
1208            }
1209
1210            @Override
1211            public void visit(KahaRemoveDestinationCommand command) throws IOException {
1212                process(command, location);
1213            }
1214
1215            @Override
1216            public void visit(KahaSubscriptionCommand command) throws IOException {
1217                process(command, location);
1218            }
1219
1220            @Override
1221            public void visit(KahaProducerAuditCommand command) throws IOException {
1222                processLocation(location);
1223            }
1224
1225            @Override
1226            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
1227                processLocation(location);
1228            }
1229
1230            @Override
1231            public void visit(KahaTraceCommand command) {
1232                processLocation(location);
1233            }
1234
1235            @Override
1236            public void visit(KahaUpdateMessageCommand command) throws IOException {
1237                process(command, location);
1238            }
1239
1240            @Override
1241            public void visit(KahaRewrittenDataFileCommand command) throws IOException {
1242                process(command, location);
1243            }
1244        });
1245    }
1246
1247    @SuppressWarnings("rawtypes")
1248    protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException {
1249        if (command.hasTransactionInfo()) {
1250            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1251            inflightTx.add(new AddOperation(command, location, runWithIndexLock));
1252        } else {
1253            this.indexLock.writeLock().lock();
1254            try {
1255                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1256                    @Override
1257                    public void execute(Transaction tx) throws IOException {
1258                        long assignedIndex = updateIndex(tx, command, location);
1259                        if (runWithIndexLock != null) {
1260                            runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex);
1261                        }
1262                    }
1263                });
1264
1265            } finally {
1266                this.indexLock.writeLock().unlock();
1267            }
1268        }
1269    }
1270
1271    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
1272        this.indexLock.writeLock().lock();
1273        try {
1274            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1275                @Override
1276                public void execute(Transaction tx) throws IOException {
1277                    updateIndex(tx, command, location);
1278                }
1279            });
1280        } finally {
1281            this.indexLock.writeLock().unlock();
1282        }
1283    }
1284
1285    @SuppressWarnings("rawtypes")
1286    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1287        if (command.hasTransactionInfo()) {
1288           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1289           inflightTx.add(new RemoveOperation(command, location));
1290        } else {
1291            this.indexLock.writeLock().lock();
1292            try {
1293                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1294                    @Override
1295                    public void execute(Transaction tx) throws IOException {
1296                        updateIndex(tx, command, location);
1297                    }
1298                });
1299            } finally {
1300                this.indexLock.writeLock().unlock();
1301            }
1302        }
1303    }
1304
1305    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1306        this.indexLock.writeLock().lock();
1307        try {
1308            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1309                @Override
1310                public void execute(Transaction tx) throws IOException {
1311                    updateIndex(tx, command, location);
1312                }
1313            });
1314        } finally {
1315            this.indexLock.writeLock().unlock();
1316        }
1317    }
1318
1319    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1320        this.indexLock.writeLock().lock();
1321        try {
1322            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1323                @Override
1324                public void execute(Transaction tx) throws IOException {
1325                    updateIndex(tx, command, location);
1326                }
1327            });
1328        } finally {
1329            this.indexLock.writeLock().unlock();
1330        }
1331    }
1332
1333    protected void processLocation(final Location location) {
1334        this.indexLock.writeLock().lock();
1335        try {
1336            metadata.lastUpdate = location;
1337        } finally {
1338            this.indexLock.writeLock().unlock();
1339        }
1340    }
1341
1342    @SuppressWarnings("rawtypes")
1343    protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException {
1344        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1345        List<Operation> inflightTx;
1346        synchronized (inflightTransactions) {
1347            inflightTx = inflightTransactions.remove(key);
1348            if (inflightTx == null) {
1349                inflightTx = preparedTransactions.remove(key);
1350            }
1351        }
1352        if (inflightTx == null) {
1353            // only non persistent messages in this tx
1354            if (before != null) {
1355                before.sequenceAssignedWithIndexLocked(-1);
1356            }
1357            return;
1358        }
1359
1360        final List<Operation> messagingTx = inflightTx;
1361        indexLock.writeLock().lock();
1362        try {
1363            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1364                @Override
1365                public void execute(Transaction tx) throws IOException {
1366                    for (Operation op : messagingTx) {
1367                        op.execute(tx);
1368                        recordAckMessageReferenceLocation(location, op.getLocation());
1369                    }
1370                }
1371            });
1372            metadata.lastUpdate = location;
1373        } finally {
1374            indexLock.writeLock().unlock();
1375        }
1376    }
1377
1378    @SuppressWarnings("rawtypes")
1379    protected void process(KahaPrepareCommand command, Location location) {
1380        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1381        List<Operation> tx = null;
1382        synchronized (inflightTransactions) {
1383            tx = inflightTransactions.remove(key);
1384            if (tx != null) {
1385                preparedTransactions.put(key, tx);
1386            }
1387        }
1388        if (tx != null && !tx.isEmpty()) {
1389            indexLock.writeLock().lock();
1390            try {
1391                for (Operation op : tx) {
1392                    recordAckMessageReferenceLocation(location, op.getLocation());
1393                }
1394            } finally {
1395                indexLock.writeLock().unlock();
1396            }
1397        }
1398    }
1399
1400    @SuppressWarnings("rawtypes")
1401    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1402        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1403        List<Operation> updates = null;
1404        synchronized (inflightTransactions) {
1405            updates = inflightTransactions.remove(key);
1406            if (updates == null) {
1407                updates = preparedTransactions.remove(key);
1408            }
1409        }
1410        if (key.isXATransaction() && updates != null && !updates.isEmpty()) {
1411            indexLock.writeLock().lock();
1412            try {
1413                for (Operation op : updates) {
1414                    recordAckMessageReferenceLocation(location, op.getLocation());
1415                }
1416            } finally {
1417                indexLock.writeLock().unlock();
1418            }
1419        }
1420    }
1421
1422    protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
1423        final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1424
1425        // Mark the current journal file as a compacted file so that gc checks can skip
1426        // over logs that are smaller compaction type logs.
1427        DataFile current = journal.getDataFileById(location.getDataFileId());
1428        current.setTypeCode(command.getRewriteType());
1429
1430        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
1431            // Move offset so that next location read jumps to next file.
1432            location.setOffset(journalMaxFileLength);
1433        }
1434    }
1435
1436    // /////////////////////////////////////////////////////////////////
1437    // These methods do the actual index updates.
1438    // /////////////////////////////////////////////////////////////////
1439
1440    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1441    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1442
1443    long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1444        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1445
1446        // Skip adding the message to the index if this is a topic and there are
1447        // no subscriptions.
1448        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1449            return -1;
1450        }
1451
1452        // Add the message.
1453        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1454        long id = sd.orderIndex.getNextMessageId(priority);
1455        Long previous = sd.locationIndex.put(tx, location, id);
1456        if (previous == null) {
1457            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1458            if (previous == null) {
1459                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1460                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1461                    addAckLocationForNewMessage(tx, sd, id);
1462                }
1463                metadata.lastUpdate = location;
1464            } else {
1465
1466                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
1467                if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
1468                    // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
1469                    LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1470                }
1471                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1472                sd.locationIndex.remove(tx, location);
1473                id = -1;
1474            }
1475        } else {
1476            // restore the previous value.. Looks like this was a redo of a previously
1477            // added message. We don't want to assign it a new id as the other indexes would
1478            // be wrong..
1479            sd.locationIndex.put(tx, location, previous);
1480            metadata.lastUpdate = location;
1481        }
1482        // record this id in any event, initial send or recovery
1483        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1484        return id;
1485    }
1486
1487    void trackPendingAdd(KahaDestination destination, Long seq) {
1488        StoredDestination sd = storedDestinations.get(key(destination));
1489        if (sd != null) {
1490            sd.trackPendingAdd(seq);
1491        }
1492    }
1493
1494    void trackPendingAddComplete(KahaDestination destination, Long seq) {
1495        StoredDestination sd = storedDestinations.get(key(destination));
1496        if (sd != null) {
1497            sd.trackPendingAddComplete(seq);
1498        }
1499    }
1500
1501    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
1502        KahaAddMessageCommand command = updateMessageCommand.getMessage();
1503        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1504
1505        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
1506        if (id != null) {
1507            MessageKeys previousKeys = sd.orderIndex.put(
1508                    tx,
1509                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
1510                    id,
1511                    new MessageKeys(command.getMessageId(), location)
1512            );
1513            sd.locationIndex.put(tx, location, id);
1514            // on first update previous is original location, on recovery/replay it may be the updated location
1515            if(previousKeys != null && !previousKeys.location.equals(location)) {
1516                sd.locationIndex.remove(tx, previousKeys.location);
1517            }
1518            metadata.lastUpdate = location;
1519        } else {
1520            //Add the message if it can't be found
1521            this.updateIndex(tx, command, location);
1522        }
1523    }
1524
1525    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1526        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1527        if (!command.hasSubscriptionKey()) {
1528
1529            // In the queue case we just remove the message from the index..
1530            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1531            if (sequenceId != null) {
1532                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1533                if (keys != null) {
1534                    sd.locationIndex.remove(tx, keys.location);
1535                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1536                    metadata.lastUpdate = ackLocation;
1537                }  else if (LOG.isDebugEnabled()) {
1538                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1539                }
1540            } else if (LOG.isDebugEnabled()) {
1541                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1542            }
1543        } else {
1544            // In the topic case we need remove the message once it's been acked
1545            // by all the subs
1546            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1547
1548            // Make sure it's a valid message id...
1549            if (sequence != null) {
1550                String subscriptionKey = command.getSubscriptionKey();
1551                if (command.getAck() != UNMATCHED) {
1552                    sd.orderIndex.get(tx, sequence);
1553                    byte priority = sd.orderIndex.lastGetPriority();
1554                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1555                }
1556
1557                MessageKeys keys = sd.orderIndex.get(tx, sequence);
1558                if (keys != null) {
1559                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1560                }
1561                // The following method handles deleting un-referenced messages.
1562                removeAckLocation(tx, sd, subscriptionKey, sequence);
1563                metadata.lastUpdate = ackLocation;
1564            } else if (LOG.isDebugEnabled()) {
1565                LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1566            }
1567
1568        }
1569    }
1570
1571    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1572        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1573        if (referenceFileIds == null) {
1574            referenceFileIds = new HashSet<Integer>();
1575            referenceFileIds.add(messageLocation.getDataFileId());
1576            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1577        } else {
1578            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1579            if (!referenceFileIds.contains(id)) {
1580                referenceFileIds.add(id);
1581            }
1582        }
1583    }
1584
1585    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1586        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1587        sd.orderIndex.remove(tx);
1588
1589        sd.locationIndex.clear(tx);
1590        sd.locationIndex.unload(tx);
1591        tx.free(sd.locationIndex.getPageId());
1592
1593        sd.messageIdIndex.clear(tx);
1594        sd.messageIdIndex.unload(tx);
1595        tx.free(sd.messageIdIndex.getPageId());
1596
1597        if (sd.subscriptions != null) {
1598            sd.subscriptions.clear(tx);
1599            sd.subscriptions.unload(tx);
1600            tx.free(sd.subscriptions.getPageId());
1601
1602            sd.subscriptionAcks.clear(tx);
1603            sd.subscriptionAcks.unload(tx);
1604            tx.free(sd.subscriptionAcks.getPageId());
1605
1606            sd.ackPositions.clear(tx);
1607            sd.ackPositions.unload(tx);
1608            tx.free(sd.ackPositions.getHeadPageId());
1609
1610            sd.subLocations.clear(tx);
1611            sd.subLocations.unload(tx);
1612            tx.free(sd.subLocations.getHeadPageId());
1613        }
1614
1615        String key = key(command.getDestination());
1616        storedDestinations.remove(key);
1617        metadata.destinations.remove(tx, key);
1618        storeCache.remove(key(command.getDestination()));
1619    }
1620
1621    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1622        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1623        final String subscriptionKey = command.getSubscriptionKey();
1624
1625        // If set then we are creating it.. otherwise we are destroying the sub
1626        if (command.hasSubscriptionInfo()) {
1627            Location existing = sd.subLocations.get(tx, subscriptionKey);
1628            if (existing != null && existing.compareTo(location) == 0) {
1629                // replay on recovery, ignore
1630                LOG.trace("ignoring journal replay of replay of sub from: " + location);
1631                return;
1632            }
1633
1634            sd.subscriptions.put(tx, subscriptionKey, command);
1635            sd.subLocations.put(tx, subscriptionKey, location);
1636            long ackLocation=NOT_ACKED;
1637            if (!command.getRetroactive()) {
1638                ackLocation = sd.orderIndex.nextMessageId-1;
1639            } else {
1640                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1641            }
1642            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1643            sd.subscriptionCache.add(subscriptionKey);
1644        } else {
1645            // delete the sub...
1646            sd.subscriptions.remove(tx, subscriptionKey);
1647            sd.subLocations.remove(tx, subscriptionKey);
1648            sd.subscriptionAcks.remove(tx, subscriptionKey);
1649            sd.subscriptionCache.remove(subscriptionKey);
1650            removeAckLocationsForSub(tx, sd, subscriptionKey);
1651
1652            if (sd.subscriptions.isEmpty(tx)) {
1653                // remove the stored destination
1654                KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
1655                removeDestinationCommand.setDestination(command.getDestination());
1656                updateIndex(tx, removeDestinationCommand, null);
1657            }
1658        }
1659    }
1660
1661    private void checkpointUpdate(final boolean cleanup) throws IOException {
1662        checkpointLock.writeLock().lock();
1663        try {
1664            this.indexLock.writeLock().lock();
1665            try {
1666                Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>, IOException>() {
1667                    @Override
1668                    public Set<Integer> execute(Transaction tx) throws IOException {
1669                        return checkpointUpdate(tx, cleanup);
1670                    }
1671                });
1672                pageFile.flush();
1673                // after the index update such that partial removal does not leave dangling references in the index.
1674                journal.removeDataFiles(filesToGc);
1675            } finally {
1676                this.indexLock.writeLock().unlock();
1677            }
1678
1679        } finally {
1680            checkpointLock.writeLock().unlock();
1681        }
1682    }
1683
1684    /**
1685     * @param tx
1686     * @throws IOException
1687     */
1688    Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1689        MDC.put("activemq.persistenceDir", getDirectory().getName());
1690        LOG.debug("Checkpoint started.");
1691
1692        // reflect last update exclusive of current checkpoint
1693        Location lastUpdate = metadata.lastUpdate;
1694
1695        metadata.state = OPEN_STATE;
1696        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1697        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
1698        Location[] inProgressTxRange = getInProgressTxLocationRange();
1699        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1700        tx.store(metadata.page, metadataMarshaller, true);
1701
1702        final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>();
1703        if (cleanup) {
1704
1705            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1706            gcCandidateSet.addAll(completeFileSet);
1707
1708            if (LOG.isTraceEnabled()) {
1709                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1710            }
1711
1712            if (lastUpdate != null) {
1713                // we won't delete past the last update, ackCompaction journal can be a candidate in error
1714                gcCandidateSet.removeAll(new TreeSet<Integer>(gcCandidateSet.tailSet(lastUpdate.getDataFileId())));
1715            }
1716
1717            // Don't GC files under replication
1718            if( journalFilesBeingReplicated!=null ) {
1719                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1720            }
1721
1722            if (metadata.producerSequenceIdTrackerLocation != null) {
1723                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
1724                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
1725                    // rewrite so we don't prevent gc
1726                    metadata.producerSequenceIdTracker.setModified(true);
1727                    if (LOG.isTraceEnabled()) {
1728                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
1729                    }
1730                }
1731                gcCandidateSet.remove(dataFileId);
1732                if (LOG.isTraceEnabled()) {
1733                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet);
1734                }
1735            }
1736
1737            if (metadata.ackMessageFileMapLocation != null) {
1738                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
1739                gcCandidateSet.remove(dataFileId);
1740                if (LOG.isTraceEnabled()) {
1741                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet);
1742                }
1743            }
1744
1745            // Don't GC files referenced by in-progress tx
1746            if (inProgressTxRange[0] != null) {
1747                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1748                    gcCandidateSet.remove(pendingTx);
1749                }
1750            }
1751            if (LOG.isTraceEnabled()) {
1752                LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1753            }
1754
1755            // Go through all the destinations to see if any of them can remove GC candidates.
1756            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1757                if( gcCandidateSet.isEmpty() ) {
1758                    break;
1759                }
1760
1761                // Use a visitor to cut down the number of pages that we load
1762                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1763                    int last=-1;
1764                    @Override
1765                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1766                        if( first==null ) {
1767                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1768                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1769                                subset.remove(second.getDataFileId());
1770                            }
1771                            return !subset.isEmpty();
1772                        } else if( second==null ) {
1773                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1774                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1775                                subset.remove(first.getDataFileId());
1776                            }
1777                            return !subset.isEmpty();
1778                        } else {
1779                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1780                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1781                                subset.remove(first.getDataFileId());
1782                            }
1783                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1784                                subset.remove(second.getDataFileId());
1785                            }
1786                            return !subset.isEmpty();
1787                        }
1788                    }
1789
1790                    @Override
1791                    public void visit(List<Location> keys, List<Long> values) {
1792                        for (Location l : keys) {
1793                            int fileId = l.getDataFileId();
1794                            if( last != fileId ) {
1795                                gcCandidateSet.remove(fileId);
1796                                last = fileId;
1797                            }
1798                        }
1799                    }
1800                });
1801
1802                // Durable Subscription
1803                if (entry.getValue().subLocations != null) {
1804                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
1805                    while (iter.hasNext()) {
1806                        Entry<String, Location> subscription = iter.next();
1807                        int dataFileId = subscription.getValue().getDataFileId();
1808
1809                        // Move subscription along if it has no outstanding messages that need ack'd
1810                        // and its in the last log file in the journal.
1811                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
1812                            final StoredDestination destination = entry.getValue();
1813                            final String subscriptionKey = subscription.getKey();
1814                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1815
1816                            // When pending is size one that is the next message Id meaning there
1817                            // are no pending messages currently.
1818                            if (pendingAcks == null || pendingAcks.isEmpty() ||
1819                                (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
1820
1821                                if (LOG.isTraceEnabled()) {
1822                                    LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
1823                                }
1824
1825                                final KahaSubscriptionCommand kahaSub =
1826                                    destination.subscriptions.get(tx, subscriptionKey);
1827                                destination.subLocations.put(
1828                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
1829
1830                                // Skips the remove from candidates if we rewrote the subscription
1831                                // in order to prevent duplicate subscription commands on recover.
1832                                // If another subscription is on the same file and isn't rewritten
1833                                // than it will remove the file from the set.
1834                                continue;
1835                            }
1836                        }
1837
1838                        gcCandidateSet.remove(dataFileId);
1839                    }
1840                }
1841
1842                if (LOG.isTraceEnabled()) {
1843                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1844                }
1845            }
1846
1847            // check we are not deleting file with ack for in-use journal files
1848            if (LOG.isTraceEnabled()) {
1849                LOG.trace("gc candidates: " + gcCandidateSet);
1850                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
1851            }
1852
1853            boolean ackMessageFileMapMod = false;
1854            Iterator<Integer> candidates = gcCandidateSet.iterator();
1855            while (candidates.hasNext()) {
1856                Integer candidate = candidates.next();
1857                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
1858                if (referencedFileIds != null) {
1859                    for (Integer referencedFileId : referencedFileIds) {
1860                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
1861                            // active file that is not targeted for deletion is referenced so don't delete
1862                            candidates.remove();
1863                            break;
1864                        }
1865                    }
1866                    if (gcCandidateSet.contains(candidate)) {
1867                        ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
1868                    } else {
1869                        if (LOG.isTraceEnabled()) {
1870                            LOG.trace("not removing data file: " + candidate
1871                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1872                        }
1873                    }
1874                }
1875            }
1876
1877            if (!gcCandidateSet.isEmpty()) {
1878                LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
1879                for (Integer candidate : gcCandidateSet) {
1880                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
1881                        ackMessageFileMapMod |= ackFiles.remove(candidate);
1882                    }
1883                }
1884                if (ackMessageFileMapMod) {
1885                    checkpointUpdate(tx, false);
1886                }
1887            } else if (isEnableAckCompaction()) {
1888                if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
1889                    // First check length of journal to make sure it makes sense to even try.
1890                    //
1891                    // If there is only one journal file with Acks in it we don't need to move
1892                    // it since it won't be chained to any later logs.
1893                    //
1894                    // If the logs haven't grown since the last time then we need to compact
1895                    // otherwise there seems to still be room for growth and we don't need to incur
1896                    // the overhead.  Depending on configuration this check can be avoided and
1897                    // Ack compaction will run any time the store has not GC'd a journal file in
1898                    // the configured amount of cycles.
1899                    if (metadata.ackMessageFileMap.size() > 1 &&
1900                        (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
1901
1902                        LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
1903                        try {
1904                            scheduler.execute(new AckCompactionRunner());
1905                        } catch (Exception ex) {
1906                            LOG.warn("Error on queueing the Ack Compactor", ex);
1907                        }
1908                    } else {
1909                        LOG.trace("Journal activity detected, no Ack compaction scheduled.");
1910                    }
1911
1912                    checkPointCyclesWithNoGC = 0;
1913                } else {
1914                    LOG.trace("Not yet time to check for compaction: {} of {} cycles",
1915                              checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
1916                }
1917
1918                journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
1919            }
1920        }
1921        MDC.remove("activemq.persistenceDir");
1922
1923        LOG.debug("Checkpoint done.");
1924        return gcCandidateSet;
1925    }
1926
1927    private final class AckCompactionRunner implements Runnable {
1928
1929        @Override
1930        public void run() {
1931
1932            int journalToAdvance = -1;
1933            Set<Integer> journalLogsReferenced = new HashSet<Integer>();
1934
1935            //flag to know whether the ack forwarding completed without an exception
1936            boolean forwarded = false;
1937
1938            try {
1939                //acquire the checkpoint lock to prevent other threads from
1940                //running a checkpoint while this is running
1941                //
1942                //Normally this task runs on the same executor as the checkpoint task
1943                //so this ack compaction runner wouldn't run at the same time as the checkpoint task.
1944                //
1945                //However, there are two cases where this isn't always true.
1946                //First, the checkpoint() method is public and can be called through the
1947                //PersistenceAdapter interface by someone at the same time this is running.
1948                //Second, a checkpoint is called during shutdown without using the executor.
1949                //
1950                //In the future it might be better to just remove the checkpointLock entirely
1951                //and only use the executor but this would need to be examined for any unintended
1952                //consequences
1953                checkpointLock.readLock().lock();
1954
1955                try {
1956
1957                    // Lock index to capture the ackMessageFileMap data
1958                    indexLock.writeLock().lock();
1959
1960                    // Map keys might not be sorted, find the earliest log file to forward acks
1961                    // from and move only those, future cycles can chip away at more as needed.
1962                    // We won't move files that are themselves rewritten on a previous compaction.
1963                    List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
1964                    Collections.sort(journalFileIds);
1965                    for (Integer journalFileId : journalFileIds) {
1966                        DataFile current = journal.getDataFileById(journalFileId);
1967                        if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
1968                            journalToAdvance = journalFileId;
1969                            break;
1970                        }
1971                    }
1972
1973                    // Check if we found one, or if we only found the current file being written to.
1974                    if (journalToAdvance == -1 || blockedFromCompaction(journalToAdvance)) {
1975                        return;
1976                    }
1977
1978                    journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
1979
1980                } finally {
1981                    indexLock.writeLock().unlock();
1982                }
1983
1984                try {
1985                    // Background rewrite of the old acks
1986                    forwardAllAcks(journalToAdvance, journalLogsReferenced);
1987                    forwarded = true;
1988                } catch (IOException ioe) {
1989                    LOG.error("Forwarding of acks failed", ioe);
1990                    brokerService.handleIOException(ioe);
1991                } catch (Throwable e) {
1992                    LOG.error("Forwarding of acks failed", e);
1993                    brokerService.handleIOException(IOExceptionSupport.create(e));
1994                }
1995            } finally {
1996                checkpointLock.readLock().unlock();
1997            }
1998
1999            try {
2000                if (forwarded) {
2001                    // Checkpoint with changes from the ackMessageFileMap
2002                    checkpointUpdate(false);
2003                }
2004            } catch (IOException ioe) {
2005                LOG.error("Checkpoint failed", ioe);
2006                brokerService.handleIOException(ioe);
2007            } catch (Throwable e) {
2008                LOG.error("Checkpoint failed", e);
2009                brokerService.handleIOException(IOExceptionSupport.create(e));
2010            }
2011        }
2012    }
2013
2014    // called with the index lock held
2015    private boolean blockedFromCompaction(int journalToAdvance) {
2016        // don't forward the current data file
2017        if (journalToAdvance == journal.getCurrentDataFileId()) {
2018            return true;
2019        }
2020        // don't forward any data file with inflight transaction records because it will whack the tx - data file link
2021        // in the ack map when all acks are migrated (now that the ack map is not just for acks)
2022        // TODO: prepare records can be dropped but completion records (maybe only commit outcomes) need to be migrated
2023        // as part of the forward work.
2024        Location[] inProgressTxRange = getInProgressTxLocationRange();
2025        if (inProgressTxRange[0] != null) {
2026            for (int pendingTx = inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
2027                if (journalToAdvance == pendingTx) {
2028                    LOG.trace("Compaction target:{} blocked by inflight transaction records: {}", journalToAdvance, inProgressTxRange);
2029                    return true;
2030                }
2031            }
2032        }
2033        return false;
2034    }
2035
2036    private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
2037        LOG.trace("Attempting to move all acks in journal:{} to the front. Referenced files:{}", journalToRead, journalLogsReferenced);
2038
2039        DataFile forwardsFile = journal.reserveDataFile();
2040        forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
2041        LOG.trace("Reserved file for forwarded acks: {}", forwardsFile);
2042
2043        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
2044
2045        try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
2046            KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
2047            compactionMarker.setSourceDataFileId(journalToRead);
2048            compactionMarker.setRewriteType(forwardsFile.getTypeCode());
2049
2050            ByteSequence payload = toByteSequence(compactionMarker);
2051            appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2052            LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
2053
2054            final Location limit = new Location(journalToRead + 1, 0);
2055            Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0), limit);
2056            while (nextLocation != null) {
2057                JournalCommand<?> command = null;
2058                try {
2059                    command = load(nextLocation);
2060                } catch (IOException ex) {
2061                    LOG.trace("Error loading command during ack forward: {}", nextLocation);
2062                }
2063
2064                if (shouldForward(command)) {
2065                    payload = toByteSequence(command);
2066                    Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2067                    updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
2068                }
2069
2070                nextLocation = getNextLocationForAckForward(nextLocation, limit);
2071            }
2072        }
2073
2074        LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
2075
2076        // Lock index while we update the ackMessageFileMap.
2077        indexLock.writeLock().lock();
2078
2079        // Update the ack map with the new locations of the acks
2080        for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
2081            Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
2082            if (referenceFileIds == null) {
2083                referenceFileIds = new HashSet<Integer>();
2084                referenceFileIds.addAll(entry.getValue());
2085                metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
2086            } else {
2087                referenceFileIds.addAll(entry.getValue());
2088            }
2089        }
2090
2091        // remove the old location data from the ack map so that the old journal log file can
2092        // be removed on next GC.
2093        metadata.ackMessageFileMap.remove(journalToRead);
2094
2095        indexLock.writeLock().unlock();
2096
2097        LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
2098    }
2099
2100    private boolean shouldForward(JournalCommand<?> command) {
2101        boolean result = false;
2102        if (command != null) {
2103            if (command instanceof KahaRemoveMessageCommand) {
2104                result = true;
2105            } else if (command instanceof KahaCommitCommand) {
2106                KahaCommitCommand kahaCommitCommand = (KahaCommitCommand) command;
2107                if (kahaCommitCommand.hasTransactionInfo() && kahaCommitCommand.getTransactionInfo().hasXaTransactionId()) {
2108                    result = true;
2109                }
2110            }
2111        }
2112        return result;
2113    }
2114
2115    private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) {
2116        //getNextLocation() can throw an IOException, we should handle it and set
2117        //nextLocation to null and abort gracefully
2118        //Should not happen in the normal case
2119        Location location = null;
2120        try {
2121            location = journal.getNextLocation(nextLocation, limit);
2122        } catch (IOException e) {
2123            LOG.warn("Failed to load next journal location after: {}, reason: {}", nextLocation, e);
2124            if (LOG.isDebugEnabled()) {
2125                LOG.debug("Failed to load next journal location after: {}", nextLocation, e);
2126            }
2127        }
2128        return location;
2129    }
2130
2131    final Runnable nullCompletionCallback = new Runnable() {
2132        @Override
2133        public void run() {
2134        }
2135    };
2136
2137    private Location checkpointProducerAudit() throws IOException {
2138        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
2139            ByteArrayOutputStream baos = new ByteArrayOutputStream();
2140            ObjectOutputStream oout = new ObjectOutputStream(baos);
2141            oout.writeObject(metadata.producerSequenceIdTracker);
2142            oout.flush();
2143            oout.close();
2144            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2145            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
2146            try {
2147                location.getLatch().await();
2148                if (location.getException().get() != null) {
2149                    throw location.getException().get();
2150                }
2151            } catch (InterruptedException e) {
2152                throw new InterruptedIOException(e.toString());
2153            }
2154            return location;
2155        }
2156        return metadata.producerSequenceIdTrackerLocation;
2157    }
2158
2159    private Location checkpointAckMessageFileMap() throws IOException {
2160        ByteArrayOutputStream baos = new ByteArrayOutputStream();
2161        ObjectOutputStream oout = new ObjectOutputStream(baos);
2162        oout.writeObject(metadata.ackMessageFileMap);
2163        oout.flush();
2164        oout.close();
2165        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2166        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
2167        try {
2168            location.getLatch().await();
2169        } catch (InterruptedException e) {
2170            throw new InterruptedIOException(e.toString());
2171        }
2172        return location;
2173    }
2174
2175    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
2176
2177        ByteSequence sequence = toByteSequence(subscription);
2178        Location location = journal.write(sequence, nullCompletionCallback) ;
2179
2180        try {
2181            location.getLatch().await();
2182        } catch (InterruptedException e) {
2183            throw new InterruptedIOException(e.toString());
2184        }
2185        return location;
2186    }
2187
2188    public HashSet<Integer> getJournalFilesBeingReplicated() {
2189        return journalFilesBeingReplicated;
2190    }
2191
2192    // /////////////////////////////////////////////////////////////////
2193    // StoredDestination related implementation methods.
2194    // /////////////////////////////////////////////////////////////////
2195
2196    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
2197
2198    static class MessageKeys {
2199        final String messageId;
2200        final Location location;
2201
2202        public MessageKeys(String messageId, Location location) {
2203            this.messageId=messageId;
2204            this.location=location;
2205        }
2206
2207        @Override
2208        public String toString() {
2209            return "["+messageId+","+location+"]";
2210        }
2211    }
2212
2213    static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
2214        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
2215
2216        @Override
2217        public MessageKeys readPayload(DataInput dataIn) throws IOException {
2218            return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
2219        }
2220
2221        @Override
2222        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
2223            dataOut.writeUTF(object.messageId);
2224            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
2225        }
2226    }
2227
2228    class LastAck {
2229        long lastAckedSequence;
2230        byte priority;
2231
2232        public LastAck(LastAck source) {
2233            this.lastAckedSequence = source.lastAckedSequence;
2234            this.priority = source.priority;
2235        }
2236
2237        public LastAck() {
2238            this.priority = MessageOrderIndex.HI;
2239        }
2240
2241        public LastAck(long ackLocation) {
2242            this.lastAckedSequence = ackLocation;
2243            this.priority = MessageOrderIndex.LO;
2244        }
2245
2246        public LastAck(long ackLocation, byte priority) {
2247            this.lastAckedSequence = ackLocation;
2248            this.priority = priority;
2249        }
2250
2251        @Override
2252        public String toString() {
2253            return "[" + lastAckedSequence + ":" + priority + "]";
2254        }
2255    }
2256
2257    protected class LastAckMarshaller implements Marshaller<LastAck> {
2258
2259        @Override
2260        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
2261            dataOut.writeLong(object.lastAckedSequence);
2262            dataOut.writeByte(object.priority);
2263        }
2264
2265        @Override
2266        public LastAck readPayload(DataInput dataIn) throws IOException {
2267            LastAck lastAcked = new LastAck();
2268            lastAcked.lastAckedSequence = dataIn.readLong();
2269            if (metadata.version >= 3) {
2270                lastAcked.priority = dataIn.readByte();
2271            }
2272            return lastAcked;
2273        }
2274
2275        @Override
2276        public int getFixedSize() {
2277            return 9;
2278        }
2279
2280        @Override
2281        public LastAck deepCopy(LastAck source) {
2282            return new LastAck(source);
2283        }
2284
2285        @Override
2286        public boolean isDeepCopySupported() {
2287            return true;
2288        }
2289    }
2290
2291    class StoredDestination {
2292
2293        MessageOrderIndex orderIndex = new MessageOrderIndex();
2294        BTreeIndex<Location, Long> locationIndex;
2295        BTreeIndex<String, Long> messageIdIndex;
2296
2297        // These bits are only set for Topics
2298        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
2299        BTreeIndex<String, LastAck> subscriptionAcks;
2300        HashMap<String, MessageOrderCursor> subscriptionCursors;
2301        ListIndex<String, SequenceSet> ackPositions;
2302        ListIndex<String, Location> subLocations;
2303
2304        // Transient data used to track which Messages are no longer needed.
2305        final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
2306        final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
2307
2308        public void trackPendingAdd(Long seq) {
2309            orderIndex.trackPendingAdd(seq);
2310        }
2311
2312        public void trackPendingAddComplete(Long seq) {
2313            orderIndex.trackPendingAddComplete(seq);
2314        }
2315
2316        @Override
2317        public String toString() {
2318            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
2319        }
2320    }
2321
2322    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
2323
2324        @Override
2325        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
2326            final StoredDestination value = new StoredDestination();
2327            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2328            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
2329            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
2330
2331            if (dataIn.readBoolean()) {
2332                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
2333                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
2334                if (metadata.version >= 4) {
2335                    value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
2336                } else {
2337                    // upgrade
2338                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2339                        @Override
2340                        public void execute(Transaction tx) throws IOException {
2341                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
2342
2343                            if (metadata.version >= 3) {
2344                                // migrate
2345                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
2346                                        new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
2347                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
2348                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
2349                                oldAckPositions.load(tx);
2350
2351
2352                                // Do the initial build of the data in memory before writing into the store
2353                                // based Ack Positions List to avoid a lot of disk thrashing.
2354                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
2355                                while (iterator.hasNext()) {
2356                                    Entry<Long, HashSet<String>> entry = iterator.next();
2357
2358                                    for(String subKey : entry.getValue()) {
2359                                        SequenceSet pendingAcks = temp.get(subKey);
2360                                        if (pendingAcks == null) {
2361                                            pendingAcks = new SequenceSet();
2362                                            temp.put(subKey, pendingAcks);
2363                                        }
2364
2365                                        pendingAcks.add(entry.getKey());
2366                                    }
2367                                }
2368                            }
2369                            // Now move the pending messages to ack data into the store backed
2370                            // structure.
2371                            value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2372                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2373                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2374                            value.ackPositions.load(tx);
2375                            for(String subscriptionKey : temp.keySet()) {
2376                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
2377                            }
2378
2379                        }
2380                    });
2381                }
2382
2383                if (metadata.version >= 5) {
2384                    value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
2385                } else {
2386                    // upgrade
2387                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2388                        @Override
2389                        public void execute(Transaction tx) throws IOException {
2390                            value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2391                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2392                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2393                            value.subLocations.load(tx);
2394                        }
2395                    });
2396                }
2397            }
2398            if (metadata.version >= 2) {
2399                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2400                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2401            } else {
2402                // upgrade
2403                pageFile.tx().execute(new Transaction.Closure<IOException>() {
2404                    @Override
2405                    public void execute(Transaction tx) throws IOException {
2406                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2407                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2408                        value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2409                        value.orderIndex.lowPriorityIndex.load(tx);
2410
2411                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2412                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2413                        value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2414                        value.orderIndex.highPriorityIndex.load(tx);
2415                    }
2416                });
2417            }
2418
2419            return value;
2420        }
2421
2422        @Override
2423        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
2424            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
2425            dataOut.writeLong(value.locationIndex.getPageId());
2426            dataOut.writeLong(value.messageIdIndex.getPageId());
2427            if (value.subscriptions != null) {
2428                dataOut.writeBoolean(true);
2429                dataOut.writeLong(value.subscriptions.getPageId());
2430                dataOut.writeLong(value.subscriptionAcks.getPageId());
2431                dataOut.writeLong(value.ackPositions.getHeadPageId());
2432                dataOut.writeLong(value.subLocations.getHeadPageId());
2433            } else {
2434                dataOut.writeBoolean(false);
2435            }
2436            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
2437            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
2438        }
2439    }
2440
2441    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
2442        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
2443
2444        @Override
2445        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
2446            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
2447            rc.mergeFramed((InputStream)dataIn);
2448            return rc;
2449        }
2450
2451        @Override
2452        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
2453            object.writeFramed((OutputStream)dataOut);
2454        }
2455    }
2456
2457    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2458        String key = key(destination);
2459        StoredDestination rc = storedDestinations.get(key);
2460        if (rc == null) {
2461            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
2462            rc = loadStoredDestination(tx, key, topic);
2463            // Cache it. We may want to remove/unload destinations from the
2464            // cache that are not used for a while
2465            // to reduce memory usage.
2466            storedDestinations.put(key, rc);
2467        }
2468        return rc;
2469    }
2470
2471    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2472        String key = key(destination);
2473        StoredDestination rc = storedDestinations.get(key);
2474        if (rc == null && metadata.destinations.containsKey(tx, key)) {
2475            rc = getStoredDestination(destination, tx);
2476        }
2477        return rc;
2478    }
2479
2480    /**
2481     * @param tx
2482     * @param key
2483     * @param topic
2484     * @return
2485     * @throws IOException
2486     */
2487    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
2488        // Try to load the existing indexes..
2489        StoredDestination rc = metadata.destinations.get(tx, key);
2490        if (rc == null) {
2491            // Brand new destination.. allocate indexes for it.
2492            rc = new StoredDestination();
2493            rc.orderIndex.allocate(tx);
2494            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
2495            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
2496
2497            if (topic) {
2498                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
2499                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
2500                rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2501                rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2502            }
2503            metadata.destinations.put(tx, key, rc);
2504        }
2505
2506        // Configure the marshalers and load.
2507        rc.orderIndex.load(tx);
2508
2509        // Figure out the next key using the last entry in the destination.
2510        rc.orderIndex.configureLast(tx);
2511
2512        rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE);
2513        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2514        rc.locationIndex.load(tx);
2515
2516        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
2517        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2518        rc.messageIdIndex.load(tx);
2519
2520        // If it was a topic...
2521        if (topic) {
2522
2523            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
2524            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
2525            rc.subscriptions.load(tx);
2526
2527            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
2528            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
2529            rc.subscriptionAcks.load(tx);
2530
2531            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2532            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2533            rc.ackPositions.load(tx);
2534
2535            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2536            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2537            rc.subLocations.load(tx);
2538
2539            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
2540
2541            if (metadata.version < 3) {
2542
2543                // on upgrade need to fill ackLocation with available messages past last ack
2544                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2545                    Entry<String, LastAck> entry = iterator.next();
2546                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
2547                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
2548                        Long sequence = orderIterator.next().getKey();
2549                        addAckLocation(tx, rc, sequence, entry.getKey());
2550                    }
2551                    // modify so it is upgraded
2552                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
2553                }
2554            }
2555
2556            // Configure the message references index
2557            Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
2558            while (subscriptions.hasNext()) {
2559                Entry<String, SequenceSet> subscription = subscriptions.next();
2560                SequenceSet pendingAcks = subscription.getValue();
2561                if (pendingAcks != null && !pendingAcks.isEmpty()) {
2562                    Long lastPendingAck = pendingAcks.getTail().getLast();
2563                    for(Long sequenceId : pendingAcks) {
2564                        Long current = rc.messageReferences.get(sequenceId);
2565                        if (current == null) {
2566                            current = new Long(0);
2567                        }
2568
2569                        // We always add a trailing empty entry for the next position to start from
2570                        // so we need to ensure we don't count that as a message reference on reload.
2571                        if (!sequenceId.equals(lastPendingAck)) {
2572                            current = current.longValue() + 1;
2573                        }
2574
2575                        rc.messageReferences.put(sequenceId, current);
2576                    }
2577                }
2578            }
2579
2580            // Configure the subscription cache
2581            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2582                Entry<String, LastAck> entry = iterator.next();
2583                rc.subscriptionCache.add(entry.getKey());
2584            }
2585
2586            if (rc.orderIndex.nextMessageId == 0) {
2587                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
2588                if (!rc.subscriptionAcks.isEmpty(tx)) {
2589                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
2590                        Entry<String, LastAck> entry = iterator.next();
2591                        rc.orderIndex.nextMessageId =
2592                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
2593                    }
2594                }
2595            } else {
2596                // update based on ackPositions for unmatched, last entry is always the next
2597                if (!rc.messageReferences.isEmpty()) {
2598                    Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
2599                    rc.orderIndex.nextMessageId =
2600                            Math.max(rc.orderIndex.nextMessageId, nextMessageId);
2601                }
2602            }
2603        }
2604
2605        if (metadata.version < VERSION) {
2606            // store again after upgrade
2607            metadata.destinations.put(tx, key, rc);
2608        }
2609        return rc;
2610    }
2611
2612    /**
2613     * This is a map to cache MessageStores for a specific
2614     * KahaDestination key
2615     */
2616    protected final ConcurrentMap<String, MessageStore> storeCache =
2617       new ConcurrentHashMap<>();
2618
2619    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
2620        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2621        if (sequences == null) {
2622            sequences = new SequenceSet();
2623            sequences.add(messageSequence);
2624            sd.ackPositions.add(tx, subscriptionKey, sequences);
2625        } else {
2626            sequences.add(messageSequence);
2627            sd.ackPositions.put(tx, subscriptionKey, sequences);
2628        }
2629
2630        Long count = sd.messageReferences.get(messageSequence);
2631        if (count == null) {
2632            count = Long.valueOf(0L);
2633        }
2634        count = count.longValue() + 1;
2635        sd.messageReferences.put(messageSequence, count);
2636    }
2637
2638    // new sub is interested in potentially all existing messages
2639    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2640        SequenceSet allOutstanding = new SequenceSet();
2641        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
2642        while (iterator.hasNext()) {
2643            SequenceSet set = iterator.next().getValue();
2644            for (Long entry : set) {
2645                allOutstanding.add(entry);
2646            }
2647        }
2648        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
2649
2650        for (Long ackPosition : allOutstanding) {
2651            Long count = sd.messageReferences.get(ackPosition);
2652            count = count.longValue() + 1;
2653            sd.messageReferences.put(ackPosition, count);
2654        }
2655    }
2656
2657    // on a new message add, all existing subs are interested in this message
2658    private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
2659        for(String subscriptionKey : sd.subscriptionCache) {
2660            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2661            if (sequences == null) {
2662                sequences = new SequenceSet();
2663                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2664                sd.ackPositions.add(tx, subscriptionKey, sequences);
2665            } else {
2666                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2667                sd.ackPositions.put(tx, subscriptionKey, sequences);
2668            }
2669
2670            Long count = sd.messageReferences.get(messageSequence);
2671            if (count == null) {
2672                count = Long.valueOf(0L);
2673            }
2674            count = count.longValue() + 1;
2675            sd.messageReferences.put(messageSequence, count);
2676            sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
2677        }
2678    }
2679
2680    private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2681        if (!sd.ackPositions.isEmpty(tx)) {
2682            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2683            if (sequences == null || sequences.isEmpty()) {
2684                return;
2685            }
2686
2687            ArrayList<Long> unreferenced = new ArrayList<Long>();
2688
2689            for(Long sequenceId : sequences) {
2690                Long references = sd.messageReferences.get(sequenceId);
2691                if (references != null) {
2692                    references = references.longValue() - 1;
2693
2694                    if (references.longValue() > 0) {
2695                        sd.messageReferences.put(sequenceId, references);
2696                    } else {
2697                        sd.messageReferences.remove(sequenceId);
2698                        unreferenced.add(sequenceId);
2699                    }
2700                }
2701            }
2702
2703            for(Long sequenceId : unreferenced) {
2704                // Find all the entries that need to get deleted.
2705                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2706                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2707
2708                // Do the actual deletes.
2709                for (Entry<Long, MessageKeys> entry : deletes) {
2710                    sd.locationIndex.remove(tx, entry.getValue().location);
2711                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2712                    sd.orderIndex.remove(tx, entry.getKey());
2713                }
2714            }
2715        }
2716    }
2717
2718    /**
2719     * @param tx
2720     * @param sd
2721     * @param subscriptionKey
2722     * @param messageSequence
2723     * @throws IOException
2724     */
2725    private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
2726        // Remove the sub from the previous location set..
2727        if (messageSequence != null) {
2728            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2729            if (range != null && !range.isEmpty()) {
2730                range.remove(messageSequence);
2731                if (!range.isEmpty()) {
2732                    sd.ackPositions.put(tx, subscriptionKey, range);
2733                } else {
2734                    sd.ackPositions.remove(tx, subscriptionKey);
2735                }
2736
2737                // Check if the message is reference by any other subscription.
2738                Long count = sd.messageReferences.get(messageSequence);
2739                if (count != null){
2740                long references = count.longValue() - 1;
2741                    if (references > 0) {
2742                        sd.messageReferences.put(messageSequence, Long.valueOf(references));
2743                        return;
2744                    } else {
2745                        sd.messageReferences.remove(messageSequence);
2746                    }
2747                }
2748
2749                // Find all the entries that need to get deleted.
2750                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2751                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2752
2753                // Do the actual deletes.
2754                for (Entry<Long, MessageKeys> entry : deletes) {
2755                    sd.locationIndex.remove(tx, entry.getValue().location);
2756                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2757                    sd.orderIndex.remove(tx, entry.getKey());
2758                }
2759            }
2760        }
2761    }
2762
2763    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2764        return sd.subscriptionAcks.get(tx, subscriptionKey);
2765    }
2766
2767    public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2768        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2769        if (messageSequences != null) {
2770            long result = messageSequences.rangeSize();
2771            // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2772            return result > 0 ? result - 1 : 0;
2773        }
2774
2775        return 0;
2776    }
2777
2778    protected String key(KahaDestination destination) {
2779        return destination.getType().getNumber() + ":" + destination.getName();
2780    }
2781
2782    // /////////////////////////////////////////////////////////////////
2783    // Transaction related implementation methods.
2784    // /////////////////////////////////////////////////////////////////
2785    @SuppressWarnings("rawtypes")
2786    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2787    @SuppressWarnings("rawtypes")
2788    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2789
2790    @SuppressWarnings("rawtypes")
2791    private List<Operation> getInflightTx(KahaTransactionInfo info) {
2792        TransactionId key = TransactionIdConversion.convert(info);
2793        List<Operation> tx;
2794        synchronized (inflightTransactions) {
2795            tx = inflightTransactions.get(key);
2796            if (tx == null) {
2797                tx = Collections.synchronizedList(new ArrayList<Operation>());
2798                inflightTransactions.put(key, tx);
2799            }
2800        }
2801        return tx;
2802    }
2803
2804    @SuppressWarnings("unused")
2805    private TransactionId key(KahaTransactionInfo transactionInfo) {
2806        return TransactionIdConversion.convert(transactionInfo);
2807    }
2808
2809    abstract class Operation <T extends JournalCommand<T>> {
2810        final T command;
2811        final Location location;
2812
2813        public Operation(T command, Location location) {
2814            this.command = command;
2815            this.location = location;
2816        }
2817
2818        public Location getLocation() {
2819            return location;
2820        }
2821
2822        public T getCommand() {
2823            return command;
2824        }
2825
2826        abstract public void execute(Transaction tx) throws IOException;
2827    }
2828
2829    class AddOperation extends Operation<KahaAddMessageCommand> {
2830        final IndexAware runWithIndexLock;
2831        public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) {
2832            super(command, location);
2833            this.runWithIndexLock = runWithIndexLock;
2834        }
2835
2836        @Override
2837        public void execute(Transaction tx) throws IOException {
2838            long seq = updateIndex(tx, command, location);
2839            if (runWithIndexLock != null) {
2840                runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
2841            }
2842        }
2843    }
2844
2845    class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
2846
2847        public RemoveOperation(KahaRemoveMessageCommand command, Location location) {
2848            super(command, location);
2849        }
2850
2851        @Override
2852        public void execute(Transaction tx) throws IOException {
2853            updateIndex(tx, command, location);
2854        }
2855    }
2856
2857    // /////////////////////////////////////////////////////////////////
2858    // Initialization related implementation methods.
2859    // /////////////////////////////////////////////////////////////////
2860
2861    private PageFile createPageFile() throws IOException {
2862        if (indexDirectory == null) {
2863            indexDirectory = directory;
2864        }
2865        IOHelper.mkdirs(indexDirectory);
2866        PageFile index = new PageFile(indexDirectory, "db");
2867        index.setEnableWriteThread(isEnableIndexWriteAsync());
2868        index.setWriteBatchSize(getIndexWriteBatchSize());
2869        index.setPageCacheSize(indexCacheSize);
2870        index.setUseLFRUEviction(isUseIndexLFRUEviction());
2871        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2872        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2873        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2874        index.setEnablePageCaching(isEnableIndexPageCaching());
2875        return index;
2876    }
2877
2878    protected Journal createJournal() throws IOException {
2879        Journal manager = new Journal();
2880        manager.setDirectory(directory);
2881        manager.setMaxFileLength(getJournalMaxFileLength());
2882        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2883        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2884        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2885        manager.setArchiveDataLogs(isArchiveDataLogs());
2886        manager.setSizeAccumulator(journalSize);
2887        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2888        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
2889        manager.setPreallocationStrategy(
2890                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
2891        manager.setJournalDiskSyncStrategy(journalDiskSyncStrategy);
2892        if (getDirectoryArchive() != null) {
2893            IOHelper.mkdirs(getDirectoryArchive());
2894            manager.setDirectoryArchive(getDirectoryArchive());
2895        }
2896        return manager;
2897    }
2898
2899    private Metadata createMetadata() {
2900        Metadata md = new Metadata();
2901        md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
2902        md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
2903        return md;
2904    }
2905
2906    public int getJournalMaxWriteBatchSize() {
2907        return journalMaxWriteBatchSize;
2908    }
2909
2910    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2911        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2912    }
2913
2914    public File getDirectory() {
2915        return directory;
2916    }
2917
2918    public void setDirectory(File directory) {
2919        this.directory = directory;
2920    }
2921
2922    public boolean isDeleteAllMessages() {
2923        return deleteAllMessages;
2924    }
2925
2926    public void setDeleteAllMessages(boolean deleteAllMessages) {
2927        this.deleteAllMessages = deleteAllMessages;
2928    }
2929
2930    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2931        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2932    }
2933
2934    public int getIndexWriteBatchSize() {
2935        return setIndexWriteBatchSize;
2936    }
2937
2938    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2939        this.enableIndexWriteAsync = enableIndexWriteAsync;
2940    }
2941
2942    boolean isEnableIndexWriteAsync() {
2943        return enableIndexWriteAsync;
2944    }
2945
2946    /**
2947     * @deprecated use {@link #getJournalDiskSyncStrategyEnum} or {@link #getJournalDiskSyncStrategy} instead
2948     * @return
2949     */
2950    public boolean isEnableJournalDiskSyncs() {
2951        return journalDiskSyncStrategy == JournalDiskSyncStrategy.ALWAYS;
2952    }
2953
2954    /**
2955     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
2956     * @param syncWrites
2957     */
2958    public void setEnableJournalDiskSyncs(boolean syncWrites) {
2959        if (syncWrites) {
2960            journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
2961        } else {
2962            journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER;
2963        }
2964    }
2965
2966    public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() {
2967        return journalDiskSyncStrategy;
2968    }
2969
2970    public String getJournalDiskSyncStrategy() {
2971        return journalDiskSyncStrategy.name();
2972    }
2973
2974    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
2975        this.journalDiskSyncStrategy = JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase());
2976    }
2977
2978    public long getJournalDiskSyncInterval() {
2979        return journalDiskSyncInterval;
2980    }
2981
2982    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
2983        this.journalDiskSyncInterval = journalDiskSyncInterval;
2984    }
2985
2986    public long getCheckpointInterval() {
2987        return checkpointInterval;
2988    }
2989
2990    public void setCheckpointInterval(long checkpointInterval) {
2991        this.checkpointInterval = checkpointInterval;
2992    }
2993
2994    public long getCleanupInterval() {
2995        return cleanupInterval;
2996    }
2997
2998    public void setCleanupInterval(long cleanupInterval) {
2999        this.cleanupInterval = cleanupInterval;
3000    }
3001
3002    public boolean getCleanupOnStop() {
3003        return cleanupOnStop;
3004    }
3005
3006    public void setCleanupOnStop(boolean cleanupOnStop) {
3007        this.cleanupOnStop = cleanupOnStop;
3008    }
3009
3010    public void setJournalMaxFileLength(int journalMaxFileLength) {
3011        this.journalMaxFileLength = journalMaxFileLength;
3012    }
3013
3014    public int getJournalMaxFileLength() {
3015        return journalMaxFileLength;
3016    }
3017
3018    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
3019        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
3020    }
3021
3022    public int getMaxFailoverProducersToTrack() {
3023        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
3024    }
3025
3026    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
3027        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
3028    }
3029
3030    public int getFailoverProducersAuditDepth() {
3031        return this.metadata.producerSequenceIdTracker.getAuditDepth();
3032    }
3033
3034    public PageFile getPageFile() throws IOException {
3035        if (pageFile == null) {
3036            pageFile = createPageFile();
3037        }
3038        return pageFile;
3039    }
3040
3041    public Journal getJournal() throws IOException {
3042        if (journal == null) {
3043            journal = createJournal();
3044        }
3045        return journal;
3046    }
3047
3048    protected Metadata getMetadata() {
3049        return metadata;
3050    }
3051
3052    public boolean isFailIfDatabaseIsLocked() {
3053        return failIfDatabaseIsLocked;
3054    }
3055
3056    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
3057        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
3058    }
3059
3060    public boolean isIgnoreMissingJournalfiles() {
3061        return ignoreMissingJournalfiles;
3062    }
3063
3064    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
3065        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
3066    }
3067
3068    public int getIndexCacheSize() {
3069        return indexCacheSize;
3070    }
3071
3072    public void setIndexCacheSize(int indexCacheSize) {
3073        this.indexCacheSize = indexCacheSize;
3074    }
3075
3076    public boolean isCheckForCorruptJournalFiles() {
3077        return checkForCorruptJournalFiles;
3078    }
3079
3080    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
3081        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
3082    }
3083
3084    public boolean isChecksumJournalFiles() {
3085        return checksumJournalFiles;
3086    }
3087
3088    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
3089        this.checksumJournalFiles = checksumJournalFiles;
3090    }
3091
3092    @Override
3093    public void setBrokerService(BrokerService brokerService) {
3094        this.brokerService = brokerService;
3095    }
3096
3097    /**
3098     * @return the archiveDataLogs
3099     */
3100    public boolean isArchiveDataLogs() {
3101        return this.archiveDataLogs;
3102    }
3103
3104    /**
3105     * @param archiveDataLogs the archiveDataLogs to set
3106     */
3107    public void setArchiveDataLogs(boolean archiveDataLogs) {
3108        this.archiveDataLogs = archiveDataLogs;
3109    }
3110
3111    /**
3112     * @return the directoryArchive
3113     */
3114    public File getDirectoryArchive() {
3115        return this.directoryArchive;
3116    }
3117
3118    /**
3119     * @param directoryArchive the directoryArchive to set
3120     */
3121    public void setDirectoryArchive(File directoryArchive) {
3122        this.directoryArchive = directoryArchive;
3123    }
3124
3125    public boolean isArchiveCorruptedIndex() {
3126        return archiveCorruptedIndex;
3127    }
3128
3129    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
3130        this.archiveCorruptedIndex = archiveCorruptedIndex;
3131    }
3132
3133    public float getIndexLFUEvictionFactor() {
3134        return indexLFUEvictionFactor;
3135    }
3136
3137    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
3138        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
3139    }
3140
3141    public boolean isUseIndexLFRUEviction() {
3142        return useIndexLFRUEviction;
3143    }
3144
3145    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
3146        this.useIndexLFRUEviction = useIndexLFRUEviction;
3147    }
3148
3149    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
3150        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
3151    }
3152
3153    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
3154        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
3155    }
3156
3157    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
3158        this.enableIndexPageCaching = enableIndexPageCaching;
3159    }
3160
3161    public boolean isEnableIndexDiskSyncs() {
3162        return enableIndexDiskSyncs;
3163    }
3164
3165    public boolean isEnableIndexRecoveryFile() {
3166        return enableIndexRecoveryFile;
3167    }
3168
3169    public boolean isEnableIndexPageCaching() {
3170        return enableIndexPageCaching;
3171    }
3172
3173    // /////////////////////////////////////////////////////////////////
3174    // Internal conversion methods.
3175    // /////////////////////////////////////////////////////////////////
3176
3177    class MessageOrderCursor{
3178        long defaultCursorPosition;
3179        long lowPriorityCursorPosition;
3180        long highPriorityCursorPosition;
3181        MessageOrderCursor(){
3182        }
3183
3184        MessageOrderCursor(long position){
3185            this.defaultCursorPosition=position;
3186            this.lowPriorityCursorPosition=position;
3187            this.highPriorityCursorPosition=position;
3188        }
3189
3190        MessageOrderCursor(MessageOrderCursor other){
3191            this.defaultCursorPosition=other.defaultCursorPosition;
3192            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3193            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3194        }
3195
3196        MessageOrderCursor copy() {
3197            return new MessageOrderCursor(this);
3198        }
3199
3200        void reset() {
3201            this.defaultCursorPosition=0;
3202            this.highPriorityCursorPosition=0;
3203            this.lowPriorityCursorPosition=0;
3204        }
3205
3206        void increment() {
3207            if (defaultCursorPosition!=0) {
3208                defaultCursorPosition++;
3209            }
3210            if (highPriorityCursorPosition!=0) {
3211                highPriorityCursorPosition++;
3212            }
3213            if (lowPriorityCursorPosition!=0) {
3214                lowPriorityCursorPosition++;
3215            }
3216        }
3217
3218        @Override
3219        public String toString() {
3220           return "MessageOrderCursor:[def:" + defaultCursorPosition
3221                   + ", low:" + lowPriorityCursorPosition
3222                   + ", high:" +  highPriorityCursorPosition + "]";
3223        }
3224
3225        public void sync(MessageOrderCursor other) {
3226            this.defaultCursorPosition=other.defaultCursorPosition;
3227            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3228            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3229        }
3230    }
3231
3232    class MessageOrderIndex {
3233        static final byte HI = 9;
3234        static final byte LO = 0;
3235        static final byte DEF = 4;
3236
3237        long nextMessageId;
3238        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
3239        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
3240        BTreeIndex<Long, MessageKeys> highPriorityIndex;
3241        final MessageOrderCursor cursor = new MessageOrderCursor();
3242        Long lastDefaultKey;
3243        Long lastHighKey;
3244        Long lastLowKey;
3245        byte lastGetPriority;
3246        final List<Long> pendingAdditions = new LinkedList<Long>();
3247
3248        MessageKeys remove(Transaction tx, Long key) throws IOException {
3249            MessageKeys result = defaultPriorityIndex.remove(tx, key);
3250            if (result == null && highPriorityIndex!=null) {
3251                result = highPriorityIndex.remove(tx, key);
3252                if (result ==null && lowPriorityIndex!=null) {
3253                    result = lowPriorityIndex.remove(tx, key);
3254                }
3255            }
3256            return result;
3257        }
3258
3259        void load(Transaction tx) throws IOException {
3260            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3261            defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
3262            defaultPriorityIndex.load(tx);
3263            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3264            lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
3265            lowPriorityIndex.load(tx);
3266            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3267            highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
3268            highPriorityIndex.load(tx);
3269        }
3270
3271        void allocate(Transaction tx) throws IOException {
3272            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3273            if (metadata.version >= 2) {
3274                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3275                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3276            }
3277        }
3278
3279        void configureLast(Transaction tx) throws IOException {
3280            // Figure out the next key using the last entry in the destination.
3281            TreeSet<Long> orderedSet = new TreeSet<Long>();
3282
3283            addLast(orderedSet, highPriorityIndex, tx);
3284            addLast(orderedSet, defaultPriorityIndex, tx);
3285            addLast(orderedSet, lowPriorityIndex, tx);
3286
3287            if (!orderedSet.isEmpty()) {
3288                nextMessageId = orderedSet.last() + 1;
3289            }
3290        }
3291
3292        private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException {
3293            if (index != null) {
3294                Entry<Long, MessageKeys> lastEntry = index.getLast(tx);
3295                if (lastEntry != null) {
3296                    orderedSet.add(lastEntry.getKey());
3297                }
3298            }
3299        }
3300
3301        void clear(Transaction tx) throws IOException {
3302            this.remove(tx);
3303            this.resetCursorPosition();
3304            this.allocate(tx);
3305            this.load(tx);
3306            this.configureLast(tx);
3307        }
3308
3309        void remove(Transaction tx) throws IOException {
3310            defaultPriorityIndex.clear(tx);
3311            defaultPriorityIndex.unload(tx);
3312            tx.free(defaultPriorityIndex.getPageId());
3313            if (lowPriorityIndex != null) {
3314                lowPriorityIndex.clear(tx);
3315                lowPriorityIndex.unload(tx);
3316
3317                tx.free(lowPriorityIndex.getPageId());
3318            }
3319            if (highPriorityIndex != null) {
3320                highPriorityIndex.clear(tx);
3321                highPriorityIndex.unload(tx);
3322                tx.free(highPriorityIndex.getPageId());
3323            }
3324        }
3325
3326        void resetCursorPosition() {
3327            this.cursor.reset();
3328            lastDefaultKey = null;
3329            lastHighKey = null;
3330            lastLowKey = null;
3331        }
3332
3333        void setBatch(Transaction tx, Long sequence) throws IOException {
3334            if (sequence != null) {
3335                Long nextPosition = new Long(sequence.longValue() + 1);
3336                lastDefaultKey = sequence;
3337                cursor.defaultCursorPosition = nextPosition.longValue();
3338                lastHighKey = sequence;
3339                cursor.highPriorityCursorPosition = nextPosition.longValue();
3340                lastLowKey = sequence;
3341                cursor.lowPriorityCursorPosition = nextPosition.longValue();
3342            }
3343        }
3344
3345        void setBatch(Transaction tx, LastAck last) throws IOException {
3346            setBatch(tx, last.lastAckedSequence);
3347            if (cursor.defaultCursorPosition == 0
3348                    && cursor.highPriorityCursorPosition == 0
3349                    && cursor.lowPriorityCursorPosition == 0) {
3350                long next = last.lastAckedSequence + 1;
3351                switch (last.priority) {
3352                    case DEF:
3353                        cursor.defaultCursorPosition = next;
3354                        cursor.highPriorityCursorPosition = next;
3355                        break;
3356                    case HI:
3357                        cursor.highPriorityCursorPosition = next;
3358                        break;
3359                    case LO:
3360                        cursor.lowPriorityCursorPosition = next;
3361                        cursor.defaultCursorPosition = next;
3362                        cursor.highPriorityCursorPosition = next;
3363                        break;
3364                }
3365            }
3366        }
3367
3368        void stoppedIterating() {
3369            if (lastDefaultKey!=null) {
3370                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
3371            }
3372            if (lastHighKey!=null) {
3373                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
3374            }
3375            if (lastLowKey!=null) {
3376                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
3377            }
3378            lastDefaultKey = null;
3379            lastHighKey = null;
3380            lastLowKey = null;
3381        }
3382
3383        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
3384                throws IOException {
3385            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
3386                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
3387            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
3388                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
3389            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
3390                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
3391            }
3392        }
3393
3394        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
3395                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
3396
3397            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null);
3398            deletes.add(iterator.next());
3399        }
3400
3401        long getNextMessageId(int priority) {
3402            return nextMessageId++;
3403        }
3404
3405        MessageKeys get(Transaction tx, Long key) throws IOException {
3406            MessageKeys result = defaultPriorityIndex.get(tx, key);
3407            if (result == null) {
3408                result = highPriorityIndex.get(tx, key);
3409                if (result == null) {
3410                    result = lowPriorityIndex.get(tx, key);
3411                    lastGetPriority = LO;
3412                } else {
3413                    lastGetPriority = HI;
3414                }
3415            } else {
3416                lastGetPriority = DEF;
3417            }
3418            return result;
3419        }
3420
3421        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
3422            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
3423                return defaultPriorityIndex.put(tx, key, value);
3424            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
3425                return highPriorityIndex.put(tx, key, value);
3426            } else {
3427                return lowPriorityIndex.put(tx, key, value);
3428            }
3429        }
3430
3431        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
3432            return new MessageOrderIterator(tx,cursor,this);
3433        }
3434
3435        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
3436            return new MessageOrderIterator(tx,m,this);
3437        }
3438
3439        public byte lastGetPriority() {
3440            return lastGetPriority;
3441        }
3442
3443        public boolean alreadyDispatched(Long sequence) {
3444            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
3445                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
3446                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
3447        }
3448
3449        public void trackPendingAdd(Long seq) {
3450            synchronized (pendingAdditions) {
3451                pendingAdditions.add(seq);
3452            }
3453        }
3454
3455        public void trackPendingAddComplete(Long seq) {
3456            synchronized (pendingAdditions) {
3457                pendingAdditions.remove(seq);
3458            }
3459        }
3460
3461        public Long minPendingAdd() {
3462            synchronized (pendingAdditions) {
3463                if (!pendingAdditions.isEmpty()) {
3464                    return pendingAdditions.get(0);
3465                } else {
3466                    return null;
3467                }
3468            }
3469        }
3470
3471
3472        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
3473            Iterator<Entry<Long, MessageKeys>>currentIterator;
3474            final Iterator<Entry<Long, MessageKeys>>highIterator;
3475            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
3476            final Iterator<Entry<Long, MessageKeys>>lowIterator;
3477
3478            MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException {
3479                Long pendingAddLimiter = messageOrderIndex.minPendingAdd();
3480                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter);
3481                if (highPriorityIndex != null) {
3482                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter);
3483                } else {
3484                    this.highIterator = null;
3485                }
3486                if (lowPriorityIndex != null) {
3487                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter);
3488                } else {
3489                    this.lowIterator = null;
3490                }
3491            }
3492
3493            @Override
3494            public boolean hasNext() {
3495                if (currentIterator == null) {
3496                    if (highIterator != null) {
3497                        if (highIterator.hasNext()) {
3498                            currentIterator = highIterator;
3499                            return currentIterator.hasNext();
3500                        }
3501                        if (defaultIterator.hasNext()) {
3502                            currentIterator = defaultIterator;
3503                            return currentIterator.hasNext();
3504                        }
3505                        if (lowIterator.hasNext()) {
3506                            currentIterator = lowIterator;
3507                            return currentIterator.hasNext();
3508                        }
3509                        return false;
3510                    } else {
3511                        currentIterator = defaultIterator;
3512                        return currentIterator.hasNext();
3513                    }
3514                }
3515                if (highIterator != null) {
3516                    if (currentIterator.hasNext()) {
3517                        return true;
3518                    }
3519                    if (currentIterator == highIterator) {
3520                        if (defaultIterator.hasNext()) {
3521                            currentIterator = defaultIterator;
3522                            return currentIterator.hasNext();
3523                        }
3524                        if (lowIterator.hasNext()) {
3525                            currentIterator = lowIterator;
3526                            return currentIterator.hasNext();
3527                        }
3528                        return false;
3529                    }
3530
3531                    if (currentIterator == defaultIterator) {
3532                        if (lowIterator.hasNext()) {
3533                            currentIterator = lowIterator;
3534                            return currentIterator.hasNext();
3535                        }
3536                        return false;
3537                    }
3538                }
3539                return currentIterator.hasNext();
3540            }
3541
3542            @Override
3543            public Entry<Long, MessageKeys> next() {
3544                Entry<Long, MessageKeys> result = currentIterator.next();
3545                if (result != null) {
3546                    Long key = result.getKey();
3547                    if (highIterator != null) {
3548                        if (currentIterator == defaultIterator) {
3549                            lastDefaultKey = key;
3550                        } else if (currentIterator == highIterator) {
3551                            lastHighKey = key;
3552                        } else {
3553                            lastLowKey = key;
3554                        }
3555                    } else {
3556                        lastDefaultKey = key;
3557                    }
3558                }
3559                return result;
3560            }
3561
3562            @Override
3563            public void remove() {
3564                throw new UnsupportedOperationException();
3565            }
3566
3567        }
3568    }
3569
3570    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
3571        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
3572
3573        @Override
3574        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
3575            ByteArrayOutputStream baos = new ByteArrayOutputStream();
3576            ObjectOutputStream oout = new ObjectOutputStream(baos);
3577            oout.writeObject(object);
3578            oout.flush();
3579            oout.close();
3580            byte[] data = baos.toByteArray();
3581            dataOut.writeInt(data.length);
3582            dataOut.write(data);
3583        }
3584
3585        @Override
3586        @SuppressWarnings("unchecked")
3587        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
3588            int dataLen = dataIn.readInt();
3589            byte[] data = new byte[dataLen];
3590            dataIn.readFully(data);
3591            ByteArrayInputStream bais = new ByteArrayInputStream(data);
3592            ObjectInputStream oin = new ObjectInputStream(bais);
3593            try {
3594                return (HashSet<String>) oin.readObject();
3595            } catch (ClassNotFoundException cfe) {
3596                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
3597                ioe.initCause(cfe);
3598                throw ioe;
3599            }
3600        }
3601    }
3602
3603    public File getIndexDirectory() {
3604        return indexDirectory;
3605    }
3606
3607    public void setIndexDirectory(File indexDirectory) {
3608        this.indexDirectory = indexDirectory;
3609    }
3610
3611    interface IndexAware {
3612        public void sequenceAssignedWithIndexLocked(long index);
3613    }
3614
3615    public String getPreallocationScope() {
3616        return preallocationScope;
3617    }
3618
3619    public void setPreallocationScope(String preallocationScope) {
3620        this.preallocationScope = preallocationScope;
3621    }
3622
3623    public String getPreallocationStrategy() {
3624        return preallocationStrategy;
3625    }
3626
3627    public void setPreallocationStrategy(String preallocationStrategy) {
3628        this.preallocationStrategy = preallocationStrategy;
3629    }
3630
3631    public int getCompactAcksAfterNoGC() {
3632        return compactAcksAfterNoGC;
3633    }
3634
3635    /**
3636     * Sets the number of GC cycles where no journal logs were removed before an attempt to
3637     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
3638     * <p>
3639     * A value of -1 will disable this feature.
3640     *
3641     * @param compactAcksAfterNoGC
3642     *      Number of empty GC cycles before we rewrite old ACKS.
3643     */
3644    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
3645        this.compactAcksAfterNoGC = compactAcksAfterNoGC;
3646    }
3647
3648    /**
3649     * Returns whether Ack compaction will ignore that the store is still growing
3650     * and run more often.
3651     *
3652     * @return the compactAcksIgnoresStoreGrowth current value.
3653     */
3654    public boolean isCompactAcksIgnoresStoreGrowth() {
3655        return compactAcksIgnoresStoreGrowth;
3656    }
3657
3658    /**
3659     * Configure if Ack compaction will occur regardless of continued growth of the
3660     * journal logs meaning that the store has not run out of space yet.  Because the
3661     * compaction operation can be costly this value is defaulted to off and the Ack
3662     * compaction is only done when it seems that the store cannot grow and larger.
3663     *
3664     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
3665     */
3666    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
3667        this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
3668    }
3669
3670    /**
3671     * Returns whether Ack compaction is enabled
3672     *
3673     * @return enableAckCompaction
3674     */
3675    public boolean isEnableAckCompaction() {
3676        return enableAckCompaction;
3677    }
3678
3679    /**
3680     * Configure if the Ack compaction task should be enabled to run
3681     *
3682     * @param enableAckCompaction
3683     */
3684    public void setEnableAckCompaction(boolean enableAckCompaction) {
3685        this.enableAckCompaction = enableAckCompaction;
3686    }
3687}