001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.EOFException;
026import java.io.File;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.InterruptedIOException;
030import java.io.ObjectInputStream;
031import java.io.ObjectOutputStream;
032import java.io.OutputStream;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Date;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.LinkedHashMap;
042import java.util.LinkedHashSet;
043import java.util.LinkedList;
044import java.util.List;
045import java.util.Map;
046import java.util.Map.Entry;
047import java.util.Set;
048import java.util.SortedSet;
049import java.util.TreeMap;
050import java.util.TreeSet;
051
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.ConcurrentMap;
054import java.util.concurrent.Executors;
055import java.util.concurrent.ScheduledExecutorService;
056import java.util.concurrent.ThreadFactory;
057import java.util.concurrent.TimeUnit;
058import java.util.concurrent.atomic.AtomicBoolean;
059import java.util.concurrent.atomic.AtomicLong;
060import java.util.concurrent.atomic.AtomicReference;
061import java.util.concurrent.locks.ReentrantReadWriteLock;
062
063import org.apache.activemq.ActiveMQMessageAuditNoSync;
064import org.apache.activemq.broker.BrokerService;
065import org.apache.activemq.broker.BrokerServiceAware;
066import org.apache.activemq.command.TransactionId;
067import org.apache.activemq.openwire.OpenWireFormat;
068import org.apache.activemq.protobuf.Buffer;
069import org.apache.activemq.store.MessageStore;
070import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
071import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
072import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
073import org.apache.activemq.store.kahadb.data.KahaDestination;
074import org.apache.activemq.store.kahadb.data.KahaEntryType;
075import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
076import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
077import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
078import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
079import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
080import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
081import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
082import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
083import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
084import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
085import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
086import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
087import org.apache.activemq.store.kahadb.disk.index.ListIndex;
088import org.apache.activemq.store.kahadb.disk.journal.DataFile;
089import org.apache.activemq.store.kahadb.disk.journal.Journal;
090import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
091import org.apache.activemq.store.kahadb.disk.journal.Location;
092import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
093import org.apache.activemq.store.kahadb.disk.page.Page;
094import org.apache.activemq.store.kahadb.disk.page.PageFile;
095import org.apache.activemq.store.kahadb.disk.page.Transaction;
096import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
097import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
098import org.apache.activemq.store.kahadb.disk.util.Marshaller;
099import org.apache.activemq.store.kahadb.disk.util.Sequence;
100import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
101import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
102import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
103import org.apache.activemq.util.ByteSequence;
104import org.apache.activemq.util.DataByteArrayInputStream;
105import org.apache.activemq.util.DataByteArrayOutputStream;
106import org.apache.activemq.util.IOExceptionSupport;
107import org.apache.activemq.util.IOHelper;
108import org.apache.activemq.util.ServiceStopper;
109import org.apache.activemq.util.ServiceSupport;
110import org.apache.activemq.util.ThreadPoolUtils;
111import org.slf4j.Logger;
112import org.slf4j.LoggerFactory;
113import org.slf4j.MDC;
114
115public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
116
117    protected BrokerService brokerService;
118
119    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
120    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
121    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
122    protected static final Buffer UNMATCHED;
123    static {
124        UNMATCHED = new Buffer(new byte[]{});
125    }
126    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
127
128    static final int CLOSED_STATE = 1;
129    static final int OPEN_STATE = 2;
130    static final long NOT_ACKED = -1;
131
132    static final int VERSION = 5;
133
134    static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
135
136    protected class Metadata {
137        protected Page<Metadata> page;
138        protected int state;
139        protected BTreeIndex<String, StoredDestination> destinations;
140        protected Location lastUpdate;
141        protected Location firstInProgressTransactionLocation;
142        protected Location producerSequenceIdTrackerLocation = null;
143        protected Location ackMessageFileMapLocation = null;
144        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
145        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
146        protected int version = VERSION;
147        protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION;
148
149        public void read(DataInput is) throws IOException {
150            state = is.readInt();
151            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
152            if (is.readBoolean()) {
153                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
154            } else {
155                lastUpdate = null;
156            }
157            if (is.readBoolean()) {
158                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
159            } else {
160                firstInProgressTransactionLocation = null;
161            }
162            try {
163                if (is.readBoolean()) {
164                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
165                } else {
166                    producerSequenceIdTrackerLocation = null;
167                }
168            } catch (EOFException expectedOnUpgrade) {
169            }
170            try {
171                version = is.readInt();
172            } catch (EOFException expectedOnUpgrade) {
173                version = 1;
174            }
175            if (version >= 5 && is.readBoolean()) {
176                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
177            } else {
178                ackMessageFileMapLocation = null;
179            }
180            try {
181                openwireVersion = is.readInt();
182            } catch (EOFException expectedOnUpgrade) {
183                openwireVersion = OpenWireFormat.DEFAULT_VERSION;
184            }
185            LOG.info("KahaDB is version " + version);
186        }
187
188        public void write(DataOutput os) throws IOException {
189            os.writeInt(state);
190            os.writeLong(destinations.getPageId());
191
192            if (lastUpdate != null) {
193                os.writeBoolean(true);
194                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
195            } else {
196                os.writeBoolean(false);
197            }
198
199            if (firstInProgressTransactionLocation != null) {
200                os.writeBoolean(true);
201                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
202            } else {
203                os.writeBoolean(false);
204            }
205
206            if (producerSequenceIdTrackerLocation != null) {
207                os.writeBoolean(true);
208                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
209            } else {
210                os.writeBoolean(false);
211            }
212            os.writeInt(VERSION);
213            if (ackMessageFileMapLocation != null) {
214                os.writeBoolean(true);
215                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
216            } else {
217                os.writeBoolean(false);
218            }
219            os.writeInt(this.openwireVersion);
220        }
221    }
222
223    class MetadataMarshaller extends VariableMarshaller<Metadata> {
224        @Override
225        public Metadata readPayload(DataInput dataIn) throws IOException {
226            Metadata rc = createMetadata();
227            rc.read(dataIn);
228            return rc;
229        }
230
231        @Override
232        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
233            object.write(dataOut);
234        }
235    }
236
237    protected PageFile pageFile;
238    protected Journal journal;
239    protected Metadata metadata = new Metadata();
240
241    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
242
243    protected boolean failIfDatabaseIsLocked;
244
245    protected boolean deleteAllMessages;
246    protected File directory = DEFAULT_DIRECTORY;
247    protected File indexDirectory = null;
248    protected ScheduledExecutorService scheduler;
249    private final Object schedulerLock = new Object();
250
251    protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
252    protected boolean archiveDataLogs;
253    protected File directoryArchive;
254    protected AtomicLong journalSize = new AtomicLong(0);
255    long journalDiskSyncInterval = 1000;
256    long checkpointInterval = 5*1000;
257    long cleanupInterval = 30*1000;
258    boolean cleanupOnStop = true;
259    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
260    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
261    boolean enableIndexWriteAsync = false;
262    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
263    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
264    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
265
266    protected AtomicBoolean opened = new AtomicBoolean();
267    private boolean ignoreMissingJournalfiles = false;
268    private int indexCacheSize = 10000;
269    private boolean checkForCorruptJournalFiles = false;
270    private boolean checksumJournalFiles = true;
271    protected boolean forceRecoverIndex = false;
272
273    private boolean archiveCorruptedIndex = false;
274    private boolean useIndexLFRUEviction = false;
275    private float indexLFUEvictionFactor = 0.2f;
276    private boolean enableIndexDiskSyncs = true;
277    private boolean enableIndexRecoveryFile = true;
278    private boolean enableIndexPageCaching = true;
279    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
280
281    private boolean enableAckCompaction = false;
282    private int compactAcksAfterNoGC = 10;
283    private boolean compactAcksIgnoresStoreGrowth = false;
284    private int checkPointCyclesWithNoGC;
285    private int journalLogOnLastCompactionCheck;
286
287    //only set when using JournalDiskSyncStrategy.PERIODIC
288    protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>();
289
290    @Override
291    public void doStart() throws Exception {
292        load();
293    }
294
295    @Override
296    public void doStop(ServiceStopper stopper) throws Exception {
297        unload();
298    }
299
300    public void allowIOResumption() {
301        if (pageFile != null) {
302            pageFile.allowIOResumption();
303        }
304        if (journal != null) {
305            journal.allowIOResumption();
306        }
307    }
308
309    private void loadPageFile() throws IOException {
310        this.indexLock.writeLock().lock();
311        try {
312            final PageFile pageFile = getPageFile();
313            pageFile.load();
314            pageFile.tx().execute(new Transaction.Closure<IOException>() {
315                @Override
316                public void execute(Transaction tx) throws IOException {
317                    if (pageFile.getPageCount() == 0) {
318                        // First time this is created.. Initialize the metadata
319                        Page<Metadata> page = tx.allocate();
320                        assert page.getPageId() == 0;
321                        page.set(metadata);
322                        metadata.page = page;
323                        metadata.state = CLOSED_STATE;
324                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
325
326                        tx.store(metadata.page, metadataMarshaller, true);
327                    } else {
328                        Page<Metadata> page = tx.load(0, metadataMarshaller);
329                        metadata = page.get();
330                        metadata.page = page;
331                    }
332                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
333                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
334                    metadata.destinations.load(tx);
335                }
336            });
337            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
338            // Perhaps we should just keep an index of file
339            storedDestinations.clear();
340            pageFile.tx().execute(new Transaction.Closure<IOException>() {
341                @Override
342                public void execute(Transaction tx) throws IOException {
343                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
344                        Entry<String, StoredDestination> entry = iterator.next();
345                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
346                        storedDestinations.put(entry.getKey(), sd);
347
348                        if (checkForCorruptJournalFiles) {
349                            // sanity check the index also
350                            if (!entry.getValue().locationIndex.isEmpty(tx)) {
351                                if (entry.getValue().orderIndex.nextMessageId <= 0) {
352                                    throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
353                                }
354                            }
355                        }
356                    }
357                }
358            });
359            pageFile.flush();
360        } finally {
361            this.indexLock.writeLock().unlock();
362        }
363    }
364
365    private void startCheckpoint() {
366        if (checkpointInterval == 0 && cleanupInterval == 0) {
367            LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart");
368            return;
369        }
370        synchronized (schedulerLock) {
371            if (scheduler == null || scheduler.isShutdown()) {
372                scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
373
374                    @Override
375                    public Thread newThread(Runnable r) {
376                        Thread schedulerThread = new Thread(r);
377
378                        schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
379                        schedulerThread.setDaemon(true);
380
381                        return schedulerThread;
382                    }
383                });
384
385                // Short intervals for check-point and cleanups
386                long delay;
387                if (journal.isJournalDiskSyncPeriodic()) {
388                    delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500);
389                } else {
390                    delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
391                }
392
393                scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
394            }
395        }
396    }
397
398    private final class CheckpointRunner implements Runnable {
399
400        private long lastCheckpoint = System.currentTimeMillis();
401        private long lastCleanup = System.currentTimeMillis();
402        private long lastSync = System.currentTimeMillis();
403        private Location lastAsyncUpdate = null;
404
405        @Override
406        public void run() {
407            try {
408                // Decide on cleanup vs full checkpoint here.
409                if (opened.get()) {
410                    long now = System.currentTimeMillis();
411                    if (journal.isJournalDiskSyncPeriodic() &&
412                            journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) {
413                        Location currentUpdate = lastAsyncJournalUpdate.get();
414                        if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) {
415                            lastAsyncUpdate = currentUpdate;
416                            if (LOG.isTraceEnabled()) {
417                                LOG.trace("Writing trace command to trigger journal sync");
418                            }
419                            store(new KahaTraceCommand(), true, null, null);
420                        }
421                        lastSync = now;
422                    }
423                    if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
424                        checkpointCleanup(true);
425                        lastCleanup = now;
426                        lastCheckpoint = now;
427                    } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
428                        checkpointCleanup(false);
429                        lastCheckpoint = now;
430                    }
431                }
432            } catch (IOException ioe) {
433                LOG.error("Checkpoint failed", ioe);
434                brokerService.handleIOException(ioe);
435            } catch (Throwable e) {
436                LOG.error("Checkpoint failed", e);
437                brokerService.handleIOException(IOExceptionSupport.create(e));
438            }
439        }
440    }
441
442    public void open() throws IOException {
443        if( opened.compareAndSet(false, true) ) {
444            getJournal().start();
445            try {
446                loadPageFile();
447            } catch (Throwable t) {
448                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
449                if (LOG.isDebugEnabled()) {
450                    LOG.debug("Index load failure", t);
451                }
452                // try to recover index
453                try {
454                    pageFile.unload();
455                } catch (Exception ignore) {}
456                if (archiveCorruptedIndex) {
457                    pageFile.archive();
458                } else {
459                    pageFile.delete();
460                }
461                metadata = createMetadata();
462                pageFile = null;
463                loadPageFile();
464            }
465            recover();
466            startCheckpoint();
467        }
468    }
469
470    public void load() throws IOException {
471        this.indexLock.writeLock().lock();
472        IOHelper.mkdirs(directory);
473        try {
474            if (deleteAllMessages) {
475                getJournal().setCheckForCorruptionOnStartup(false);
476                getJournal().start();
477                getJournal().delete();
478                getJournal().close();
479                journal = null;
480                getPageFile().delete();
481                LOG.info("Persistence store purged.");
482                deleteAllMessages = false;
483            }
484
485            open();
486            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
487        } finally {
488            this.indexLock.writeLock().unlock();
489        }
490    }
491
492    public void close() throws IOException, InterruptedException {
493        if( opened.compareAndSet(true, false)) {
494            checkpointLock.writeLock().lock();
495            try {
496                if (metadata.page != null) {
497                    checkpointUpdate(getCleanupOnStop());
498                }
499                pageFile.unload();
500                metadata = createMetadata();
501            } finally {
502                checkpointLock.writeLock().unlock();
503            }
504            journal.close();
505            synchronized(schedulerLock) {
506                if (scheduler != null) {
507                    ThreadPoolUtils.shutdownGraceful(scheduler, -1);
508                    scheduler = null;
509                }
510            }
511            // clear the cache and journalSize on shutdown of the store
512            storeCache.clear();
513            journalSize.set(0);
514        }
515    }
516
517    public void unload() throws IOException, InterruptedException {
518        this.indexLock.writeLock().lock();
519        try {
520            if( pageFile != null && pageFile.isLoaded() ) {
521                metadata.state = CLOSED_STATE;
522                metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
523
524                if (metadata.page != null) {
525                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
526                        @Override
527                        public void execute(Transaction tx) throws IOException {
528                            tx.store(metadata.page, metadataMarshaller, true);
529                        }
530                    });
531                }
532            }
533        } finally {
534            this.indexLock.writeLock().unlock();
535        }
536        close();
537    }
538
539    // public for testing
540    @SuppressWarnings("rawtypes")
541    public Location[] getInProgressTxLocationRange() {
542        Location[] range = new Location[]{null, null};
543        synchronized (inflightTransactions) {
544            if (!inflightTransactions.isEmpty()) {
545                for (List<Operation> ops : inflightTransactions.values()) {
546                    if (!ops.isEmpty()) {
547                        trackMaxAndMin(range, ops);
548                    }
549                }
550            }
551            if (!preparedTransactions.isEmpty()) {
552                for (List<Operation> ops : preparedTransactions.values()) {
553                    if (!ops.isEmpty()) {
554                        trackMaxAndMin(range, ops);
555                    }
556                }
557            }
558        }
559        return range;
560    }
561
562    @SuppressWarnings("rawtypes")
563    private void trackMaxAndMin(Location[] range, List<Operation> ops) {
564        Location t = ops.get(0).getLocation();
565        if (range[0] == null || t.compareTo(range[0]) <= 0) {
566            range[0] = t;
567        }
568        t = ops.get(ops.size() -1).getLocation();
569        if (range[1] == null || t.compareTo(range[1]) >= 0) {
570            range[1] = t;
571        }
572    }
573
574    class TranInfo {
575        TransactionId id;
576        Location location;
577
578        class opCount {
579            int add;
580            int remove;
581        }
582        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
583
584        @SuppressWarnings("rawtypes")
585        public void track(Operation operation) {
586            if (location == null ) {
587                location = operation.getLocation();
588            }
589            KahaDestination destination;
590            boolean isAdd = false;
591            if (operation instanceof AddOperation) {
592                AddOperation add = (AddOperation) operation;
593                destination = add.getCommand().getDestination();
594                isAdd = true;
595            } else {
596                RemoveOperation removeOpperation = (RemoveOperation) operation;
597                destination = removeOpperation.getCommand().getDestination();
598            }
599            opCount opCount = destinationOpCount.get(destination);
600            if (opCount == null) {
601                opCount = new opCount();
602                destinationOpCount.put(destination, opCount);
603            }
604            if (isAdd) {
605                opCount.add++;
606            } else {
607                opCount.remove++;
608            }
609        }
610
611        @Override
612        public String toString() {
613           StringBuffer buffer = new StringBuffer();
614           buffer.append(location).append(";").append(id).append(";\n");
615           for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
616               buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
617           }
618           return buffer.toString();
619        }
620    }
621
622    @SuppressWarnings("rawtypes")
623    public String getTransactions() {
624
625        ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
626        synchronized (inflightTransactions) {
627            if (!inflightTransactions.isEmpty()) {
628                for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
629                    TranInfo info = new TranInfo();
630                    info.id = entry.getKey();
631                    for (Operation operation : entry.getValue()) {
632                        info.track(operation);
633                    }
634                    infos.add(info);
635                }
636            }
637        }
638        synchronized (preparedTransactions) {
639            if (!preparedTransactions.isEmpty()) {
640                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
641                    TranInfo info = new TranInfo();
642                    info.id = entry.getKey();
643                    for (Operation operation : entry.getValue()) {
644                        info.track(operation);
645                    }
646                    infos.add(info);
647                }
648            }
649        }
650        return infos.toString();
651    }
652
653    /**
654     * Move all the messages that were in the journal into long term storage. We
655     * just replay and do a checkpoint.
656     *
657     * @throws IOException
658     * @throws IOException
659     * @throws IllegalStateException
660     */
661    private void recover() throws IllegalStateException, IOException {
662        this.indexLock.writeLock().lock();
663        try {
664
665            long start = System.currentTimeMillis();
666            boolean requiresJournalReplay = recoverProducerAudit();
667            requiresJournalReplay |= recoverAckMessageFileMap();
668            Location lastIndoubtPosition = getRecoveryPosition();
669            Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition;
670            if (recoveryPosition != null) {
671                int redoCounter = 0;
672                int dataFileRotationTracker = recoveryPosition.getDataFileId();
673                LOG.info("Recovering from the journal @" + recoveryPosition);
674                while (recoveryPosition != null) {
675                    try {
676                        JournalCommand<?> message = load(recoveryPosition);
677                        metadata.lastUpdate = recoveryPosition;
678                        process(message, recoveryPosition, lastIndoubtPosition);
679                        redoCounter++;
680                    } catch (IOException failedRecovery) {
681                        if (isIgnoreMissingJournalfiles()) {
682                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
683                            // track this dud location
684                            journal.corruptRecoveryLocation(recoveryPosition);
685                        } else {
686                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
687                        }
688                    }
689                    recoveryPosition = journal.getNextLocation(recoveryPosition);
690                    // hold on to the minimum number of open files during recovery
691                    if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) {
692                        dataFileRotationTracker = recoveryPosition.getDataFileId();
693                        journal.cleanup();
694                    }
695                    if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
696                        LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
697                    }
698                }
699                if (LOG.isInfoEnabled()) {
700                    long end = System.currentTimeMillis();
701                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
702                }
703            }
704
705            // We may have to undo some index updates.
706            pageFile.tx().execute(new Transaction.Closure<IOException>() {
707                @Override
708                public void execute(Transaction tx) throws IOException {
709                    recoverIndex(tx);
710                }
711            });
712
713            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
714            Set<TransactionId> toRollback = new HashSet<TransactionId>();
715            Set<TransactionId> toDiscard = new HashSet<TransactionId>();
716            synchronized (inflightTransactions) {
717                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
718                    TransactionId id = it.next();
719                    if (id.isLocalTransaction()) {
720                        toRollback.add(id);
721                    } else {
722                        toDiscard.add(id);
723                    }
724                }
725                for (TransactionId tx: toRollback) {
726                    if (LOG.isDebugEnabled()) {
727                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
728                    }
729                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
730                }
731                for (TransactionId tx: toDiscard) {
732                    if (LOG.isDebugEnabled()) {
733                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
734                    }
735                    inflightTransactions.remove(tx);
736                }
737            }
738
739            synchronized (preparedTransactions) {
740                for (TransactionId txId : preparedTransactions.keySet()) {
741                    LOG.warn("Recovered prepared XA TX: [{}]", txId);
742                }
743            }
744
745        } finally {
746            this.indexLock.writeLock().unlock();
747        }
748    }
749
750    @SuppressWarnings("unused")
751    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
752        return TransactionIdConversion.convertToLocal(tx);
753    }
754
755    private Location minimum(Location x,
756                             Location y) {
757        Location min = null;
758        if (x != null) {
759            min = x;
760            if (y != null) {
761                int compare = y.compareTo(x);
762                if (compare < 0) {
763                    min = y;
764                }
765            }
766        } else {
767            min = y;
768        }
769        return min;
770    }
771
772    private boolean recoverProducerAudit() throws IOException {
773        boolean requiresReplay = true;
774        if (metadata.producerSequenceIdTrackerLocation != null) {
775            try {
776                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
777                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
778                int maxNumProducers = getMaxFailoverProducersToTrack();
779                int maxAuditDepth = getFailoverProducersAuditDepth();
780                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
781                metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
782                metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
783                requiresReplay = false;
784            } catch (Exception e) {
785                LOG.warn("Cannot recover message audit", e);
786            }
787        }
788        // got no audit stored so got to recreate via replay from start of the journal
789        return requiresReplay;
790    }
791
792    @SuppressWarnings("unchecked")
793    private boolean recoverAckMessageFileMap() throws IOException {
794        boolean requiresReplay = true;
795        if (metadata.ackMessageFileMapLocation != null) {
796            try {
797                KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
798                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
799                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
800                requiresReplay = false;
801            } catch (Exception e) {
802                LOG.warn("Cannot recover ackMessageFileMap", e);
803            }
804        }
805        // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
806        return requiresReplay;
807    }
808
809    protected void recoverIndex(Transaction tx) throws IOException {
810        long start = System.currentTimeMillis();
811        // It is possible index updates got applied before the journal updates..
812        // in that case we need to removed references to messages that are not in the journal
813        final Location lastAppendLocation = journal.getLastAppendLocation();
814        long undoCounter=0;
815
816        // Go through all the destinations to see if they have messages past the lastAppendLocation
817        for (StoredDestination sd : storedDestinations.values()) {
818
819            final ArrayList<Long> matches = new ArrayList<Long>();
820            // Find all the Locations that are >= than the last Append Location.
821            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
822                @Override
823                protected void matched(Location key, Long value) {
824                    matches.add(value);
825                }
826            });
827
828            for (Long sequenceId : matches) {
829                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
830                if (keys != null) {
831                    sd.locationIndex.remove(tx, keys.location);
832                    sd.messageIdIndex.remove(tx, keys.messageId);
833                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
834                    undoCounter++;
835                    // TODO: do we need to modify the ack positions for the pub sub case?
836                }
837            }
838        }
839
840        if (undoCounter > 0) {
841            // The rolledback operations are basically in flight journal writes.  To avoid getting
842            // these the end user should do sync writes to the journal.
843            if (LOG.isInfoEnabled()) {
844                long end = System.currentTimeMillis();
845                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
846            }
847        }
848
849        undoCounter = 0;
850        start = System.currentTimeMillis();
851
852        // Lets be extra paranoid here and verify that all the datafiles being referenced
853        // by the indexes still exists.
854
855        final SequenceSet ss = new SequenceSet();
856        for (StoredDestination sd : storedDestinations.values()) {
857            // Use a visitor to cut down the number of pages that we load
858            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
859                int last=-1;
860
861                @Override
862                public boolean isInterestedInKeysBetween(Location first, Location second) {
863                    if( first==null ) {
864                        return !ss.contains(0, second.getDataFileId());
865                    } else if( second==null ) {
866                        return true;
867                    } else {
868                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
869                    }
870                }
871
872                @Override
873                public void visit(List<Location> keys, List<Long> values) {
874                    for (Location l : keys) {
875                        int fileId = l.getDataFileId();
876                        if( last != fileId ) {
877                            ss.add(fileId);
878                            last = fileId;
879                        }
880                    }
881                }
882
883            });
884        }
885        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
886        while (!ss.isEmpty()) {
887            missingJournalFiles.add((int) ss.removeFirst());
888        }
889
890        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
891            missingJournalFiles.add(entry.getKey());
892            for (Integer i : entry.getValue()) {
893                missingJournalFiles.add(i);
894            }
895        }
896
897        missingJournalFiles.removeAll(journal.getFileMap().keySet());
898
899        if (!missingJournalFiles.isEmpty()) {
900            LOG.warn("Some journal files are missing: " + missingJournalFiles);
901        }
902
903        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
904        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
905        for (Integer missing : missingJournalFiles) {
906            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
907        }
908
909        if (checkForCorruptJournalFiles) {
910            Collection<DataFile> dataFiles = journal.getFileMap().values();
911            for (DataFile dataFile : dataFiles) {
912                int id = dataFile.getDataFileId();
913                // eof to next file id
914                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
915                Sequence seq = dataFile.getCorruptedBlocks().getHead();
916                while (seq != null) {
917                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
918                        new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
919                    missingPredicates.add(visitor);
920                    knownCorruption.add(visitor);
921                    seq = seq.getNext();
922                }
923            }
924        }
925
926        if (!missingPredicates.isEmpty()) {
927            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
928                final StoredDestination sd = sdEntry.getValue();
929                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>();
930                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
931                    @Override
932                    protected void matched(Location key, Long value) {
933                        matches.put(value, key);
934                    }
935                });
936
937                // If some message references are affected by the missing data files...
938                if (!matches.isEmpty()) {
939
940                    // We either 'gracefully' recover dropping the missing messages or
941                    // we error out.
942                    if( ignoreMissingJournalfiles ) {
943                        // Update the index to remove the references to the missing data
944                        for (Long sequenceId : matches.keySet()) {
945                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
946                            sd.locationIndex.remove(tx, keys.location);
947                            sd.messageIdIndex.remove(tx, keys.messageId);
948                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
949                            undoCounter++;
950                            // TODO: do we need to modify the ack positions for the pub sub case?
951                        }
952                    } else {
953                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
954                        throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
955                    }
956                }
957            }
958        }
959
960        if (!ignoreMissingJournalfiles) {
961            if (!knownCorruption.isEmpty()) {
962                LOG.error("Detected corrupt journal files. " + knownCorruption);
963                throw new IOException("Detected corrupt journal files. " + knownCorruption);
964            }
965
966            if (!missingJournalFiles.isEmpty()) {
967                LOG.error("Detected missing journal files. " + missingJournalFiles);
968                throw new IOException("Detected missing journal files. " + missingJournalFiles);
969            }
970        }
971
972        if (undoCounter > 0) {
973            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
974            // should do sync writes to the journal.
975            if (LOG.isInfoEnabled()) {
976                long end = System.currentTimeMillis();
977                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
978            }
979        }
980    }
981
982    private Location nextRecoveryPosition;
983    private Location lastRecoveryPosition;
984
985    public void incrementalRecover() throws IOException {
986        this.indexLock.writeLock().lock();
987        try {
988            if( nextRecoveryPosition == null ) {
989                if( lastRecoveryPosition==null ) {
990                    nextRecoveryPosition = getRecoveryPosition();
991                } else {
992                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
993                }
994            }
995            while (nextRecoveryPosition != null) {
996                lastRecoveryPosition = nextRecoveryPosition;
997                metadata.lastUpdate = lastRecoveryPosition;
998                JournalCommand<?> message = load(lastRecoveryPosition);
999                process(message, lastRecoveryPosition, (IndexAware) null);
1000                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
1001            }
1002        } finally {
1003            this.indexLock.writeLock().unlock();
1004        }
1005    }
1006
1007    public Location getLastUpdatePosition() throws IOException {
1008        return metadata.lastUpdate;
1009    }
1010
1011    private Location getRecoveryPosition() throws IOException {
1012
1013        if (!this.forceRecoverIndex) {
1014
1015            // If we need to recover the transactions..
1016            if (metadata.firstInProgressTransactionLocation != null) {
1017                return metadata.firstInProgressTransactionLocation;
1018            }
1019
1020            // Perhaps there were no transactions...
1021            if( metadata.lastUpdate!=null) {
1022                // Start replay at the record after the last one recorded in the index file.
1023                return getNextInitializedLocation(metadata.lastUpdate);
1024            }
1025        }
1026        // This loads the first position.
1027        return journal.getNextLocation(null);
1028    }
1029
1030    private Location getNextInitializedLocation(Location location) throws IOException {
1031        Location mayNotBeInitialized = journal.getNextLocation(location);
1032        if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) {
1033            // need to init size and type to skip
1034            return journal.getNextLocation(mayNotBeInitialized);
1035        } else {
1036            return mayNotBeInitialized;
1037        }
1038    }
1039
1040    protected void checkpointCleanup(final boolean cleanup) throws IOException {
1041        long start;
1042        this.indexLock.writeLock().lock();
1043        try {
1044            start = System.currentTimeMillis();
1045            if( !opened.get() ) {
1046                return;
1047            }
1048        } finally {
1049            this.indexLock.writeLock().unlock();
1050        }
1051        checkpointUpdate(cleanup);
1052        long end = System.currentTimeMillis();
1053        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1054            if (LOG.isInfoEnabled()) {
1055                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
1056            }
1057        }
1058    }
1059
1060    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
1061        int size = data.serializedSizeFramed();
1062        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
1063        os.writeByte(data.type().getNumber());
1064        data.writeFramed(os);
1065        return os.toByteSequence();
1066    }
1067
1068    // /////////////////////////////////////////////////////////////////
1069    // Methods call by the broker to update and query the store.
1070    // /////////////////////////////////////////////////////////////////
1071    public Location store(JournalCommand<?> data) throws IOException {
1072        return store(data, false, null,null);
1073    }
1074
1075    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
1076        return store(data, false, null, null, onJournalStoreComplete);
1077    }
1078
1079    public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException {
1080        return store(data, sync, before, after, null);
1081    }
1082
1083    /**
1084     * All updated are are funneled through this method. The updates are converted
1085     * to a JournalMessage which is logged to the journal and then the data from
1086     * the JournalMessage is used to update the index just like it would be done
1087     * during a recovery process.
1088     */
1089    public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
1090        try {
1091            ByteSequence sequence = toByteSequence(data);
1092            Location location;
1093
1094            checkpointLock.readLock().lock();
1095            try {
1096
1097                long start = System.currentTimeMillis();
1098                location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
1099                long start2 = System.currentTimeMillis();
1100                //Track the last async update so we know if we need to sync at the next checkpoint
1101                if (!sync && journal.isJournalDiskSyncPeriodic()) {
1102                    lastAsyncJournalUpdate.set(location);
1103                }
1104                process(data, location, before);
1105
1106                long end = System.currentTimeMillis();
1107                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1108                    if (LOG.isInfoEnabled()) {
1109                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
1110                    }
1111                }
1112            } finally {
1113                checkpointLock.readLock().unlock();
1114            }
1115
1116            if (after != null) {
1117                after.run();
1118            }
1119
1120            return location;
1121        } catch (IOException ioe) {
1122            LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe);
1123            brokerService.handleIOException(ioe);
1124            throw ioe;
1125        }
1126    }
1127
1128    /**
1129     * Loads a previously stored JournalMessage
1130     *
1131     * @param location
1132     * @return
1133     * @throws IOException
1134     */
1135    public JournalCommand<?> load(Location location) throws IOException {
1136        long start = System.currentTimeMillis();
1137        ByteSequence data = journal.read(location);
1138        long end = System.currentTimeMillis();
1139        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1140            if (LOG.isInfoEnabled()) {
1141                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
1142            }
1143        }
1144        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
1145        byte readByte = is.readByte();
1146        KahaEntryType type = KahaEntryType.valueOf(readByte);
1147        if( type == null ) {
1148            try {
1149                is.close();
1150            } catch (IOException e) {}
1151            throw new IOException("Could not load journal record, null type information from: " + readByte + " at location: "+location);
1152        }
1153        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
1154        message.mergeFramed(is);
1155        return message;
1156    }
1157
1158    /**
1159     * do minimal recovery till we reach the last inDoubtLocation
1160     * @param data
1161     * @param location
1162     * @param inDoubtlocation
1163     * @throws IOException
1164     */
1165    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
1166        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
1167            initMessageStore(data);
1168            process(data, location, (IndexAware) null);
1169        } else {
1170            // just recover producer audit
1171            data.visit(new Visitor() {
1172                @Override
1173                public void visit(KahaAddMessageCommand command) throws IOException {
1174                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1175                }
1176            });
1177        }
1178    }
1179
1180    private void initMessageStore(JournalCommand<?> data) throws IOException {
1181        data.visit(new Visitor() {
1182            @Override
1183            public void visit(KahaAddMessageCommand command) throws IOException {
1184                final KahaDestination destination = command.getDestination();
1185                if (!storedDestinations.containsKey(key(destination))) {
1186                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1187                        @Override
1188                        public void execute(Transaction tx) throws IOException {
1189                            getStoredDestination(destination, tx);
1190                        }
1191                    });
1192                }
1193            }
1194        });
1195    }
1196
1197    // /////////////////////////////////////////////////////////////////
1198    // Journaled record processing methods. Once the record is journaled,
1199    // these methods handle applying the index updates. These may be called
1200    // from the recovery method too so they need to be idempotent
1201    // /////////////////////////////////////////////////////////////////
1202
1203    void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException {
1204        data.visit(new Visitor() {
1205            @Override
1206            public void visit(KahaAddMessageCommand command) throws IOException {
1207                process(command, location, onSequenceAssignedCallback);
1208            }
1209
1210            @Override
1211            public void visit(KahaRemoveMessageCommand command) throws IOException {
1212                process(command, location);
1213            }
1214
1215            @Override
1216            public void visit(KahaPrepareCommand command) throws IOException {
1217                process(command, location);
1218            }
1219
1220            @Override
1221            public void visit(KahaCommitCommand command) throws IOException {
1222                process(command, location, onSequenceAssignedCallback);
1223            }
1224
1225            @Override
1226            public void visit(KahaRollbackCommand command) throws IOException {
1227                process(command, location);
1228            }
1229
1230            @Override
1231            public void visit(KahaRemoveDestinationCommand command) throws IOException {
1232                process(command, location);
1233            }
1234
1235            @Override
1236            public void visit(KahaSubscriptionCommand command) throws IOException {
1237                process(command, location);
1238            }
1239
1240            @Override
1241            public void visit(KahaProducerAuditCommand command) throws IOException {
1242                processLocation(location);
1243            }
1244
1245            @Override
1246            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
1247                processLocation(location);
1248            }
1249
1250            @Override
1251            public void visit(KahaTraceCommand command) {
1252                processLocation(location);
1253            }
1254
1255            @Override
1256            public void visit(KahaUpdateMessageCommand command) throws IOException {
1257                process(command, location);
1258            }
1259
1260            @Override
1261            public void visit(KahaRewrittenDataFileCommand command) throws IOException {
1262                process(command, location);
1263            }
1264        });
1265    }
1266
1267    @SuppressWarnings("rawtypes")
1268    protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException {
1269        if (command.hasTransactionInfo()) {
1270            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1271            inflightTx.add(new AddOperation(command, location, runWithIndexLock));
1272        } else {
1273            this.indexLock.writeLock().lock();
1274            try {
1275                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1276                    @Override
1277                    public void execute(Transaction tx) throws IOException {
1278                        long assignedIndex = updateIndex(tx, command, location);
1279                        if (runWithIndexLock != null) {
1280                            runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex);
1281                        }
1282                    }
1283                });
1284
1285            } finally {
1286                this.indexLock.writeLock().unlock();
1287            }
1288        }
1289    }
1290
1291    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
1292        this.indexLock.writeLock().lock();
1293        try {
1294            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1295                @Override
1296                public void execute(Transaction tx) throws IOException {
1297                    updateIndex(tx, command, location);
1298                }
1299            });
1300        } finally {
1301            this.indexLock.writeLock().unlock();
1302        }
1303    }
1304
1305    @SuppressWarnings("rawtypes")
1306    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1307        if (command.hasTransactionInfo()) {
1308           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1309           inflightTx.add(new RemoveOperation(command, location));
1310        } else {
1311            this.indexLock.writeLock().lock();
1312            try {
1313                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1314                    @Override
1315                    public void execute(Transaction tx) throws IOException {
1316                        updateIndex(tx, command, location);
1317                    }
1318                });
1319            } finally {
1320                this.indexLock.writeLock().unlock();
1321            }
1322        }
1323    }
1324
1325    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1326        this.indexLock.writeLock().lock();
1327        try {
1328            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1329                @Override
1330                public void execute(Transaction tx) throws IOException {
1331                    updateIndex(tx, command, location);
1332                }
1333            });
1334        } finally {
1335            this.indexLock.writeLock().unlock();
1336        }
1337    }
1338
1339    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1340        this.indexLock.writeLock().lock();
1341        try {
1342            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1343                @Override
1344                public void execute(Transaction tx) throws IOException {
1345                    updateIndex(tx, command, location);
1346                }
1347            });
1348        } finally {
1349            this.indexLock.writeLock().unlock();
1350        }
1351    }
1352
1353    protected void processLocation(final Location location) {
1354        this.indexLock.writeLock().lock();
1355        try {
1356            metadata.lastUpdate = location;
1357        } finally {
1358            this.indexLock.writeLock().unlock();
1359        }
1360    }
1361
1362    @SuppressWarnings("rawtypes")
1363    protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException {
1364        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1365        List<Operation> inflightTx;
1366        synchronized (inflightTransactions) {
1367            inflightTx = inflightTransactions.remove(key);
1368            if (inflightTx == null) {
1369                inflightTx = preparedTransactions.remove(key);
1370            }
1371        }
1372        if (inflightTx == null) {
1373            // only non persistent messages in this tx
1374            if (before != null) {
1375                before.sequenceAssignedWithIndexLocked(-1);
1376            }
1377            return;
1378        }
1379
1380        final List<Operation> messagingTx = inflightTx;
1381        indexLock.writeLock().lock();
1382        try {
1383            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1384                @Override
1385                public void execute(Transaction tx) throws IOException {
1386                    for (Operation op : messagingTx) {
1387                        op.execute(tx);
1388                        recordAckMessageReferenceLocation(location, op.getLocation());
1389                    }
1390                }
1391            });
1392            metadata.lastUpdate = location;
1393        } finally {
1394            indexLock.writeLock().unlock();
1395        }
1396    }
1397
1398    @SuppressWarnings("rawtypes")
1399    protected void process(KahaPrepareCommand command, Location location) {
1400        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1401        List<Operation> tx = null;
1402        synchronized (inflightTransactions) {
1403            tx = inflightTransactions.remove(key);
1404            if (tx != null) {
1405                preparedTransactions.put(key, tx);
1406            }
1407        }
1408        if (tx != null && !tx.isEmpty()) {
1409            indexLock.writeLock().lock();
1410            try {
1411                for (Operation op : tx) {
1412                    recordAckMessageReferenceLocation(location, op.getLocation());
1413                }
1414            } finally {
1415                indexLock.writeLock().unlock();
1416            }
1417        }
1418    }
1419
1420    @SuppressWarnings("rawtypes")
1421    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1422        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1423        List<Operation> updates = null;
1424        synchronized (inflightTransactions) {
1425            updates = inflightTransactions.remove(key);
1426            if (updates == null) {
1427                updates = preparedTransactions.remove(key);
1428            }
1429        }
1430        if (key.isXATransaction() && updates != null && !updates.isEmpty()) {
1431            indexLock.writeLock().lock();
1432            try {
1433                for (Operation op : updates) {
1434                    recordAckMessageReferenceLocation(location, op.getLocation());
1435                }
1436            } finally {
1437                indexLock.writeLock().unlock();
1438            }
1439        }
1440    }
1441
1442    protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
1443        final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1444
1445        // Mark the current journal file as a compacted file so that gc checks can skip
1446        // over logs that are smaller compaction type logs.
1447        DataFile current = journal.getDataFileById(location.getDataFileId());
1448        current.setTypeCode(command.getRewriteType());
1449
1450        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
1451            // Move offset so that next location read jumps to next file.
1452            location.setOffset(journalMaxFileLength);
1453        }
1454    }
1455
1456    // /////////////////////////////////////////////////////////////////
1457    // These methods do the actual index updates.
1458    // /////////////////////////////////////////////////////////////////
1459
1460    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1461    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1462
1463    long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1464        StoredDestination sd = getExistingStoredDestination(command.getDestination(), tx);
1465        if (sd == null) {
1466            // if the store no longer exists, skip
1467            return -1;
1468        }
1469        // Skip adding the message to the index if this is a topic and there are
1470        // no subscriptions.
1471        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1472            return -1;
1473        }
1474
1475        // Add the message.
1476        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1477        long id = sd.orderIndex.getNextMessageId();
1478        Long previous = sd.locationIndex.put(tx, location, id);
1479        if (previous == null) {
1480            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1481            if (previous == null) {
1482                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1483                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1484                    addAckLocationForNewMessage(tx, sd, id);
1485                }
1486                metadata.lastUpdate = location;
1487            } else {
1488
1489                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
1490                if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
1491                    // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
1492                    LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1493                }
1494                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1495                sd.locationIndex.remove(tx, location);
1496                id = -1;
1497            }
1498        } else {
1499            // restore the previous value.. Looks like this was a redo of a previously
1500            // added message. We don't want to assign it a new id as the other indexes would
1501            // be wrong..
1502            sd.locationIndex.put(tx, location, previous);
1503            // ensure sequence is not broken
1504            sd.orderIndex.revertNextMessageId();
1505            metadata.lastUpdate = location;
1506        }
1507        // record this id in any event, initial send or recovery
1508        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1509        return id;
1510    }
1511
1512    void trackPendingAdd(KahaDestination destination, Long seq) {
1513        StoredDestination sd = storedDestinations.get(key(destination));
1514        if (sd != null) {
1515            sd.trackPendingAdd(seq);
1516        }
1517    }
1518
1519    void trackPendingAddComplete(KahaDestination destination, Long seq) {
1520        StoredDestination sd = storedDestinations.get(key(destination));
1521        if (sd != null) {
1522            sd.trackPendingAddComplete(seq);
1523        }
1524    }
1525
1526    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
1527        KahaAddMessageCommand command = updateMessageCommand.getMessage();
1528        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1529
1530        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
1531        if (id != null) {
1532            MessageKeys previousKeys = sd.orderIndex.put(
1533                    tx,
1534                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
1535                    id,
1536                    new MessageKeys(command.getMessageId(), location)
1537            );
1538            sd.locationIndex.put(tx, location, id);
1539            // on first update previous is original location, on recovery/replay it may be the updated location
1540            if(previousKeys != null && !previousKeys.location.equals(location)) {
1541                sd.locationIndex.remove(tx, previousKeys.location);
1542            }
1543            metadata.lastUpdate = location;
1544        } else {
1545            //Add the message if it can't be found
1546            this.updateIndex(tx, command, location);
1547        }
1548    }
1549
1550    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1551        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1552        if (!command.hasSubscriptionKey()) {
1553
1554            // In the queue case we just remove the message from the index..
1555            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1556            if (sequenceId != null) {
1557                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1558                if (keys != null) {
1559                    sd.locationIndex.remove(tx, keys.location);
1560                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1561                    metadata.lastUpdate = ackLocation;
1562                }  else if (LOG.isDebugEnabled()) {
1563                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1564                }
1565            } else if (LOG.isDebugEnabled()) {
1566                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1567            }
1568        } else {
1569            // In the topic case we need remove the message once it's been acked
1570            // by all the subs
1571            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1572
1573            // Make sure it's a valid message id...
1574            if (sequence != null) {
1575                String subscriptionKey = command.getSubscriptionKey();
1576                if (command.getAck() != UNMATCHED) {
1577                    sd.orderIndex.get(tx, sequence);
1578                    byte priority = sd.orderIndex.lastGetPriority();
1579                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1580                }
1581
1582                MessageKeys keys = sd.orderIndex.get(tx, sequence);
1583                if (keys != null) {
1584                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1585                }
1586                // The following method handles deleting un-referenced messages.
1587                removeAckLocation(tx, sd, subscriptionKey, sequence);
1588                metadata.lastUpdate = ackLocation;
1589            } else if (LOG.isDebugEnabled()) {
1590                LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1591            }
1592
1593        }
1594    }
1595
1596    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1597        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1598        if (referenceFileIds == null) {
1599            referenceFileIds = new HashSet<Integer>();
1600            referenceFileIds.add(messageLocation.getDataFileId());
1601            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1602        } else {
1603            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1604            if (!referenceFileIds.contains(id)) {
1605                referenceFileIds.add(id);
1606            }
1607        }
1608    }
1609
1610    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1611        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1612        sd.orderIndex.remove(tx);
1613
1614        sd.locationIndex.clear(tx);
1615        sd.locationIndex.unload(tx);
1616        tx.free(sd.locationIndex.getPageId());
1617
1618        sd.messageIdIndex.clear(tx);
1619        sd.messageIdIndex.unload(tx);
1620        tx.free(sd.messageIdIndex.getPageId());
1621
1622        if (sd.subscriptions != null) {
1623            sd.subscriptions.clear(tx);
1624            sd.subscriptions.unload(tx);
1625            tx.free(sd.subscriptions.getPageId());
1626
1627            sd.subscriptionAcks.clear(tx);
1628            sd.subscriptionAcks.unload(tx);
1629            tx.free(sd.subscriptionAcks.getPageId());
1630
1631            sd.ackPositions.clear(tx);
1632            sd.ackPositions.unload(tx);
1633            tx.free(sd.ackPositions.getHeadPageId());
1634
1635            sd.subLocations.clear(tx);
1636            sd.subLocations.unload(tx);
1637            tx.free(sd.subLocations.getHeadPageId());
1638        }
1639
1640        String key = key(command.getDestination());
1641        storedDestinations.remove(key);
1642        metadata.destinations.remove(tx, key);
1643        storeCache.remove(key(command.getDestination()));
1644    }
1645
1646    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1647        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1648        final String subscriptionKey = command.getSubscriptionKey();
1649
1650        // If set then we are creating it.. otherwise we are destroying the sub
1651        if (command.hasSubscriptionInfo()) {
1652            Location existing = sd.subLocations.get(tx, subscriptionKey);
1653            if (existing != null && existing.compareTo(location) == 0) {
1654                // replay on recovery, ignore
1655                LOG.trace("ignoring journal replay of replay of sub from: " + location);
1656                return;
1657            }
1658
1659            sd.subscriptions.put(tx, subscriptionKey, command);
1660            sd.subLocations.put(tx, subscriptionKey, location);
1661            long ackLocation=NOT_ACKED;
1662            if (!command.getRetroactive()) {
1663                ackLocation = sd.orderIndex.nextMessageId-1;
1664            } else {
1665                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1666            }
1667            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1668            sd.subscriptionCache.add(subscriptionKey);
1669        } else {
1670            // delete the sub...
1671            sd.subscriptions.remove(tx, subscriptionKey);
1672            sd.subLocations.remove(tx, subscriptionKey);
1673            sd.subscriptionAcks.remove(tx, subscriptionKey);
1674            sd.subscriptionCache.remove(subscriptionKey);
1675            removeAckLocationsForSub(tx, sd, subscriptionKey);
1676
1677            if (sd.subscriptions.isEmpty(tx)) {
1678                // remove the stored destination
1679                KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
1680                removeDestinationCommand.setDestination(command.getDestination());
1681                updateIndex(tx, removeDestinationCommand, null);
1682            }
1683        }
1684    }
1685
1686    private void checkpointUpdate(final boolean cleanup) throws IOException {
1687        checkpointLock.writeLock().lock();
1688        try {
1689            this.indexLock.writeLock().lock();
1690            try {
1691                Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>, IOException>() {
1692                    @Override
1693                    public Set<Integer> execute(Transaction tx) throws IOException {
1694                        return checkpointUpdate(tx, cleanup);
1695                    }
1696                });
1697                pageFile.flush();
1698                // after the index update such that partial removal does not leave dangling references in the index.
1699                journal.removeDataFiles(filesToGc);
1700            } finally {
1701                this.indexLock.writeLock().unlock();
1702            }
1703
1704        } finally {
1705            checkpointLock.writeLock().unlock();
1706        }
1707    }
1708
1709    /**
1710     * @param tx
1711     * @throws IOException
1712     */
1713    Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1714        MDC.put("activemq.persistenceDir", getDirectory().getName());
1715        LOG.debug("Checkpoint started.");
1716
1717        // reflect last update exclusive of current checkpoint
1718        Location lastUpdate = metadata.lastUpdate;
1719
1720        metadata.state = OPEN_STATE;
1721        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1722        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
1723        Location[] inProgressTxRange = getInProgressTxLocationRange();
1724        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1725        tx.store(metadata.page, metadataMarshaller, true);
1726
1727        final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>();
1728        if (cleanup) {
1729
1730            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1731            gcCandidateSet.addAll(completeFileSet);
1732
1733            if (LOG.isTraceEnabled()) {
1734                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1735            }
1736
1737            if (lastUpdate != null) {
1738                // we won't delete past the last update, ackCompaction journal can be a candidate in error
1739                gcCandidateSet.removeAll(new TreeSet<Integer>(gcCandidateSet.tailSet(lastUpdate.getDataFileId())));
1740            }
1741
1742            // Don't GC files under replication
1743            if( journalFilesBeingReplicated!=null ) {
1744                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1745            }
1746
1747            if (metadata.producerSequenceIdTrackerLocation != null) {
1748                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
1749                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
1750                    // rewrite so we don't prevent gc
1751                    metadata.producerSequenceIdTracker.setModified(true);
1752                    if (LOG.isTraceEnabled()) {
1753                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
1754                    }
1755                }
1756                gcCandidateSet.remove(dataFileId);
1757                if (LOG.isTraceEnabled()) {
1758                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet);
1759                }
1760            }
1761
1762            if (metadata.ackMessageFileMapLocation != null) {
1763                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
1764                gcCandidateSet.remove(dataFileId);
1765                if (LOG.isTraceEnabled()) {
1766                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet);
1767                }
1768            }
1769
1770            // Don't GC files referenced by in-progress tx
1771            if (inProgressTxRange[0] != null) {
1772                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1773                    gcCandidateSet.remove(pendingTx);
1774                }
1775            }
1776            if (LOG.isTraceEnabled()) {
1777                LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1778            }
1779
1780            // Go through all the destinations to see if any of them can remove GC candidates.
1781            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1782                if( gcCandidateSet.isEmpty() ) {
1783                    break;
1784                }
1785
1786                // Use a visitor to cut down the number of pages that we load
1787                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1788                    int last=-1;
1789                    @Override
1790                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1791                        if( first==null ) {
1792                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1793                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1794                                subset.remove(second.getDataFileId());
1795                            }
1796                            return !subset.isEmpty();
1797                        } else if( second==null ) {
1798                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1799                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1800                                subset.remove(first.getDataFileId());
1801                            }
1802                            return !subset.isEmpty();
1803                        } else {
1804                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1805                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1806                                subset.remove(first.getDataFileId());
1807                            }
1808                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1809                                subset.remove(second.getDataFileId());
1810                            }
1811                            return !subset.isEmpty();
1812                        }
1813                    }
1814
1815                    @Override
1816                    public void visit(List<Location> keys, List<Long> values) {
1817                        for (Location l : keys) {
1818                            int fileId = l.getDataFileId();
1819                            if( last != fileId ) {
1820                                gcCandidateSet.remove(fileId);
1821                                last = fileId;
1822                            }
1823                        }
1824                    }
1825                });
1826
1827                // Durable Subscription
1828                if (entry.getValue().subLocations != null) {
1829                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
1830                    while (iter.hasNext()) {
1831                        Entry<String, Location> subscription = iter.next();
1832                        int dataFileId = subscription.getValue().getDataFileId();
1833
1834                        // Move subscription along if it has no outstanding messages that need ack'd
1835                        // and its in the last log file in the journal.
1836                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
1837                            final StoredDestination destination = entry.getValue();
1838                            final String subscriptionKey = subscription.getKey();
1839                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1840
1841                            // When pending is size one that is the next message Id meaning there
1842                            // are no pending messages currently.
1843                            if (pendingAcks == null || pendingAcks.isEmpty() ||
1844                                (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
1845
1846                                if (LOG.isTraceEnabled()) {
1847                                    LOG.trace("Found candidate for rewrite: sub {} on {} from file {}", subscriptionKey, entry.getKey(), dataFileId);
1848                                }
1849
1850                                final KahaSubscriptionCommand kahaSub =
1851                                    destination.subscriptions.get(tx, subscriptionKey);
1852                                destination.subLocations.put(
1853                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
1854
1855                                // Skips the remove from candidates if we rewrote the subscription
1856                                // in order to prevent duplicate subscription commands on recover.
1857                                // If another subscription is on the same file and isn't rewritten
1858                                // than it will remove the file from the set.
1859                                continue;
1860                            }
1861                        }
1862
1863                        if (LOG.isTraceEnabled()) {
1864                            final StoredDestination destination = entry.getValue();
1865                            final String subscriptionKey = subscription.getKey();
1866                            final SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1867                            LOG.trace("sub {} on {} in dataFile {} has pendingCount {}", subscriptionKey, entry.getKey(), dataFileId, pendingAcks.rangeSize()-1);
1868                        }
1869                        gcCandidateSet.remove(dataFileId);
1870                    }
1871                }
1872
1873                if (LOG.isTraceEnabled()) {
1874                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1875                }
1876            }
1877
1878            // check we are not deleting file with ack for in-use journal files
1879            if (LOG.isTraceEnabled()) {
1880                LOG.trace("gc candidates: " + gcCandidateSet);
1881                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
1882            }
1883
1884            boolean ackMessageFileMapMod = false;
1885            Iterator<Integer> candidates = gcCandidateSet.iterator();
1886            while (candidates.hasNext()) {
1887                Integer candidate = candidates.next();
1888                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
1889                if (referencedFileIds != null) {
1890                    for (Integer referencedFileId : referencedFileIds) {
1891                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
1892                            // active file that is not targeted for deletion is referenced so don't delete
1893                            candidates.remove();
1894                            break;
1895                        }
1896                    }
1897                    if (gcCandidateSet.contains(candidate)) {
1898                        ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
1899                    } else {
1900                        if (LOG.isTraceEnabled()) {
1901                            LOG.trace("not removing data file: " + candidate
1902                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1903                        }
1904                    }
1905                }
1906            }
1907
1908            if (!gcCandidateSet.isEmpty()) {
1909                LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
1910                for (Integer candidate : gcCandidateSet) {
1911                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
1912                        ackMessageFileMapMod |= ackFiles.remove(candidate);
1913                    }
1914                }
1915                if (ackMessageFileMapMod) {
1916                    checkpointUpdate(tx, false);
1917                }
1918            } else if (isEnableAckCompaction()) {
1919                if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
1920                    // First check length of journal to make sure it makes sense to even try.
1921                    //
1922                    // If there is only one journal file with Acks in it we don't need to move
1923                    // it since it won't be chained to any later logs.
1924                    //
1925                    // If the logs haven't grown since the last time then we need to compact
1926                    // otherwise there seems to still be room for growth and we don't need to incur
1927                    // the overhead.  Depending on configuration this check can be avoided and
1928                    // Ack compaction will run any time the store has not GC'd a journal file in
1929                    // the configured amount of cycles.
1930                    if (metadata.ackMessageFileMap.size() > 1 &&
1931                        (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
1932
1933                        LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
1934                        try {
1935                            scheduler.execute(new AckCompactionRunner());
1936                        } catch (Exception ex) {
1937                            LOG.warn("Error on queueing the Ack Compactor", ex);
1938                        }
1939                    } else {
1940                        LOG.trace("Journal activity detected, no Ack compaction scheduled.");
1941                    }
1942
1943                    checkPointCyclesWithNoGC = 0;
1944                } else {
1945                    LOG.trace("Not yet time to check for compaction: {} of {} cycles",
1946                              checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
1947                }
1948
1949                journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
1950            }
1951        }
1952        MDC.remove("activemq.persistenceDir");
1953
1954        LOG.debug("Checkpoint done.");
1955        return gcCandidateSet;
1956    }
1957
1958    private final class AckCompactionRunner implements Runnable {
1959
1960        @Override
1961        public void run() {
1962
1963            int journalToAdvance = -1;
1964            Set<Integer> journalLogsReferenced = new HashSet<Integer>();
1965
1966            //flag to know whether the ack forwarding completed without an exception
1967            boolean forwarded = false;
1968
1969            try {
1970                //acquire the checkpoint lock to prevent other threads from
1971                //running a checkpoint while this is running
1972                //
1973                //Normally this task runs on the same executor as the checkpoint task
1974                //so this ack compaction runner wouldn't run at the same time as the checkpoint task.
1975                //
1976                //However, there are two cases where this isn't always true.
1977                //First, the checkpoint() method is public and can be called through the
1978                //PersistenceAdapter interface by someone at the same time this is running.
1979                //Second, a checkpoint is called during shutdown without using the executor.
1980                //
1981                //In the future it might be better to just remove the checkpointLock entirely
1982                //and only use the executor but this would need to be examined for any unintended
1983                //consequences
1984                checkpointLock.readLock().lock();
1985
1986                try {
1987
1988                    // Lock index to capture the ackMessageFileMap data
1989                    indexLock.writeLock().lock();
1990
1991                    // Map keys might not be sorted, find the earliest log file to forward acks
1992                    // from and move only those, future cycles can chip away at more as needed.
1993                    // We won't move files that are themselves rewritten on a previous compaction.
1994                    List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
1995                    Collections.sort(journalFileIds);
1996                    for (Integer journalFileId : journalFileIds) {
1997                        DataFile current = journal.getDataFileById(journalFileId);
1998                        if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
1999                            journalToAdvance = journalFileId;
2000                            break;
2001                        }
2002                    }
2003
2004                    // Check if we found one, or if we only found the current file being written to.
2005                    if (journalToAdvance == -1 || blockedFromCompaction(journalToAdvance)) {
2006                        return;
2007                    }
2008
2009                    journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
2010
2011                } finally {
2012                    indexLock.writeLock().unlock();
2013                }
2014
2015                try {
2016                    // Background rewrite of the old acks
2017                    forwardAllAcks(journalToAdvance, journalLogsReferenced);
2018                    forwarded = true;
2019                } catch (IOException ioe) {
2020                    LOG.error("Forwarding of acks failed", ioe);
2021                    brokerService.handleIOException(ioe);
2022                } catch (Throwable e) {
2023                    LOG.error("Forwarding of acks failed", e);
2024                    brokerService.handleIOException(IOExceptionSupport.create(e));
2025                }
2026            } finally {
2027                checkpointLock.readLock().unlock();
2028            }
2029
2030            try {
2031                if (forwarded) {
2032                    // Checkpoint with changes from the ackMessageFileMap
2033                    checkpointUpdate(false);
2034                }
2035            } catch (IOException ioe) {
2036                LOG.error("Checkpoint failed", ioe);
2037                brokerService.handleIOException(ioe);
2038            } catch (Throwable e) {
2039                LOG.error("Checkpoint failed", e);
2040                brokerService.handleIOException(IOExceptionSupport.create(e));
2041            }
2042        }
2043    }
2044
2045    // called with the index lock held
2046    private boolean blockedFromCompaction(int journalToAdvance) {
2047        // don't forward the current data file
2048        if (journalToAdvance == journal.getCurrentDataFileId()) {
2049            return true;
2050        }
2051        // don't forward any data file with inflight transaction records because it will whack the tx - data file link
2052        // in the ack map when all acks are migrated (now that the ack map is not just for acks)
2053        // TODO: prepare records can be dropped but completion records (maybe only commit outcomes) need to be migrated
2054        // as part of the forward work.
2055        Location[] inProgressTxRange = getInProgressTxLocationRange();
2056        if (inProgressTxRange[0] != null) {
2057            for (int pendingTx = inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
2058                if (journalToAdvance == pendingTx) {
2059                    LOG.trace("Compaction target:{} blocked by inflight transaction records: {}", journalToAdvance, inProgressTxRange);
2060                    return true;
2061                }
2062            }
2063        }
2064        return false;
2065    }
2066
2067    private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
2068        LOG.trace("Attempting to move all acks in journal:{} to the front. Referenced files:{}", journalToRead, journalLogsReferenced);
2069
2070        DataFile forwardsFile = journal.reserveDataFile();
2071        forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
2072        LOG.trace("Reserved file for forwarded acks: {}", forwardsFile);
2073
2074        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
2075
2076        try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
2077            KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
2078            compactionMarker.setSourceDataFileId(journalToRead);
2079            compactionMarker.setRewriteType(forwardsFile.getTypeCode());
2080
2081            ByteSequence payload = toByteSequence(compactionMarker);
2082            appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2083            LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
2084
2085            final Location limit = new Location(journalToRead + 1, 0);
2086            Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0), limit);
2087            while (nextLocation != null) {
2088                JournalCommand<?> command = null;
2089                try {
2090                    command = load(nextLocation);
2091                } catch (IOException ex) {
2092                    LOG.trace("Error loading command during ack forward: {}", nextLocation);
2093                }
2094
2095                if (shouldForward(command)) {
2096                    payload = toByteSequence(command);
2097                    Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2098                    updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
2099                }
2100
2101                nextLocation = getNextLocationForAckForward(nextLocation, limit);
2102            }
2103        }
2104
2105        LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
2106
2107        // Lock index while we update the ackMessageFileMap.
2108        indexLock.writeLock().lock();
2109
2110        // Update the ack map with the new locations of the acks
2111        for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
2112            Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
2113            if (referenceFileIds == null) {
2114                referenceFileIds = new HashSet<Integer>();
2115                referenceFileIds.addAll(entry.getValue());
2116                metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
2117            } else {
2118                referenceFileIds.addAll(entry.getValue());
2119            }
2120        }
2121
2122        // remove the old location data from the ack map so that the old journal log file can
2123        // be removed on next GC.
2124        metadata.ackMessageFileMap.remove(journalToRead);
2125
2126        indexLock.writeLock().unlock();
2127
2128        LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
2129    }
2130
2131    private boolean shouldForward(JournalCommand<?> command) {
2132        boolean result = false;
2133        if (command != null) {
2134            if (command instanceof KahaRemoveMessageCommand) {
2135                result = true;
2136            } else if (command instanceof KahaCommitCommand) {
2137                KahaCommitCommand kahaCommitCommand = (KahaCommitCommand) command;
2138                if (kahaCommitCommand.hasTransactionInfo() && kahaCommitCommand.getTransactionInfo().hasXaTransactionId()) {
2139                    result = true;
2140                }
2141            }
2142        }
2143        return result;
2144    }
2145
2146    private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) {
2147        //getNextLocation() can throw an IOException, we should handle it and set
2148        //nextLocation to null and abort gracefully
2149        //Should not happen in the normal case
2150        Location location = null;
2151        try {
2152            location = journal.getNextLocation(nextLocation, limit);
2153        } catch (IOException e) {
2154            LOG.warn("Failed to load next journal location after: {}, reason: {}", nextLocation, e);
2155            if (LOG.isDebugEnabled()) {
2156                LOG.debug("Failed to load next journal location after: {}", nextLocation, e);
2157            }
2158        }
2159        return location;
2160    }
2161
2162    final Runnable nullCompletionCallback = new Runnable() {
2163        @Override
2164        public void run() {
2165        }
2166    };
2167
2168    private Location checkpointProducerAudit() throws IOException {
2169        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
2170            ByteArrayOutputStream baos = new ByteArrayOutputStream();
2171            ObjectOutputStream oout = new ObjectOutputStream(baos);
2172            oout.writeObject(metadata.producerSequenceIdTracker);
2173            oout.flush();
2174            oout.close();
2175            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2176            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
2177            try {
2178                location.getLatch().await();
2179                if (location.getException().get() != null) {
2180                    throw location.getException().get();
2181                }
2182            } catch (InterruptedException e) {
2183                throw new InterruptedIOException(e.toString());
2184            }
2185            return location;
2186        }
2187        return metadata.producerSequenceIdTrackerLocation;
2188    }
2189
2190    private Location checkpointAckMessageFileMap() throws IOException {
2191        ByteArrayOutputStream baos = new ByteArrayOutputStream();
2192        ObjectOutputStream oout = new ObjectOutputStream(baos);
2193        oout.writeObject(metadata.ackMessageFileMap);
2194        oout.flush();
2195        oout.close();
2196        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2197        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
2198        try {
2199            location.getLatch().await();
2200        } catch (InterruptedException e) {
2201            throw new InterruptedIOException(e.toString());
2202        }
2203        return location;
2204    }
2205
2206    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
2207
2208        ByteSequence sequence = toByteSequence(subscription);
2209        Location location = journal.write(sequence, nullCompletionCallback) ;
2210
2211        try {
2212            location.getLatch().await();
2213        } catch (InterruptedException e) {
2214            throw new InterruptedIOException(e.toString());
2215        }
2216        return location;
2217    }
2218
2219    public HashSet<Integer> getJournalFilesBeingReplicated() {
2220        return journalFilesBeingReplicated;
2221    }
2222
2223    // /////////////////////////////////////////////////////////////////
2224    // StoredDestination related implementation methods.
2225    // /////////////////////////////////////////////////////////////////
2226
2227    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
2228
2229    static class MessageKeys {
2230        final String messageId;
2231        final Location location;
2232
2233        public MessageKeys(String messageId, Location location) {
2234            this.messageId=messageId;
2235            this.location=location;
2236        }
2237
2238        @Override
2239        public String toString() {
2240            return "["+messageId+","+location+"]";
2241        }
2242    }
2243
2244    static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
2245        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
2246
2247        @Override
2248        public MessageKeys readPayload(DataInput dataIn) throws IOException {
2249            return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
2250        }
2251
2252        @Override
2253        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
2254            dataOut.writeUTF(object.messageId);
2255            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
2256        }
2257    }
2258
2259    class LastAck {
2260        long lastAckedSequence;
2261        byte priority;
2262
2263        public LastAck(LastAck source) {
2264            this.lastAckedSequence = source.lastAckedSequence;
2265            this.priority = source.priority;
2266        }
2267
2268        public LastAck() {
2269            this.priority = MessageOrderIndex.HI;
2270        }
2271
2272        public LastAck(long ackLocation) {
2273            this.lastAckedSequence = ackLocation;
2274            this.priority = MessageOrderIndex.LO;
2275        }
2276
2277        public LastAck(long ackLocation, byte priority) {
2278            this.lastAckedSequence = ackLocation;
2279            this.priority = priority;
2280        }
2281
2282        @Override
2283        public String toString() {
2284            return "[" + lastAckedSequence + ":" + priority + "]";
2285        }
2286    }
2287
2288    protected class LastAckMarshaller implements Marshaller<LastAck> {
2289
2290        @Override
2291        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
2292            dataOut.writeLong(object.lastAckedSequence);
2293            dataOut.writeByte(object.priority);
2294        }
2295
2296        @Override
2297        public LastAck readPayload(DataInput dataIn) throws IOException {
2298            LastAck lastAcked = new LastAck();
2299            lastAcked.lastAckedSequence = dataIn.readLong();
2300            if (metadata.version >= 3) {
2301                lastAcked.priority = dataIn.readByte();
2302            }
2303            return lastAcked;
2304        }
2305
2306        @Override
2307        public int getFixedSize() {
2308            return 9;
2309        }
2310
2311        @Override
2312        public LastAck deepCopy(LastAck source) {
2313            return new LastAck(source);
2314        }
2315
2316        @Override
2317        public boolean isDeepCopySupported() {
2318            return true;
2319        }
2320    }
2321
2322    class StoredDestination {
2323
2324        MessageOrderIndex orderIndex = new MessageOrderIndex();
2325        BTreeIndex<Location, Long> locationIndex;
2326        BTreeIndex<String, Long> messageIdIndex;
2327
2328        // These bits are only set for Topics
2329        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
2330        BTreeIndex<String, LastAck> subscriptionAcks;
2331        HashMap<String, MessageOrderCursor> subscriptionCursors;
2332        ListIndex<String, SequenceSet> ackPositions;
2333        ListIndex<String, Location> subLocations;
2334
2335        // Transient data used to track which Messages are no longer needed.
2336        final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
2337
2338        public void trackPendingAdd(Long seq) {
2339            orderIndex.trackPendingAdd(seq);
2340        }
2341
2342        public void trackPendingAddComplete(Long seq) {
2343            orderIndex.trackPendingAddComplete(seq);
2344        }
2345
2346        @Override
2347        public String toString() {
2348            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
2349        }
2350    }
2351
2352    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
2353
2354        @Override
2355        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
2356            final StoredDestination value = new StoredDestination();
2357            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2358            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
2359            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
2360
2361            if (dataIn.readBoolean()) {
2362                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
2363                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
2364                if (metadata.version >= 4) {
2365                    value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
2366                } else {
2367                    // upgrade
2368                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2369                        @Override
2370                        public void execute(Transaction tx) throws IOException {
2371                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
2372
2373                            if (metadata.version >= 3) {
2374                                // migrate
2375                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
2376                                        new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
2377                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
2378                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
2379                                oldAckPositions.load(tx);
2380
2381
2382                                // Do the initial build of the data in memory before writing into the store
2383                                // based Ack Positions List to avoid a lot of disk thrashing.
2384                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
2385                                while (iterator.hasNext()) {
2386                                    Entry<Long, HashSet<String>> entry = iterator.next();
2387
2388                                    for(String subKey : entry.getValue()) {
2389                                        SequenceSet pendingAcks = temp.get(subKey);
2390                                        if (pendingAcks == null) {
2391                                            pendingAcks = new SequenceSet();
2392                                            temp.put(subKey, pendingAcks);
2393                                        }
2394
2395                                        pendingAcks.add(entry.getKey());
2396                                    }
2397                                }
2398                            }
2399                            // Now move the pending messages to ack data into the store backed
2400                            // structure.
2401                            value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2402                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2403                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2404                            value.ackPositions.load(tx);
2405                            for(String subscriptionKey : temp.keySet()) {
2406                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
2407                            }
2408
2409                        }
2410                    });
2411                }
2412
2413                if (metadata.version >= 5) {
2414                    value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
2415                } else {
2416                    // upgrade
2417                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2418                        @Override
2419                        public void execute(Transaction tx) throws IOException {
2420                            value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2421                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2422                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2423                            value.subLocations.load(tx);
2424                        }
2425                    });
2426                }
2427            }
2428            if (metadata.version >= 2) {
2429                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2430                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2431            } else {
2432                // upgrade
2433                pageFile.tx().execute(new Transaction.Closure<IOException>() {
2434                    @Override
2435                    public void execute(Transaction tx) throws IOException {
2436                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2437                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2438                        value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2439                        value.orderIndex.lowPriorityIndex.load(tx);
2440
2441                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2442                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2443                        value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2444                        value.orderIndex.highPriorityIndex.load(tx);
2445                    }
2446                });
2447            }
2448
2449            return value;
2450        }
2451
2452        @Override
2453        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
2454            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
2455            dataOut.writeLong(value.locationIndex.getPageId());
2456            dataOut.writeLong(value.messageIdIndex.getPageId());
2457            if (value.subscriptions != null) {
2458                dataOut.writeBoolean(true);
2459                dataOut.writeLong(value.subscriptions.getPageId());
2460                dataOut.writeLong(value.subscriptionAcks.getPageId());
2461                dataOut.writeLong(value.ackPositions.getHeadPageId());
2462                dataOut.writeLong(value.subLocations.getHeadPageId());
2463            } else {
2464                dataOut.writeBoolean(false);
2465            }
2466            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
2467            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
2468        }
2469    }
2470
2471    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
2472        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
2473
2474        @Override
2475        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
2476            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
2477            rc.mergeFramed((InputStream)dataIn);
2478            return rc;
2479        }
2480
2481        @Override
2482        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
2483            object.writeFramed((OutputStream)dataOut);
2484        }
2485    }
2486
2487    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2488        String key = key(destination);
2489        StoredDestination rc = storedDestinations.get(key);
2490        if (rc == null) {
2491            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
2492            rc = loadStoredDestination(tx, key, topic);
2493            // Cache it. We may want to remove/unload destinations from the
2494            // cache that are not used for a while
2495            // to reduce memory usage.
2496            storedDestinations.put(key, rc);
2497        }
2498        return rc;
2499    }
2500
2501    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2502        String key = key(destination);
2503        StoredDestination rc = storedDestinations.get(key);
2504        if (rc == null && metadata.destinations.containsKey(tx, key)) {
2505            rc = getStoredDestination(destination, tx);
2506        }
2507        return rc;
2508    }
2509
2510    /**
2511     * @param tx
2512     * @param key
2513     * @param topic
2514     * @return
2515     * @throws IOException
2516     */
2517    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
2518        // Try to load the existing indexes..
2519        StoredDestination rc = metadata.destinations.get(tx, key);
2520        if (rc == null) {
2521            // Brand new destination.. allocate indexes for it.
2522            rc = new StoredDestination();
2523            rc.orderIndex.allocate(tx);
2524            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
2525            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
2526
2527            if (topic) {
2528                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
2529                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
2530                rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2531                rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2532            }
2533            metadata.destinations.put(tx, key, rc);
2534        }
2535
2536        // Configure the marshalers and load.
2537        rc.orderIndex.load(tx);
2538
2539        // Figure out the next key using the last entry in the destination.
2540        rc.orderIndex.configureLast(tx);
2541
2542        rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE);
2543        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2544        rc.locationIndex.load(tx);
2545
2546        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
2547        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2548        rc.messageIdIndex.load(tx);
2549
2550        // If it was a topic...
2551        if (topic) {
2552
2553            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
2554            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
2555            rc.subscriptions.load(tx);
2556
2557            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
2558            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
2559            rc.subscriptionAcks.load(tx);
2560
2561            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2562            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2563            rc.ackPositions.load(tx);
2564
2565            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2566            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2567            rc.subLocations.load(tx);
2568
2569            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
2570
2571            if (metadata.version < 3) {
2572
2573                // on upgrade need to fill ackLocation with available messages past last ack
2574                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2575                    Entry<String, LastAck> entry = iterator.next();
2576                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
2577                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
2578                        Long sequence = orderIterator.next().getKey();
2579                        addAckLocation(tx, rc, sequence, entry.getKey());
2580                    }
2581                    // modify so it is upgraded
2582                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
2583                }
2584            }
2585
2586
2587            // Configure the subscription cache
2588            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2589                Entry<String, LastAck> entry = iterator.next();
2590                rc.subscriptionCache.add(entry.getKey());
2591            }
2592
2593            if (rc.orderIndex.nextMessageId == 0) {
2594                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
2595                if (!rc.subscriptionAcks.isEmpty(tx)) {
2596                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
2597                        Entry<String, LastAck> entry = iterator.next();
2598                        rc.orderIndex.nextMessageId =
2599                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
2600                    }
2601                }
2602            } else {
2603                // update based on ackPositions for unmatched, last entry is always the next
2604                Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
2605                while (subscriptions.hasNext()) {
2606                    Entry<String, SequenceSet> subscription = subscriptions.next();
2607                    SequenceSet pendingAcks = subscription.getValue();
2608                    if (pendingAcks != null && !pendingAcks.isEmpty()) {
2609                        for (Long sequenceId : pendingAcks) {
2610                            rc.orderIndex.nextMessageId = Math.max(rc.orderIndex.nextMessageId, sequenceId);
2611                        }
2612                    }
2613                }
2614            }
2615        }
2616
2617        if (metadata.version < VERSION) {
2618            // store again after upgrade
2619            metadata.destinations.put(tx, key, rc);
2620        }
2621        return rc;
2622    }
2623
2624    /**
2625     * This is a map to cache MessageStores for a specific
2626     * KahaDestination key
2627     */
2628    protected final ConcurrentMap<String, MessageStore> storeCache =
2629       new ConcurrentHashMap<>();
2630
2631    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
2632        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2633        if (sequences == null) {
2634            sequences = new SequenceSet();
2635            sequences.add(messageSequence);
2636            sd.ackPositions.add(tx, subscriptionKey, sequences);
2637        } else {
2638            sequences.add(messageSequence);
2639            sd.ackPositions.put(tx, subscriptionKey, sequences);
2640        }
2641    }
2642
2643    // new sub is interested in potentially all existing messages
2644    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2645        SequenceSet allOutstanding = new SequenceSet();
2646        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
2647        while (iterator.hasNext()) {
2648            SequenceSet set = iterator.next().getValue();
2649            for (Long entry : set) {
2650                allOutstanding.add(entry);
2651            }
2652        }
2653        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
2654    }
2655
2656    // on a new message add, all existing subs are interested in this message
2657    private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
2658        for(String subscriptionKey : sd.subscriptionCache) {
2659            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2660            if (sequences == null) {
2661                sequences = new SequenceSet();
2662                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2663                sd.ackPositions.add(tx, subscriptionKey, sequences);
2664            } else {
2665                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2666                sd.ackPositions.put(tx, subscriptionKey, sequences);
2667            }
2668       }
2669    }
2670
2671    private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2672        if (!sd.ackPositions.isEmpty(tx)) {
2673            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2674            if (sequences == null || sequences.isEmpty()) {
2675                return;
2676            }
2677
2678            ArrayList<Long> unreferenced = new ArrayList<Long>();
2679
2680            for(Long sequenceId : sequences) {
2681                if(!isSequenceReferenced(tx, sd, sequenceId)) {
2682                    unreferenced.add(sequenceId);
2683                }
2684            }
2685
2686            for(Long sequenceId : unreferenced) {
2687                // Find all the entries that need to get deleted.
2688                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2689                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2690
2691                // Do the actual deletes.
2692                for (Entry<Long, MessageKeys> entry : deletes) {
2693                    sd.locationIndex.remove(tx, entry.getValue().location);
2694                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2695                    sd.orderIndex.remove(tx, entry.getKey());
2696                }
2697            }
2698        }
2699    }
2700
2701    private boolean isSequenceReferenced(final Transaction tx, final StoredDestination sd, final Long sequenceId) throws IOException {
2702        for(String subscriptionKey : sd.subscriptionCache) {
2703            SequenceSet sequence = sd.ackPositions.get(tx, subscriptionKey);
2704            if (sequence != null && sequence.contains(sequenceId)) {
2705                return true;
2706            }
2707        }
2708        return false;
2709    }
2710
2711    /**
2712     * @param tx
2713     * @param sd
2714     * @param subscriptionKey
2715     * @param messageSequence
2716     * @throws IOException
2717     */
2718    private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
2719        // Remove the sub from the previous location set..
2720        if (messageSequence != null) {
2721            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2722            if (range != null && !range.isEmpty()) {
2723                range.remove(messageSequence);
2724                if (!range.isEmpty()) {
2725                    sd.ackPositions.put(tx, subscriptionKey, range);
2726                } else {
2727                    sd.ackPositions.remove(tx, subscriptionKey);
2728                }
2729
2730                // Check if the message is reference by any other subscription.
2731                if (isSequenceReferenced(tx, sd, messageSequence)) {
2732                    return;
2733                }
2734                // Find all the entries that need to get deleted.
2735                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2736                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2737
2738                // Do the actual deletes.
2739                for (Entry<Long, MessageKeys> entry : deletes) {
2740                    sd.locationIndex.remove(tx, entry.getValue().location);
2741                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2742                    sd.orderIndex.remove(tx, entry.getKey());
2743                }
2744            }
2745        }
2746    }
2747
2748    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2749        return sd.subscriptionAcks.get(tx, subscriptionKey);
2750    }
2751
2752    protected SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2753        if (sd.ackPositions != null) {
2754            final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2755            return messageSequences;
2756        }
2757
2758        return null;
2759    }
2760
2761    protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2762        if (sd.ackPositions != null) {
2763            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2764            if (messageSequences != null) {
2765                long result = messageSequences.rangeSize();
2766                // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2767                return result > 0 ? result - 1 : 0;
2768            }
2769        }
2770
2771        return 0;
2772    }
2773
2774    protected String key(KahaDestination destination) {
2775        return destination.getType().getNumber() + ":" + destination.getName();
2776    }
2777
2778    // /////////////////////////////////////////////////////////////////
2779    // Transaction related implementation methods.
2780    // /////////////////////////////////////////////////////////////////
2781    @SuppressWarnings("rawtypes")
2782    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2783    @SuppressWarnings("rawtypes")
2784    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2785
2786    @SuppressWarnings("rawtypes")
2787    private List<Operation> getInflightTx(KahaTransactionInfo info) {
2788        TransactionId key = TransactionIdConversion.convert(info);
2789        List<Operation> tx;
2790        synchronized (inflightTransactions) {
2791            tx = inflightTransactions.get(key);
2792            if (tx == null) {
2793                tx = Collections.synchronizedList(new ArrayList<Operation>());
2794                inflightTransactions.put(key, tx);
2795            }
2796        }
2797        return tx;
2798    }
2799
2800    @SuppressWarnings("unused")
2801    private TransactionId key(KahaTransactionInfo transactionInfo) {
2802        return TransactionIdConversion.convert(transactionInfo);
2803    }
2804
2805    abstract class Operation <T extends JournalCommand<T>> {
2806        final T command;
2807        final Location location;
2808
2809        public Operation(T command, Location location) {
2810            this.command = command;
2811            this.location = location;
2812        }
2813
2814        public Location getLocation() {
2815            return location;
2816        }
2817
2818        public T getCommand() {
2819            return command;
2820        }
2821
2822        abstract public void execute(Transaction tx) throws IOException;
2823    }
2824
2825    class AddOperation extends Operation<KahaAddMessageCommand> {
2826        final IndexAware runWithIndexLock;
2827        public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) {
2828            super(command, location);
2829            this.runWithIndexLock = runWithIndexLock;
2830        }
2831
2832        @Override
2833        public void execute(Transaction tx) throws IOException {
2834            long seq = updateIndex(tx, command, location);
2835            if (runWithIndexLock != null) {
2836                runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
2837            }
2838        }
2839    }
2840
2841    class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
2842
2843        public RemoveOperation(KahaRemoveMessageCommand command, Location location) {
2844            super(command, location);
2845        }
2846
2847        @Override
2848        public void execute(Transaction tx) throws IOException {
2849            updateIndex(tx, command, location);
2850        }
2851    }
2852
2853    // /////////////////////////////////////////////////////////////////
2854    // Initialization related implementation methods.
2855    // /////////////////////////////////////////////////////////////////
2856
2857    private PageFile createPageFile() throws IOException {
2858        if (indexDirectory == null) {
2859            indexDirectory = directory;
2860        }
2861        IOHelper.mkdirs(indexDirectory);
2862        PageFile index = new PageFile(indexDirectory, "db");
2863        index.setEnableWriteThread(isEnableIndexWriteAsync());
2864        index.setWriteBatchSize(getIndexWriteBatchSize());
2865        index.setPageCacheSize(indexCacheSize);
2866        index.setUseLFRUEviction(isUseIndexLFRUEviction());
2867        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2868        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2869        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2870        index.setEnablePageCaching(isEnableIndexPageCaching());
2871        return index;
2872    }
2873
2874    protected Journal createJournal() throws IOException {
2875        Journal manager = new Journal();
2876        manager.setDirectory(directory);
2877        manager.setMaxFileLength(getJournalMaxFileLength());
2878        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2879        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2880        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2881        manager.setArchiveDataLogs(isArchiveDataLogs());
2882        manager.setSizeAccumulator(journalSize);
2883        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2884        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
2885        manager.setPreallocationStrategy(
2886                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
2887        manager.setJournalDiskSyncStrategy(journalDiskSyncStrategy);
2888        if (getDirectoryArchive() != null) {
2889            IOHelper.mkdirs(getDirectoryArchive());
2890            manager.setDirectoryArchive(getDirectoryArchive());
2891        }
2892        return manager;
2893    }
2894
2895    private Metadata createMetadata() {
2896        Metadata md = new Metadata();
2897        md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
2898        md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
2899        return md;
2900    }
2901
2902    public int getJournalMaxWriteBatchSize() {
2903        return journalMaxWriteBatchSize;
2904    }
2905
2906    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2907        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2908    }
2909
2910    public File getDirectory() {
2911        return directory;
2912    }
2913
2914    public void setDirectory(File directory) {
2915        this.directory = directory;
2916    }
2917
2918    public boolean isDeleteAllMessages() {
2919        return deleteAllMessages;
2920    }
2921
2922    public void setDeleteAllMessages(boolean deleteAllMessages) {
2923        this.deleteAllMessages = deleteAllMessages;
2924    }
2925
2926    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2927        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2928    }
2929
2930    public int getIndexWriteBatchSize() {
2931        return setIndexWriteBatchSize;
2932    }
2933
2934    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2935        this.enableIndexWriteAsync = enableIndexWriteAsync;
2936    }
2937
2938    boolean isEnableIndexWriteAsync() {
2939        return enableIndexWriteAsync;
2940    }
2941
2942    /**
2943     * @deprecated use {@link #getJournalDiskSyncStrategyEnum} or {@link #getJournalDiskSyncStrategy} instead
2944     * @return
2945     */
2946    public boolean isEnableJournalDiskSyncs() {
2947        return journalDiskSyncStrategy == JournalDiskSyncStrategy.ALWAYS;
2948    }
2949
2950    /**
2951     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
2952     * @param syncWrites
2953     */
2954    public void setEnableJournalDiskSyncs(boolean syncWrites) {
2955        if (syncWrites) {
2956            journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
2957        } else {
2958            journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER;
2959        }
2960    }
2961
2962    public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() {
2963        return journalDiskSyncStrategy;
2964    }
2965
2966    public String getJournalDiskSyncStrategy() {
2967        return journalDiskSyncStrategy.name();
2968    }
2969
2970    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
2971        this.journalDiskSyncStrategy = JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase());
2972    }
2973
2974    public long getJournalDiskSyncInterval() {
2975        return journalDiskSyncInterval;
2976    }
2977
2978    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
2979        this.journalDiskSyncInterval = journalDiskSyncInterval;
2980    }
2981
2982    public long getCheckpointInterval() {
2983        return checkpointInterval;
2984    }
2985
2986    public void setCheckpointInterval(long checkpointInterval) {
2987        this.checkpointInterval = checkpointInterval;
2988    }
2989
2990    public long getCleanupInterval() {
2991        return cleanupInterval;
2992    }
2993
2994    public void setCleanupInterval(long cleanupInterval) {
2995        this.cleanupInterval = cleanupInterval;
2996    }
2997
2998    public boolean getCleanupOnStop() {
2999        return cleanupOnStop;
3000    }
3001
3002    public void setCleanupOnStop(boolean cleanupOnStop) {
3003        this.cleanupOnStop = cleanupOnStop;
3004    }
3005
3006    public void setJournalMaxFileLength(int journalMaxFileLength) {
3007        this.journalMaxFileLength = journalMaxFileLength;
3008    }
3009
3010    public int getJournalMaxFileLength() {
3011        return journalMaxFileLength;
3012    }
3013
3014    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
3015        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
3016    }
3017
3018    public int getMaxFailoverProducersToTrack() {
3019        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
3020    }
3021
3022    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
3023        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
3024    }
3025
3026    public int getFailoverProducersAuditDepth() {
3027        return this.metadata.producerSequenceIdTracker.getAuditDepth();
3028    }
3029
3030    public PageFile getPageFile() throws IOException {
3031        if (pageFile == null) {
3032            pageFile = createPageFile();
3033        }
3034        return pageFile;
3035    }
3036
3037    public Journal getJournal() throws IOException {
3038        if (journal == null) {
3039            journal = createJournal();
3040        }
3041        return journal;
3042    }
3043
3044    protected Metadata getMetadata() {
3045        return metadata;
3046    }
3047
3048    public boolean isFailIfDatabaseIsLocked() {
3049        return failIfDatabaseIsLocked;
3050    }
3051
3052    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
3053        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
3054    }
3055
3056    public boolean isIgnoreMissingJournalfiles() {
3057        return ignoreMissingJournalfiles;
3058    }
3059
3060    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
3061        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
3062    }
3063
3064    public int getIndexCacheSize() {
3065        return indexCacheSize;
3066    }
3067
3068    public void setIndexCacheSize(int indexCacheSize) {
3069        this.indexCacheSize = indexCacheSize;
3070    }
3071
3072    public boolean isCheckForCorruptJournalFiles() {
3073        return checkForCorruptJournalFiles;
3074    }
3075
3076    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
3077        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
3078    }
3079
3080    public boolean isChecksumJournalFiles() {
3081        return checksumJournalFiles;
3082    }
3083
3084    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
3085        this.checksumJournalFiles = checksumJournalFiles;
3086    }
3087
3088    @Override
3089    public void setBrokerService(BrokerService brokerService) {
3090        this.brokerService = brokerService;
3091    }
3092
3093    /**
3094     * @return the archiveDataLogs
3095     */
3096    public boolean isArchiveDataLogs() {
3097        return this.archiveDataLogs;
3098    }
3099
3100    /**
3101     * @param archiveDataLogs the archiveDataLogs to set
3102     */
3103    public void setArchiveDataLogs(boolean archiveDataLogs) {
3104        this.archiveDataLogs = archiveDataLogs;
3105    }
3106
3107    /**
3108     * @return the directoryArchive
3109     */
3110    public File getDirectoryArchive() {
3111        return this.directoryArchive;
3112    }
3113
3114    /**
3115     * @param directoryArchive the directoryArchive to set
3116     */
3117    public void setDirectoryArchive(File directoryArchive) {
3118        this.directoryArchive = directoryArchive;
3119    }
3120
3121    public boolean isArchiveCorruptedIndex() {
3122        return archiveCorruptedIndex;
3123    }
3124
3125    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
3126        this.archiveCorruptedIndex = archiveCorruptedIndex;
3127    }
3128
3129    public float getIndexLFUEvictionFactor() {
3130        return indexLFUEvictionFactor;
3131    }
3132
3133    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
3134        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
3135    }
3136
3137    public boolean isUseIndexLFRUEviction() {
3138        return useIndexLFRUEviction;
3139    }
3140
3141    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
3142        this.useIndexLFRUEviction = useIndexLFRUEviction;
3143    }
3144
3145    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
3146        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
3147    }
3148
3149    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
3150        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
3151    }
3152
3153    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
3154        this.enableIndexPageCaching = enableIndexPageCaching;
3155    }
3156
3157    public boolean isEnableIndexDiskSyncs() {
3158        return enableIndexDiskSyncs;
3159    }
3160
3161    public boolean isEnableIndexRecoveryFile() {
3162        return enableIndexRecoveryFile;
3163    }
3164
3165    public boolean isEnableIndexPageCaching() {
3166        return enableIndexPageCaching;
3167    }
3168
3169    // /////////////////////////////////////////////////////////////////
3170    // Internal conversion methods.
3171    // /////////////////////////////////////////////////////////////////
3172
3173    class MessageOrderCursor{
3174        long defaultCursorPosition;
3175        long lowPriorityCursorPosition;
3176        long highPriorityCursorPosition;
3177        MessageOrderCursor(){
3178        }
3179
3180        MessageOrderCursor(long position){
3181            this.defaultCursorPosition=position;
3182            this.lowPriorityCursorPosition=position;
3183            this.highPriorityCursorPosition=position;
3184        }
3185
3186        MessageOrderCursor(MessageOrderCursor other){
3187            this.defaultCursorPosition=other.defaultCursorPosition;
3188            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3189            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3190        }
3191
3192        MessageOrderCursor copy() {
3193            return new MessageOrderCursor(this);
3194        }
3195
3196        void reset() {
3197            this.defaultCursorPosition=0;
3198            this.highPriorityCursorPosition=0;
3199            this.lowPriorityCursorPosition=0;
3200        }
3201
3202        void increment() {
3203            if (defaultCursorPosition!=0) {
3204                defaultCursorPosition++;
3205            }
3206            if (highPriorityCursorPosition!=0) {
3207                highPriorityCursorPosition++;
3208            }
3209            if (lowPriorityCursorPosition!=0) {
3210                lowPriorityCursorPosition++;
3211            }
3212        }
3213
3214        @Override
3215        public String toString() {
3216           return "MessageOrderCursor:[def:" + defaultCursorPosition
3217                   + ", low:" + lowPriorityCursorPosition
3218                   + ", high:" +  highPriorityCursorPosition + "]";
3219        }
3220
3221        public void sync(MessageOrderCursor other) {
3222            this.defaultCursorPosition=other.defaultCursorPosition;
3223            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3224            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3225        }
3226    }
3227
3228    class MessageOrderIndex {
3229        static final byte HI = 9;
3230        static final byte LO = 0;
3231        static final byte DEF = 4;
3232
3233        long nextMessageId;
3234        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
3235        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
3236        BTreeIndex<Long, MessageKeys> highPriorityIndex;
3237        final MessageOrderCursor cursor = new MessageOrderCursor();
3238        Long lastDefaultKey;
3239        Long lastHighKey;
3240        Long lastLowKey;
3241        byte lastGetPriority;
3242        final List<Long> pendingAdditions = new LinkedList<Long>();
3243
3244        MessageKeys remove(Transaction tx, Long key) throws IOException {
3245            MessageKeys result = defaultPriorityIndex.remove(tx, key);
3246            if (result == null && highPriorityIndex!=null) {
3247                result = highPriorityIndex.remove(tx, key);
3248                if (result ==null && lowPriorityIndex!=null) {
3249                    result = lowPriorityIndex.remove(tx, key);
3250                }
3251            }
3252            return result;
3253        }
3254
3255        void load(Transaction tx) throws IOException {
3256            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3257            defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
3258            defaultPriorityIndex.load(tx);
3259            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3260            lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
3261            lowPriorityIndex.load(tx);
3262            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3263            highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
3264            highPriorityIndex.load(tx);
3265        }
3266
3267        void allocate(Transaction tx) throws IOException {
3268            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3269            if (metadata.version >= 2) {
3270                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3271                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3272            }
3273        }
3274
3275        void configureLast(Transaction tx) throws IOException {
3276            // Figure out the next key using the last entry in the destination.
3277            TreeSet<Long> orderedSet = new TreeSet<Long>();
3278
3279            addLast(orderedSet, highPriorityIndex, tx);
3280            addLast(orderedSet, defaultPriorityIndex, tx);
3281            addLast(orderedSet, lowPriorityIndex, tx);
3282
3283            if (!orderedSet.isEmpty()) {
3284                nextMessageId = orderedSet.last() + 1;
3285            }
3286        }
3287
3288        private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException {
3289            if (index != null) {
3290                Entry<Long, MessageKeys> lastEntry = index.getLast(tx);
3291                if (lastEntry != null) {
3292                    orderedSet.add(lastEntry.getKey());
3293                }
3294            }
3295        }
3296
3297        void clear(Transaction tx) throws IOException {
3298            this.remove(tx);
3299            this.resetCursorPosition();
3300            this.allocate(tx);
3301            this.load(tx);
3302            this.configureLast(tx);
3303        }
3304
3305        void remove(Transaction tx) throws IOException {
3306            defaultPriorityIndex.clear(tx);
3307            defaultPriorityIndex.unload(tx);
3308            tx.free(defaultPriorityIndex.getPageId());
3309            if (lowPriorityIndex != null) {
3310                lowPriorityIndex.clear(tx);
3311                lowPriorityIndex.unload(tx);
3312
3313                tx.free(lowPriorityIndex.getPageId());
3314            }
3315            if (highPriorityIndex != null) {
3316                highPriorityIndex.clear(tx);
3317                highPriorityIndex.unload(tx);
3318                tx.free(highPriorityIndex.getPageId());
3319            }
3320        }
3321
3322        void resetCursorPosition() {
3323            this.cursor.reset();
3324            lastDefaultKey = null;
3325            lastHighKey = null;
3326            lastLowKey = null;
3327        }
3328
3329        void setBatch(Transaction tx, Long sequence) throws IOException {
3330            if (sequence != null) {
3331                Long nextPosition = new Long(sequence.longValue() + 1);
3332                lastDefaultKey = sequence;
3333                cursor.defaultCursorPosition = nextPosition.longValue();
3334                lastHighKey = sequence;
3335                cursor.highPriorityCursorPosition = nextPosition.longValue();
3336                lastLowKey = sequence;
3337                cursor.lowPriorityCursorPosition = nextPosition.longValue();
3338            }
3339        }
3340
3341        void setBatch(Transaction tx, LastAck last) throws IOException {
3342            setBatch(tx, last.lastAckedSequence);
3343            if (cursor.defaultCursorPosition == 0
3344                    && cursor.highPriorityCursorPosition == 0
3345                    && cursor.lowPriorityCursorPosition == 0) {
3346                long next = last.lastAckedSequence + 1;
3347                switch (last.priority) {
3348                    case DEF:
3349                        cursor.defaultCursorPosition = next;
3350                        cursor.highPriorityCursorPosition = next;
3351                        break;
3352                    case HI:
3353                        cursor.highPriorityCursorPosition = next;
3354                        break;
3355                    case LO:
3356                        cursor.lowPriorityCursorPosition = next;
3357                        cursor.defaultCursorPosition = next;
3358                        cursor.highPriorityCursorPosition = next;
3359                        break;
3360                }
3361            }
3362        }
3363
3364        void stoppedIterating() {
3365            if (lastDefaultKey!=null) {
3366                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
3367            }
3368            if (lastHighKey!=null) {
3369                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
3370            }
3371            if (lastLowKey!=null) {
3372                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
3373            }
3374            lastDefaultKey = null;
3375            lastHighKey = null;
3376            lastLowKey = null;
3377        }
3378
3379        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
3380                throws IOException {
3381            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
3382                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
3383            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
3384                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
3385            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
3386                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
3387            }
3388        }
3389
3390        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
3391                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
3392
3393            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null);
3394            deletes.add(iterator.next());
3395        }
3396
3397        long getNextMessageId() {
3398            return nextMessageId++;
3399        }
3400
3401        void revertNextMessageId() {
3402            nextMessageId--;
3403        }
3404
3405        MessageKeys get(Transaction tx, Long key) throws IOException {
3406            MessageKeys result = defaultPriorityIndex.get(tx, key);
3407            if (result == null) {
3408                result = highPriorityIndex.get(tx, key);
3409                if (result == null) {
3410                    result = lowPriorityIndex.get(tx, key);
3411                    lastGetPriority = LO;
3412                } else {
3413                    lastGetPriority = HI;
3414                }
3415            } else {
3416                lastGetPriority = DEF;
3417            }
3418            return result;
3419        }
3420
3421        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
3422            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
3423                return defaultPriorityIndex.put(tx, key, value);
3424            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
3425                return highPriorityIndex.put(tx, key, value);
3426            } else {
3427                return lowPriorityIndex.put(tx, key, value);
3428            }
3429        }
3430
3431        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
3432            return new MessageOrderIterator(tx,cursor,this);
3433        }
3434
3435        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
3436            return new MessageOrderIterator(tx,m,this);
3437        }
3438
3439        public byte lastGetPriority() {
3440            return lastGetPriority;
3441        }
3442
3443        public boolean alreadyDispatched(Long sequence) {
3444            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
3445                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
3446                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
3447        }
3448
3449        public void trackPendingAdd(Long seq) {
3450            synchronized (pendingAdditions) {
3451                pendingAdditions.add(seq);
3452            }
3453        }
3454
3455        public void trackPendingAddComplete(Long seq) {
3456            synchronized (pendingAdditions) {
3457                pendingAdditions.remove(seq);
3458            }
3459        }
3460
3461        public Long minPendingAdd() {
3462            synchronized (pendingAdditions) {
3463                if (!pendingAdditions.isEmpty()) {
3464                    return pendingAdditions.get(0);
3465                } else {
3466                    return null;
3467                }
3468            }
3469        }
3470
3471
3472        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
3473            Iterator<Entry<Long, MessageKeys>>currentIterator;
3474            final Iterator<Entry<Long, MessageKeys>>highIterator;
3475            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
3476            final Iterator<Entry<Long, MessageKeys>>lowIterator;
3477
3478            MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException {
3479                Long pendingAddLimiter = messageOrderIndex.minPendingAdd();
3480                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter);
3481                if (highPriorityIndex != null) {
3482                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter);
3483                } else {
3484                    this.highIterator = null;
3485                }
3486                if (lowPriorityIndex != null) {
3487                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter);
3488                } else {
3489                    this.lowIterator = null;
3490                }
3491            }
3492
3493            @Override
3494            public boolean hasNext() {
3495                if (currentIterator == null) {
3496                    if (highIterator != null) {
3497                        if (highIterator.hasNext()) {
3498                            currentIterator = highIterator;
3499                            return currentIterator.hasNext();
3500                        }
3501                        if (defaultIterator.hasNext()) {
3502                            currentIterator = defaultIterator;
3503                            return currentIterator.hasNext();
3504                        }
3505                        if (lowIterator.hasNext()) {
3506                            currentIterator = lowIterator;
3507                            return currentIterator.hasNext();
3508                        }
3509                        return false;
3510                    } else {
3511                        currentIterator = defaultIterator;
3512                        return currentIterator.hasNext();
3513                    }
3514                }
3515                if (highIterator != null) {
3516                    if (currentIterator.hasNext()) {
3517                        return true;
3518                    }
3519                    if (currentIterator == highIterator) {
3520                        if (defaultIterator.hasNext()) {
3521                            currentIterator = defaultIterator;
3522                            return currentIterator.hasNext();
3523                        }
3524                        if (lowIterator.hasNext()) {
3525                            currentIterator = lowIterator;
3526                            return currentIterator.hasNext();
3527                        }
3528                        return false;
3529                    }
3530
3531                    if (currentIterator == defaultIterator) {
3532                        if (lowIterator.hasNext()) {
3533                            currentIterator = lowIterator;
3534                            return currentIterator.hasNext();
3535                        }
3536                        return false;
3537                    }
3538                }
3539                return currentIterator.hasNext();
3540            }
3541
3542            @Override
3543            public Entry<Long, MessageKeys> next() {
3544                Entry<Long, MessageKeys> result = currentIterator.next();
3545                if (result != null) {
3546                    Long key = result.getKey();
3547                    if (highIterator != null) {
3548                        if (currentIterator == defaultIterator) {
3549                            lastDefaultKey = key;
3550                        } else if (currentIterator == highIterator) {
3551                            lastHighKey = key;
3552                        } else {
3553                            lastLowKey = key;
3554                        }
3555                    } else {
3556                        lastDefaultKey = key;
3557                    }
3558                }
3559                return result;
3560            }
3561
3562            @Override
3563            public void remove() {
3564                throw new UnsupportedOperationException();
3565            }
3566
3567        }
3568    }
3569
3570    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
3571        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
3572
3573        @Override
3574        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
3575            ByteArrayOutputStream baos = new ByteArrayOutputStream();
3576            ObjectOutputStream oout = new ObjectOutputStream(baos);
3577            oout.writeObject(object);
3578            oout.flush();
3579            oout.close();
3580            byte[] data = baos.toByteArray();
3581            dataOut.writeInt(data.length);
3582            dataOut.write(data);
3583        }
3584
3585        @Override
3586        @SuppressWarnings("unchecked")
3587        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
3588            int dataLen = dataIn.readInt();
3589            byte[] data = new byte[dataLen];
3590            dataIn.readFully(data);
3591            ByteArrayInputStream bais = new ByteArrayInputStream(data);
3592            ObjectInputStream oin = new ObjectInputStream(bais);
3593            try {
3594                return (HashSet<String>) oin.readObject();
3595            } catch (ClassNotFoundException cfe) {
3596                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
3597                ioe.initCause(cfe);
3598                throw ioe;
3599            }
3600        }
3601    }
3602
3603    public File getIndexDirectory() {
3604        return indexDirectory;
3605    }
3606
3607    public void setIndexDirectory(File indexDirectory) {
3608        this.indexDirectory = indexDirectory;
3609    }
3610
3611    interface IndexAware {
3612        public void sequenceAssignedWithIndexLocked(long index);
3613    }
3614
3615    public String getPreallocationScope() {
3616        return preallocationScope;
3617    }
3618
3619    public void setPreallocationScope(String preallocationScope) {
3620        this.preallocationScope = preallocationScope;
3621    }
3622
3623    public String getPreallocationStrategy() {
3624        return preallocationStrategy;
3625    }
3626
3627    public void setPreallocationStrategy(String preallocationStrategy) {
3628        this.preallocationStrategy = preallocationStrategy;
3629    }
3630
3631    public int getCompactAcksAfterNoGC() {
3632        return compactAcksAfterNoGC;
3633    }
3634
3635    /**
3636     * Sets the number of GC cycles where no journal logs were removed before an attempt to
3637     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
3638     * <p>
3639     * A value of -1 will disable this feature.
3640     *
3641     * @param compactAcksAfterNoGC
3642     *      Number of empty GC cycles before we rewrite old ACKS.
3643     */
3644    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
3645        this.compactAcksAfterNoGC = compactAcksAfterNoGC;
3646    }
3647
3648    /**
3649     * Returns whether Ack compaction will ignore that the store is still growing
3650     * and run more often.
3651     *
3652     * @return the compactAcksIgnoresStoreGrowth current value.
3653     */
3654    public boolean isCompactAcksIgnoresStoreGrowth() {
3655        return compactAcksIgnoresStoreGrowth;
3656    }
3657
3658    /**
3659     * Configure if Ack compaction will occur regardless of continued growth of the
3660     * journal logs meaning that the store has not run out of space yet.  Because the
3661     * compaction operation can be costly this value is defaulted to off and the Ack
3662     * compaction is only done when it seems that the store cannot grow and larger.
3663     *
3664     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
3665     */
3666    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
3667        this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
3668    }
3669
3670    /**
3671     * Returns whether Ack compaction is enabled
3672     *
3673     * @return enableAckCompaction
3674     */
3675    public boolean isEnableAckCompaction() {
3676        return enableAckCompaction;
3677    }
3678
3679    /**
3680     * Configure if the Ack compaction task should be enabled to run
3681     *
3682     * @param enableAckCompaction
3683     */
3684    public void setEnableAckCompaction(boolean enableAckCompaction) {
3685        this.enableAckCompaction = enableAckCompaction;
3686    }
3687}