001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.EOFException;
026import java.io.File;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.InterruptedIOException;
030import java.io.ObjectInputStream;
031import java.io.ObjectOutputStream;
032import java.io.OutputStream;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Date;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.LinkedHashMap;
042import java.util.LinkedHashSet;
043import java.util.LinkedList;
044import java.util.List;
045import java.util.Map;
046import java.util.Map.Entry;
047import java.util.Set;
048import java.util.SortedSet;
049import java.util.TreeMap;
050import java.util.TreeSet;
051
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.ConcurrentMap;
054import java.util.concurrent.Executors;
055import java.util.concurrent.ScheduledExecutorService;
056import java.util.concurrent.ThreadFactory;
057import java.util.concurrent.TimeUnit;
058import java.util.concurrent.atomic.AtomicBoolean;
059import java.util.concurrent.atomic.AtomicLong;
060import java.util.concurrent.atomic.AtomicReference;
061import java.util.concurrent.locks.ReentrantReadWriteLock;
062
063import org.apache.activemq.ActiveMQMessageAuditNoSync;
064import org.apache.activemq.broker.BrokerService;
065import org.apache.activemq.broker.BrokerServiceAware;
066import org.apache.activemq.command.TransactionId;
067import org.apache.activemq.openwire.OpenWireFormat;
068import org.apache.activemq.protobuf.Buffer;
069import org.apache.activemq.store.MessageStore;
070import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
071import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
072import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
073import org.apache.activemq.store.kahadb.data.KahaDestination;
074import org.apache.activemq.store.kahadb.data.KahaEntryType;
075import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
076import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
077import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
078import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
079import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
080import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
081import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
082import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
083import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
084import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
085import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
086import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
087import org.apache.activemq.store.kahadb.disk.index.ListIndex;
088import org.apache.activemq.store.kahadb.disk.journal.DataFile;
089import org.apache.activemq.store.kahadb.disk.journal.Journal;
090import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
091import org.apache.activemq.store.kahadb.disk.journal.Location;
092import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
093import org.apache.activemq.store.kahadb.disk.page.Page;
094import org.apache.activemq.store.kahadb.disk.page.PageFile;
095import org.apache.activemq.store.kahadb.disk.page.Transaction;
096import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
097import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
098import org.apache.activemq.store.kahadb.disk.util.Marshaller;
099import org.apache.activemq.store.kahadb.disk.util.Sequence;
100import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
101import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
102import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
103import org.apache.activemq.util.ByteSequence;
104import org.apache.activemq.util.DataByteArrayInputStream;
105import org.apache.activemq.util.DataByteArrayOutputStream;
106import org.apache.activemq.util.IOExceptionSupport;
107import org.apache.activemq.util.IOHelper;
108import org.apache.activemq.util.ServiceStopper;
109import org.apache.activemq.util.ServiceSupport;
110import org.apache.activemq.util.ThreadPoolUtils;
111import org.slf4j.Logger;
112import org.slf4j.LoggerFactory;
113import org.slf4j.MDC;
114
115public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
116
117    protected BrokerService brokerService;
118
119    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
120    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
121    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
122    protected static final Buffer UNMATCHED;
123    static {
124        UNMATCHED = new Buffer(new byte[]{});
125    }
126    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
127
128    static final int CLOSED_STATE = 1;
129    static final int OPEN_STATE = 2;
130    static final long NOT_ACKED = -1;
131
132    static final int VERSION = 5;
133
134    static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
135
136    protected class Metadata {
137        protected Page<Metadata> page;
138        protected int state;
139        protected BTreeIndex<String, StoredDestination> destinations;
140        protected Location lastUpdate;
141        protected Location firstInProgressTransactionLocation;
142        protected Location producerSequenceIdTrackerLocation = null;
143        protected Location ackMessageFileMapLocation = null;
144        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
145        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
146        protected int version = VERSION;
147        protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION;
148
149        public void read(DataInput is) throws IOException {
150            state = is.readInt();
151            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
152            if (is.readBoolean()) {
153                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
154            } else {
155                lastUpdate = null;
156            }
157            if (is.readBoolean()) {
158                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
159            } else {
160                firstInProgressTransactionLocation = null;
161            }
162            try {
163                if (is.readBoolean()) {
164                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
165                } else {
166                    producerSequenceIdTrackerLocation = null;
167                }
168            } catch (EOFException expectedOnUpgrade) {
169            }
170            try {
171                version = is.readInt();
172            } catch (EOFException expectedOnUpgrade) {
173                version = 1;
174            }
175            if (version >= 5 && is.readBoolean()) {
176                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
177            } else {
178                ackMessageFileMapLocation = null;
179            }
180            try {
181                openwireVersion = is.readInt();
182            } catch (EOFException expectedOnUpgrade) {
183                openwireVersion = OpenWireFormat.DEFAULT_VERSION;
184            }
185            LOG.info("KahaDB is version " + version);
186        }
187
188        public void write(DataOutput os) throws IOException {
189            os.writeInt(state);
190            os.writeLong(destinations.getPageId());
191
192            if (lastUpdate != null) {
193                os.writeBoolean(true);
194                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
195            } else {
196                os.writeBoolean(false);
197            }
198
199            if (firstInProgressTransactionLocation != null) {
200                os.writeBoolean(true);
201                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
202            } else {
203                os.writeBoolean(false);
204            }
205
206            if (producerSequenceIdTrackerLocation != null) {
207                os.writeBoolean(true);
208                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
209            } else {
210                os.writeBoolean(false);
211            }
212            os.writeInt(VERSION);
213            if (ackMessageFileMapLocation != null) {
214                os.writeBoolean(true);
215                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
216            } else {
217                os.writeBoolean(false);
218            }
219            os.writeInt(this.openwireVersion);
220        }
221    }
222
223    class MetadataMarshaller extends VariableMarshaller<Metadata> {
224        @Override
225        public Metadata readPayload(DataInput dataIn) throws IOException {
226            Metadata rc = createMetadata();
227            rc.read(dataIn);
228            return rc;
229        }
230
231        @Override
232        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
233            object.write(dataOut);
234        }
235    }
236
237    protected PageFile pageFile;
238    protected Journal journal;
239    protected Metadata metadata = new Metadata();
240
241    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
242
243    protected boolean failIfDatabaseIsLocked;
244
245    protected boolean deleteAllMessages;
246    protected File directory = DEFAULT_DIRECTORY;
247    protected File indexDirectory = null;
248    protected ScheduledExecutorService scheduler;
249    private final Object schedulerLock = new Object();
250
251    protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
252    protected boolean archiveDataLogs;
253    protected File directoryArchive;
254    protected AtomicLong journalSize = new AtomicLong(0);
255    long journalDiskSyncInterval = 1000;
256    long checkpointInterval = 5*1000;
257    long cleanupInterval = 30*1000;
258    boolean cleanupOnStop = true;
259    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
260    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
261    boolean enableIndexWriteAsync = false;
262    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
263    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
264    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
265
266    protected AtomicBoolean opened = new AtomicBoolean();
267    private boolean ignoreMissingJournalfiles = false;
268    private int indexCacheSize = 10000;
269    private boolean checkForCorruptJournalFiles = false;
270    private boolean checksumJournalFiles = true;
271    protected boolean forceRecoverIndex = false;
272
273    private boolean archiveCorruptedIndex = false;
274    private boolean useIndexLFRUEviction = false;
275    private float indexLFUEvictionFactor = 0.2f;
276    private boolean enableIndexDiskSyncs = true;
277    private boolean enableIndexRecoveryFile = true;
278    private boolean enableIndexPageCaching = true;
279    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
280
281    private boolean enableAckCompaction = false;
282    private int compactAcksAfterNoGC = 10;
283    private boolean compactAcksIgnoresStoreGrowth = false;
284    private int checkPointCyclesWithNoGC;
285    private int journalLogOnLastCompactionCheck;
286
287    //only set when using JournalDiskSyncStrategy.PERIODIC
288    protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>();
289
290    @Override
291    public void doStart() throws Exception {
292        load();
293    }
294
295    @Override
296    public void doStop(ServiceStopper stopper) throws Exception {
297        unload();
298    }
299
300    public void allowIOResumption() {
301        if (pageFile != null) {
302            pageFile.allowIOResumption();
303        }
304        if (journal != null) {
305            journal.allowIOResumption();
306        }
307    }
308
309    private void loadPageFile() throws IOException {
310        this.indexLock.writeLock().lock();
311        try {
312            final PageFile pageFile = getPageFile();
313            pageFile.load();
314            pageFile.tx().execute(new Transaction.Closure<IOException>() {
315                @Override
316                public void execute(Transaction tx) throws IOException {
317                    if (pageFile.getPageCount() == 0) {
318                        // First time this is created.. Initialize the metadata
319                        Page<Metadata> page = tx.allocate();
320                        assert page.getPageId() == 0;
321                        page.set(metadata);
322                        metadata.page = page;
323                        metadata.state = CLOSED_STATE;
324                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
325
326                        tx.store(metadata.page, metadataMarshaller, true);
327                    } else {
328                        Page<Metadata> page = tx.load(0, metadataMarshaller);
329                        metadata = page.get();
330                        metadata.page = page;
331                    }
332                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
333                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
334                    metadata.destinations.load(tx);
335                }
336            });
337            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
338            // Perhaps we should just keep an index of file
339            storedDestinations.clear();
340            pageFile.tx().execute(new Transaction.Closure<IOException>() {
341                @Override
342                public void execute(Transaction tx) throws IOException {
343                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
344                        Entry<String, StoredDestination> entry = iterator.next();
345                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
346                        storedDestinations.put(entry.getKey(), sd);
347
348                        if (checkForCorruptJournalFiles) {
349                            // sanity check the index also
350                            if (!entry.getValue().locationIndex.isEmpty(tx)) {
351                                if (entry.getValue().orderIndex.nextMessageId <= 0) {
352                                    throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
353                                }
354                            }
355                        }
356                    }
357                }
358            });
359            pageFile.flush();
360        } finally {
361            this.indexLock.writeLock().unlock();
362        }
363    }
364
365    private void startCheckpoint() {
366        if (checkpointInterval == 0 && cleanupInterval == 0) {
367            LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart");
368            return;
369        }
370        synchronized (schedulerLock) {
371            if (scheduler == null || scheduler.isShutdown()) {
372                scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
373
374                    @Override
375                    public Thread newThread(Runnable r) {
376                        Thread schedulerThread = new Thread(r);
377
378                        schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
379                        schedulerThread.setDaemon(true);
380
381                        return schedulerThread;
382                    }
383                });
384
385                // Short intervals for check-point and cleanups
386                long delay;
387                if (journal.isJournalDiskSyncPeriodic()) {
388                    delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500);
389                } else {
390                    delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
391                }
392
393                scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
394            }
395        }
396    }
397
398    private final class CheckpointRunner implements Runnable {
399
400        private long lastCheckpoint = System.currentTimeMillis();
401        private long lastCleanup = System.currentTimeMillis();
402        private long lastSync = System.currentTimeMillis();
403        private Location lastAsyncUpdate = null;
404
405        @Override
406        public void run() {
407            try {
408                // Decide on cleanup vs full checkpoint here.
409                if (opened.get()) {
410                    long now = System.currentTimeMillis();
411                    if (journal.isJournalDiskSyncPeriodic() &&
412                            journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) {
413                        Location currentUpdate = lastAsyncJournalUpdate.get();
414                        if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) {
415                            lastAsyncUpdate = currentUpdate;
416                            if (LOG.isTraceEnabled()) {
417                                LOG.trace("Writing trace command to trigger journal sync");
418                            }
419                            store(new KahaTraceCommand(), true, null, null);
420                        }
421                        lastSync = now;
422                    }
423                    if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
424                        checkpointCleanup(true);
425                        lastCleanup = now;
426                        lastCheckpoint = now;
427                    } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
428                        checkpointCleanup(false);
429                        lastCheckpoint = now;
430                    }
431                }
432            } catch (IOException ioe) {
433                LOG.error("Checkpoint failed", ioe);
434                brokerService.handleIOException(ioe);
435            } catch (Throwable e) {
436                LOG.error("Checkpoint failed", e);
437                brokerService.handleIOException(IOExceptionSupport.create(e));
438            }
439        }
440    }
441
442    public void open() throws IOException {
443        if( opened.compareAndSet(false, true) ) {
444            getJournal().start();
445            try {
446                loadPageFile();
447            } catch (Throwable t) {
448                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
449                if (LOG.isDebugEnabled()) {
450                    LOG.debug("Index load failure", t);
451                }
452                // try to recover index
453                try {
454                    pageFile.unload();
455                } catch (Exception ignore) {}
456                if (archiveCorruptedIndex) {
457                    pageFile.archive();
458                } else {
459                    pageFile.delete();
460                }
461                metadata = createMetadata();
462                pageFile = null;
463                loadPageFile();
464            }
465            recover();
466            startCheckpoint();
467        }
468    }
469
470    public void load() throws IOException {
471        this.indexLock.writeLock().lock();
472        IOHelper.mkdirs(directory);
473        try {
474            if (deleteAllMessages) {
475                getJournal().setCheckForCorruptionOnStartup(false);
476                getJournal().start();
477                getJournal().delete();
478                getJournal().close();
479                journal = null;
480                getPageFile().delete();
481                LOG.info("Persistence store purged.");
482                deleteAllMessages = false;
483            }
484
485            open();
486            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
487        } finally {
488            this.indexLock.writeLock().unlock();
489        }
490    }
491
492    public void close() throws IOException, InterruptedException {
493        if( opened.compareAndSet(true, false)) {
494            checkpointLock.writeLock().lock();
495            try {
496                if (metadata.page != null) {
497                    checkpointUpdate(getCleanupOnStop());
498                }
499                pageFile.unload();
500                metadata = createMetadata();
501            } finally {
502                checkpointLock.writeLock().unlock();
503            }
504            journal.close();
505            synchronized(schedulerLock) {
506                if (scheduler != null) {
507                    ThreadPoolUtils.shutdownGraceful(scheduler, -1);
508                    scheduler = null;
509                }
510            }
511            // clear the cache and journalSize on shutdown of the store
512            storeCache.clear();
513            journalSize.set(0);
514        }
515    }
516
517    public void unload() throws IOException, InterruptedException {
518        this.indexLock.writeLock().lock();
519        try {
520            if( pageFile != null && pageFile.isLoaded() ) {
521                metadata.state = CLOSED_STATE;
522                metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
523
524                if (metadata.page != null) {
525                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
526                        @Override
527                        public void execute(Transaction tx) throws IOException {
528                            tx.store(metadata.page, metadataMarshaller, true);
529                        }
530                    });
531                }
532            }
533        } finally {
534            this.indexLock.writeLock().unlock();
535        }
536        close();
537    }
538
539    // public for testing
540    @SuppressWarnings("rawtypes")
541    public Location[] getInProgressTxLocationRange() {
542        Location[] range = new Location[]{null, null};
543        synchronized (inflightTransactions) {
544            if (!inflightTransactions.isEmpty()) {
545                for (List<Operation> ops : inflightTransactions.values()) {
546                    if (!ops.isEmpty()) {
547                        trackMaxAndMin(range, ops);
548                    }
549                }
550            }
551            if (!preparedTransactions.isEmpty()) {
552                for (List<Operation> ops : preparedTransactions.values()) {
553                    if (!ops.isEmpty()) {
554                        trackMaxAndMin(range, ops);
555                    }
556                }
557            }
558        }
559        return range;
560    }
561
562    @SuppressWarnings("rawtypes")
563    private void trackMaxAndMin(Location[] range, List<Operation> ops) {
564        Location t = ops.get(0).getLocation();
565        if (range[0] == null || t.compareTo(range[0]) <= 0) {
566            range[0] = t;
567        }
568        t = ops.get(ops.size() -1).getLocation();
569        if (range[1] == null || t.compareTo(range[1]) >= 0) {
570            range[1] = t;
571        }
572    }
573
574    class TranInfo {
575        TransactionId id;
576        Location location;
577
578        class opCount {
579            int add;
580            int remove;
581        }
582        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
583
584        @SuppressWarnings("rawtypes")
585        public void track(Operation operation) {
586            if (location == null ) {
587                location = operation.getLocation();
588            }
589            KahaDestination destination;
590            boolean isAdd = false;
591            if (operation instanceof AddOperation) {
592                AddOperation add = (AddOperation) operation;
593                destination = add.getCommand().getDestination();
594                isAdd = true;
595            } else {
596                RemoveOperation removeOpperation = (RemoveOperation) operation;
597                destination = removeOpperation.getCommand().getDestination();
598            }
599            opCount opCount = destinationOpCount.get(destination);
600            if (opCount == null) {
601                opCount = new opCount();
602                destinationOpCount.put(destination, opCount);
603            }
604            if (isAdd) {
605                opCount.add++;
606            } else {
607                opCount.remove++;
608            }
609        }
610
611        @Override
612        public String toString() {
613           StringBuffer buffer = new StringBuffer();
614           buffer.append(location).append(";").append(id).append(";\n");
615           for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
616               buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
617           }
618           return buffer.toString();
619        }
620    }
621
622    @SuppressWarnings("rawtypes")
623    public String getTransactions() {
624
625        ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
626        synchronized (inflightTransactions) {
627            if (!inflightTransactions.isEmpty()) {
628                for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
629                    TranInfo info = new TranInfo();
630                    info.id = entry.getKey();
631                    for (Operation operation : entry.getValue()) {
632                        info.track(operation);
633                    }
634                    infos.add(info);
635                }
636            }
637        }
638        synchronized (preparedTransactions) {
639            if (!preparedTransactions.isEmpty()) {
640                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
641                    TranInfo info = new TranInfo();
642                    info.id = entry.getKey();
643                    for (Operation operation : entry.getValue()) {
644                        info.track(operation);
645                    }
646                    infos.add(info);
647                }
648            }
649        }
650        return infos.toString();
651    }
652
653    public String getPreparedTransaction(TransactionId transactionId) {
654        String result = "";
655        synchronized (preparedTransactions) {
656            List<Operation> operations = preparedTransactions.get(transactionId);
657            if (operations != null) {
658                TranInfo info = new TranInfo();
659                info.id = transactionId;
660                for (Operation operation : preparedTransactions.get(transactionId)) {
661                    info.track(operation);
662                }
663                result = info.toString();
664            }
665        }
666        return result;
667    }
668
669    /**
670     * Move all the messages that were in the journal into long term storage. We
671     * just replay and do a checkpoint.
672     *
673     * @throws IOException
674     * @throws IOException
675     * @throws IllegalStateException
676     */
677    private void recover() throws IllegalStateException, IOException {
678        this.indexLock.writeLock().lock();
679        try {
680
681            long start = System.currentTimeMillis();
682            boolean requiresJournalReplay = recoverProducerAudit();
683            requiresJournalReplay |= recoverAckMessageFileMap();
684            Location lastIndoubtPosition = getRecoveryPosition();
685            Location recoveryPosition = requiresJournalReplay ? journal.getNextLocation(null) : lastIndoubtPosition;
686            if (recoveryPosition != null) {
687                int redoCounter = 0;
688                int dataFileRotationTracker = recoveryPosition.getDataFileId();
689                LOG.info("Recovering from the journal @" + recoveryPosition);
690                while (recoveryPosition != null) {
691                    try {
692                        JournalCommand<?> message = load(recoveryPosition);
693                        metadata.lastUpdate = recoveryPosition;
694                        process(message, recoveryPosition, lastIndoubtPosition);
695                        redoCounter++;
696                    } catch (IOException failedRecovery) {
697                        if (isIgnoreMissingJournalfiles()) {
698                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
699                            // track this dud location
700                            journal.corruptRecoveryLocation(recoveryPosition);
701                        } else {
702                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
703                        }
704                    }
705                    recoveryPosition = journal.getNextLocation(recoveryPosition);
706                    // hold on to the minimum number of open files during recovery
707                    if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) {
708                        dataFileRotationTracker = recoveryPosition.getDataFileId();
709                        journal.cleanup();
710                    }
711                    if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
712                        LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
713                    }
714                }
715                if (LOG.isInfoEnabled()) {
716                    long end = System.currentTimeMillis();
717                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
718                }
719            }
720
721            // We may have to undo some index updates.
722            pageFile.tx().execute(new Transaction.Closure<IOException>() {
723                @Override
724                public void execute(Transaction tx) throws IOException {
725                    recoverIndex(tx);
726                }
727            });
728
729            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
730            Set<TransactionId> toRollback = new HashSet<TransactionId>();
731            Set<TransactionId> toDiscard = new HashSet<TransactionId>();
732            synchronized (inflightTransactions) {
733                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
734                    TransactionId id = it.next();
735                    if (id.isLocalTransaction()) {
736                        toRollback.add(id);
737                    } else {
738                        toDiscard.add(id);
739                    }
740                }
741                for (TransactionId tx: toRollback) {
742                    if (LOG.isDebugEnabled()) {
743                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
744                    }
745                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
746                }
747                for (TransactionId tx: toDiscard) {
748                    if (LOG.isDebugEnabled()) {
749                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
750                    }
751                    inflightTransactions.remove(tx);
752                }
753            }
754
755            synchronized (preparedTransactions) {
756                for (TransactionId txId : preparedTransactions.keySet()) {
757                    LOG.warn("Recovered prepared XA TX: [{}]", txId);
758                }
759            }
760
761        } finally {
762            this.indexLock.writeLock().unlock();
763        }
764    }
765
766    @SuppressWarnings("unused")
767    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
768        return TransactionIdConversion.convertToLocal(tx);
769    }
770
771    private Location minimum(Location x,
772                             Location y) {
773        Location min = null;
774        if (x != null) {
775            min = x;
776            if (y != null) {
777                int compare = y.compareTo(x);
778                if (compare < 0) {
779                    min = y;
780                }
781            }
782        } else {
783            min = y;
784        }
785        return min;
786    }
787
788    private boolean recoverProducerAudit() throws IOException {
789        boolean requiresReplay = true;
790        if (metadata.producerSequenceIdTrackerLocation != null) {
791            try {
792                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
793                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
794                int maxNumProducers = getMaxFailoverProducersToTrack();
795                int maxAuditDepth = getFailoverProducersAuditDepth();
796                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
797                metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
798                metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
799                requiresReplay = false;
800            } catch (Exception e) {
801                LOG.warn("Cannot recover message audit", e);
802            }
803        }
804        // got no audit stored so got to recreate via replay from start of the journal
805        return requiresReplay;
806    }
807
808    @SuppressWarnings("unchecked")
809    private boolean recoverAckMessageFileMap() throws IOException {
810        boolean requiresReplay = true;
811        if (metadata.ackMessageFileMapLocation != null) {
812            try {
813                KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
814                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
815                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
816                requiresReplay = false;
817            } catch (Exception e) {
818                LOG.warn("Cannot recover ackMessageFileMap", e);
819            }
820        }
821        // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
822        return requiresReplay;
823    }
824
825    protected void recoverIndex(Transaction tx) throws IOException {
826        long start = System.currentTimeMillis();
827        // It is possible index updates got applied before the journal updates..
828        // in that case we need to removed references to messages that are not in the journal
829        final Location lastAppendLocation = journal.getLastAppendLocation();
830        long undoCounter=0;
831
832        // Go through all the destinations to see if they have messages past the lastAppendLocation
833        for (StoredDestination sd : storedDestinations.values()) {
834
835            final ArrayList<Long> matches = new ArrayList<Long>();
836            // Find all the Locations that are >= than the last Append Location.
837            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
838                @Override
839                protected void matched(Location key, Long value) {
840                    matches.add(value);
841                }
842            });
843
844            for (Long sequenceId : matches) {
845                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
846                if (keys != null) {
847                    sd.locationIndex.remove(tx, keys.location);
848                    sd.messageIdIndex.remove(tx, keys.messageId);
849                    metadata.producerSequenceIdTracker.rollback(keys.messageId);
850                    undoCounter++;
851                    // TODO: do we need to modify the ack positions for the pub sub case?
852                }
853            }
854        }
855
856        if (undoCounter > 0) {
857            // The rolledback operations are basically in flight journal writes.  To avoid getting
858            // these the end user should do sync writes to the journal.
859            if (LOG.isInfoEnabled()) {
860                long end = System.currentTimeMillis();
861                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
862            }
863        }
864
865        undoCounter = 0;
866        start = System.currentTimeMillis();
867
868        // Lets be extra paranoid here and verify that all the datafiles being referenced
869        // by the indexes still exists.
870
871        final SequenceSet ss = new SequenceSet();
872        for (StoredDestination sd : storedDestinations.values()) {
873            // Use a visitor to cut down the number of pages that we load
874            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
875                int last=-1;
876
877                @Override
878                public boolean isInterestedInKeysBetween(Location first, Location second) {
879                    if( first==null ) {
880                        return !ss.contains(0, second.getDataFileId());
881                    } else if( second==null ) {
882                        return true;
883                    } else {
884                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
885                    }
886                }
887
888                @Override
889                public void visit(List<Location> keys, List<Long> values) {
890                    for (Location l : keys) {
891                        int fileId = l.getDataFileId();
892                        if( last != fileId ) {
893                            ss.add(fileId);
894                            last = fileId;
895                        }
896                    }
897                }
898
899            });
900        }
901        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
902        while (!ss.isEmpty()) {
903            missingJournalFiles.add((int) ss.removeFirst());
904        }
905
906        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
907            missingJournalFiles.add(entry.getKey());
908            for (Integer i : entry.getValue()) {
909                missingJournalFiles.add(i);
910            }
911        }
912
913        missingJournalFiles.removeAll(journal.getFileMap().keySet());
914
915        if (!missingJournalFiles.isEmpty()) {
916            LOG.warn("Some journal files are missing: " + missingJournalFiles);
917        }
918
919        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
920        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
921        for (Integer missing : missingJournalFiles) {
922            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
923        }
924
925        if (checkForCorruptJournalFiles) {
926            Collection<DataFile> dataFiles = journal.getFileMap().values();
927            for (DataFile dataFile : dataFiles) {
928                int id = dataFile.getDataFileId();
929                // eof to next file id
930                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
931                Sequence seq = dataFile.getCorruptedBlocks().getHead();
932                while (seq != null) {
933                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
934                        new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
935                    missingPredicates.add(visitor);
936                    knownCorruption.add(visitor);
937                    seq = seq.getNext();
938                }
939            }
940        }
941
942        if (!missingPredicates.isEmpty()) {
943            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
944                final StoredDestination sd = sdEntry.getValue();
945                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>();
946                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
947                    @Override
948                    protected void matched(Location key, Long value) {
949                        matches.put(value, key);
950                    }
951                });
952
953                // If some message references are affected by the missing data files...
954                if (!matches.isEmpty()) {
955
956                    // We either 'gracefully' recover dropping the missing messages or
957                    // we error out.
958                    if( ignoreMissingJournalfiles ) {
959                        // Update the index to remove the references to the missing data
960                        for (Long sequenceId : matches.keySet()) {
961                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
962                            sd.locationIndex.remove(tx, keys.location);
963                            sd.messageIdIndex.remove(tx, keys.messageId);
964                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
965                            undoCounter++;
966                            // TODO: do we need to modify the ack positions for the pub sub case?
967                        }
968                    } else {
969                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
970                        throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
971                    }
972                }
973            }
974        }
975
976        if (!ignoreMissingJournalfiles) {
977            if (!knownCorruption.isEmpty()) {
978                LOG.error("Detected corrupt journal files. " + knownCorruption);
979                throw new IOException("Detected corrupt journal files. " + knownCorruption);
980            }
981
982            if (!missingJournalFiles.isEmpty()) {
983                LOG.error("Detected missing journal files. " + missingJournalFiles);
984                throw new IOException("Detected missing journal files. " + missingJournalFiles);
985            }
986        }
987
988        if (undoCounter > 0) {
989            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
990            // should do sync writes to the journal.
991            if (LOG.isInfoEnabled()) {
992                long end = System.currentTimeMillis();
993                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
994            }
995        }
996    }
997
998    private Location nextRecoveryPosition;
999    private Location lastRecoveryPosition;
1000
1001    public void incrementalRecover() throws IOException {
1002        this.indexLock.writeLock().lock();
1003        try {
1004            if( nextRecoveryPosition == null ) {
1005                if( lastRecoveryPosition==null ) {
1006                    nextRecoveryPosition = getRecoveryPosition();
1007                } else {
1008                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
1009                }
1010            }
1011            while (nextRecoveryPosition != null) {
1012                lastRecoveryPosition = nextRecoveryPosition;
1013                metadata.lastUpdate = lastRecoveryPosition;
1014                JournalCommand<?> message = load(lastRecoveryPosition);
1015                process(message, lastRecoveryPosition, (IndexAware) null);
1016                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
1017            }
1018        } finally {
1019            this.indexLock.writeLock().unlock();
1020        }
1021    }
1022
1023    public Location getLastUpdatePosition() throws IOException {
1024        return metadata.lastUpdate;
1025    }
1026
1027    private Location getRecoveryPosition() throws IOException {
1028
1029        if (!this.forceRecoverIndex) {
1030
1031            // If we need to recover the transactions..
1032            if (metadata.firstInProgressTransactionLocation != null) {
1033                return metadata.firstInProgressTransactionLocation;
1034            }
1035
1036            // Perhaps there were no transactions...
1037            if( metadata.lastUpdate!=null) {
1038                // Start replay at the record after the last one recorded in the index file.
1039                return getNextInitializedLocation(metadata.lastUpdate);
1040            }
1041        }
1042        // This loads the first position.
1043        return journal.getNextLocation(null);
1044    }
1045
1046    private Location getNextInitializedLocation(Location location) throws IOException {
1047        Location mayNotBeInitialized = journal.getNextLocation(location);
1048        if (location.getSize() == NOT_SET && mayNotBeInitialized != null && mayNotBeInitialized.getSize() != NOT_SET) {
1049            // need to init size and type to skip
1050            return journal.getNextLocation(mayNotBeInitialized);
1051        } else {
1052            return mayNotBeInitialized;
1053        }
1054    }
1055
1056    protected void checkpointCleanup(final boolean cleanup) throws IOException {
1057        long start;
1058        this.indexLock.writeLock().lock();
1059        try {
1060            start = System.currentTimeMillis();
1061            if( !opened.get() ) {
1062                return;
1063            }
1064        } finally {
1065            this.indexLock.writeLock().unlock();
1066        }
1067        checkpointUpdate(cleanup);
1068        long end = System.currentTimeMillis();
1069        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1070            if (LOG.isInfoEnabled()) {
1071                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
1072            }
1073        }
1074    }
1075
1076    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
1077        int size = data.serializedSizeFramed();
1078        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
1079        os.writeByte(data.type().getNumber());
1080        data.writeFramed(os);
1081        return os.toByteSequence();
1082    }
1083
1084    // /////////////////////////////////////////////////////////////////
1085    // Methods call by the broker to update and query the store.
1086    // /////////////////////////////////////////////////////////////////
1087    public Location store(JournalCommand<?> data) throws IOException {
1088        return store(data, false, null,null);
1089    }
1090
1091    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
1092        return store(data, false, null, null, onJournalStoreComplete);
1093    }
1094
1095    public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException {
1096        return store(data, sync, before, after, null);
1097    }
1098
1099    /**
1100     * All updated are are funneled through this method. The updates are converted
1101     * to a JournalMessage which is logged to the journal and then the data from
1102     * the JournalMessage is used to update the index just like it would be done
1103     * during a recovery process.
1104     */
1105    public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
1106        try {
1107            ByteSequence sequence = toByteSequence(data);
1108            Location location;
1109
1110            checkpointLock.readLock().lock();
1111            try {
1112
1113                long start = System.currentTimeMillis();
1114                location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
1115                long start2 = System.currentTimeMillis();
1116                //Track the last async update so we know if we need to sync at the next checkpoint
1117                if (!sync && journal.isJournalDiskSyncPeriodic()) {
1118                    lastAsyncJournalUpdate.set(location);
1119                }
1120                process(data, location, before);
1121
1122                long end = System.currentTimeMillis();
1123                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1124                    if (LOG.isInfoEnabled()) {
1125                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
1126                    }
1127                }
1128            } finally {
1129                checkpointLock.readLock().unlock();
1130            }
1131
1132            if (after != null) {
1133                after.run();
1134            }
1135
1136            return location;
1137        } catch (IOException ioe) {
1138            LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe);
1139            brokerService.handleIOException(ioe);
1140            throw ioe;
1141        }
1142    }
1143
1144    /**
1145     * Loads a previously stored JournalMessage
1146     *
1147     * @param location
1148     * @return
1149     * @throws IOException
1150     */
1151    public JournalCommand<?> load(Location location) throws IOException {
1152        long start = System.currentTimeMillis();
1153        ByteSequence data = journal.read(location);
1154        long end = System.currentTimeMillis();
1155        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1156            if (LOG.isInfoEnabled()) {
1157                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
1158            }
1159        }
1160        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
1161        byte readByte = is.readByte();
1162        KahaEntryType type = KahaEntryType.valueOf(readByte);
1163        if( type == null ) {
1164            try {
1165                is.close();
1166            } catch (IOException e) {}
1167            throw new IOException("Could not load journal record, null type information from: " + readByte + " at location: "+location);
1168        }
1169        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
1170        message.mergeFramed(is);
1171        return message;
1172    }
1173
1174    /**
1175     * do minimal recovery till we reach the last inDoubtLocation
1176     * @param data
1177     * @param location
1178     * @param inDoubtlocation
1179     * @throws IOException
1180     */
1181    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
1182        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
1183            initMessageStore(data);
1184            process(data, location, (IndexAware) null);
1185        } else {
1186            // just recover producer audit
1187            data.visit(new Visitor() {
1188                @Override
1189                public void visit(KahaAddMessageCommand command) throws IOException {
1190                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1191                }
1192            });
1193        }
1194    }
1195
1196    private void initMessageStore(JournalCommand<?> data) throws IOException {
1197        data.visit(new Visitor() {
1198            @Override
1199            public void visit(KahaAddMessageCommand command) throws IOException {
1200                final KahaDestination destination = command.getDestination();
1201                if (!storedDestinations.containsKey(key(destination))) {
1202                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
1203                        @Override
1204                        public void execute(Transaction tx) throws IOException {
1205                            getStoredDestination(destination, tx);
1206                        }
1207                    });
1208                }
1209            }
1210        });
1211    }
1212
1213    // /////////////////////////////////////////////////////////////////
1214    // Journaled record processing methods. Once the record is journaled,
1215    // these methods handle applying the index updates. These may be called
1216    // from the recovery method too so they need to be idempotent
1217    // /////////////////////////////////////////////////////////////////
1218
1219    void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException {
1220        data.visit(new Visitor() {
1221            @Override
1222            public void visit(KahaAddMessageCommand command) throws IOException {
1223                process(command, location, onSequenceAssignedCallback);
1224            }
1225
1226            @Override
1227            public void visit(KahaRemoveMessageCommand command) throws IOException {
1228                process(command, location);
1229            }
1230
1231            @Override
1232            public void visit(KahaPrepareCommand command) throws IOException {
1233                process(command, location);
1234            }
1235
1236            @Override
1237            public void visit(KahaCommitCommand command) throws IOException {
1238                process(command, location, onSequenceAssignedCallback);
1239            }
1240
1241            @Override
1242            public void visit(KahaRollbackCommand command) throws IOException {
1243                process(command, location);
1244            }
1245
1246            @Override
1247            public void visit(KahaRemoveDestinationCommand command) throws IOException {
1248                process(command, location);
1249            }
1250
1251            @Override
1252            public void visit(KahaSubscriptionCommand command) throws IOException {
1253                process(command, location);
1254            }
1255
1256            @Override
1257            public void visit(KahaProducerAuditCommand command) throws IOException {
1258                processLocation(location);
1259            }
1260
1261            @Override
1262            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
1263                processLocation(location);
1264            }
1265
1266            @Override
1267            public void visit(KahaTraceCommand command) {
1268                processLocation(location);
1269            }
1270
1271            @Override
1272            public void visit(KahaUpdateMessageCommand command) throws IOException {
1273                process(command, location);
1274            }
1275
1276            @Override
1277            public void visit(KahaRewrittenDataFileCommand command) throws IOException {
1278                process(command, location);
1279            }
1280        });
1281    }
1282
1283    @SuppressWarnings("rawtypes")
1284    protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException {
1285        if (command.hasTransactionInfo()) {
1286            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1287            inflightTx.add(new AddOperation(command, location, runWithIndexLock));
1288        } else {
1289            this.indexLock.writeLock().lock();
1290            try {
1291                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1292                    @Override
1293                    public void execute(Transaction tx) throws IOException {
1294                        long assignedIndex = updateIndex(tx, command, location);
1295                        if (runWithIndexLock != null) {
1296                            runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex);
1297                        }
1298                    }
1299                });
1300
1301            } finally {
1302                this.indexLock.writeLock().unlock();
1303            }
1304        }
1305    }
1306
1307    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
1308        this.indexLock.writeLock().lock();
1309        try {
1310            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1311                @Override
1312                public void execute(Transaction tx) throws IOException {
1313                    updateIndex(tx, command, location);
1314                }
1315            });
1316        } finally {
1317            this.indexLock.writeLock().unlock();
1318        }
1319    }
1320
1321    @SuppressWarnings("rawtypes")
1322    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1323        if (command.hasTransactionInfo()) {
1324           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1325           inflightTx.add(new RemoveOperation(command, location));
1326        } else {
1327            this.indexLock.writeLock().lock();
1328            try {
1329                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1330                    @Override
1331                    public void execute(Transaction tx) throws IOException {
1332                        updateIndex(tx, command, location);
1333                    }
1334                });
1335            } finally {
1336                this.indexLock.writeLock().unlock();
1337            }
1338        }
1339    }
1340
1341    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1342        this.indexLock.writeLock().lock();
1343        try {
1344            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1345                @Override
1346                public void execute(Transaction tx) throws IOException {
1347                    updateIndex(tx, command, location);
1348                }
1349            });
1350        } finally {
1351            this.indexLock.writeLock().unlock();
1352        }
1353    }
1354
1355    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1356        this.indexLock.writeLock().lock();
1357        try {
1358            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1359                @Override
1360                public void execute(Transaction tx) throws IOException {
1361                    updateIndex(tx, command, location);
1362                }
1363            });
1364        } finally {
1365            this.indexLock.writeLock().unlock();
1366        }
1367    }
1368
1369    protected void processLocation(final Location location) {
1370        this.indexLock.writeLock().lock();
1371        try {
1372            metadata.lastUpdate = location;
1373        } finally {
1374            this.indexLock.writeLock().unlock();
1375        }
1376    }
1377
1378    @SuppressWarnings("rawtypes")
1379    protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException {
1380        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1381        List<Operation> inflightTx;
1382        synchronized (inflightTransactions) {
1383            inflightTx = inflightTransactions.remove(key);
1384            if (inflightTx == null) {
1385                inflightTx = preparedTransactions.remove(key);
1386            }
1387        }
1388        if (inflightTx == null) {
1389            // only non persistent messages in this tx
1390            if (before != null) {
1391                before.sequenceAssignedWithIndexLocked(-1);
1392            }
1393            return;
1394        }
1395
1396        final List<Operation> messagingTx = inflightTx;
1397        indexLock.writeLock().lock();
1398        try {
1399            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1400                @Override
1401                public void execute(Transaction tx) throws IOException {
1402                    for (Operation op : messagingTx) {
1403                        op.execute(tx);
1404                        recordAckMessageReferenceLocation(location, op.getLocation());
1405                    }
1406                }
1407            });
1408            metadata.lastUpdate = location;
1409        } finally {
1410            indexLock.writeLock().unlock();
1411        }
1412    }
1413
1414    @SuppressWarnings("rawtypes")
1415    protected void process(KahaPrepareCommand command, Location location) {
1416        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1417        List<Operation> tx = null;
1418        synchronized (inflightTransactions) {
1419            tx = inflightTransactions.remove(key);
1420            if (tx != null) {
1421                preparedTransactions.put(key, tx);
1422            }
1423        }
1424        if (tx != null && !tx.isEmpty()) {
1425            indexLock.writeLock().lock();
1426            try {
1427                for (Operation op : tx) {
1428                    recordAckMessageReferenceLocation(location, op.getLocation());
1429                }
1430            } finally {
1431                indexLock.writeLock().unlock();
1432            }
1433        }
1434    }
1435
1436    @SuppressWarnings("rawtypes")
1437    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1438        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1439        List<Operation> updates = null;
1440        synchronized (inflightTransactions) {
1441            updates = inflightTransactions.remove(key);
1442            if (updates == null) {
1443                updates = preparedTransactions.remove(key);
1444            }
1445        }
1446        if (key.isXATransaction() && updates != null && !updates.isEmpty()) {
1447            indexLock.writeLock().lock();
1448            try {
1449                for (Operation op : updates) {
1450                    recordAckMessageReferenceLocation(location, op.getLocation());
1451                }
1452            } finally {
1453                indexLock.writeLock().unlock();
1454            }
1455        }
1456    }
1457
1458    protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
1459        final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1460
1461        // Mark the current journal file as a compacted file so that gc checks can skip
1462        // over logs that are smaller compaction type logs.
1463        DataFile current = journal.getDataFileById(location.getDataFileId());
1464        current.setTypeCode(command.getRewriteType());
1465
1466        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
1467            // Move offset so that next location read jumps to next file.
1468            location.setOffset(journalMaxFileLength);
1469        }
1470    }
1471
1472    // /////////////////////////////////////////////////////////////////
1473    // These methods do the actual index updates.
1474    // /////////////////////////////////////////////////////////////////
1475
1476    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1477    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1478
1479    long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1480        StoredDestination sd = getExistingStoredDestination(command.getDestination(), tx);
1481        if (sd == null) {
1482            // if the store no longer exists, skip
1483            return -1;
1484        }
1485        // Skip adding the message to the index if this is a topic and there are
1486        // no subscriptions.
1487        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1488            return -1;
1489        }
1490
1491        // Add the message.
1492        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1493        long id = sd.orderIndex.getNextMessageId();
1494        Long previous = sd.locationIndex.put(tx, location, id);
1495        if (previous == null) {
1496            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1497            if (previous == null) {
1498                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1499                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1500                    addAckLocationForNewMessage(tx, sd, id);
1501                }
1502                metadata.lastUpdate = location;
1503            } else {
1504
1505                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
1506                if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
1507                    // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
1508                    LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1509                }
1510                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1511                sd.locationIndex.remove(tx, location);
1512                id = -1;
1513            }
1514        } else {
1515            // restore the previous value.. Looks like this was a redo of a previously
1516            // added message. We don't want to assign it a new id as the other indexes would
1517            // be wrong..
1518            sd.locationIndex.put(tx, location, previous);
1519            // ensure sequence is not broken
1520            sd.orderIndex.revertNextMessageId();
1521            metadata.lastUpdate = location;
1522        }
1523        // record this id in any event, initial send or recovery
1524        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1525        return id;
1526    }
1527
1528    void trackPendingAdd(KahaDestination destination, Long seq) {
1529        StoredDestination sd = storedDestinations.get(key(destination));
1530        if (sd != null) {
1531            sd.trackPendingAdd(seq);
1532        }
1533    }
1534
1535    void trackPendingAddComplete(KahaDestination destination, Long seq) {
1536        StoredDestination sd = storedDestinations.get(key(destination));
1537        if (sd != null) {
1538            sd.trackPendingAddComplete(seq);
1539        }
1540    }
1541
1542    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
1543        KahaAddMessageCommand command = updateMessageCommand.getMessage();
1544        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1545
1546        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
1547        if (id != null) {
1548            MessageKeys previousKeys = sd.orderIndex.put(
1549                    tx,
1550                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
1551                    id,
1552                    new MessageKeys(command.getMessageId(), location)
1553            );
1554            sd.locationIndex.put(tx, location, id);
1555            // on first update previous is original location, on recovery/replay it may be the updated location
1556            if(previousKeys != null && !previousKeys.location.equals(location)) {
1557                sd.locationIndex.remove(tx, previousKeys.location);
1558            }
1559            metadata.lastUpdate = location;
1560        } else {
1561            //Add the message if it can't be found
1562            this.updateIndex(tx, command, location);
1563        }
1564    }
1565
1566    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1567        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1568        if (!command.hasSubscriptionKey()) {
1569
1570            // In the queue case we just remove the message from the index..
1571            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1572            if (sequenceId != null) {
1573                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1574                if (keys != null) {
1575                    sd.locationIndex.remove(tx, keys.location);
1576                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1577                    metadata.lastUpdate = ackLocation;
1578                }  else if (LOG.isDebugEnabled()) {
1579                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1580                }
1581            } else if (LOG.isDebugEnabled()) {
1582                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1583            }
1584        } else {
1585            // In the topic case we need remove the message once it's been acked
1586            // by all the subs
1587            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1588
1589            // Make sure it's a valid message id...
1590            if (sequence != null) {
1591                String subscriptionKey = command.getSubscriptionKey();
1592                if (command.getAck() != UNMATCHED) {
1593                    sd.orderIndex.get(tx, sequence);
1594                    byte priority = sd.orderIndex.lastGetPriority();
1595                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1596                }
1597
1598                MessageKeys keys = sd.orderIndex.get(tx, sequence);
1599                if (keys != null) {
1600                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1601                }
1602                // The following method handles deleting un-referenced messages.
1603                removeAckLocation(tx, sd, subscriptionKey, sequence);
1604                metadata.lastUpdate = ackLocation;
1605            } else if (LOG.isDebugEnabled()) {
1606                LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1607            }
1608
1609        }
1610    }
1611
1612    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1613        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1614        if (referenceFileIds == null) {
1615            referenceFileIds = new HashSet<Integer>();
1616            referenceFileIds.add(messageLocation.getDataFileId());
1617            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1618        } else {
1619            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1620            if (!referenceFileIds.contains(id)) {
1621                referenceFileIds.add(id);
1622            }
1623        }
1624    }
1625
1626    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1627        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1628        sd.orderIndex.remove(tx);
1629
1630        sd.locationIndex.clear(tx);
1631        sd.locationIndex.unload(tx);
1632        tx.free(sd.locationIndex.getPageId());
1633
1634        sd.messageIdIndex.clear(tx);
1635        sd.messageIdIndex.unload(tx);
1636        tx.free(sd.messageIdIndex.getPageId());
1637
1638        if (sd.subscriptions != null) {
1639            sd.subscriptions.clear(tx);
1640            sd.subscriptions.unload(tx);
1641            tx.free(sd.subscriptions.getPageId());
1642
1643            sd.subscriptionAcks.clear(tx);
1644            sd.subscriptionAcks.unload(tx);
1645            tx.free(sd.subscriptionAcks.getPageId());
1646
1647            sd.ackPositions.clear(tx);
1648            sd.ackPositions.unload(tx);
1649            tx.free(sd.ackPositions.getHeadPageId());
1650
1651            sd.subLocations.clear(tx);
1652            sd.subLocations.unload(tx);
1653            tx.free(sd.subLocations.getHeadPageId());
1654        }
1655
1656        String key = key(command.getDestination());
1657        storedDestinations.remove(key);
1658        metadata.destinations.remove(tx, key);
1659        storeCache.remove(key(command.getDestination()));
1660    }
1661
1662    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1663        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1664        final String subscriptionKey = command.getSubscriptionKey();
1665
1666        // If set then we are creating it.. otherwise we are destroying the sub
1667        if (command.hasSubscriptionInfo()) {
1668            Location existing = sd.subLocations.get(tx, subscriptionKey);
1669            if (existing != null && existing.compareTo(location) == 0) {
1670                // replay on recovery, ignore
1671                LOG.trace("ignoring journal replay of replay of sub from: " + location);
1672                return;
1673            }
1674
1675            sd.subscriptions.put(tx, subscriptionKey, command);
1676            sd.subLocations.put(tx, subscriptionKey, location);
1677            long ackLocation=NOT_ACKED;
1678            if (!command.getRetroactive()) {
1679                ackLocation = sd.orderIndex.nextMessageId-1;
1680            } else {
1681                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1682            }
1683            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1684            sd.subscriptionCache.add(subscriptionKey);
1685        } else {
1686            // delete the sub...
1687            sd.subscriptions.remove(tx, subscriptionKey);
1688            sd.subLocations.remove(tx, subscriptionKey);
1689            sd.subscriptionAcks.remove(tx, subscriptionKey);
1690            sd.subscriptionCache.remove(subscriptionKey);
1691            removeAckLocationsForSub(tx, sd, subscriptionKey);
1692
1693            if (sd.subscriptions.isEmpty(tx)) {
1694                // remove the stored destination
1695                KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
1696                removeDestinationCommand.setDestination(command.getDestination());
1697                updateIndex(tx, removeDestinationCommand, null);
1698            }
1699        }
1700    }
1701
1702    private void checkpointUpdate(final boolean cleanup) throws IOException {
1703        checkpointLock.writeLock().lock();
1704        try {
1705            this.indexLock.writeLock().lock();
1706            try {
1707                Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>, IOException>() {
1708                    @Override
1709                    public Set<Integer> execute(Transaction tx) throws IOException {
1710                        return checkpointUpdate(tx, cleanup);
1711                    }
1712                });
1713                pageFile.flush();
1714                // after the index update such that partial removal does not leave dangling references in the index.
1715                journal.removeDataFiles(filesToGc);
1716            } finally {
1717                this.indexLock.writeLock().unlock();
1718            }
1719
1720        } finally {
1721            checkpointLock.writeLock().unlock();
1722        }
1723    }
1724
1725    /**
1726     * @param tx
1727     * @throws IOException
1728     */
1729    Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1730        MDC.put("activemq.persistenceDir", getDirectory().getName());
1731        LOG.debug("Checkpoint started.");
1732
1733        // reflect last update exclusive of current checkpoint
1734        Location lastUpdate = metadata.lastUpdate;
1735
1736        metadata.state = OPEN_STATE;
1737        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1738        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
1739        Location[] inProgressTxRange = getInProgressTxLocationRange();
1740        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1741        tx.store(metadata.page, metadataMarshaller, true);
1742
1743        final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>();
1744        if (cleanup) {
1745
1746            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1747            gcCandidateSet.addAll(completeFileSet);
1748
1749            if (LOG.isTraceEnabled()) {
1750                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1751            }
1752
1753            if (lastUpdate != null) {
1754                // we won't delete past the last update, ackCompaction journal can be a candidate in error
1755                gcCandidateSet.removeAll(new TreeSet<Integer>(gcCandidateSet.tailSet(lastUpdate.getDataFileId())));
1756            }
1757
1758            // Don't GC files under replication
1759            if( journalFilesBeingReplicated!=null ) {
1760                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1761            }
1762
1763            if (metadata.producerSequenceIdTrackerLocation != null) {
1764                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
1765                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
1766                    // rewrite so we don't prevent gc
1767                    metadata.producerSequenceIdTracker.setModified(true);
1768                    if (LOG.isTraceEnabled()) {
1769                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
1770                    }
1771                }
1772                gcCandidateSet.remove(dataFileId);
1773                if (LOG.isTraceEnabled()) {
1774                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet);
1775                }
1776            }
1777
1778            if (metadata.ackMessageFileMapLocation != null) {
1779                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
1780                gcCandidateSet.remove(dataFileId);
1781                if (LOG.isTraceEnabled()) {
1782                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet);
1783                }
1784            }
1785
1786            // Don't GC files referenced by in-progress tx
1787            if (inProgressTxRange[0] != null) {
1788                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1789                    gcCandidateSet.remove(pendingTx);
1790                }
1791            }
1792            if (LOG.isTraceEnabled()) {
1793                LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1794            }
1795
1796            // Go through all the destinations to see if any of them can remove GC candidates.
1797            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1798                if( gcCandidateSet.isEmpty() ) {
1799                    break;
1800                }
1801
1802                // Use a visitor to cut down the number of pages that we load
1803                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1804                    int last=-1;
1805                    @Override
1806                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1807                        if( first==null ) {
1808                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1809                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1810                                subset.remove(second.getDataFileId());
1811                            }
1812                            return !subset.isEmpty();
1813                        } else if( second==null ) {
1814                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1815                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1816                                subset.remove(first.getDataFileId());
1817                            }
1818                            return !subset.isEmpty();
1819                        } else {
1820                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1821                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1822                                subset.remove(first.getDataFileId());
1823                            }
1824                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1825                                subset.remove(second.getDataFileId());
1826                            }
1827                            return !subset.isEmpty();
1828                        }
1829                    }
1830
1831                    @Override
1832                    public void visit(List<Location> keys, List<Long> values) {
1833                        for (Location l : keys) {
1834                            int fileId = l.getDataFileId();
1835                            if( last != fileId ) {
1836                                gcCandidateSet.remove(fileId);
1837                                last = fileId;
1838                            }
1839                        }
1840                    }
1841                });
1842
1843                // Durable Subscription
1844                if (entry.getValue().subLocations != null) {
1845                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
1846                    while (iter.hasNext()) {
1847                        Entry<String, Location> subscription = iter.next();
1848                        int dataFileId = subscription.getValue().getDataFileId();
1849
1850                        // Move subscription along if it has no outstanding messages that need ack'd
1851                        // and its in the last log file in the journal.
1852                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
1853                            final StoredDestination destination = entry.getValue();
1854                            final String subscriptionKey = subscription.getKey();
1855                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1856
1857                            // When pending is size one that is the next message Id meaning there
1858                            // are no pending messages currently.
1859                            if (pendingAcks == null || pendingAcks.isEmpty() ||
1860                                (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
1861
1862                                if (LOG.isTraceEnabled()) {
1863                                    LOG.trace("Found candidate for rewrite: sub {} on {} from file {}", subscriptionKey, entry.getKey(), dataFileId);
1864                                }
1865
1866                                final KahaSubscriptionCommand kahaSub =
1867                                    destination.subscriptions.get(tx, subscriptionKey);
1868                                destination.subLocations.put(
1869                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
1870
1871                                // Skips the remove from candidates if we rewrote the subscription
1872                                // in order to prevent duplicate subscription commands on recover.
1873                                // If another subscription is on the same file and isn't rewritten
1874                                // than it will remove the file from the set.
1875                                continue;
1876                            }
1877                        }
1878
1879                        if (LOG.isTraceEnabled()) {
1880                            final StoredDestination destination = entry.getValue();
1881                            final String subscriptionKey = subscription.getKey();
1882                            final SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1883                            LOG.trace("sub {} on {} in dataFile {} has pendingCount {}", subscriptionKey, entry.getKey(), dataFileId, pendingAcks.rangeSize()-1);
1884                        }
1885                        gcCandidateSet.remove(dataFileId);
1886                    }
1887                }
1888
1889                if (LOG.isTraceEnabled()) {
1890                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1891                }
1892            }
1893
1894            // check we are not deleting file with ack for in-use journal files
1895            if (LOG.isTraceEnabled()) {
1896                LOG.trace("gc candidates: " + gcCandidateSet);
1897                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
1898            }
1899
1900            boolean ackMessageFileMapMod = false;
1901            Iterator<Integer> candidates = gcCandidateSet.iterator();
1902            while (candidates.hasNext()) {
1903                Integer candidate = candidates.next();
1904                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
1905                if (referencedFileIds != null) {
1906                    for (Integer referencedFileId : referencedFileIds) {
1907                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
1908                            // active file that is not targeted for deletion is referenced so don't delete
1909                            candidates.remove();
1910                            break;
1911                        }
1912                    }
1913                    if (gcCandidateSet.contains(candidate)) {
1914                        ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
1915                    } else {
1916                        if (LOG.isTraceEnabled()) {
1917                            LOG.trace("not removing data file: " + candidate
1918                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1919                        }
1920                    }
1921                }
1922            }
1923
1924            if (!gcCandidateSet.isEmpty()) {
1925                LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
1926                for (Integer candidate : gcCandidateSet) {
1927                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
1928                        ackMessageFileMapMod |= ackFiles.remove(candidate);
1929                    }
1930                }
1931                if (ackMessageFileMapMod) {
1932                    checkpointUpdate(tx, false);
1933                }
1934            } else if (isEnableAckCompaction()) {
1935                if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
1936                    // First check length of journal to make sure it makes sense to even try.
1937                    //
1938                    // If there is only one journal file with Acks in it we don't need to move
1939                    // it since it won't be chained to any later logs.
1940                    //
1941                    // If the logs haven't grown since the last time then we need to compact
1942                    // otherwise there seems to still be room for growth and we don't need to incur
1943                    // the overhead.  Depending on configuration this check can be avoided and
1944                    // Ack compaction will run any time the store has not GC'd a journal file in
1945                    // the configured amount of cycles.
1946                    if (metadata.ackMessageFileMap.size() > 1 &&
1947                        (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
1948
1949                        LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
1950                        try {
1951                            scheduler.execute(new AckCompactionRunner());
1952                        } catch (Exception ex) {
1953                            LOG.warn("Error on queueing the Ack Compactor", ex);
1954                        }
1955                    } else {
1956                        LOG.trace("Journal activity detected, no Ack compaction scheduled.");
1957                    }
1958
1959                    checkPointCyclesWithNoGC = 0;
1960                } else {
1961                    LOG.trace("Not yet time to check for compaction: {} of {} cycles",
1962                              checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
1963                }
1964
1965                journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
1966            }
1967        }
1968        MDC.remove("activemq.persistenceDir");
1969
1970        LOG.debug("Checkpoint done.");
1971        return gcCandidateSet;
1972    }
1973
1974    private final class AckCompactionRunner implements Runnable {
1975
1976        @Override
1977        public void run() {
1978
1979            int journalToAdvance = -1;
1980            Set<Integer> journalLogsReferenced = new HashSet<Integer>();
1981
1982            //flag to know whether the ack forwarding completed without an exception
1983            boolean forwarded = false;
1984
1985            try {
1986                //acquire the checkpoint lock to prevent other threads from
1987                //running a checkpoint while this is running
1988                //
1989                //Normally this task runs on the same executor as the checkpoint task
1990                //so this ack compaction runner wouldn't run at the same time as the checkpoint task.
1991                //
1992                //However, there are two cases where this isn't always true.
1993                //First, the checkpoint() method is public and can be called through the
1994                //PersistenceAdapter interface by someone at the same time this is running.
1995                //Second, a checkpoint is called during shutdown without using the executor.
1996                //
1997                //In the future it might be better to just remove the checkpointLock entirely
1998                //and only use the executor but this would need to be examined for any unintended
1999                //consequences
2000                checkpointLock.readLock().lock();
2001
2002                try {
2003
2004                    // Lock index to capture the ackMessageFileMap data
2005                    indexLock.writeLock().lock();
2006
2007                    // Map keys might not be sorted, find the earliest log file to forward acks
2008                    // from and move only those, future cycles can chip away at more as needed.
2009                    // We won't move files that are themselves rewritten on a previous compaction.
2010                    List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
2011                    Collections.sort(journalFileIds);
2012                    for (Integer journalFileId : journalFileIds) {
2013                        DataFile current = journal.getDataFileById(journalFileId);
2014                        if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
2015                            journalToAdvance = journalFileId;
2016                            break;
2017                        }
2018                    }
2019
2020                    // Check if we found one, or if we only found the current file being written to.
2021                    if (journalToAdvance == -1 || blockedFromCompaction(journalToAdvance)) {
2022                        return;
2023                    }
2024
2025                    journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
2026
2027                } finally {
2028                    indexLock.writeLock().unlock();
2029                }
2030
2031                try {
2032                    // Background rewrite of the old acks
2033                    forwardAllAcks(journalToAdvance, journalLogsReferenced);
2034                    forwarded = true;
2035                } catch (IOException ioe) {
2036                    LOG.error("Forwarding of acks failed", ioe);
2037                    brokerService.handleIOException(ioe);
2038                } catch (Throwable e) {
2039                    LOG.error("Forwarding of acks failed", e);
2040                    brokerService.handleIOException(IOExceptionSupport.create(e));
2041                }
2042            } finally {
2043                checkpointLock.readLock().unlock();
2044            }
2045
2046            try {
2047                if (forwarded) {
2048                    // Checkpoint with changes from the ackMessageFileMap
2049                    checkpointUpdate(false);
2050                }
2051            } catch (IOException ioe) {
2052                LOG.error("Checkpoint failed", ioe);
2053                brokerService.handleIOException(ioe);
2054            } catch (Throwable e) {
2055                LOG.error("Checkpoint failed", e);
2056                brokerService.handleIOException(IOExceptionSupport.create(e));
2057            }
2058        }
2059    }
2060
2061    // called with the index lock held
2062    private boolean blockedFromCompaction(int journalToAdvance) {
2063        // don't forward the current data file
2064        if (journalToAdvance == journal.getCurrentDataFileId()) {
2065            return true;
2066        }
2067        // don't forward any data file with inflight transaction records because it will whack the tx - data file link
2068        // in the ack map when all acks are migrated (now that the ack map is not just for acks)
2069        // TODO: prepare records can be dropped but completion records (maybe only commit outcomes) need to be migrated
2070        // as part of the forward work.
2071        Location[] inProgressTxRange = getInProgressTxLocationRange();
2072        if (inProgressTxRange[0] != null) {
2073            for (int pendingTx = inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
2074                if (journalToAdvance == pendingTx) {
2075                    LOG.trace("Compaction target:{} blocked by inflight transaction records: {}", journalToAdvance, inProgressTxRange);
2076                    return true;
2077                }
2078            }
2079        }
2080        return false;
2081    }
2082
2083    private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
2084        LOG.trace("Attempting to move all acks in journal:{} to the front. Referenced files:{}", journalToRead, journalLogsReferenced);
2085
2086        DataFile forwardsFile = journal.reserveDataFile();
2087        forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
2088        LOG.trace("Reserved file for forwarded acks: {}", forwardsFile);
2089
2090        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
2091
2092        try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
2093            KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
2094            compactionMarker.setSourceDataFileId(journalToRead);
2095            compactionMarker.setRewriteType(forwardsFile.getTypeCode());
2096
2097            ByteSequence payload = toByteSequence(compactionMarker);
2098            appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2099            LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
2100
2101            final Location limit = new Location(journalToRead + 1, 0);
2102            Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0), limit);
2103            while (nextLocation != null) {
2104                JournalCommand<?> command = null;
2105                try {
2106                    command = load(nextLocation);
2107                } catch (IOException ex) {
2108                    LOG.trace("Error loading command during ack forward: {}", nextLocation);
2109                }
2110
2111                if (shouldForward(command)) {
2112                    payload = toByteSequence(command);
2113                    Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2114                    updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
2115                }
2116
2117                nextLocation = getNextLocationForAckForward(nextLocation, limit);
2118            }
2119        }
2120
2121        LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
2122
2123        // Lock index while we update the ackMessageFileMap.
2124        indexLock.writeLock().lock();
2125
2126        // Update the ack map with the new locations of the acks
2127        for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
2128            Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
2129            if (referenceFileIds == null) {
2130                referenceFileIds = new HashSet<Integer>();
2131                referenceFileIds.addAll(entry.getValue());
2132                metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
2133            } else {
2134                referenceFileIds.addAll(entry.getValue());
2135            }
2136        }
2137
2138        // remove the old location data from the ack map so that the old journal log file can
2139        // be removed on next GC.
2140        metadata.ackMessageFileMap.remove(journalToRead);
2141
2142        indexLock.writeLock().unlock();
2143
2144        LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
2145    }
2146
2147    private boolean shouldForward(JournalCommand<?> command) {
2148        boolean result = false;
2149        if (command != null) {
2150            if (command instanceof KahaRemoveMessageCommand) {
2151                result = true;
2152            } else if (command instanceof KahaCommitCommand) {
2153                KahaCommitCommand kahaCommitCommand = (KahaCommitCommand) command;
2154                if (kahaCommitCommand.hasTransactionInfo() && kahaCommitCommand.getTransactionInfo().hasXaTransactionId()) {
2155                    result = true;
2156                }
2157            }
2158        }
2159        return result;
2160    }
2161
2162    private Location getNextLocationForAckForward(final Location nextLocation, final Location limit) {
2163        //getNextLocation() can throw an IOException, we should handle it and set
2164        //nextLocation to null and abort gracefully
2165        //Should not happen in the normal case
2166        Location location = null;
2167        try {
2168            location = journal.getNextLocation(nextLocation, limit);
2169        } catch (IOException e) {
2170            LOG.warn("Failed to load next journal location after: {}, reason: {}", nextLocation, e);
2171            if (LOG.isDebugEnabled()) {
2172                LOG.debug("Failed to load next journal location after: {}", nextLocation, e);
2173            }
2174        }
2175        return location;
2176    }
2177
2178    final Runnable nullCompletionCallback = new Runnable() {
2179        @Override
2180        public void run() {
2181        }
2182    };
2183
2184    private Location checkpointProducerAudit() throws IOException {
2185        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
2186            ByteArrayOutputStream baos = new ByteArrayOutputStream();
2187            ObjectOutputStream oout = new ObjectOutputStream(baos);
2188            oout.writeObject(metadata.producerSequenceIdTracker);
2189            oout.flush();
2190            oout.close();
2191            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2192            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
2193            try {
2194                location.getLatch().await();
2195                if (location.getException().get() != null) {
2196                    throw location.getException().get();
2197                }
2198            } catch (InterruptedException e) {
2199                throw new InterruptedIOException(e.toString());
2200            }
2201            return location;
2202        }
2203        return metadata.producerSequenceIdTrackerLocation;
2204    }
2205
2206    private Location checkpointAckMessageFileMap() throws IOException {
2207        ByteArrayOutputStream baos = new ByteArrayOutputStream();
2208        ObjectOutputStream oout = new ObjectOutputStream(baos);
2209        oout.writeObject(metadata.ackMessageFileMap);
2210        oout.flush();
2211        oout.close();
2212        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2213        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
2214        try {
2215            location.getLatch().await();
2216        } catch (InterruptedException e) {
2217            throw new InterruptedIOException(e.toString());
2218        }
2219        return location;
2220    }
2221
2222    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
2223
2224        ByteSequence sequence = toByteSequence(subscription);
2225        Location location = journal.write(sequence, nullCompletionCallback) ;
2226
2227        try {
2228            location.getLatch().await();
2229        } catch (InterruptedException e) {
2230            throw new InterruptedIOException(e.toString());
2231        }
2232        return location;
2233    }
2234
2235    public HashSet<Integer> getJournalFilesBeingReplicated() {
2236        return journalFilesBeingReplicated;
2237    }
2238
2239    // /////////////////////////////////////////////////////////////////
2240    // StoredDestination related implementation methods.
2241    // /////////////////////////////////////////////////////////////////
2242
2243    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
2244
2245    static class MessageKeys {
2246        final String messageId;
2247        final Location location;
2248
2249        public MessageKeys(String messageId, Location location) {
2250            this.messageId=messageId;
2251            this.location=location;
2252        }
2253
2254        @Override
2255        public String toString() {
2256            return "["+messageId+","+location+"]";
2257        }
2258    }
2259
2260    static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
2261        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
2262
2263        @Override
2264        public MessageKeys readPayload(DataInput dataIn) throws IOException {
2265            return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
2266        }
2267
2268        @Override
2269        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
2270            dataOut.writeUTF(object.messageId);
2271            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
2272        }
2273    }
2274
2275    class LastAck {
2276        long lastAckedSequence;
2277        byte priority;
2278
2279        public LastAck(LastAck source) {
2280            this.lastAckedSequence = source.lastAckedSequence;
2281            this.priority = source.priority;
2282        }
2283
2284        public LastAck() {
2285            this.priority = MessageOrderIndex.HI;
2286        }
2287
2288        public LastAck(long ackLocation) {
2289            this.lastAckedSequence = ackLocation;
2290            this.priority = MessageOrderIndex.LO;
2291        }
2292
2293        public LastAck(long ackLocation, byte priority) {
2294            this.lastAckedSequence = ackLocation;
2295            this.priority = priority;
2296        }
2297
2298        @Override
2299        public String toString() {
2300            return "[" + lastAckedSequence + ":" + priority + "]";
2301        }
2302    }
2303
2304    protected class LastAckMarshaller implements Marshaller<LastAck> {
2305
2306        @Override
2307        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
2308            dataOut.writeLong(object.lastAckedSequence);
2309            dataOut.writeByte(object.priority);
2310        }
2311
2312        @Override
2313        public LastAck readPayload(DataInput dataIn) throws IOException {
2314            LastAck lastAcked = new LastAck();
2315            lastAcked.lastAckedSequence = dataIn.readLong();
2316            if (metadata.version >= 3) {
2317                lastAcked.priority = dataIn.readByte();
2318            }
2319            return lastAcked;
2320        }
2321
2322        @Override
2323        public int getFixedSize() {
2324            return 9;
2325        }
2326
2327        @Override
2328        public LastAck deepCopy(LastAck source) {
2329            return new LastAck(source);
2330        }
2331
2332        @Override
2333        public boolean isDeepCopySupported() {
2334            return true;
2335        }
2336    }
2337
2338    class StoredDestination {
2339
2340        MessageOrderIndex orderIndex = new MessageOrderIndex();
2341        BTreeIndex<Location, Long> locationIndex;
2342        BTreeIndex<String, Long> messageIdIndex;
2343
2344        // These bits are only set for Topics
2345        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
2346        BTreeIndex<String, LastAck> subscriptionAcks;
2347        HashMap<String, MessageOrderCursor> subscriptionCursors;
2348        ListIndex<String, SequenceSet> ackPositions;
2349        ListIndex<String, Location> subLocations;
2350
2351        // Transient data used to track which Messages are no longer needed.
2352        final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
2353
2354        public void trackPendingAdd(Long seq) {
2355            orderIndex.trackPendingAdd(seq);
2356        }
2357
2358        public void trackPendingAddComplete(Long seq) {
2359            orderIndex.trackPendingAddComplete(seq);
2360        }
2361
2362        @Override
2363        public String toString() {
2364            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
2365        }
2366    }
2367
2368    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
2369
2370        @Override
2371        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
2372            final StoredDestination value = new StoredDestination();
2373            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2374            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
2375            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
2376
2377            if (dataIn.readBoolean()) {
2378                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
2379                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
2380                if (metadata.version >= 4) {
2381                    value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
2382                } else {
2383                    // upgrade
2384                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2385                        @Override
2386                        public void execute(Transaction tx) throws IOException {
2387                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
2388
2389                            if (metadata.version >= 3) {
2390                                // migrate
2391                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
2392                                        new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
2393                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
2394                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
2395                                oldAckPositions.load(tx);
2396
2397
2398                                // Do the initial build of the data in memory before writing into the store
2399                                // based Ack Positions List to avoid a lot of disk thrashing.
2400                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
2401                                while (iterator.hasNext()) {
2402                                    Entry<Long, HashSet<String>> entry = iterator.next();
2403
2404                                    for(String subKey : entry.getValue()) {
2405                                        SequenceSet pendingAcks = temp.get(subKey);
2406                                        if (pendingAcks == null) {
2407                                            pendingAcks = new SequenceSet();
2408                                            temp.put(subKey, pendingAcks);
2409                                        }
2410
2411                                        pendingAcks.add(entry.getKey());
2412                                    }
2413                                }
2414                            }
2415                            // Now move the pending messages to ack data into the store backed
2416                            // structure.
2417                            value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2418                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2419                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2420                            value.ackPositions.load(tx);
2421                            for(String subscriptionKey : temp.keySet()) {
2422                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
2423                            }
2424
2425                        }
2426                    });
2427                }
2428
2429                if (metadata.version >= 5) {
2430                    value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
2431                } else {
2432                    // upgrade
2433                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2434                        @Override
2435                        public void execute(Transaction tx) throws IOException {
2436                            value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2437                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2438                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2439                            value.subLocations.load(tx);
2440                        }
2441                    });
2442                }
2443            }
2444            if (metadata.version >= 2) {
2445                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2446                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2447            } else {
2448                // upgrade
2449                pageFile.tx().execute(new Transaction.Closure<IOException>() {
2450                    @Override
2451                    public void execute(Transaction tx) throws IOException {
2452                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2453                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2454                        value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2455                        value.orderIndex.lowPriorityIndex.load(tx);
2456
2457                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2458                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2459                        value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2460                        value.orderIndex.highPriorityIndex.load(tx);
2461                    }
2462                });
2463            }
2464
2465            return value;
2466        }
2467
2468        @Override
2469        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
2470            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
2471            dataOut.writeLong(value.locationIndex.getPageId());
2472            dataOut.writeLong(value.messageIdIndex.getPageId());
2473            if (value.subscriptions != null) {
2474                dataOut.writeBoolean(true);
2475                dataOut.writeLong(value.subscriptions.getPageId());
2476                dataOut.writeLong(value.subscriptionAcks.getPageId());
2477                dataOut.writeLong(value.ackPositions.getHeadPageId());
2478                dataOut.writeLong(value.subLocations.getHeadPageId());
2479            } else {
2480                dataOut.writeBoolean(false);
2481            }
2482            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
2483            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
2484        }
2485    }
2486
2487    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
2488        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
2489
2490        @Override
2491        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
2492            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
2493            rc.mergeFramed((InputStream)dataIn);
2494            return rc;
2495        }
2496
2497        @Override
2498        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
2499            object.writeFramed((OutputStream)dataOut);
2500        }
2501    }
2502
2503    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2504        String key = key(destination);
2505        StoredDestination rc = storedDestinations.get(key);
2506        if (rc == null) {
2507            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
2508            rc = loadStoredDestination(tx, key, topic);
2509            // Cache it. We may want to remove/unload destinations from the
2510            // cache that are not used for a while
2511            // to reduce memory usage.
2512            storedDestinations.put(key, rc);
2513        }
2514        return rc;
2515    }
2516
2517    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2518        String key = key(destination);
2519        StoredDestination rc = storedDestinations.get(key);
2520        if (rc == null && metadata.destinations.containsKey(tx, key)) {
2521            rc = getStoredDestination(destination, tx);
2522        }
2523        return rc;
2524    }
2525
2526    /**
2527     * @param tx
2528     * @param key
2529     * @param topic
2530     * @return
2531     * @throws IOException
2532     */
2533    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
2534        // Try to load the existing indexes..
2535        StoredDestination rc = metadata.destinations.get(tx, key);
2536        if (rc == null) {
2537            // Brand new destination.. allocate indexes for it.
2538            rc = new StoredDestination();
2539            rc.orderIndex.allocate(tx);
2540            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
2541            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
2542
2543            if (topic) {
2544                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
2545                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
2546                rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2547                rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2548            }
2549            metadata.destinations.put(tx, key, rc);
2550        }
2551
2552        // Configure the marshalers and load.
2553        rc.orderIndex.load(tx);
2554
2555        // Figure out the next key using the last entry in the destination.
2556        rc.orderIndex.configureLast(tx);
2557
2558        rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE);
2559        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2560        rc.locationIndex.load(tx);
2561
2562        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
2563        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2564        rc.messageIdIndex.load(tx);
2565
2566        // If it was a topic...
2567        if (topic) {
2568
2569            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
2570            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
2571            rc.subscriptions.load(tx);
2572
2573            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
2574            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
2575            rc.subscriptionAcks.load(tx);
2576
2577            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2578            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2579            rc.ackPositions.load(tx);
2580
2581            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2582            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2583            rc.subLocations.load(tx);
2584
2585            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
2586
2587            if (metadata.version < 3) {
2588
2589                // on upgrade need to fill ackLocation with available messages past last ack
2590                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2591                    Entry<String, LastAck> entry = iterator.next();
2592                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
2593                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
2594                        Long sequence = orderIterator.next().getKey();
2595                        addAckLocation(tx, rc, sequence, entry.getKey());
2596                    }
2597                    // modify so it is upgraded
2598                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
2599                }
2600            }
2601
2602
2603            // Configure the subscription cache
2604            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2605                Entry<String, LastAck> entry = iterator.next();
2606                rc.subscriptionCache.add(entry.getKey());
2607            }
2608
2609            if (rc.orderIndex.nextMessageId == 0) {
2610                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
2611                if (!rc.subscriptionAcks.isEmpty(tx)) {
2612                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
2613                        Entry<String, LastAck> entry = iterator.next();
2614                        rc.orderIndex.nextMessageId =
2615                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
2616                    }
2617                }
2618            } else {
2619                // update based on ackPositions for unmatched, last entry is always the next
2620                Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
2621                while (subscriptions.hasNext()) {
2622                    Entry<String, SequenceSet> subscription = subscriptions.next();
2623                    SequenceSet pendingAcks = subscription.getValue();
2624                    if (pendingAcks != null && !pendingAcks.isEmpty()) {
2625                        for (Long sequenceId : pendingAcks) {
2626                            rc.orderIndex.nextMessageId = Math.max(rc.orderIndex.nextMessageId, sequenceId);
2627                        }
2628                    }
2629                }
2630            }
2631        }
2632
2633        if (metadata.version < VERSION) {
2634            // store again after upgrade
2635            metadata.destinations.put(tx, key, rc);
2636        }
2637        return rc;
2638    }
2639
2640    /**
2641     * This is a map to cache MessageStores for a specific
2642     * KahaDestination key
2643     */
2644    protected final ConcurrentMap<String, MessageStore> storeCache =
2645       new ConcurrentHashMap<>();
2646
2647    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
2648        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2649        if (sequences == null) {
2650            sequences = new SequenceSet();
2651            sequences.add(messageSequence);
2652            sd.ackPositions.add(tx, subscriptionKey, sequences);
2653        } else {
2654            sequences.add(messageSequence);
2655            sd.ackPositions.put(tx, subscriptionKey, sequences);
2656        }
2657    }
2658
2659    // new sub is interested in potentially all existing messages
2660    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2661        SequenceSet allOutstanding = new SequenceSet();
2662        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
2663        while (iterator.hasNext()) {
2664            SequenceSet set = iterator.next().getValue();
2665            for (Long entry : set) {
2666                allOutstanding.add(entry);
2667            }
2668        }
2669        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
2670    }
2671
2672    // on a new message add, all existing subs are interested in this message
2673    private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
2674        for(String subscriptionKey : sd.subscriptionCache) {
2675            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2676            if (sequences == null) {
2677                sequences = new SequenceSet();
2678                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2679                sd.ackPositions.add(tx, subscriptionKey, sequences);
2680            } else {
2681                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2682                sd.ackPositions.put(tx, subscriptionKey, sequences);
2683            }
2684       }
2685    }
2686
2687    private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2688        if (!sd.ackPositions.isEmpty(tx)) {
2689            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2690            if (sequences == null || sequences.isEmpty()) {
2691                return;
2692            }
2693
2694            ArrayList<Long> unreferenced = new ArrayList<Long>();
2695
2696            for(Long sequenceId : sequences) {
2697                if(!isSequenceReferenced(tx, sd, sequenceId)) {
2698                    unreferenced.add(sequenceId);
2699                }
2700            }
2701
2702            for(Long sequenceId : unreferenced) {
2703                // Find all the entries that need to get deleted.
2704                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2705                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2706
2707                // Do the actual deletes.
2708                for (Entry<Long, MessageKeys> entry : deletes) {
2709                    sd.locationIndex.remove(tx, entry.getValue().location);
2710                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2711                    sd.orderIndex.remove(tx, entry.getKey());
2712                }
2713            }
2714        }
2715    }
2716
2717    private boolean isSequenceReferenced(final Transaction tx, final StoredDestination sd, final Long sequenceId) throws IOException {
2718        for(String subscriptionKey : sd.subscriptionCache) {
2719            SequenceSet sequence = sd.ackPositions.get(tx, subscriptionKey);
2720            if (sequence != null && sequence.contains(sequenceId)) {
2721                return true;
2722            }
2723        }
2724        return false;
2725    }
2726
2727    /**
2728     * @param tx
2729     * @param sd
2730     * @param subscriptionKey
2731     * @param messageSequence
2732     * @throws IOException
2733     */
2734    private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
2735        // Remove the sub from the previous location set..
2736        if (messageSequence != null) {
2737            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2738            if (range != null && !range.isEmpty()) {
2739                range.remove(messageSequence);
2740                if (!range.isEmpty()) {
2741                    sd.ackPositions.put(tx, subscriptionKey, range);
2742                } else {
2743                    sd.ackPositions.remove(tx, subscriptionKey);
2744                }
2745
2746                // Check if the message is reference by any other subscription.
2747                if (isSequenceReferenced(tx, sd, messageSequence)) {
2748                    return;
2749                }
2750                // Find all the entries that need to get deleted.
2751                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2752                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2753
2754                // Do the actual deletes.
2755                for (Entry<Long, MessageKeys> entry : deletes) {
2756                    sd.locationIndex.remove(tx, entry.getValue().location);
2757                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2758                    sd.orderIndex.remove(tx, entry.getKey());
2759                }
2760            }
2761        }
2762    }
2763
2764    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2765        return sd.subscriptionAcks.get(tx, subscriptionKey);
2766    }
2767
2768    protected SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2769        if (sd.ackPositions != null) {
2770            final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2771            return messageSequences;
2772        }
2773
2774        return null;
2775    }
2776
2777    protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2778        if (sd.ackPositions != null) {
2779            SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2780            if (messageSequences != null) {
2781                long result = messageSequences.rangeSize();
2782                // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2783                return result > 0 ? result - 1 : 0;
2784            }
2785        }
2786
2787        return 0;
2788    }
2789
2790    protected String key(KahaDestination destination) {
2791        return destination.getType().getNumber() + ":" + destination.getName();
2792    }
2793
2794    // /////////////////////////////////////////////////////////////////
2795    // Transaction related implementation methods.
2796    // /////////////////////////////////////////////////////////////////
2797    @SuppressWarnings("rawtypes")
2798    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2799    @SuppressWarnings("rawtypes")
2800    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2801
2802    @SuppressWarnings("rawtypes")
2803    private List<Operation> getInflightTx(KahaTransactionInfo info) {
2804        TransactionId key = TransactionIdConversion.convert(info);
2805        List<Operation> tx;
2806        synchronized (inflightTransactions) {
2807            tx = inflightTransactions.get(key);
2808            if (tx == null) {
2809                tx = Collections.synchronizedList(new ArrayList<Operation>());
2810                inflightTransactions.put(key, tx);
2811            }
2812        }
2813        return tx;
2814    }
2815
2816    @SuppressWarnings("unused")
2817    private TransactionId key(KahaTransactionInfo transactionInfo) {
2818        return TransactionIdConversion.convert(transactionInfo);
2819    }
2820
2821    abstract class Operation <T extends JournalCommand<T>> {
2822        final T command;
2823        final Location location;
2824
2825        public Operation(T command, Location location) {
2826            this.command = command;
2827            this.location = location;
2828        }
2829
2830        public Location getLocation() {
2831            return location;
2832        }
2833
2834        public T getCommand() {
2835            return command;
2836        }
2837
2838        abstract public void execute(Transaction tx) throws IOException;
2839    }
2840
2841    class AddOperation extends Operation<KahaAddMessageCommand> {
2842        final IndexAware runWithIndexLock;
2843        public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) {
2844            super(command, location);
2845            this.runWithIndexLock = runWithIndexLock;
2846        }
2847
2848        @Override
2849        public void execute(Transaction tx) throws IOException {
2850            long seq = updateIndex(tx, command, location);
2851            if (runWithIndexLock != null) {
2852                runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
2853            }
2854        }
2855    }
2856
2857    class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
2858
2859        public RemoveOperation(KahaRemoveMessageCommand command, Location location) {
2860            super(command, location);
2861        }
2862
2863        @Override
2864        public void execute(Transaction tx) throws IOException {
2865            updateIndex(tx, command, location);
2866        }
2867    }
2868
2869    // /////////////////////////////////////////////////////////////////
2870    // Initialization related implementation methods.
2871    // /////////////////////////////////////////////////////////////////
2872
2873    private PageFile createPageFile() throws IOException {
2874        if (indexDirectory == null) {
2875            indexDirectory = directory;
2876        }
2877        IOHelper.mkdirs(indexDirectory);
2878        PageFile index = new PageFile(indexDirectory, "db");
2879        index.setEnableWriteThread(isEnableIndexWriteAsync());
2880        index.setWriteBatchSize(getIndexWriteBatchSize());
2881        index.setPageCacheSize(indexCacheSize);
2882        index.setUseLFRUEviction(isUseIndexLFRUEviction());
2883        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2884        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2885        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2886        index.setEnablePageCaching(isEnableIndexPageCaching());
2887        return index;
2888    }
2889
2890    protected Journal createJournal() throws IOException {
2891        Journal manager = new Journal();
2892        manager.setDirectory(directory);
2893        manager.setMaxFileLength(getJournalMaxFileLength());
2894        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2895        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2896        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2897        manager.setArchiveDataLogs(isArchiveDataLogs());
2898        manager.setSizeAccumulator(journalSize);
2899        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2900        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
2901        manager.setPreallocationStrategy(
2902                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
2903        manager.setJournalDiskSyncStrategy(journalDiskSyncStrategy);
2904        if (getDirectoryArchive() != null) {
2905            IOHelper.mkdirs(getDirectoryArchive());
2906            manager.setDirectoryArchive(getDirectoryArchive());
2907        }
2908        return manager;
2909    }
2910
2911    private Metadata createMetadata() {
2912        Metadata md = new Metadata();
2913        md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
2914        md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
2915        return md;
2916    }
2917
2918    public int getJournalMaxWriteBatchSize() {
2919        return journalMaxWriteBatchSize;
2920    }
2921
2922    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2923        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2924    }
2925
2926    public File getDirectory() {
2927        return directory;
2928    }
2929
2930    public void setDirectory(File directory) {
2931        this.directory = directory;
2932    }
2933
2934    public boolean isDeleteAllMessages() {
2935        return deleteAllMessages;
2936    }
2937
2938    public void setDeleteAllMessages(boolean deleteAllMessages) {
2939        this.deleteAllMessages = deleteAllMessages;
2940    }
2941
2942    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2943        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2944    }
2945
2946    public int getIndexWriteBatchSize() {
2947        return setIndexWriteBatchSize;
2948    }
2949
2950    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2951        this.enableIndexWriteAsync = enableIndexWriteAsync;
2952    }
2953
2954    boolean isEnableIndexWriteAsync() {
2955        return enableIndexWriteAsync;
2956    }
2957
2958    /**
2959     * @deprecated use {@link #getJournalDiskSyncStrategyEnum} or {@link #getJournalDiskSyncStrategy} instead
2960     * @return
2961     */
2962    public boolean isEnableJournalDiskSyncs() {
2963        return journalDiskSyncStrategy == JournalDiskSyncStrategy.ALWAYS;
2964    }
2965
2966    /**
2967     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
2968     * @param syncWrites
2969     */
2970    public void setEnableJournalDiskSyncs(boolean syncWrites) {
2971        if (syncWrites) {
2972            journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
2973        } else {
2974            journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER;
2975        }
2976    }
2977
2978    public JournalDiskSyncStrategy getJournalDiskSyncStrategyEnum() {
2979        return journalDiskSyncStrategy;
2980    }
2981
2982    public String getJournalDiskSyncStrategy() {
2983        return journalDiskSyncStrategy.name();
2984    }
2985
2986    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
2987        this.journalDiskSyncStrategy = JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase());
2988    }
2989
2990    public long getJournalDiskSyncInterval() {
2991        return journalDiskSyncInterval;
2992    }
2993
2994    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
2995        this.journalDiskSyncInterval = journalDiskSyncInterval;
2996    }
2997
2998    public long getCheckpointInterval() {
2999        return checkpointInterval;
3000    }
3001
3002    public void setCheckpointInterval(long checkpointInterval) {
3003        this.checkpointInterval = checkpointInterval;
3004    }
3005
3006    public long getCleanupInterval() {
3007        return cleanupInterval;
3008    }
3009
3010    public void setCleanupInterval(long cleanupInterval) {
3011        this.cleanupInterval = cleanupInterval;
3012    }
3013
3014    public boolean getCleanupOnStop() {
3015        return cleanupOnStop;
3016    }
3017
3018    public void setCleanupOnStop(boolean cleanupOnStop) {
3019        this.cleanupOnStop = cleanupOnStop;
3020    }
3021
3022    public void setJournalMaxFileLength(int journalMaxFileLength) {
3023        this.journalMaxFileLength = journalMaxFileLength;
3024    }
3025
3026    public int getJournalMaxFileLength() {
3027        return journalMaxFileLength;
3028    }
3029
3030    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
3031        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
3032    }
3033
3034    public int getMaxFailoverProducersToTrack() {
3035        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
3036    }
3037
3038    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
3039        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
3040    }
3041
3042    public int getFailoverProducersAuditDepth() {
3043        return this.metadata.producerSequenceIdTracker.getAuditDepth();
3044    }
3045
3046    public PageFile getPageFile() throws IOException {
3047        if (pageFile == null) {
3048            pageFile = createPageFile();
3049        }
3050        return pageFile;
3051    }
3052
3053    public Journal getJournal() throws IOException {
3054        if (journal == null) {
3055            journal = createJournal();
3056        }
3057        return journal;
3058    }
3059
3060    protected Metadata getMetadata() {
3061        return metadata;
3062    }
3063
3064    public boolean isFailIfDatabaseIsLocked() {
3065        return failIfDatabaseIsLocked;
3066    }
3067
3068    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
3069        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
3070    }
3071
3072    public boolean isIgnoreMissingJournalfiles() {
3073        return ignoreMissingJournalfiles;
3074    }
3075
3076    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
3077        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
3078    }
3079
3080    public int getIndexCacheSize() {
3081        return indexCacheSize;
3082    }
3083
3084    public void setIndexCacheSize(int indexCacheSize) {
3085        this.indexCacheSize = indexCacheSize;
3086    }
3087
3088    public boolean isCheckForCorruptJournalFiles() {
3089        return checkForCorruptJournalFiles;
3090    }
3091
3092    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
3093        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
3094    }
3095
3096    public boolean isChecksumJournalFiles() {
3097        return checksumJournalFiles;
3098    }
3099
3100    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
3101        this.checksumJournalFiles = checksumJournalFiles;
3102    }
3103
3104    @Override
3105    public void setBrokerService(BrokerService brokerService) {
3106        this.brokerService = brokerService;
3107    }
3108
3109    /**
3110     * @return the archiveDataLogs
3111     */
3112    public boolean isArchiveDataLogs() {
3113        return this.archiveDataLogs;
3114    }
3115
3116    /**
3117     * @param archiveDataLogs the archiveDataLogs to set
3118     */
3119    public void setArchiveDataLogs(boolean archiveDataLogs) {
3120        this.archiveDataLogs = archiveDataLogs;
3121    }
3122
3123    /**
3124     * @return the directoryArchive
3125     */
3126    public File getDirectoryArchive() {
3127        return this.directoryArchive;
3128    }
3129
3130    /**
3131     * @param directoryArchive the directoryArchive to set
3132     */
3133    public void setDirectoryArchive(File directoryArchive) {
3134        this.directoryArchive = directoryArchive;
3135    }
3136
3137    public boolean isArchiveCorruptedIndex() {
3138        return archiveCorruptedIndex;
3139    }
3140
3141    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
3142        this.archiveCorruptedIndex = archiveCorruptedIndex;
3143    }
3144
3145    public float getIndexLFUEvictionFactor() {
3146        return indexLFUEvictionFactor;
3147    }
3148
3149    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
3150        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
3151    }
3152
3153    public boolean isUseIndexLFRUEviction() {
3154        return useIndexLFRUEviction;
3155    }
3156
3157    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
3158        this.useIndexLFRUEviction = useIndexLFRUEviction;
3159    }
3160
3161    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
3162        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
3163    }
3164
3165    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
3166        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
3167    }
3168
3169    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
3170        this.enableIndexPageCaching = enableIndexPageCaching;
3171    }
3172
3173    public boolean isEnableIndexDiskSyncs() {
3174        return enableIndexDiskSyncs;
3175    }
3176
3177    public boolean isEnableIndexRecoveryFile() {
3178        return enableIndexRecoveryFile;
3179    }
3180
3181    public boolean isEnableIndexPageCaching() {
3182        return enableIndexPageCaching;
3183    }
3184
3185    // /////////////////////////////////////////////////////////////////
3186    // Internal conversion methods.
3187    // /////////////////////////////////////////////////////////////////
3188
3189    class MessageOrderCursor{
3190        long defaultCursorPosition;
3191        long lowPriorityCursorPosition;
3192        long highPriorityCursorPosition;
3193        MessageOrderCursor(){
3194        }
3195
3196        MessageOrderCursor(long position){
3197            this.defaultCursorPosition=position;
3198            this.lowPriorityCursorPosition=position;
3199            this.highPriorityCursorPosition=position;
3200        }
3201
3202        MessageOrderCursor(MessageOrderCursor other){
3203            this.defaultCursorPosition=other.defaultCursorPosition;
3204            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3205            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3206        }
3207
3208        MessageOrderCursor copy() {
3209            return new MessageOrderCursor(this);
3210        }
3211
3212        void reset() {
3213            this.defaultCursorPosition=0;
3214            this.highPriorityCursorPosition=0;
3215            this.lowPriorityCursorPosition=0;
3216        }
3217
3218        void increment() {
3219            if (defaultCursorPosition!=0) {
3220                defaultCursorPosition++;
3221            }
3222            if (highPriorityCursorPosition!=0) {
3223                highPriorityCursorPosition++;
3224            }
3225            if (lowPriorityCursorPosition!=0) {
3226                lowPriorityCursorPosition++;
3227            }
3228        }
3229
3230        @Override
3231        public String toString() {
3232           return "MessageOrderCursor:[def:" + defaultCursorPosition
3233                   + ", low:" + lowPriorityCursorPosition
3234                   + ", high:" +  highPriorityCursorPosition + "]";
3235        }
3236
3237        public void sync(MessageOrderCursor other) {
3238            this.defaultCursorPosition=other.defaultCursorPosition;
3239            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3240            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3241        }
3242    }
3243
3244    class MessageOrderIndex {
3245        static final byte HI = 9;
3246        static final byte LO = 0;
3247        static final byte DEF = 4;
3248
3249        long nextMessageId;
3250        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
3251        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
3252        BTreeIndex<Long, MessageKeys> highPriorityIndex;
3253        final MessageOrderCursor cursor = new MessageOrderCursor();
3254        Long lastDefaultKey;
3255        Long lastHighKey;
3256        Long lastLowKey;
3257        byte lastGetPriority;
3258        final List<Long> pendingAdditions = new LinkedList<Long>();
3259
3260        MessageKeys remove(Transaction tx, Long key) throws IOException {
3261            MessageKeys result = defaultPriorityIndex.remove(tx, key);
3262            if (result == null && highPriorityIndex!=null) {
3263                result = highPriorityIndex.remove(tx, key);
3264                if (result ==null && lowPriorityIndex!=null) {
3265                    result = lowPriorityIndex.remove(tx, key);
3266                }
3267            }
3268            return result;
3269        }
3270
3271        void load(Transaction tx) throws IOException {
3272            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3273            defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
3274            defaultPriorityIndex.load(tx);
3275            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3276            lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
3277            lowPriorityIndex.load(tx);
3278            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3279            highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
3280            highPriorityIndex.load(tx);
3281        }
3282
3283        void allocate(Transaction tx) throws IOException {
3284            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3285            if (metadata.version >= 2) {
3286                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3287                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3288            }
3289        }
3290
3291        void configureLast(Transaction tx) throws IOException {
3292            // Figure out the next key using the last entry in the destination.
3293            TreeSet<Long> orderedSet = new TreeSet<Long>();
3294
3295            addLast(orderedSet, highPriorityIndex, tx);
3296            addLast(orderedSet, defaultPriorityIndex, tx);
3297            addLast(orderedSet, lowPriorityIndex, tx);
3298
3299            if (!orderedSet.isEmpty()) {
3300                nextMessageId = orderedSet.last() + 1;
3301            }
3302        }
3303
3304        private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException {
3305            if (index != null) {
3306                Entry<Long, MessageKeys> lastEntry = index.getLast(tx);
3307                if (lastEntry != null) {
3308                    orderedSet.add(lastEntry.getKey());
3309                }
3310            }
3311        }
3312
3313        void clear(Transaction tx) throws IOException {
3314            this.remove(tx);
3315            this.resetCursorPosition();
3316            this.allocate(tx);
3317            this.load(tx);
3318            this.configureLast(tx);
3319        }
3320
3321        void remove(Transaction tx) throws IOException {
3322            defaultPriorityIndex.clear(tx);
3323            defaultPriorityIndex.unload(tx);
3324            tx.free(defaultPriorityIndex.getPageId());
3325            if (lowPriorityIndex != null) {
3326                lowPriorityIndex.clear(tx);
3327                lowPriorityIndex.unload(tx);
3328
3329                tx.free(lowPriorityIndex.getPageId());
3330            }
3331            if (highPriorityIndex != null) {
3332                highPriorityIndex.clear(tx);
3333                highPriorityIndex.unload(tx);
3334                tx.free(highPriorityIndex.getPageId());
3335            }
3336        }
3337
3338        void resetCursorPosition() {
3339            this.cursor.reset();
3340            lastDefaultKey = null;
3341            lastHighKey = null;
3342            lastLowKey = null;
3343        }
3344
3345        void setBatch(Transaction tx, Long sequence) throws IOException {
3346            if (sequence != null) {
3347                Long nextPosition = new Long(sequence.longValue() + 1);
3348                lastDefaultKey = sequence;
3349                cursor.defaultCursorPosition = nextPosition.longValue();
3350                lastHighKey = sequence;
3351                cursor.highPriorityCursorPosition = nextPosition.longValue();
3352                lastLowKey = sequence;
3353                cursor.lowPriorityCursorPosition = nextPosition.longValue();
3354            }
3355        }
3356
3357        void setBatch(Transaction tx, LastAck last) throws IOException {
3358            setBatch(tx, last.lastAckedSequence);
3359            if (cursor.defaultCursorPosition == 0
3360                    && cursor.highPriorityCursorPosition == 0
3361                    && cursor.lowPriorityCursorPosition == 0) {
3362                long next = last.lastAckedSequence + 1;
3363                switch (last.priority) {
3364                    case DEF:
3365                        cursor.defaultCursorPosition = next;
3366                        cursor.highPriorityCursorPosition = next;
3367                        break;
3368                    case HI:
3369                        cursor.highPriorityCursorPosition = next;
3370                        break;
3371                    case LO:
3372                        cursor.lowPriorityCursorPosition = next;
3373                        cursor.defaultCursorPosition = next;
3374                        cursor.highPriorityCursorPosition = next;
3375                        break;
3376                }
3377            }
3378        }
3379
3380        void stoppedIterating() {
3381            if (lastDefaultKey!=null) {
3382                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
3383            }
3384            if (lastHighKey!=null) {
3385                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
3386            }
3387            if (lastLowKey!=null) {
3388                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
3389            }
3390            lastDefaultKey = null;
3391            lastHighKey = null;
3392            lastLowKey = null;
3393        }
3394
3395        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
3396                throws IOException {
3397            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
3398                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
3399            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
3400                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
3401            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
3402                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
3403            }
3404        }
3405
3406        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
3407                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
3408
3409            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null);
3410            deletes.add(iterator.next());
3411        }
3412
3413        long getNextMessageId() {
3414            return nextMessageId++;
3415        }
3416
3417        void revertNextMessageId() {
3418            nextMessageId--;
3419        }
3420
3421        MessageKeys get(Transaction tx, Long key) throws IOException {
3422            MessageKeys result = defaultPriorityIndex.get(tx, key);
3423            if (result == null) {
3424                result = highPriorityIndex.get(tx, key);
3425                if (result == null) {
3426                    result = lowPriorityIndex.get(tx, key);
3427                    lastGetPriority = LO;
3428                } else {
3429                    lastGetPriority = HI;
3430                }
3431            } else {
3432                lastGetPriority = DEF;
3433            }
3434            return result;
3435        }
3436
3437        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
3438            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
3439                return defaultPriorityIndex.put(tx, key, value);
3440            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
3441                return highPriorityIndex.put(tx, key, value);
3442            } else {
3443                return lowPriorityIndex.put(tx, key, value);
3444            }
3445        }
3446
3447        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
3448            return new MessageOrderIterator(tx,cursor,this);
3449        }
3450
3451        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
3452            return new MessageOrderIterator(tx,m,this);
3453        }
3454
3455        public byte lastGetPriority() {
3456            return lastGetPriority;
3457        }
3458
3459        public boolean alreadyDispatched(Long sequence) {
3460            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
3461                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
3462                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
3463        }
3464
3465        public void trackPendingAdd(Long seq) {
3466            synchronized (pendingAdditions) {
3467                pendingAdditions.add(seq);
3468            }
3469        }
3470
3471        public void trackPendingAddComplete(Long seq) {
3472            synchronized (pendingAdditions) {
3473                pendingAdditions.remove(seq);
3474            }
3475        }
3476
3477        public Long minPendingAdd() {
3478            synchronized (pendingAdditions) {
3479                if (!pendingAdditions.isEmpty()) {
3480                    return pendingAdditions.get(0);
3481                } else {
3482                    return null;
3483                }
3484            }
3485        }
3486
3487
3488        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
3489            Iterator<Entry<Long, MessageKeys>>currentIterator;
3490            final Iterator<Entry<Long, MessageKeys>>highIterator;
3491            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
3492            final Iterator<Entry<Long, MessageKeys>>lowIterator;
3493
3494            MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException {
3495                Long pendingAddLimiter = messageOrderIndex.minPendingAdd();
3496                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter);
3497                if (highPriorityIndex != null) {
3498                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter);
3499                } else {
3500                    this.highIterator = null;
3501                }
3502                if (lowPriorityIndex != null) {
3503                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter);
3504                } else {
3505                    this.lowIterator = null;
3506                }
3507            }
3508
3509            @Override
3510            public boolean hasNext() {
3511                if (currentIterator == null) {
3512                    if (highIterator != null) {
3513                        if (highIterator.hasNext()) {
3514                            currentIterator = highIterator;
3515                            return currentIterator.hasNext();
3516                        }
3517                        if (defaultIterator.hasNext()) {
3518                            currentIterator = defaultIterator;
3519                            return currentIterator.hasNext();
3520                        }
3521                        if (lowIterator.hasNext()) {
3522                            currentIterator = lowIterator;
3523                            return currentIterator.hasNext();
3524                        }
3525                        return false;
3526                    } else {
3527                        currentIterator = defaultIterator;
3528                        return currentIterator.hasNext();
3529                    }
3530                }
3531                if (highIterator != null) {
3532                    if (currentIterator.hasNext()) {
3533                        return true;
3534                    }
3535                    if (currentIterator == highIterator) {
3536                        if (defaultIterator.hasNext()) {
3537                            currentIterator = defaultIterator;
3538                            return currentIterator.hasNext();
3539                        }
3540                        if (lowIterator.hasNext()) {
3541                            currentIterator = lowIterator;
3542                            return currentIterator.hasNext();
3543                        }
3544                        return false;
3545                    }
3546
3547                    if (currentIterator == defaultIterator) {
3548                        if (lowIterator.hasNext()) {
3549                            currentIterator = lowIterator;
3550                            return currentIterator.hasNext();
3551                        }
3552                        return false;
3553                    }
3554                }
3555                return currentIterator.hasNext();
3556            }
3557
3558            @Override
3559            public Entry<Long, MessageKeys> next() {
3560                Entry<Long, MessageKeys> result = currentIterator.next();
3561                if (result != null) {
3562                    Long key = result.getKey();
3563                    if (highIterator != null) {
3564                        if (currentIterator == defaultIterator) {
3565                            lastDefaultKey = key;
3566                        } else if (currentIterator == highIterator) {
3567                            lastHighKey = key;
3568                        } else {
3569                            lastLowKey = key;
3570                        }
3571                    } else {
3572                        lastDefaultKey = key;
3573                    }
3574                }
3575                return result;
3576            }
3577
3578            @Override
3579            public void remove() {
3580                throw new UnsupportedOperationException();
3581            }
3582
3583        }
3584    }
3585
3586    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
3587        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
3588
3589        @Override
3590        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
3591            ByteArrayOutputStream baos = new ByteArrayOutputStream();
3592            ObjectOutputStream oout = new ObjectOutputStream(baos);
3593            oout.writeObject(object);
3594            oout.flush();
3595            oout.close();
3596            byte[] data = baos.toByteArray();
3597            dataOut.writeInt(data.length);
3598            dataOut.write(data);
3599        }
3600
3601        @Override
3602        @SuppressWarnings("unchecked")
3603        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
3604            int dataLen = dataIn.readInt();
3605            byte[] data = new byte[dataLen];
3606            dataIn.readFully(data);
3607            ByteArrayInputStream bais = new ByteArrayInputStream(data);
3608            ObjectInputStream oin = new ObjectInputStream(bais);
3609            try {
3610                return (HashSet<String>) oin.readObject();
3611            } catch (ClassNotFoundException cfe) {
3612                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
3613                ioe.initCause(cfe);
3614                throw ioe;
3615            }
3616        }
3617    }
3618
3619    public File getIndexDirectory() {
3620        return indexDirectory;
3621    }
3622
3623    public void setIndexDirectory(File indexDirectory) {
3624        this.indexDirectory = indexDirectory;
3625    }
3626
3627    interface IndexAware {
3628        public void sequenceAssignedWithIndexLocked(long index);
3629    }
3630
3631    public String getPreallocationScope() {
3632        return preallocationScope;
3633    }
3634
3635    public void setPreallocationScope(String preallocationScope) {
3636        this.preallocationScope = preallocationScope;
3637    }
3638
3639    public String getPreallocationStrategy() {
3640        return preallocationStrategy;
3641    }
3642
3643    public void setPreallocationStrategy(String preallocationStrategy) {
3644        this.preallocationStrategy = preallocationStrategy;
3645    }
3646
3647    public int getCompactAcksAfterNoGC() {
3648        return compactAcksAfterNoGC;
3649    }
3650
3651    /**
3652     * Sets the number of GC cycles where no journal logs were removed before an attempt to
3653     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
3654     * <p>
3655     * A value of -1 will disable this feature.
3656     *
3657     * @param compactAcksAfterNoGC
3658     *      Number of empty GC cycles before we rewrite old ACKS.
3659     */
3660    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
3661        this.compactAcksAfterNoGC = compactAcksAfterNoGC;
3662    }
3663
3664    /**
3665     * Returns whether Ack compaction will ignore that the store is still growing
3666     * and run more often.
3667     *
3668     * @return the compactAcksIgnoresStoreGrowth current value.
3669     */
3670    public boolean isCompactAcksIgnoresStoreGrowth() {
3671        return compactAcksIgnoresStoreGrowth;
3672    }
3673
3674    /**
3675     * Configure if Ack compaction will occur regardless of continued growth of the
3676     * journal logs meaning that the store has not run out of space yet.  Because the
3677     * compaction operation can be costly this value is defaulted to off and the Ack
3678     * compaction is only done when it seems that the store cannot grow and larger.
3679     *
3680     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
3681     */
3682    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
3683        this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
3684    }
3685
3686    /**
3687     * Returns whether Ack compaction is enabled
3688     *
3689     * @return enableAckCompaction
3690     */
3691    public boolean isEnableAckCompaction() {
3692        return enableAckCompaction;
3693    }
3694
3695    /**
3696     * Configure if the Ack compaction task should be enabled to run
3697     *
3698     * @param enableAckCompaction
3699     */
3700    public void setEnableAckCompaction(boolean enableAckCompaction) {
3701        this.enableAckCompaction = enableAckCompaction;
3702    }
3703}