001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb;
018
019import static org.apache.activemq.store.kahadb.disk.journal.Location.NOT_SET;
020
021import java.io.ByteArrayInputStream;
022import java.io.ByteArrayOutputStream;
023import java.io.DataInput;
024import java.io.DataOutput;
025import java.io.EOFException;
026import java.io.File;
027import java.io.IOException;
028import java.io.InputStream;
029import java.io.InterruptedIOException;
030import java.io.ObjectInputStream;
031import java.io.ObjectOutputStream;
032import java.io.OutputStream;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Date;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.Iterator;
041import java.util.LinkedHashMap;
042import java.util.LinkedHashSet;
043import java.util.LinkedList;
044import java.util.List;
045import java.util.Map;
046import java.util.Map.Entry;
047import java.util.Set;
048import java.util.SortedSet;
049import java.util.TreeMap;
050import java.util.TreeSet;
051
052import java.util.concurrent.Executors;
053import java.util.concurrent.ScheduledExecutorService;
054import java.util.concurrent.ThreadFactory;
055import java.util.concurrent.TimeUnit;
056import java.util.concurrent.atomic.AtomicBoolean;
057import java.util.concurrent.atomic.AtomicLong;
058import java.util.concurrent.atomic.AtomicReference;
059import java.util.concurrent.locks.ReentrantReadWriteLock;
060
061import org.apache.activemq.ActiveMQMessageAuditNoSync;
062import org.apache.activemq.broker.BrokerService;
063import org.apache.activemq.broker.BrokerServiceAware;
064import org.apache.activemq.command.MessageAck;
065import org.apache.activemq.command.TransactionId;
066import org.apache.activemq.openwire.OpenWireFormat;
067import org.apache.activemq.protobuf.Buffer;
068import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
069import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
070import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
071import org.apache.activemq.store.kahadb.data.KahaDestination;
072import org.apache.activemq.store.kahadb.data.KahaEntryType;
073import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
074import org.apache.activemq.store.kahadb.data.KahaProducerAuditCommand;
075import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
076import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
077import org.apache.activemq.store.kahadb.data.KahaRewrittenDataFileCommand;
078import org.apache.activemq.store.kahadb.data.KahaRollbackCommand;
079import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
080import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
081import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
082import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
083import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
084import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
085import org.apache.activemq.store.kahadb.disk.index.ListIndex;
086import org.apache.activemq.store.kahadb.disk.journal.DataFile;
087import org.apache.activemq.store.kahadb.disk.journal.Journal;
088import org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
089import org.apache.activemq.store.kahadb.disk.journal.Location;
090import org.apache.activemq.store.kahadb.disk.journal.TargetedDataFileAppender;
091import org.apache.activemq.store.kahadb.disk.page.Page;
092import org.apache.activemq.store.kahadb.disk.page.PageFile;
093import org.apache.activemq.store.kahadb.disk.page.Transaction;
094import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
095import org.apache.activemq.store.kahadb.disk.util.LongMarshaller;
096import org.apache.activemq.store.kahadb.disk.util.Marshaller;
097import org.apache.activemq.store.kahadb.disk.util.Sequence;
098import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
099import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
100import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
101import org.apache.activemq.util.ByteSequence;
102import org.apache.activemq.util.DataByteArrayInputStream;
103import org.apache.activemq.util.DataByteArrayOutputStream;
104import org.apache.activemq.util.IOExceptionSupport;
105import org.apache.activemq.util.IOHelper;
106import org.apache.activemq.util.ServiceStopper;
107import org.apache.activemq.util.ServiceSupport;
108import org.apache.activemq.util.ThreadPoolUtils;
109import org.slf4j.Logger;
110import org.slf4j.LoggerFactory;
111import org.slf4j.MDC;
112
113public abstract class MessageDatabase extends ServiceSupport implements BrokerServiceAware {
114
115    protected BrokerService brokerService;
116
117    public static final String PROPERTY_LOG_SLOW_ACCESS_TIME = "org.apache.activemq.store.kahadb.LOG_SLOW_ACCESS_TIME";
118    public static final int LOG_SLOW_ACCESS_TIME = Integer.getInteger(PROPERTY_LOG_SLOW_ACCESS_TIME, 0);
119    public static final File DEFAULT_DIRECTORY = new File("KahaDB");
120    protected static final Buffer UNMATCHED;
121    static {
122        UNMATCHED = new Buffer(new byte[]{});
123    }
124    private static final Logger LOG = LoggerFactory.getLogger(MessageDatabase.class);
125
126    static final int CLOSED_STATE = 1;
127    static final int OPEN_STATE = 2;
128    static final long NOT_ACKED = -1;
129
130    static final int VERSION = 5;
131
132    static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
133
134    protected class Metadata {
135        protected Page<Metadata> page;
136        protected int state;
137        protected BTreeIndex<String, StoredDestination> destinations;
138        protected Location lastUpdate;
139        protected Location firstInProgressTransactionLocation;
140        protected Location producerSequenceIdTrackerLocation = null;
141        protected Location ackMessageFileMapLocation = null;
142        protected transient ActiveMQMessageAuditNoSync producerSequenceIdTracker = new ActiveMQMessageAuditNoSync();
143        protected transient Map<Integer, Set<Integer>> ackMessageFileMap = new HashMap<Integer, Set<Integer>>();
144        protected int version = VERSION;
145        protected int openwireVersion = OpenWireFormat.DEFAULT_VERSION;
146
147        public void read(DataInput is) throws IOException {
148            state = is.readInt();
149            destinations = new BTreeIndex<String, StoredDestination>(pageFile, is.readLong());
150            if (is.readBoolean()) {
151                lastUpdate = LocationMarshaller.INSTANCE.readPayload(is);
152            } else {
153                lastUpdate = null;
154            }
155            if (is.readBoolean()) {
156                firstInProgressTransactionLocation = LocationMarshaller.INSTANCE.readPayload(is);
157            } else {
158                firstInProgressTransactionLocation = null;
159            }
160            try {
161                if (is.readBoolean()) {
162                    producerSequenceIdTrackerLocation = LocationMarshaller.INSTANCE.readPayload(is);
163                } else {
164                    producerSequenceIdTrackerLocation = null;
165                }
166            } catch (EOFException expectedOnUpgrade) {
167            }
168            try {
169                version = is.readInt();
170            } catch (EOFException expectedOnUpgrade) {
171                version = 1;
172            }
173            if (version >= 5 && is.readBoolean()) {
174                ackMessageFileMapLocation = LocationMarshaller.INSTANCE.readPayload(is);
175            } else {
176                ackMessageFileMapLocation = null;
177            }
178            try {
179                openwireVersion = is.readInt();
180            } catch (EOFException expectedOnUpgrade) {
181                openwireVersion = OpenWireFormat.DEFAULT_VERSION;
182            }
183            LOG.info("KahaDB is version " + version);
184        }
185
186        public void write(DataOutput os) throws IOException {
187            os.writeInt(state);
188            os.writeLong(destinations.getPageId());
189
190            if (lastUpdate != null) {
191                os.writeBoolean(true);
192                LocationMarshaller.INSTANCE.writePayload(lastUpdate, os);
193            } else {
194                os.writeBoolean(false);
195            }
196
197            if (firstInProgressTransactionLocation != null) {
198                os.writeBoolean(true);
199                LocationMarshaller.INSTANCE.writePayload(firstInProgressTransactionLocation, os);
200            } else {
201                os.writeBoolean(false);
202            }
203
204            if (producerSequenceIdTrackerLocation != null) {
205                os.writeBoolean(true);
206                LocationMarshaller.INSTANCE.writePayload(producerSequenceIdTrackerLocation, os);
207            } else {
208                os.writeBoolean(false);
209            }
210            os.writeInt(VERSION);
211            if (ackMessageFileMapLocation != null) {
212                os.writeBoolean(true);
213                LocationMarshaller.INSTANCE.writePayload(ackMessageFileMapLocation, os);
214            } else {
215                os.writeBoolean(false);
216            }
217            os.writeInt(this.openwireVersion);
218        }
219    }
220
221    class MetadataMarshaller extends VariableMarshaller<Metadata> {
222        @Override
223        public Metadata readPayload(DataInput dataIn) throws IOException {
224            Metadata rc = createMetadata();
225            rc.read(dataIn);
226            return rc;
227        }
228
229        @Override
230        public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
231            object.write(dataOut);
232        }
233    }
234
235    protected PageFile pageFile;
236    protected Journal journal;
237    protected Metadata metadata = new Metadata();
238
239    protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
240
241    protected boolean failIfDatabaseIsLocked;
242
243    protected boolean deleteAllMessages;
244    protected File directory = DEFAULT_DIRECTORY;
245    protected File indexDirectory = null;
246    protected ScheduledExecutorService scheduler;
247    private final Object schedulerLock = new Object();
248
249    protected String journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name();
250    protected boolean archiveDataLogs;
251    protected File directoryArchive;
252    protected AtomicLong journalSize = new AtomicLong(0);
253    long journalDiskSyncInterval = 1000;
254    long checkpointInterval = 5*1000;
255    long cleanupInterval = 30*1000;
256    int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
257    int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
258    boolean enableIndexWriteAsync = false;
259    int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
260    private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
261    private String preallocationStrategy = Journal.PreallocationStrategy.SPARSE_FILE.name();
262
263    protected AtomicBoolean opened = new AtomicBoolean();
264    private boolean ignoreMissingJournalfiles = false;
265    private int indexCacheSize = 10000;
266    private boolean checkForCorruptJournalFiles = false;
267    private boolean checksumJournalFiles = true;
268    protected boolean forceRecoverIndex = false;
269
270    private boolean archiveCorruptedIndex = false;
271    private boolean useIndexLFRUEviction = false;
272    private float indexLFUEvictionFactor = 0.2f;
273    private boolean enableIndexDiskSyncs = true;
274    private boolean enableIndexRecoveryFile = true;
275    private boolean enableIndexPageCaching = true;
276    ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
277
278    private boolean enableAckCompaction = false;
279    private int compactAcksAfterNoGC = 10;
280    private boolean compactAcksIgnoresStoreGrowth = false;
281    private int checkPointCyclesWithNoGC;
282    private int journalLogOnLastCompactionCheck;
283
284    //only set when using JournalDiskSyncStrategy.PERIODIC
285    protected final AtomicReference<Location> lastAsyncJournalUpdate = new AtomicReference<>();
286
287    @Override
288    public void doStart() throws Exception {
289        load();
290    }
291
292    @Override
293    public void doStop(ServiceStopper stopper) throws Exception {
294        unload();
295    }
296
297    public void allowIOResumption() {
298        if (pageFile != null) {
299            pageFile.allowIOResumption();
300        }
301        if (journal != null) {
302            journal.allowIOResumption();
303        }
304    }
305
306    private void loadPageFile() throws IOException {
307        this.indexLock.writeLock().lock();
308        try {
309            final PageFile pageFile = getPageFile();
310            pageFile.load();
311            pageFile.tx().execute(new Transaction.Closure<IOException>() {
312                @Override
313                public void execute(Transaction tx) throws IOException {
314                    if (pageFile.getPageCount() == 0) {
315                        // First time this is created.. Initialize the metadata
316                        Page<Metadata> page = tx.allocate();
317                        assert page.getPageId() == 0;
318                        page.set(metadata);
319                        metadata.page = page;
320                        metadata.state = CLOSED_STATE;
321                        metadata.destinations = new BTreeIndex<String, StoredDestination>(pageFile, tx.allocate().getPageId());
322
323                        tx.store(metadata.page, metadataMarshaller, true);
324                    } else {
325                        Page<Metadata> page = tx.load(0, metadataMarshaller);
326                        metadata = page.get();
327                        metadata.page = page;
328                    }
329                    metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE);
330                    metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller());
331                    metadata.destinations.load(tx);
332                }
333            });
334            // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted.
335            // Perhaps we should just keep an index of file
336            storedDestinations.clear();
337            pageFile.tx().execute(new Transaction.Closure<IOException>() {
338                @Override
339                public void execute(Transaction tx) throws IOException {
340                    for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) {
341                        Entry<String, StoredDestination> entry = iterator.next();
342                        StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null);
343                        storedDestinations.put(entry.getKey(), sd);
344
345                        if (checkForCorruptJournalFiles) {
346                            // sanity check the index also
347                            if (!entry.getValue().locationIndex.isEmpty(tx)) {
348                                if (entry.getValue().orderIndex.nextMessageId <= 0) {
349                                    throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey());
350                                }
351                            }
352                        }
353                    }
354                }
355            });
356            pageFile.flush();
357        } finally {
358            this.indexLock.writeLock().unlock();
359        }
360    }
361
362    private void startCheckpoint() {
363        if (checkpointInterval == 0 && cleanupInterval == 0) {
364            LOG.info("periodic checkpoint/cleanup disabled, will ocurr on clean shutdown/restart");
365            return;
366        }
367        synchronized (schedulerLock) {
368            if (scheduler == null || scheduler.isShutdown()) {
369                scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
370
371                    @Override
372                    public Thread newThread(Runnable r) {
373                        Thread schedulerThread = new Thread(r);
374
375                        schedulerThread.setName("ActiveMQ Journal Checkpoint Worker");
376                        schedulerThread.setDaemon(true);
377
378                        return schedulerThread;
379                    }
380                });
381
382                // Short intervals for check-point and cleanups
383                long delay;
384                if (journal.isJournalDiskSyncPeriodic()) {
385                    delay = Math.min(journalDiskSyncInterval > 0 ? journalDiskSyncInterval : checkpointInterval, 500);
386                } else {
387                    delay = Math.min(checkpointInterval > 0 ? checkpointInterval : cleanupInterval, 500);
388                }
389
390                scheduler.scheduleWithFixedDelay(new CheckpointRunner(), 0, delay, TimeUnit.MILLISECONDS);
391            }
392        }
393    }
394
395    private final class CheckpointRunner implements Runnable {
396
397        private long lastCheckpoint = System.currentTimeMillis();
398        private long lastCleanup = System.currentTimeMillis();
399        private long lastSync = System.currentTimeMillis();
400        private Location lastAsyncUpdate = null;
401
402        @Override
403        public void run() {
404            try {
405                // Decide on cleanup vs full checkpoint here.
406                if (opened.get()) {
407                    long now = System.currentTimeMillis();
408                    if (journal.isJournalDiskSyncPeriodic() &&
409                            journalDiskSyncInterval > 0 && (now - lastSync >= journalDiskSyncInterval)) {
410                        Location currentUpdate = lastAsyncJournalUpdate.get();
411                        if (currentUpdate != null && !currentUpdate.equals(lastAsyncUpdate)) {
412                            lastAsyncUpdate = currentUpdate;
413                            if (LOG.isTraceEnabled()) {
414                                LOG.trace("Writing trace command to trigger journal sync");
415                            }
416                            store(new KahaTraceCommand(), true, null, null);
417                        }
418                        lastSync = now;
419                    }
420                    if (cleanupInterval > 0 && (now - lastCleanup >= cleanupInterval)) {
421                        checkpointCleanup(true);
422                        lastCleanup = now;
423                        lastCheckpoint = now;
424                    } else if (checkpointInterval > 0 && (now - lastCheckpoint >= checkpointInterval)) {
425                        checkpointCleanup(false);
426                        lastCheckpoint = now;
427                    }
428                }
429            } catch (IOException ioe) {
430                LOG.error("Checkpoint failed", ioe);
431                brokerService.handleIOException(ioe);
432            } catch (Throwable e) {
433                LOG.error("Checkpoint failed", e);
434                brokerService.handleIOException(IOExceptionSupport.create(e));
435            }
436        }
437    }
438
439    public void open() throws IOException {
440        if( opened.compareAndSet(false, true) ) {
441            getJournal().start();
442            try {
443                loadPageFile();
444            } catch (Throwable t) {
445                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t);
446                if (LOG.isDebugEnabled()) {
447                    LOG.debug("Index load failure", t);
448                }
449                // try to recover index
450                try {
451                    pageFile.unload();
452                } catch (Exception ignore) {}
453                if (archiveCorruptedIndex) {
454                    pageFile.archive();
455                } else {
456                    pageFile.delete();
457                }
458                metadata = createMetadata();
459                pageFile = null;
460                loadPageFile();
461            }
462            startCheckpoint();
463            recover();
464        }
465    }
466
467    public void load() throws IOException {
468        this.indexLock.writeLock().lock();
469        IOHelper.mkdirs(directory);
470        try {
471            if (deleteAllMessages) {
472                getJournal().start();
473                getJournal().delete();
474                getJournal().close();
475                journal = null;
476                getPageFile().delete();
477                LOG.info("Persistence store purged.");
478                deleteAllMessages = false;
479            }
480
481            open();
482            store(new KahaTraceCommand().setMessage("LOADED " + new Date()));
483        } finally {
484            this.indexLock.writeLock().unlock();
485        }
486    }
487
488    public void close() throws IOException, InterruptedException {
489        if( opened.compareAndSet(true, false)) {
490            checkpointLock.writeLock().lock();
491            try {
492                if (metadata.page != null) {
493                    checkpointUpdate(true);
494                }
495                pageFile.unload();
496                metadata = createMetadata();
497            } finally {
498                checkpointLock.writeLock().unlock();
499            }
500            journal.close();
501            synchronized(schedulerLock) {
502                if (scheduler != null) {
503                    ThreadPoolUtils.shutdownGraceful(scheduler, -1);
504                    scheduler = null;
505                }
506            }
507            journalSize.set(0);
508        }
509    }
510
511    public void unload() throws IOException, InterruptedException {
512        this.indexLock.writeLock().lock();
513        try {
514            if( pageFile != null && pageFile.isLoaded() ) {
515                metadata.state = CLOSED_STATE;
516                metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0];
517
518                if (metadata.page != null) {
519                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
520                        @Override
521                        public void execute(Transaction tx) throws IOException {
522                            tx.store(metadata.page, metadataMarshaller, true);
523                        }
524                    });
525                }
526            }
527        } finally {
528            this.indexLock.writeLock().unlock();
529        }
530        close();
531    }
532
533    // public for testing
534    @SuppressWarnings("rawtypes")
535    public Location[] getInProgressTxLocationRange() {
536        Location[] range = new Location[]{null, null};
537        synchronized (inflightTransactions) {
538            if (!inflightTransactions.isEmpty()) {
539                for (List<Operation> ops : inflightTransactions.values()) {
540                    if (!ops.isEmpty()) {
541                        trackMaxAndMin(range, ops);
542                    }
543                }
544            }
545            if (!preparedTransactions.isEmpty()) {
546                for (List<Operation> ops : preparedTransactions.values()) {
547                    if (!ops.isEmpty()) {
548                        trackMaxAndMin(range, ops);
549                    }
550                }
551            }
552        }
553        return range;
554    }
555
556    @SuppressWarnings("rawtypes")
557    private void trackMaxAndMin(Location[] range, List<Operation> ops) {
558        Location t = ops.get(0).getLocation();
559        if (range[0] == null || t.compareTo(range[0]) <= 0) {
560            range[0] = t;
561        }
562        t = ops.get(ops.size() -1).getLocation();
563        if (range[1] == null || t.compareTo(range[1]) >= 0) {
564            range[1] = t;
565        }
566    }
567
568    class TranInfo {
569        TransactionId id;
570        Location location;
571
572        class opCount {
573            int add;
574            int remove;
575        }
576        HashMap<KahaDestination, opCount> destinationOpCount = new HashMap<KahaDestination, opCount>();
577
578        @SuppressWarnings("rawtypes")
579        public void track(Operation operation) {
580            if (location == null ) {
581                location = operation.getLocation();
582            }
583            KahaDestination destination;
584            boolean isAdd = false;
585            if (operation instanceof AddOperation) {
586                AddOperation add = (AddOperation) operation;
587                destination = add.getCommand().getDestination();
588                isAdd = true;
589            } else {
590                RemoveOperation removeOpperation = (RemoveOperation) operation;
591                destination = removeOpperation.getCommand().getDestination();
592            }
593            opCount opCount = destinationOpCount.get(destination);
594            if (opCount == null) {
595                opCount = new opCount();
596                destinationOpCount.put(destination, opCount);
597            }
598            if (isAdd) {
599                opCount.add++;
600            } else {
601                opCount.remove++;
602            }
603        }
604
605        @Override
606        public String toString() {
607           StringBuffer buffer = new StringBuffer();
608           buffer.append(location).append(";").append(id).append(";\n");
609           for (Entry<KahaDestination, opCount> op : destinationOpCount.entrySet()) {
610               buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';');
611           }
612           return buffer.toString();
613        }
614    }
615
616    @SuppressWarnings("rawtypes")
617    public String getTransactions() {
618
619        ArrayList<TranInfo> infos = new ArrayList<TranInfo>();
620        synchronized (inflightTransactions) {
621            if (!inflightTransactions.isEmpty()) {
622                for (Entry<TransactionId, List<Operation>> entry : inflightTransactions.entrySet()) {
623                    TranInfo info = new TranInfo();
624                    info.id = entry.getKey();
625                    for (Operation operation : entry.getValue()) {
626                        info.track(operation);
627                    }
628                    infos.add(info);
629                }
630            }
631        }
632        synchronized (preparedTransactions) {
633            if (!preparedTransactions.isEmpty()) {
634                for (Entry<TransactionId, List<Operation>> entry : preparedTransactions.entrySet()) {
635                    TranInfo info = new TranInfo();
636                    info.id = entry.getKey();
637                    for (Operation operation : entry.getValue()) {
638                        info.track(operation);
639                    }
640                    infos.add(info);
641                }
642            }
643        }
644        return infos.toString();
645    }
646
647    /**
648     * Move all the messages that were in the journal into long term storage. We
649     * just replay and do a checkpoint.
650     *
651     * @throws IOException
652     * @throws IOException
653     * @throws IllegalStateException
654     */
655    private void recover() throws IllegalStateException, IOException {
656        this.indexLock.writeLock().lock();
657        try {
658
659            long start = System.currentTimeMillis();
660            Location afterProducerAudit = recoverProducerAudit();
661            Location afterAckMessageFile = recoverAckMessageFileMap();
662            Location lastIndoubtPosition = getRecoveryPosition();
663
664            if (afterProducerAudit != null && afterProducerAudit.equals(metadata.ackMessageFileMapLocation)) {
665                // valid checkpoint, possible recover from afterAckMessageFile
666                afterProducerAudit = null;
667            }
668            Location recoveryPosition = minimum(afterProducerAudit, afterAckMessageFile);
669            recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition);
670
671            if (recoveryPosition != null) {
672                int redoCounter = 0;
673                int dataFileRotationTracker = recoveryPosition.getDataFileId();
674                LOG.info("Recovering from the journal @" + recoveryPosition);
675                while (recoveryPosition != null) {
676                    try {
677                        JournalCommand<?> message = load(recoveryPosition);
678                        metadata.lastUpdate = recoveryPosition;
679                        process(message, recoveryPosition, lastIndoubtPosition);
680                        redoCounter++;
681                    } catch (IOException failedRecovery) {
682                        if (isIgnoreMissingJournalfiles()) {
683                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
684                            // track this dud location
685                            journal.corruptRecoveryLocation(recoveryPosition);
686                        } else {
687                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
688                        }
689                    }
690                    recoveryPosition = journal.getNextLocation(recoveryPosition);
691                    // hold on to the minimum number of open files during recovery
692                    if (recoveryPosition != null && dataFileRotationTracker != recoveryPosition.getDataFileId()) {
693                        dataFileRotationTracker = recoveryPosition.getDataFileId();
694                        journal.cleanup();
695                    }
696                    if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
697                        LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
698                    }
699                }
700                if (LOG.isInfoEnabled()) {
701                    long end = System.currentTimeMillis();
702                    LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds.");
703                }
704            }
705
706            // We may have to undo some index updates.
707            pageFile.tx().execute(new Transaction.Closure<IOException>() {
708                @Override
709                public void execute(Transaction tx) throws IOException {
710                    recoverIndex(tx);
711                }
712            });
713
714            // rollback any recovered inflight local transactions, and discard any inflight XA transactions.
715            Set<TransactionId> toRollback = new HashSet<TransactionId>();
716            Set<TransactionId> toDiscard = new HashSet<TransactionId>();
717            synchronized (inflightTransactions) {
718                for (Iterator<TransactionId> it = inflightTransactions.keySet().iterator(); it.hasNext(); ) {
719                    TransactionId id = it.next();
720                    if (id.isLocalTransaction()) {
721                        toRollback.add(id);
722                    } else {
723                        toDiscard.add(id);
724                    }
725                }
726                for (TransactionId tx: toRollback) {
727                    if (LOG.isDebugEnabled()) {
728                        LOG.debug("rolling back recovered indoubt local transaction " + tx);
729                    }
730                    store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null);
731                }
732                for (TransactionId tx: toDiscard) {
733                    if (LOG.isDebugEnabled()) {
734                        LOG.debug("discarding recovered in-flight XA transaction " + tx);
735                    }
736                    inflightTransactions.remove(tx);
737                }
738            }
739
740            synchronized (preparedTransactions) {
741                for (TransactionId txId : preparedTransactions.keySet()) {
742                    LOG.warn("Recovered prepared XA TX: [{}]", txId);
743                }
744            }
745
746        } finally {
747            this.indexLock.writeLock().unlock();
748        }
749    }
750
751    @SuppressWarnings("unused")
752    private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) {
753        return TransactionIdConversion.convertToLocal(tx);
754    }
755
756    private Location minimum(Location x,
757                             Location y) {
758        Location min = null;
759        if (x != null) {
760            min = x;
761            if (y != null) {
762                int compare = y.compareTo(x);
763                if (compare < 0) {
764                    min = y;
765                }
766            }
767        } else {
768            min = y;
769        }
770        return min;
771    }
772
773    private Location recoverProducerAudit() throws IOException {
774        if (metadata.producerSequenceIdTrackerLocation != null) {
775            try {
776                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
777                ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
778                int maxNumProducers = getMaxFailoverProducersToTrack();
779                int maxAuditDepth = getFailoverProducersAuditDepth();
780                metadata.producerSequenceIdTracker = (ActiveMQMessageAuditNoSync) objectIn.readObject();
781                metadata.producerSequenceIdTracker.setAuditDepth(maxAuditDepth);
782                metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxNumProducers);
783                return getNextInitializedLocation(metadata.producerSequenceIdTrackerLocation);
784            } catch (Exception e) {
785                LOG.warn("Cannot recover message audit", e);
786                return journal.getNextLocation(null);
787            }
788        } else {
789            // got no audit stored so got to recreate via replay from start of the journal
790            return journal.getNextLocation(null);
791        }
792    }
793
794    @SuppressWarnings("unchecked")
795    private Location recoverAckMessageFileMap() throws IOException {
796        if (metadata.ackMessageFileMapLocation != null) {
797            try {
798                KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
799                ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
800                metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
801                return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
802            } catch (Exception e) {
803                LOG.warn("Cannot recover ackMessageFileMap", e);
804                return journal.getNextLocation(null);
805            }
806        } else {
807            // got no ackMessageFileMap stored so got to recreate via replay from start of the journal
808            return journal.getNextLocation(null);
809        }
810    }
811
812    protected void recoverIndex(Transaction tx) throws IOException {
813        long start = System.currentTimeMillis();
814        // It is possible index updates got applied before the journal updates..
815        // in that case we need to removed references to messages that are not in the journal
816        final Location lastAppendLocation = journal.getLastAppendLocation();
817        long undoCounter=0;
818
819        // Go through all the destinations to see if they have messages past the lastAppendLocation
820        for (StoredDestination sd : storedDestinations.values()) {
821
822            final ArrayList<Long> matches = new ArrayList<Long>();
823            // Find all the Locations that are >= than the last Append Location.
824            sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<Location, Long>(lastAppendLocation) {
825                @Override
826                protected void matched(Location key, Long value) {
827                    matches.add(value);
828                }
829            });
830
831            for (Long sequenceId : matches) {
832                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
833                sd.locationIndex.remove(tx, keys.location);
834                sd.messageIdIndex.remove(tx, keys.messageId);
835                metadata.producerSequenceIdTracker.rollback(keys.messageId);
836                undoCounter++;
837                // TODO: do we need to modify the ack positions for the pub sub case?
838            }
839        }
840
841        if (undoCounter > 0) {
842            // The rolledback operations are basically in flight journal writes.  To avoid getting
843            // these the end user should do sync writes to the journal.
844            if (LOG.isInfoEnabled()) {
845                long end = System.currentTimeMillis();
846                LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
847            }
848        }
849
850        undoCounter = 0;
851        start = System.currentTimeMillis();
852
853        // Lets be extra paranoid here and verify that all the datafiles being referenced
854        // by the indexes still exists.
855
856        final SequenceSet ss = new SequenceSet();
857        for (StoredDestination sd : storedDestinations.values()) {
858            // Use a visitor to cut down the number of pages that we load
859            sd.locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
860                int last=-1;
861
862                @Override
863                public boolean isInterestedInKeysBetween(Location first, Location second) {
864                    if( first==null ) {
865                        return !ss.contains(0, second.getDataFileId());
866                    } else if( second==null ) {
867                        return true;
868                    } else {
869                        return !ss.contains(first.getDataFileId(), second.getDataFileId());
870                    }
871                }
872
873                @Override
874                public void visit(List<Location> keys, List<Long> values) {
875                    for (Location l : keys) {
876                        int fileId = l.getDataFileId();
877                        if( last != fileId ) {
878                            ss.add(fileId);
879                            last = fileId;
880                        }
881                    }
882                }
883
884            });
885        }
886        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
887        while (!ss.isEmpty()) {
888            missingJournalFiles.add((int) ss.removeFirst());
889        }
890
891        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet()) {
892            missingJournalFiles.add(entry.getKey());
893            for (Integer i : entry.getValue()) {
894                missingJournalFiles.add(i);
895            }
896        }
897
898        missingJournalFiles.removeAll(journal.getFileMap().keySet());
899
900        if (!missingJournalFiles.isEmpty()) {
901            LOG.warn("Some journal files are missing: " + missingJournalFiles);
902        }
903
904        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
905        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
906        for (Integer missing : missingJournalFiles) {
907            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(missing, 0), new Location(missing + 1, 0)));
908        }
909
910        if (checkForCorruptJournalFiles) {
911            Collection<DataFile> dataFiles = journal.getFileMap().values();
912            for (DataFile dataFile : dataFiles) {
913                int id = dataFile.getDataFileId();
914                // eof to next file id
915                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
916                Sequence seq = dataFile.getCorruptedBlocks().getHead();
917                while (seq != null) {
918                    BTreeVisitor.BetweenVisitor<Location, Long> visitor =
919                        new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
920                    missingPredicates.add(visitor);
921                    knownCorruption.add(visitor);
922                    seq = seq.getNext();
923                }
924            }
925        }
926
927        if (!missingPredicates.isEmpty()) {
928            for (Entry<String, StoredDestination> sdEntry : storedDestinations.entrySet()) {
929                final StoredDestination sd = sdEntry.getValue();
930                final LinkedHashMap<Long, Location> matches = new LinkedHashMap<Long, Location>();
931                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates) {
932                    @Override
933                    protected void matched(Location key, Long value) {
934                        matches.put(value, key);
935                    }
936                });
937
938                // If some message references are affected by the missing data files...
939                if (!matches.isEmpty()) {
940
941                    // We either 'gracefully' recover dropping the missing messages or
942                    // we error out.
943                    if( ignoreMissingJournalfiles ) {
944                        // Update the index to remove the references to the missing data
945                        for (Long sequenceId : matches.keySet()) {
946                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
947                            sd.locationIndex.remove(tx, keys.location);
948                            sd.messageIdIndex.remove(tx, keys.messageId);
949                            LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
950                            undoCounter++;
951                            // TODO: do we need to modify the ack positions for the pub sub case?
952                        }
953                    } else {
954                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches);
955                        throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
956                    }
957                }
958            }
959        }
960
961        if (!ignoreMissingJournalfiles) {
962            if (!knownCorruption.isEmpty()) {
963                LOG.error("Detected corrupt journal files. " + knownCorruption);
964                throw new IOException("Detected corrupt journal files. " + knownCorruption);
965            }
966
967            if (!missingJournalFiles.isEmpty()) {
968                LOG.error("Detected missing journal files. " + missingJournalFiles);
969                throw new IOException("Detected missing journal files. " + missingJournalFiles);
970            }
971        }
972
973        if (undoCounter > 0) {
974            // The rolledback operations are basically in flight journal writes.  To avoid getting these the end user
975            // should do sync writes to the journal.
976            if (LOG.isInfoEnabled()) {
977                long end = System.currentTimeMillis();
978                LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds.");
979            }
980        }
981    }
982
983    private Location nextRecoveryPosition;
984    private Location lastRecoveryPosition;
985
986    public void incrementalRecover() throws IOException {
987        this.indexLock.writeLock().lock();
988        try {
989            if( nextRecoveryPosition == null ) {
990                if( lastRecoveryPosition==null ) {
991                    nextRecoveryPosition = getRecoveryPosition();
992                } else {
993                    nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
994                }
995            }
996            while (nextRecoveryPosition != null) {
997                lastRecoveryPosition = nextRecoveryPosition;
998                metadata.lastUpdate = lastRecoveryPosition;
999                JournalCommand<?> message = load(lastRecoveryPosition);
1000                process(message, lastRecoveryPosition, (IndexAware) null);
1001                nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
1002            }
1003        } finally {
1004            this.indexLock.writeLock().unlock();
1005        }
1006    }
1007
1008    public Location getLastUpdatePosition() throws IOException {
1009        return metadata.lastUpdate;
1010    }
1011
1012    private Location getRecoveryPosition() throws IOException {
1013
1014        if (!this.forceRecoverIndex) {
1015
1016            // If we need to recover the transactions..
1017            if (metadata.firstInProgressTransactionLocation != null) {
1018                return metadata.firstInProgressTransactionLocation;
1019            }
1020
1021            // Perhaps there were no transactions...
1022            if( metadata.lastUpdate!=null) {
1023                // Start replay at the record after the last one recorded in the index file.
1024                return getNextInitializedLocation(metadata.lastUpdate);
1025            }
1026        }
1027        // This loads the first position.
1028        return journal.getNextLocation(null);
1029    }
1030
1031    private Location getNextInitializedLocation(Location location) throws IOException {
1032        Location mayNotBeInitialized = journal.getNextLocation(location);
1033        if (location.getSize() == NOT_SET && mayNotBeInitialized.getSize() != NOT_SET) {
1034            // need to init size and type to skip
1035            return journal.getNextLocation(mayNotBeInitialized);
1036        } else {
1037            return mayNotBeInitialized;
1038        }
1039    }
1040
1041    protected void checkpointCleanup(final boolean cleanup) throws IOException {
1042        long start;
1043        this.indexLock.writeLock().lock();
1044        try {
1045            start = System.currentTimeMillis();
1046            if( !opened.get() ) {
1047                return;
1048            }
1049        } finally {
1050            this.indexLock.writeLock().unlock();
1051        }
1052        checkpointUpdate(cleanup);
1053        long end = System.currentTimeMillis();
1054        if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1055            if (LOG.isInfoEnabled()) {
1056                LOG.info("Slow KahaDB access: cleanup took " + (end - start));
1057            }
1058        }
1059    }
1060
1061    public ByteSequence toByteSequence(JournalCommand<?> data) throws IOException {
1062        int size = data.serializedSizeFramed();
1063        DataByteArrayOutputStream os = new DataByteArrayOutputStream(size + 1);
1064        os.writeByte(data.type().getNumber());
1065        data.writeFramed(os);
1066        return os.toByteSequence();
1067    }
1068
1069    // /////////////////////////////////////////////////////////////////
1070    // Methods call by the broker to update and query the store.
1071    // /////////////////////////////////////////////////////////////////
1072    public Location store(JournalCommand<?> data) throws IOException {
1073        return store(data, false, null,null);
1074    }
1075
1076    public Location store(JournalCommand<?> data, Runnable onJournalStoreComplete) throws IOException {
1077        return store(data, false, null, null, onJournalStoreComplete);
1078    }
1079
1080    public Location store(JournalCommand<?> data, boolean sync, IndexAware before,Runnable after) throws IOException {
1081        return store(data, sync, before, after, null);
1082    }
1083
1084    /**
1085     * All updated are are funneled through this method. The updates are converted
1086     * to a JournalMessage which is logged to the journal and then the data from
1087     * the JournalMessage is used to update the index just like it would be done
1088     * during a recovery process.
1089     */
1090    public Location store(JournalCommand<?> data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException {
1091        try {
1092            ByteSequence sequence = toByteSequence(data);
1093            Location location;
1094
1095            checkpointLock.readLock().lock();
1096            try {
1097
1098                long start = System.currentTimeMillis();
1099                location = onJournalStoreComplete == null ? journal.write(sequence, sync) : journal.write(sequence, onJournalStoreComplete) ;
1100                long start2 = System.currentTimeMillis();
1101                //Track the last async update so we know if we need to sync at the next checkpoint
1102                if (!sync && journal.isJournalDiskSyncPeriodic()) {
1103                    lastAsyncJournalUpdate.set(location);
1104                }
1105                process(data, location, before);
1106
1107                long end = System.currentTimeMillis();
1108                if (LOG_SLOW_ACCESS_TIME > 0 && end - start > LOG_SLOW_ACCESS_TIME) {
1109                    if (LOG.isInfoEnabled()) {
1110                        LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms");
1111                    }
1112                }
1113            } finally {
1114                checkpointLock.readLock().unlock();
1115            }
1116
1117            if (after != null) {
1118                after.run();
1119            }
1120
1121            if (scheduler == null && opened.get()) {
1122                startCheckpoint();
1123            }
1124            return location;
1125        } catch (IOException ioe) {
1126            LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe);
1127            brokerService.handleIOException(ioe);
1128            throw ioe;
1129        }
1130    }
1131
1132    /**
1133     * Loads a previously stored JournalMessage
1134     *
1135     * @param location
1136     * @return
1137     * @throws IOException
1138     */
1139    public JournalCommand<?> load(Location location) throws IOException {
1140        long start = System.currentTimeMillis();
1141        ByteSequence data = journal.read(location);
1142        long end = System.currentTimeMillis();
1143        if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME) {
1144            if (LOG.isInfoEnabled()) {
1145                LOG.info("Slow KahaDB access: Journal read took: "+(end-start)+" ms");
1146            }
1147        }
1148        DataByteArrayInputStream is = new DataByteArrayInputStream(data);
1149        byte readByte = is.readByte();
1150        KahaEntryType type = KahaEntryType.valueOf(readByte);
1151        if( type == null ) {
1152            try {
1153                is.close();
1154            } catch (IOException e) {}
1155            throw new IOException("Could not load journal record. Invalid location: "+location);
1156        }
1157        JournalCommand<?> message = (JournalCommand<?>)type.createMessage();
1158        message.mergeFramed(is);
1159        return message;
1160    }
1161
1162    /**
1163     * do minimal recovery till we reach the last inDoubtLocation
1164     * @param data
1165     * @param location
1166     * @param inDoubtlocation
1167     * @throws IOException
1168     */
1169    void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
1170        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
1171            process(data, location, (IndexAware) null);
1172        } else {
1173            // just recover producer audit
1174            data.visit(new Visitor() {
1175                @Override
1176                public void visit(KahaAddMessageCommand command) throws IOException {
1177                    metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1178                }
1179            });
1180        }
1181    }
1182
1183    // /////////////////////////////////////////////////////////////////
1184    // Journaled record processing methods. Once the record is journaled,
1185    // these methods handle applying the index updates. These may be called
1186    // from the recovery method too so they need to be idempotent
1187    // /////////////////////////////////////////////////////////////////
1188
1189    void process(JournalCommand<?> data, final Location location, final IndexAware onSequenceAssignedCallback) throws IOException {
1190        data.visit(new Visitor() {
1191            @Override
1192            public void visit(KahaAddMessageCommand command) throws IOException {
1193                process(command, location, onSequenceAssignedCallback);
1194            }
1195
1196            @Override
1197            public void visit(KahaRemoveMessageCommand command) throws IOException {
1198                process(command, location);
1199            }
1200
1201            @Override
1202            public void visit(KahaPrepareCommand command) throws IOException {
1203                process(command, location);
1204            }
1205
1206            @Override
1207            public void visit(KahaCommitCommand command) throws IOException {
1208                process(command, location, onSequenceAssignedCallback);
1209            }
1210
1211            @Override
1212            public void visit(KahaRollbackCommand command) throws IOException {
1213                process(command, location);
1214            }
1215
1216            @Override
1217            public void visit(KahaRemoveDestinationCommand command) throws IOException {
1218                process(command, location);
1219            }
1220
1221            @Override
1222            public void visit(KahaSubscriptionCommand command) throws IOException {
1223                process(command, location);
1224            }
1225
1226            @Override
1227            public void visit(KahaProducerAuditCommand command) throws IOException {
1228                processLocation(location);
1229            }
1230
1231            @Override
1232            public void visit(KahaAckMessageFileMapCommand command) throws IOException {
1233                processLocation(location);
1234            }
1235
1236            @Override
1237            public void visit(KahaTraceCommand command) {
1238                processLocation(location);
1239            }
1240
1241            @Override
1242            public void visit(KahaUpdateMessageCommand command) throws IOException {
1243                process(command, location);
1244            }
1245
1246            @Override
1247            public void visit(KahaRewrittenDataFileCommand command) throws IOException {
1248                process(command, location);
1249            }
1250        });
1251    }
1252
1253    @SuppressWarnings("rawtypes")
1254    protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException {
1255        if (command.hasTransactionInfo()) {
1256            List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1257            inflightTx.add(new AddOperation(command, location, runWithIndexLock));
1258        } else {
1259            this.indexLock.writeLock().lock();
1260            try {
1261                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1262                    @Override
1263                    public void execute(Transaction tx) throws IOException {
1264                        long assignedIndex = updateIndex(tx, command, location);
1265                        if (runWithIndexLock != null) {
1266                            runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex);
1267                        }
1268                    }
1269                });
1270
1271            } finally {
1272                this.indexLock.writeLock().unlock();
1273            }
1274        }
1275    }
1276
1277    protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException {
1278        this.indexLock.writeLock().lock();
1279        try {
1280            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1281                @Override
1282                public void execute(Transaction tx) throws IOException {
1283                    updateIndex(tx, command, location);
1284                }
1285            });
1286        } finally {
1287            this.indexLock.writeLock().unlock();
1288        }
1289    }
1290
1291    @SuppressWarnings("rawtypes")
1292    protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
1293        if (command.hasTransactionInfo()) {
1294           List<Operation> inflightTx = getInflightTx(command.getTransactionInfo());
1295           inflightTx.add(new RemoveOperation(command, location));
1296        } else {
1297            this.indexLock.writeLock().lock();
1298            try {
1299                pageFile.tx().execute(new Transaction.Closure<IOException>() {
1300                    @Override
1301                    public void execute(Transaction tx) throws IOException {
1302                        updateIndex(tx, command, location);
1303                    }
1304                });
1305            } finally {
1306                this.indexLock.writeLock().unlock();
1307            }
1308        }
1309    }
1310
1311    protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException {
1312        this.indexLock.writeLock().lock();
1313        try {
1314            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1315                @Override
1316                public void execute(Transaction tx) throws IOException {
1317                    updateIndex(tx, command, location);
1318                }
1319            });
1320        } finally {
1321            this.indexLock.writeLock().unlock();
1322        }
1323    }
1324
1325    protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException {
1326        this.indexLock.writeLock().lock();
1327        try {
1328            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1329                @Override
1330                public void execute(Transaction tx) throws IOException {
1331                    updateIndex(tx, command, location);
1332                }
1333            });
1334        } finally {
1335            this.indexLock.writeLock().unlock();
1336        }
1337    }
1338
1339    protected void processLocation(final Location location) {
1340        this.indexLock.writeLock().lock();
1341        try {
1342            metadata.lastUpdate = location;
1343        } finally {
1344            this.indexLock.writeLock().unlock();
1345        }
1346    }
1347
1348    @SuppressWarnings("rawtypes")
1349    protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException {
1350        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1351        List<Operation> inflightTx;
1352        synchronized (inflightTransactions) {
1353            inflightTx = inflightTransactions.remove(key);
1354            if (inflightTx == null) {
1355                inflightTx = preparedTransactions.remove(key);
1356            }
1357        }
1358        if (inflightTx == null) {
1359            // only non persistent messages in this tx
1360            if (before != null) {
1361                before.sequenceAssignedWithIndexLocked(-1);
1362            }
1363            return;
1364        }
1365
1366        final List<Operation> messagingTx = inflightTx;
1367        indexLock.writeLock().lock();
1368        try {
1369            pageFile.tx().execute(new Transaction.Closure<IOException>() {
1370                @Override
1371                public void execute(Transaction tx) throws IOException {
1372                    for (Operation op : messagingTx) {
1373                        op.execute(tx);
1374                    }
1375                }
1376            });
1377            metadata.lastUpdate = location;
1378        } finally {
1379            indexLock.writeLock().unlock();
1380        }
1381    }
1382
1383    @SuppressWarnings("rawtypes")
1384    protected void process(KahaPrepareCommand command, Location location) {
1385        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1386        synchronized (inflightTransactions) {
1387            List<Operation> tx = inflightTransactions.remove(key);
1388            if (tx != null) {
1389                preparedTransactions.put(key, tx);
1390            }
1391        }
1392    }
1393
1394    @SuppressWarnings("rawtypes")
1395    protected void process(KahaRollbackCommand command, Location location)  throws IOException {
1396        TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
1397        List<Operation> updates = null;
1398        synchronized (inflightTransactions) {
1399            updates = inflightTransactions.remove(key);
1400            if (updates == null) {
1401                updates = preparedTransactions.remove(key);
1402            }
1403        }
1404    }
1405
1406    protected void process(KahaRewrittenDataFileCommand command, Location location)  throws IOException {
1407        final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1408
1409        // Mark the current journal file as a compacted file so that gc checks can skip
1410        // over logs that are smaller compaction type logs.
1411        DataFile current = journal.getDataFileById(location.getDataFileId());
1412        current.setTypeCode(command.getRewriteType());
1413
1414        if (completeFileSet.contains(command.getSourceDataFileId()) && command.getSkipIfSourceExists()) {
1415            // Move offset so that next location read jumps to next file.
1416            location.setOffset(journalMaxFileLength);
1417        }
1418    }
1419
1420    // /////////////////////////////////////////////////////////////////
1421    // These methods do the actual index updates.
1422    // /////////////////////////////////////////////////////////////////
1423
1424    protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock();
1425    private final HashSet<Integer> journalFilesBeingReplicated = new HashSet<Integer>();
1426
1427    long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
1428        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1429
1430        // Skip adding the message to the index if this is a topic and there are
1431        // no subscriptions.
1432        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
1433            return -1;
1434        }
1435
1436        // Add the message.
1437        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
1438        long id = sd.orderIndex.getNextMessageId(priority);
1439        Long previous = sd.locationIndex.put(tx, location, id);
1440        if (previous == null) {
1441            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
1442            if (previous == null) {
1443                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
1444                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
1445                    addAckLocationForNewMessage(tx, sd, id);
1446                }
1447                metadata.lastUpdate = location;
1448            } else {
1449
1450                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
1451                if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
1452                    // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
1453                    LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
1454                }
1455                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
1456                sd.locationIndex.remove(tx, location);
1457                id = -1;
1458            }
1459        } else {
1460            // restore the previous value.. Looks like this was a redo of a previously
1461            // added message. We don't want to assign it a new id as the other indexes would
1462            // be wrong..
1463            sd.locationIndex.put(tx, location, previous);
1464            metadata.lastUpdate = location;
1465        }
1466        // record this id in any event, initial send or recovery
1467        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
1468        return id;
1469    }
1470
1471    void trackPendingAdd(KahaDestination destination, Long seq) {
1472        StoredDestination sd = storedDestinations.get(key(destination));
1473        if (sd != null) {
1474            sd.trackPendingAdd(seq);
1475        }
1476    }
1477
1478    void trackPendingAddComplete(KahaDestination destination, Long seq) {
1479        StoredDestination sd = storedDestinations.get(key(destination));
1480        if (sd != null) {
1481            sd.trackPendingAddComplete(seq);
1482        }
1483    }
1484
1485    void updateIndex(Transaction tx, KahaUpdateMessageCommand updateMessageCommand, Location location) throws IOException {
1486        KahaAddMessageCommand command = updateMessageCommand.getMessage();
1487        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1488
1489        Long id = sd.messageIdIndex.get(tx, command.getMessageId());
1490        if (id != null) {
1491            MessageKeys previousKeys = sd.orderIndex.put(
1492                    tx,
1493                    command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY,
1494                    id,
1495                    new MessageKeys(command.getMessageId(), location)
1496            );
1497            sd.locationIndex.put(tx, location, id);
1498            // on first update previous is original location, on recovery/replay it may be the updated location
1499            if(previousKeys != null && !previousKeys.location.equals(location)) {
1500                sd.locationIndex.remove(tx, previousKeys.location);
1501            }
1502            metadata.lastUpdate = location;
1503        } else {
1504            //Add the message if it can't be found
1505            this.updateIndex(tx, command, location);
1506        }
1507    }
1508
1509    void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
1510        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1511        if (!command.hasSubscriptionKey()) {
1512
1513            // In the queue case we just remove the message from the index..
1514            Long sequenceId = sd.messageIdIndex.remove(tx, command.getMessageId());
1515            if (sequenceId != null) {
1516                MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
1517                if (keys != null) {
1518                    sd.locationIndex.remove(tx, keys.location);
1519                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1520                    metadata.lastUpdate = ackLocation;
1521                }  else if (LOG.isDebugEnabled()) {
1522                    LOG.debug("message not found in order index: " + sequenceId  + " for: " + command.getMessageId());
1523                }
1524            } else if (LOG.isDebugEnabled()) {
1525                LOG.debug("message not found in sequence id index: " + command.getMessageId());
1526            }
1527        } else {
1528            // In the topic case we need remove the message once it's been acked
1529            // by all the subs
1530            Long sequence = sd.messageIdIndex.get(tx, command.getMessageId());
1531
1532            // Make sure it's a valid message id...
1533            if (sequence != null) {
1534                String subscriptionKey = command.getSubscriptionKey();
1535                if (command.getAck() != UNMATCHED) {
1536                    sd.orderIndex.get(tx, sequence);
1537                    byte priority = sd.orderIndex.lastGetPriority();
1538                    sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(sequence, priority));
1539                }
1540
1541                MessageKeys keys = sd.orderIndex.get(tx, sequence);
1542                if (keys != null) {
1543                    recordAckMessageReferenceLocation(ackLocation, keys.location);
1544                }
1545                // The following method handles deleting un-referenced messages.
1546                removeAckLocation(tx, sd, subscriptionKey, sequence);
1547                metadata.lastUpdate = ackLocation;
1548            } else if (LOG.isDebugEnabled()) {
1549                LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
1550            }
1551
1552        }
1553    }
1554
1555    private void recordAckMessageReferenceLocation(Location ackLocation, Location messageLocation) {
1556        Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(Integer.valueOf(ackLocation.getDataFileId()));
1557        if (referenceFileIds == null) {
1558            referenceFileIds = new HashSet<Integer>();
1559            referenceFileIds.add(messageLocation.getDataFileId());
1560            metadata.ackMessageFileMap.put(ackLocation.getDataFileId(), referenceFileIds);
1561        } else {
1562            Integer id = Integer.valueOf(messageLocation.getDataFileId());
1563            if (!referenceFileIds.contains(id)) {
1564                referenceFileIds.add(id);
1565            }
1566        }
1567    }
1568
1569    void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException {
1570        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1571        sd.orderIndex.remove(tx);
1572
1573        sd.locationIndex.clear(tx);
1574        sd.locationIndex.unload(tx);
1575        tx.free(sd.locationIndex.getPageId());
1576
1577        sd.messageIdIndex.clear(tx);
1578        sd.messageIdIndex.unload(tx);
1579        tx.free(sd.messageIdIndex.getPageId());
1580
1581        if (sd.subscriptions != null) {
1582            sd.subscriptions.clear(tx);
1583            sd.subscriptions.unload(tx);
1584            tx.free(sd.subscriptions.getPageId());
1585
1586            sd.subscriptionAcks.clear(tx);
1587            sd.subscriptionAcks.unload(tx);
1588            tx.free(sd.subscriptionAcks.getPageId());
1589
1590            sd.ackPositions.clear(tx);
1591            sd.ackPositions.unload(tx);
1592            tx.free(sd.ackPositions.getHeadPageId());
1593
1594            sd.subLocations.clear(tx);
1595            sd.subLocations.unload(tx);
1596            tx.free(sd.subLocations.getHeadPageId());
1597        }
1598
1599        String key = key(command.getDestination());
1600        storedDestinations.remove(key);
1601        metadata.destinations.remove(tx, key);
1602    }
1603
1604    void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
1605        StoredDestination sd = getStoredDestination(command.getDestination(), tx);
1606        final String subscriptionKey = command.getSubscriptionKey();
1607
1608        // If set then we are creating it.. otherwise we are destroying the sub
1609        if (command.hasSubscriptionInfo()) {
1610            Location existing = sd.subLocations.get(tx, subscriptionKey);
1611            if (existing != null && existing.compareTo(location) == 0) {
1612                // replay on recovery, ignore
1613                LOG.trace("ignoring journal replay of replay of sub from: " + location);
1614                return;
1615            }
1616
1617            sd.subscriptions.put(tx, subscriptionKey, command);
1618            sd.subLocations.put(tx, subscriptionKey, location);
1619            long ackLocation=NOT_ACKED;
1620            if (!command.getRetroactive()) {
1621                ackLocation = sd.orderIndex.nextMessageId-1;
1622            } else {
1623                addAckLocationForRetroactiveSub(tx, sd, subscriptionKey);
1624            }
1625            sd.subscriptionAcks.put(tx, subscriptionKey, new LastAck(ackLocation));
1626            sd.subscriptionCache.add(subscriptionKey);
1627        } else {
1628            // delete the sub...
1629            sd.subscriptions.remove(tx, subscriptionKey);
1630            sd.subLocations.remove(tx, subscriptionKey);
1631            sd.subscriptionAcks.remove(tx, subscriptionKey);
1632            sd.subscriptionCache.remove(subscriptionKey);
1633            removeAckLocationsForSub(tx, sd, subscriptionKey);
1634
1635            if (sd.subscriptions.isEmpty(tx)) {
1636                // remove the stored destination
1637                KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
1638                removeDestinationCommand.setDestination(command.getDestination());
1639                updateIndex(tx, removeDestinationCommand, null);
1640            }
1641        }
1642    }
1643
1644    private void checkpointUpdate(final boolean cleanup) throws IOException {
1645        checkpointLock.writeLock().lock();
1646        try {
1647            this.indexLock.writeLock().lock();
1648            try {
1649                Set<Integer> filesToGc = pageFile.tx().execute(new Transaction.CallableClosure<Set<Integer>, IOException>() {
1650                    @Override
1651                    public Set<Integer> execute(Transaction tx) throws IOException {
1652                        return checkpointUpdate(tx, cleanup);
1653                    }
1654                });
1655                pageFile.flush();
1656                // after the index update such that partial removal does not leave dangling references in the index.
1657                journal.removeDataFiles(filesToGc);
1658            } finally {
1659                this.indexLock.writeLock().unlock();
1660            }
1661
1662        } finally {
1663            checkpointLock.writeLock().unlock();
1664        }
1665    }
1666
1667    /**
1668     * @param tx
1669     * @throws IOException
1670     */
1671    Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
1672        MDC.put("activemq.persistenceDir", getDirectory().getName());
1673        LOG.debug("Checkpoint started.");
1674
1675        // reflect last update exclusive of current checkpoint
1676        Location lastUpdate = metadata.lastUpdate;
1677
1678        metadata.state = OPEN_STATE;
1679        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
1680        metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
1681        Location[] inProgressTxRange = getInProgressTxLocationRange();
1682        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
1683        tx.store(metadata.page, metadataMarshaller, true);
1684
1685        final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>();
1686        if (cleanup) {
1687
1688            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
1689            gcCandidateSet.addAll(completeFileSet);
1690
1691            if (LOG.isTraceEnabled()) {
1692                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
1693            }
1694
1695            if (lastUpdate != null) {
1696                gcCandidateSet.remove(lastUpdate.getDataFileId());
1697            }
1698
1699            // Don't GC files under replication
1700            if( journalFilesBeingReplicated!=null ) {
1701                gcCandidateSet.removeAll(journalFilesBeingReplicated);
1702            }
1703
1704            if (metadata.producerSequenceIdTrackerLocation != null) {
1705                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
1706                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
1707                    // rewrite so we don't prevent gc
1708                    metadata.producerSequenceIdTracker.setModified(true);
1709                    if (LOG.isTraceEnabled()) {
1710                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
1711                    }
1712                }
1713                gcCandidateSet.remove(dataFileId);
1714                if (LOG.isTraceEnabled()) {
1715                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + dataFileId + ", " + gcCandidateSet);
1716                }
1717            }
1718
1719            if (metadata.ackMessageFileMapLocation != null) {
1720                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
1721                gcCandidateSet.remove(dataFileId);
1722                if (LOG.isTraceEnabled()) {
1723                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + dataFileId + ", " + gcCandidateSet);
1724                }
1725            }
1726
1727            // Don't GC files referenced by in-progress tx
1728            if (inProgressTxRange[0] != null) {
1729                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
1730                    gcCandidateSet.remove(pendingTx);
1731                }
1732            }
1733            if (LOG.isTraceEnabled()) {
1734                LOG.trace("gc candidates after tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
1735            }
1736
1737            // Go through all the destinations to see if any of them can remove GC candidates.
1738            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
1739                if( gcCandidateSet.isEmpty() ) {
1740                    break;
1741                }
1742
1743                // Use a visitor to cut down the number of pages that we load
1744                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
1745                    int last=-1;
1746                    @Override
1747                    public boolean isInterestedInKeysBetween(Location first, Location second) {
1748                        if( first==null ) {
1749                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
1750                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1751                                subset.remove(second.getDataFileId());
1752                            }
1753                            return !subset.isEmpty();
1754                        } else if( second==null ) {
1755                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
1756                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1757                                subset.remove(first.getDataFileId());
1758                            }
1759                            return !subset.isEmpty();
1760                        } else {
1761                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
1762                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
1763                                subset.remove(first.getDataFileId());
1764                            }
1765                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
1766                                subset.remove(second.getDataFileId());
1767                            }
1768                            return !subset.isEmpty();
1769                        }
1770                    }
1771
1772                    @Override
1773                    public void visit(List<Location> keys, List<Long> values) {
1774                        for (Location l : keys) {
1775                            int fileId = l.getDataFileId();
1776                            if( last != fileId ) {
1777                                gcCandidateSet.remove(fileId);
1778                                last = fileId;
1779                            }
1780                        }
1781                    }
1782                });
1783
1784                // Durable Subscription
1785                if (entry.getValue().subLocations != null) {
1786                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
1787                    while (iter.hasNext()) {
1788                        Entry<String, Location> subscription = iter.next();
1789                        int dataFileId = subscription.getValue().getDataFileId();
1790
1791                        // Move subscription along if it has no outstanding messages that need ack'd
1792                        // and its in the last log file in the journal.
1793                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
1794                            final StoredDestination destination = entry.getValue();
1795                            final String subscriptionKey = subscription.getKey();
1796                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
1797
1798                            // When pending is size one that is the next message Id meaning there
1799                            // are no pending messages currently.
1800                            if (pendingAcks == null || pendingAcks.isEmpty() ||
1801                                (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
1802
1803                                if (LOG.isTraceEnabled()) {
1804                                    LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
1805                                }
1806
1807                                final KahaSubscriptionCommand kahaSub =
1808                                    destination.subscriptions.get(tx, subscriptionKey);
1809                                destination.subLocations.put(
1810                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
1811
1812                                // Skips the remove from candidates if we rewrote the subscription
1813                                // in order to prevent duplicate subscription commands on recover.
1814                                // If another subscription is on the same file and isn't rewritten
1815                                // than it will remove the file from the set.
1816                                continue;
1817                            }
1818                        }
1819
1820                        gcCandidateSet.remove(dataFileId);
1821                    }
1822                }
1823
1824                if (LOG.isTraceEnabled()) {
1825                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
1826                }
1827            }
1828
1829            // check we are not deleting file with ack for in-use journal files
1830            if (LOG.isTraceEnabled()) {
1831                LOG.trace("gc candidates: " + gcCandidateSet);
1832                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
1833            }
1834
1835            boolean ackMessageFileMapMod = false;
1836            Iterator<Integer> candidates = gcCandidateSet.iterator();
1837            while (candidates.hasNext()) {
1838                Integer candidate = candidates.next();
1839                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
1840                if (referencedFileIds != null) {
1841                    for (Integer referencedFileId : referencedFileIds) {
1842                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
1843                            // active file that is not targeted for deletion is referenced so don't delete
1844                            candidates.remove();
1845                            break;
1846                        }
1847                    }
1848                    if (gcCandidateSet.contains(candidate)) {
1849                        ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
1850                    } else {
1851                        if (LOG.isTraceEnabled()) {
1852                            LOG.trace("not removing data file: " + candidate
1853                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
1854                        }
1855                    }
1856                }
1857            }
1858
1859            if (!gcCandidateSet.isEmpty()) {
1860                LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
1861                for (Integer candidate : gcCandidateSet) {
1862                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
1863                        ackMessageFileMapMod |= ackFiles.remove(candidate);
1864                    }
1865                }
1866                if (ackMessageFileMapMod) {
1867                    checkpointUpdate(tx, false);
1868                }
1869            } else if (isEnableAckCompaction()) {
1870                if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
1871                    // First check length of journal to make sure it makes sense to even try.
1872                    //
1873                    // If there is only one journal file with Acks in it we don't need to move
1874                    // it since it won't be chained to any later logs.
1875                    //
1876                    // If the logs haven't grown since the last time then we need to compact
1877                    // otherwise there seems to still be room for growth and we don't need to incur
1878                    // the overhead.  Depending on configuration this check can be avoided and
1879                    // Ack compaction will run any time the store has not GC'd a journal file in
1880                    // the configured amount of cycles.
1881                    if (metadata.ackMessageFileMap.size() > 1 &&
1882                        (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
1883
1884                        LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
1885                        try {
1886                            scheduler.execute(new AckCompactionRunner());
1887                        } catch (Exception ex) {
1888                            LOG.warn("Error on queueing the Ack Compactor", ex);
1889                        }
1890                    } else {
1891                        LOG.trace("Journal activity detected, no Ack compaction scheduled.");
1892                    }
1893
1894                    checkPointCyclesWithNoGC = 0;
1895                } else {
1896                    LOG.trace("Not yet time to check for compaction: {} of {} cycles",
1897                              checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
1898                }
1899
1900                journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
1901            }
1902        }
1903        MDC.remove("activemq.persistenceDir");
1904
1905        LOG.debug("Checkpoint done.");
1906        return gcCandidateSet;
1907    }
1908
1909    private final class AckCompactionRunner implements Runnable {
1910
1911        @Override
1912        public void run() {
1913
1914            int journalToAdvance = -1;
1915            Set<Integer> journalLogsReferenced = new HashSet<Integer>();
1916
1917            //flag to know whether the ack forwarding completed without an exception
1918            boolean forwarded = false;
1919
1920            try {
1921                //acquire the checkpoint lock to prevent other threads from
1922                //running a checkpoint while this is running
1923                //
1924                //Normally this task runs on the same executor as the checkpoint task
1925                //so this ack compaction runner wouldn't run at the same time as the checkpoint task.
1926                //
1927                //However, there are two cases where this isn't always true.
1928                //First, the checkpoint() method is public and can be called through the
1929                //PersistenceAdapter interface by someone at the same time this is running.
1930                //Second, a checkpoint is called during shutdown without using the executor.
1931                //
1932                //In the future it might be better to just remove the checkpointLock entirely
1933                //and only use the executor but this would need to be examined for any unintended
1934                //consequences
1935                checkpointLock.readLock().lock();
1936
1937                try {
1938
1939                    // Lock index to capture the ackMessageFileMap data
1940                    indexLock.writeLock().lock();
1941
1942                    // Map keys might not be sorted, find the earliest log file to forward acks
1943                    // from and move only those, future cycles can chip away at more as needed.
1944                    // We won't move files that are themselves rewritten on a previous compaction.
1945                    List<Integer> journalFileIds = new ArrayList<Integer>(metadata.ackMessageFileMap.keySet());
1946                    Collections.sort(journalFileIds);
1947                    for (Integer journalFileId : journalFileIds) {
1948                        DataFile current = journal.getDataFileById(journalFileId);
1949                        if (current != null && current.getTypeCode() != COMPACTED_JOURNAL_FILE) {
1950                            journalToAdvance = journalFileId;
1951                            break;
1952                        }
1953                    }
1954
1955                    // Check if we found one, or if we only found the current file being written to.
1956                    if (journalToAdvance == -1 || journalToAdvance == journal.getCurrentDataFileId()) {
1957                        return;
1958                    }
1959
1960                    journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance));
1961
1962                } finally {
1963                    indexLock.writeLock().unlock();
1964                }
1965
1966                try {
1967                    // Background rewrite of the old acks
1968                    forwardAllAcks(journalToAdvance, journalLogsReferenced);
1969                    forwarded = true;
1970                } catch (IOException ioe) {
1971                    LOG.error("Forwarding of acks failed", ioe);
1972                    brokerService.handleIOException(ioe);
1973                } catch (Throwable e) {
1974                    LOG.error("Forwarding of acks failed", e);
1975                    brokerService.handleIOException(IOExceptionSupport.create(e));
1976                }
1977            } finally {
1978                checkpointLock.readLock().unlock();
1979            }
1980
1981            try {
1982                if (forwarded) {
1983                    // Checkpoint with changes from the ackMessageFileMap
1984                    checkpointUpdate(false);
1985                }
1986            } catch (IOException ioe) {
1987                LOG.error("Checkpoint failed", ioe);
1988                brokerService.handleIOException(ioe);
1989            } catch (Throwable e) {
1990                LOG.error("Checkpoint failed", e);
1991                brokerService.handleIOException(IOExceptionSupport.create(e));
1992            }
1993        }
1994    }
1995
1996    private void forwardAllAcks(Integer journalToRead, Set<Integer> journalLogsReferenced) throws IllegalStateException, IOException {
1997        LOG.trace("Attempting to move all acks in journal:{} to the front.", journalToRead);
1998
1999        DataFile forwardsFile = journal.reserveDataFile();
2000        forwardsFile.setTypeCode(COMPACTED_JOURNAL_FILE);
2001        LOG.trace("Reserved now file for forwarded acks: {}", forwardsFile);
2002
2003        Map<Integer, Set<Integer>> updatedAckLocations = new HashMap<Integer, Set<Integer>>();
2004
2005        try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) {
2006            KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand();
2007            compactionMarker.setSourceDataFileId(journalToRead);
2008            compactionMarker.setRewriteType(forwardsFile.getTypeCode());
2009
2010            ByteSequence payload = toByteSequence(compactionMarker);
2011            appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2012            LOG.trace("Marked ack rewrites file as replacing file: {}", journalToRead);
2013
2014            Location nextLocation = getNextLocationForAckForward(new Location(journalToRead, 0));
2015            while (nextLocation != null && nextLocation.getDataFileId() == journalToRead) {
2016                JournalCommand<?> command = null;
2017                try {
2018                    command = load(nextLocation);
2019                } catch (IOException ex) {
2020                    LOG.trace("Error loading command during ack forward: {}", nextLocation);
2021                }
2022
2023                if (command != null && command instanceof KahaRemoveMessageCommand) {
2024                    payload = toByteSequence(command);
2025                    Location location = appender.storeItem(payload, Journal.USER_RECORD_TYPE, false);
2026                    updatedAckLocations.put(location.getDataFileId(), journalLogsReferenced);
2027                }
2028
2029                nextLocation = getNextLocationForAckForward(nextLocation);
2030            }
2031        }
2032
2033        LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations);
2034
2035        // Lock index while we update the ackMessageFileMap.
2036        indexLock.writeLock().lock();
2037
2038        // Update the ack map with the new locations of the acks
2039        for (Entry<Integer, Set<Integer>> entry : updatedAckLocations.entrySet()) {
2040            Set<Integer> referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey());
2041            if (referenceFileIds == null) {
2042                referenceFileIds = new HashSet<Integer>();
2043                referenceFileIds.addAll(entry.getValue());
2044                metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds);
2045            } else {
2046                referenceFileIds.addAll(entry.getValue());
2047            }
2048        }
2049
2050        // remove the old location data from the ack map so that the old journal log file can
2051        // be removed on next GC.
2052        metadata.ackMessageFileMap.remove(journalToRead);
2053
2054        indexLock.writeLock().unlock();
2055
2056        LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap);
2057    }
2058
2059    private Location getNextLocationForAckForward(final Location nextLocation) {
2060        //getNextLocation() can throw an IOException, we should handle it and set
2061        //nextLocation to null and abort gracefully
2062        //Should not happen in the normal case
2063        Location location = null;
2064        try {
2065            location = journal.getNextLocation(nextLocation);
2066        } catch (IOException e) {
2067            LOG.warn("Failed to load next journal location: {}", e.getMessage());
2068            if (LOG.isDebugEnabled()) {
2069                LOG.debug("Failed to load next journal location", e);
2070            }
2071        }
2072        return location;
2073    }
2074
2075    final Runnable nullCompletionCallback = new Runnable() {
2076        @Override
2077        public void run() {
2078        }
2079    };
2080
2081    private Location checkpointProducerAudit() throws IOException {
2082        if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) {
2083            ByteArrayOutputStream baos = new ByteArrayOutputStream();
2084            ObjectOutputStream oout = new ObjectOutputStream(baos);
2085            oout.writeObject(metadata.producerSequenceIdTracker);
2086            oout.flush();
2087            oout.close();
2088            // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2089            Location location = store(new KahaProducerAuditCommand().setAudit(new Buffer(baos.toByteArray())), nullCompletionCallback);
2090            try {
2091                location.getLatch().await();
2092            } catch (InterruptedException e) {
2093                throw new InterruptedIOException(e.toString());
2094            }
2095            return location;
2096        }
2097        return metadata.producerSequenceIdTrackerLocation;
2098    }
2099
2100    private Location checkpointAckMessageFileMap() throws IOException {
2101        ByteArrayOutputStream baos = new ByteArrayOutputStream();
2102        ObjectOutputStream oout = new ObjectOutputStream(baos);
2103        oout.writeObject(metadata.ackMessageFileMap);
2104        oout.flush();
2105        oout.close();
2106        // using completion callback allows a disk sync to be avoided when enableJournalDiskSyncs = false
2107        Location location = store(new KahaAckMessageFileMapCommand().setAckMessageFileMap(new Buffer(baos.toByteArray())), nullCompletionCallback);
2108        try {
2109            location.getLatch().await();
2110        } catch (InterruptedException e) {
2111            throw new InterruptedIOException(e.toString());
2112        }
2113        return location;
2114    }
2115
2116    private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscription) throws IOException {
2117
2118        ByteSequence sequence = toByteSequence(subscription);
2119        Location location = journal.write(sequence, nullCompletionCallback) ;
2120
2121        try {
2122            location.getLatch().await();
2123        } catch (InterruptedException e) {
2124            throw new InterruptedIOException(e.toString());
2125        }
2126        return location;
2127    }
2128
2129    public HashSet<Integer> getJournalFilesBeingReplicated() {
2130        return journalFilesBeingReplicated;
2131    }
2132
2133    // /////////////////////////////////////////////////////////////////
2134    // StoredDestination related implementation methods.
2135    // /////////////////////////////////////////////////////////////////
2136
2137    protected final HashMap<String, StoredDestination> storedDestinations = new HashMap<String, StoredDestination>();
2138
2139    static class MessageKeys {
2140        final String messageId;
2141        final Location location;
2142
2143        public MessageKeys(String messageId, Location location) {
2144            this.messageId=messageId;
2145            this.location=location;
2146        }
2147
2148        @Override
2149        public String toString() {
2150            return "["+messageId+","+location+"]";
2151        }
2152    }
2153
2154    static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
2155        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
2156
2157        @Override
2158        public MessageKeys readPayload(DataInput dataIn) throws IOException {
2159            return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
2160        }
2161
2162        @Override
2163        public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException {
2164            dataOut.writeUTF(object.messageId);
2165            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
2166        }
2167    }
2168
2169    class LastAck {
2170        long lastAckedSequence;
2171        byte priority;
2172
2173        public LastAck(LastAck source) {
2174            this.lastAckedSequence = source.lastAckedSequence;
2175            this.priority = source.priority;
2176        }
2177
2178        public LastAck() {
2179            this.priority = MessageOrderIndex.HI;
2180        }
2181
2182        public LastAck(long ackLocation) {
2183            this.lastAckedSequence = ackLocation;
2184            this.priority = MessageOrderIndex.LO;
2185        }
2186
2187        public LastAck(long ackLocation, byte priority) {
2188            this.lastAckedSequence = ackLocation;
2189            this.priority = priority;
2190        }
2191
2192        @Override
2193        public String toString() {
2194            return "[" + lastAckedSequence + ":" + priority + "]";
2195        }
2196    }
2197
2198    protected class LastAckMarshaller implements Marshaller<LastAck> {
2199
2200        @Override
2201        public void writePayload(LastAck object, DataOutput dataOut) throws IOException {
2202            dataOut.writeLong(object.lastAckedSequence);
2203            dataOut.writeByte(object.priority);
2204        }
2205
2206        @Override
2207        public LastAck readPayload(DataInput dataIn) throws IOException {
2208            LastAck lastAcked = new LastAck();
2209            lastAcked.lastAckedSequence = dataIn.readLong();
2210            if (metadata.version >= 3) {
2211                lastAcked.priority = dataIn.readByte();
2212            }
2213            return lastAcked;
2214        }
2215
2216        @Override
2217        public int getFixedSize() {
2218            return 9;
2219        }
2220
2221        @Override
2222        public LastAck deepCopy(LastAck source) {
2223            return new LastAck(source);
2224        }
2225
2226        @Override
2227        public boolean isDeepCopySupported() {
2228            return true;
2229        }
2230    }
2231
2232    class StoredDestination {
2233
2234        MessageOrderIndex orderIndex = new MessageOrderIndex();
2235        BTreeIndex<Location, Long> locationIndex;
2236        BTreeIndex<String, Long> messageIdIndex;
2237
2238        // These bits are only set for Topics
2239        BTreeIndex<String, KahaSubscriptionCommand> subscriptions;
2240        BTreeIndex<String, LastAck> subscriptionAcks;
2241        HashMap<String, MessageOrderCursor> subscriptionCursors;
2242        ListIndex<String, SequenceSet> ackPositions;
2243        ListIndex<String, Location> subLocations;
2244
2245        // Transient data used to track which Messages are no longer needed.
2246        final TreeMap<Long, Long> messageReferences = new TreeMap<Long, Long>();
2247        final HashSet<String> subscriptionCache = new LinkedHashSet<String>();
2248
2249        public void trackPendingAdd(Long seq) {
2250            orderIndex.trackPendingAdd(seq);
2251        }
2252
2253        public void trackPendingAddComplete(Long seq) {
2254            orderIndex.trackPendingAddComplete(seq);
2255        }
2256
2257        @Override
2258        public String toString() {
2259            return "nextSeq:" + orderIndex.nextMessageId + ",lastRet:" + orderIndex.cursor + ",pending:" + orderIndex.pendingAdditions.size();
2260        }
2261    }
2262
2263    protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
2264
2265        @Override
2266        public StoredDestination readPayload(final DataInput dataIn) throws IOException {
2267            final StoredDestination value = new StoredDestination();
2268            value.orderIndex.defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2269            value.locationIndex = new BTreeIndex<Location, Long>(pageFile, dataIn.readLong());
2270            value.messageIdIndex = new BTreeIndex<String, Long>(pageFile, dataIn.readLong());
2271
2272            if (dataIn.readBoolean()) {
2273                value.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, dataIn.readLong());
2274                value.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, dataIn.readLong());
2275                if (metadata.version >= 4) {
2276                    value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, dataIn.readLong());
2277                } else {
2278                    // upgrade
2279                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2280                        @Override
2281                        public void execute(Transaction tx) throws IOException {
2282                            LinkedHashMap<String, SequenceSet> temp = new LinkedHashMap<String, SequenceSet>();
2283
2284                            if (metadata.version >= 3) {
2285                                // migrate
2286                                BTreeIndex<Long, HashSet<String>> oldAckPositions =
2287                                        new BTreeIndex<Long, HashSet<String>>(pageFile, dataIn.readLong());
2288                                oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE);
2289                                oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE);
2290                                oldAckPositions.load(tx);
2291
2292
2293                                // Do the initial build of the data in memory before writing into the store
2294                                // based Ack Positions List to avoid a lot of disk thrashing.
2295                                Iterator<Entry<Long, HashSet<String>>> iterator = oldAckPositions.iterator(tx);
2296                                while (iterator.hasNext()) {
2297                                    Entry<Long, HashSet<String>> entry = iterator.next();
2298
2299                                    for(String subKey : entry.getValue()) {
2300                                        SequenceSet pendingAcks = temp.get(subKey);
2301                                        if (pendingAcks == null) {
2302                                            pendingAcks = new SequenceSet();
2303                                            temp.put(subKey, pendingAcks);
2304                                        }
2305
2306                                        pendingAcks.add(entry.getKey());
2307                                    }
2308                                }
2309                            }
2310                            // Now move the pending messages to ack data into the store backed
2311                            // structure.
2312                            value.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2313                            value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2314                            value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2315                            value.ackPositions.load(tx);
2316                            for(String subscriptionKey : temp.keySet()) {
2317                                value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey));
2318                            }
2319
2320                        }
2321                    });
2322                }
2323
2324                if (metadata.version >= 5) {
2325                    value.subLocations = new ListIndex<String, Location>(pageFile, dataIn.readLong());
2326                } else {
2327                    // upgrade
2328                    pageFile.tx().execute(new Transaction.Closure<IOException>() {
2329                        @Override
2330                        public void execute(Transaction tx) throws IOException {
2331                            value.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2332                            value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2333                            value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2334                            value.subLocations.load(tx);
2335                        }
2336                    });
2337                }
2338            }
2339            if (metadata.version >= 2) {
2340                value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2341                value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, dataIn.readLong());
2342            } else {
2343                // upgrade
2344                pageFile.tx().execute(new Transaction.Closure<IOException>() {
2345                    @Override
2346                    public void execute(Transaction tx) throws IOException {
2347                        value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2348                        value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2349                        value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2350                        value.orderIndex.lowPriorityIndex.load(tx);
2351
2352                        value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
2353                        value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
2354                        value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
2355                        value.orderIndex.highPriorityIndex.load(tx);
2356                    }
2357                });
2358            }
2359
2360            return value;
2361        }
2362
2363        @Override
2364        public void writePayload(StoredDestination value, DataOutput dataOut) throws IOException {
2365            dataOut.writeLong(value.orderIndex.defaultPriorityIndex.getPageId());
2366            dataOut.writeLong(value.locationIndex.getPageId());
2367            dataOut.writeLong(value.messageIdIndex.getPageId());
2368            if (value.subscriptions != null) {
2369                dataOut.writeBoolean(true);
2370                dataOut.writeLong(value.subscriptions.getPageId());
2371                dataOut.writeLong(value.subscriptionAcks.getPageId());
2372                dataOut.writeLong(value.ackPositions.getHeadPageId());
2373                dataOut.writeLong(value.subLocations.getHeadPageId());
2374            } else {
2375                dataOut.writeBoolean(false);
2376            }
2377            dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
2378            dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
2379        }
2380    }
2381
2382    static class KahaSubscriptionCommandMarshaller extends VariableMarshaller<KahaSubscriptionCommand> {
2383        final static KahaSubscriptionCommandMarshaller INSTANCE = new KahaSubscriptionCommandMarshaller();
2384
2385        @Override
2386        public KahaSubscriptionCommand readPayload(DataInput dataIn) throws IOException {
2387            KahaSubscriptionCommand rc = new KahaSubscriptionCommand();
2388            rc.mergeFramed((InputStream)dataIn);
2389            return rc;
2390        }
2391
2392        @Override
2393        public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) throws IOException {
2394            object.writeFramed((OutputStream)dataOut);
2395        }
2396    }
2397
2398    protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2399        String key = key(destination);
2400        StoredDestination rc = storedDestinations.get(key);
2401        if (rc == null) {
2402            boolean topic = destination.getType() == KahaDestination.DestinationType.TOPIC || destination.getType() == KahaDestination.DestinationType.TEMP_TOPIC;
2403            rc = loadStoredDestination(tx, key, topic);
2404            // Cache it. We may want to remove/unload destinations from the
2405            // cache that are not used for a while
2406            // to reduce memory usage.
2407            storedDestinations.put(key, rc);
2408        }
2409        return rc;
2410    }
2411
2412    protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
2413        String key = key(destination);
2414        StoredDestination rc = storedDestinations.get(key);
2415        if (rc == null && metadata.destinations.containsKey(tx, key)) {
2416            rc = getStoredDestination(destination, tx);
2417        }
2418        return rc;
2419    }
2420
2421    /**
2422     * @param tx
2423     * @param key
2424     * @param topic
2425     * @return
2426     * @throws IOException
2427     */
2428    private StoredDestination loadStoredDestination(Transaction tx, String key, boolean topic) throws IOException {
2429        // Try to load the existing indexes..
2430        StoredDestination rc = metadata.destinations.get(tx, key);
2431        if (rc == null) {
2432            // Brand new destination.. allocate indexes for it.
2433            rc = new StoredDestination();
2434            rc.orderIndex.allocate(tx);
2435            rc.locationIndex = new BTreeIndex<Location, Long>(pageFile, tx.allocate());
2436            rc.messageIdIndex = new BTreeIndex<String, Long>(pageFile, tx.allocate());
2437
2438            if (topic) {
2439                rc.subscriptions = new BTreeIndex<String, KahaSubscriptionCommand>(pageFile, tx.allocate());
2440                rc.subscriptionAcks = new BTreeIndex<String, LastAck>(pageFile, tx.allocate());
2441                rc.ackPositions = new ListIndex<String, SequenceSet>(pageFile, tx.allocate());
2442                rc.subLocations = new ListIndex<String, Location>(pageFile, tx.allocate());
2443            }
2444            metadata.destinations.put(tx, key, rc);
2445        }
2446
2447        // Configure the marshalers and load.
2448        rc.orderIndex.load(tx);
2449
2450        // Figure out the next key using the last entry in the destination.
2451        rc.orderIndex.configureLast(tx);
2452
2453        rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE);
2454        rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2455        rc.locationIndex.load(tx);
2456
2457        rc.messageIdIndex.setKeyMarshaller(StringMarshaller.INSTANCE);
2458        rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
2459        rc.messageIdIndex.load(tx);
2460
2461        // If it was a topic...
2462        if (topic) {
2463
2464            rc.subscriptions.setKeyMarshaller(StringMarshaller.INSTANCE);
2465            rc.subscriptions.setValueMarshaller(KahaSubscriptionCommandMarshaller.INSTANCE);
2466            rc.subscriptions.load(tx);
2467
2468            rc.subscriptionAcks.setKeyMarshaller(StringMarshaller.INSTANCE);
2469            rc.subscriptionAcks.setValueMarshaller(new LastAckMarshaller());
2470            rc.subscriptionAcks.load(tx);
2471
2472            rc.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE);
2473            rc.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE);
2474            rc.ackPositions.load(tx);
2475
2476            rc.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE);
2477            rc.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE);
2478            rc.subLocations.load(tx);
2479
2480            rc.subscriptionCursors = new HashMap<String, MessageOrderCursor>();
2481
2482            if (metadata.version < 3) {
2483
2484                // on upgrade need to fill ackLocation with available messages past last ack
2485                for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2486                    Entry<String, LastAck> entry = iterator.next();
2487                    for (Iterator<Entry<Long, MessageKeys>> orderIterator =
2488                            rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) {
2489                        Long sequence = orderIterator.next().getKey();
2490                        addAckLocation(tx, rc, sequence, entry.getKey());
2491                    }
2492                    // modify so it is upgraded
2493                    rc.subscriptionAcks.put(tx, entry.getKey(), entry.getValue());
2494                }
2495            }
2496
2497            // Configure the message references index
2498            Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
2499            while (subscriptions.hasNext()) {
2500                Entry<String, SequenceSet> subscription = subscriptions.next();
2501                SequenceSet pendingAcks = subscription.getValue();
2502                if (pendingAcks != null && !pendingAcks.isEmpty()) {
2503                    Long lastPendingAck = pendingAcks.getTail().getLast();
2504                    for(Long sequenceId : pendingAcks) {
2505                        Long current = rc.messageReferences.get(sequenceId);
2506                        if (current == null) {
2507                            current = new Long(0);
2508                        }
2509
2510                        // We always add a trailing empty entry for the next position to start from
2511                        // so we need to ensure we don't count that as a message reference on reload.
2512                        if (!sequenceId.equals(lastPendingAck)) {
2513                            current = current.longValue() + 1;
2514                        }
2515
2516                        rc.messageReferences.put(sequenceId, current);
2517                    }
2518                }
2519            }
2520
2521            // Configure the subscription cache
2522            for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
2523                Entry<String, LastAck> entry = iterator.next();
2524                rc.subscriptionCache.add(entry.getKey());
2525            }
2526
2527            if (rc.orderIndex.nextMessageId == 0) {
2528                // check for existing durable sub all acked out - pull next seq from acks as messages are gone
2529                if (!rc.subscriptionAcks.isEmpty(tx)) {
2530                    for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext();) {
2531                        Entry<String, LastAck> entry = iterator.next();
2532                        rc.orderIndex.nextMessageId =
2533                                Math.max(rc.orderIndex.nextMessageId, entry.getValue().lastAckedSequence +1);
2534                    }
2535                }
2536            } else {
2537                // update based on ackPositions for unmatched, last entry is always the next
2538                if (!rc.messageReferences.isEmpty()) {
2539                    Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
2540                    rc.orderIndex.nextMessageId =
2541                            Math.max(rc.orderIndex.nextMessageId, nextMessageId);
2542                }
2543            }
2544        }
2545
2546        if (metadata.version < VERSION) {
2547            // store again after upgrade
2548            metadata.destinations.put(tx, key, rc);
2549        }
2550        return rc;
2551    }
2552
2553    private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
2554        SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2555        if (sequences == null) {
2556            sequences = new SequenceSet();
2557            sequences.add(messageSequence);
2558            sd.ackPositions.add(tx, subscriptionKey, sequences);
2559        } else {
2560            sequences.add(messageSequence);
2561            sd.ackPositions.put(tx, subscriptionKey, sequences);
2562        }
2563
2564        Long count = sd.messageReferences.get(messageSequence);
2565        if (count == null) {
2566            count = Long.valueOf(0L);
2567        }
2568        count = count.longValue() + 1;
2569        sd.messageReferences.put(messageSequence, count);
2570    }
2571
2572    // new sub is interested in potentially all existing messages
2573    private void addAckLocationForRetroactiveSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2574        SequenceSet allOutstanding = new SequenceSet();
2575        Iterator<Map.Entry<String, SequenceSet>> iterator = sd.ackPositions.iterator(tx);
2576        while (iterator.hasNext()) {
2577            SequenceSet set = iterator.next().getValue();
2578            for (Long entry : set) {
2579                allOutstanding.add(entry);
2580            }
2581        }
2582        sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
2583
2584        for (Long ackPosition : allOutstanding) {
2585            Long count = sd.messageReferences.get(ackPosition);
2586            count = count.longValue() + 1;
2587            sd.messageReferences.put(ackPosition, count);
2588        }
2589    }
2590
2591    // on a new message add, all existing subs are interested in this message
2592    private void addAckLocationForNewMessage(Transaction tx, StoredDestination sd, Long messageSequence) throws IOException {
2593        for(String subscriptionKey : sd.subscriptionCache) {
2594            SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
2595            if (sequences == null) {
2596                sequences = new SequenceSet();
2597                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2598                sd.ackPositions.add(tx, subscriptionKey, sequences);
2599            } else {
2600                sequences.add(new Sequence(messageSequence, messageSequence + 1));
2601                sd.ackPositions.put(tx, subscriptionKey, sequences);
2602            }
2603
2604            Long count = sd.messageReferences.get(messageSequence);
2605            if (count == null) {
2606                count = Long.valueOf(0L);
2607            }
2608            count = count.longValue() + 1;
2609            sd.messageReferences.put(messageSequence, count);
2610            sd.messageReferences.put(messageSequence+1, Long.valueOf(0L));
2611        }
2612    }
2613
2614    private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2615        if (!sd.ackPositions.isEmpty(tx)) {
2616            SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
2617            if (sequences == null || sequences.isEmpty()) {
2618                return;
2619            }
2620
2621            ArrayList<Long> unreferenced = new ArrayList<Long>();
2622
2623            for(Long sequenceId : sequences) {
2624                Long references = sd.messageReferences.get(sequenceId);
2625                if (references != null) {
2626                    references = references.longValue() - 1;
2627
2628                    if (references.longValue() > 0) {
2629                        sd.messageReferences.put(sequenceId, references);
2630                    } else {
2631                        sd.messageReferences.remove(sequenceId);
2632                        unreferenced.add(sequenceId);
2633                    }
2634                }
2635            }
2636
2637            for(Long sequenceId : unreferenced) {
2638                // Find all the entries that need to get deleted.
2639                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2640                sd.orderIndex.getDeleteList(tx, deletes, sequenceId);
2641
2642                // Do the actual deletes.
2643                for (Entry<Long, MessageKeys> entry : deletes) {
2644                    sd.locationIndex.remove(tx, entry.getValue().location);
2645                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2646                    sd.orderIndex.remove(tx, entry.getKey());
2647                }
2648            }
2649        }
2650    }
2651
2652    /**
2653     * @param tx
2654     * @param sd
2655     * @param subscriptionKey
2656     * @param messageSequence
2657     * @throws IOException
2658     */
2659    private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
2660        // Remove the sub from the previous location set..
2661        if (messageSequence != null) {
2662            SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
2663            if (range != null && !range.isEmpty()) {
2664                range.remove(messageSequence);
2665                if (!range.isEmpty()) {
2666                    sd.ackPositions.put(tx, subscriptionKey, range);
2667                } else {
2668                    sd.ackPositions.remove(tx, subscriptionKey);
2669                }
2670
2671                // Check if the message is reference by any other subscription.
2672                Long count = sd.messageReferences.get(messageSequence);
2673                if (count != null){
2674                long references = count.longValue() - 1;
2675                    if (references > 0) {
2676                        sd.messageReferences.put(messageSequence, Long.valueOf(references));
2677                        return;
2678                    } else {
2679                        sd.messageReferences.remove(messageSequence);
2680                    }
2681                }
2682
2683                // Find all the entries that need to get deleted.
2684                ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<Entry<Long, MessageKeys>>();
2685                sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
2686
2687                // Do the actual deletes.
2688                for (Entry<Long, MessageKeys> entry : deletes) {
2689                    sd.locationIndex.remove(tx, entry.getValue().location);
2690                    sd.messageIdIndex.remove(tx, entry.getValue().messageId);
2691                    sd.orderIndex.remove(tx, entry.getKey());
2692                }
2693            }
2694        }
2695    }
2696
2697    public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2698        return sd.subscriptionAcks.get(tx, subscriptionKey);
2699    }
2700
2701    public long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
2702        SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
2703        if (messageSequences != null) {
2704            long result = messageSequences.rangeSize();
2705            // if there's anything in the range the last value is always the nextMessage marker, so remove 1.
2706            return result > 0 ? result - 1 : 0;
2707        }
2708
2709        return 0;
2710    }
2711
2712    protected String key(KahaDestination destination) {
2713        return destination.getType().getNumber() + ":" + destination.getName();
2714    }
2715
2716    // /////////////////////////////////////////////////////////////////
2717    // Transaction related implementation methods.
2718    // /////////////////////////////////////////////////////////////////
2719    @SuppressWarnings("rawtypes")
2720    private final LinkedHashMap<TransactionId, List<Operation>> inflightTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2721    @SuppressWarnings("rawtypes")
2722    protected final LinkedHashMap<TransactionId, List<Operation>> preparedTransactions = new LinkedHashMap<TransactionId, List<Operation>>();
2723    protected final Set<String> ackedAndPrepared = new HashSet<String>();
2724    protected final Set<String> rolledBackAcks = new HashSet<String>();
2725
2726    // messages that have prepared (pending) acks cannot be re-dispatched unless the outcome is rollback,
2727    // till then they are skipped by the store.
2728    // 'at most once' XA guarantee
2729    public void trackRecoveredAcks(ArrayList<MessageAck> acks) {
2730        this.indexLock.writeLock().lock();
2731        try {
2732            for (MessageAck ack : acks) {
2733                ackedAndPrepared.add(ack.getLastMessageId().toProducerKey());
2734            }
2735        } finally {
2736            this.indexLock.writeLock().unlock();
2737        }
2738    }
2739
2740    public void forgetRecoveredAcks(ArrayList<MessageAck> acks, boolean rollback) throws IOException {
2741        if (acks != null) {
2742            this.indexLock.writeLock().lock();
2743            try {
2744                for (MessageAck ack : acks) {
2745                    final String id = ack.getLastMessageId().toProducerKey();
2746                    ackedAndPrepared.remove(id);
2747                    if (rollback) {
2748                        rolledBackAcks.add(id);
2749                    }
2750                }
2751            } finally {
2752                this.indexLock.writeLock().unlock();
2753            }
2754        }
2755    }
2756
2757    @SuppressWarnings("rawtypes")
2758    private List<Operation> getInflightTx(KahaTransactionInfo info) {
2759        TransactionId key = TransactionIdConversion.convert(info);
2760        List<Operation> tx;
2761        synchronized (inflightTransactions) {
2762            tx = inflightTransactions.get(key);
2763            if (tx == null) {
2764                tx = Collections.synchronizedList(new ArrayList<Operation>());
2765                inflightTransactions.put(key, tx);
2766            }
2767        }
2768        return tx;
2769    }
2770
2771    @SuppressWarnings("unused")
2772    private TransactionId key(KahaTransactionInfo transactionInfo) {
2773        return TransactionIdConversion.convert(transactionInfo);
2774    }
2775
2776    abstract class Operation <T extends JournalCommand<T>> {
2777        final T command;
2778        final Location location;
2779
2780        public Operation(T command, Location location) {
2781            this.command = command;
2782            this.location = location;
2783        }
2784
2785        public Location getLocation() {
2786            return location;
2787        }
2788
2789        public T getCommand() {
2790            return command;
2791        }
2792
2793        abstract public void execute(Transaction tx) throws IOException;
2794    }
2795
2796    class AddOperation extends Operation<KahaAddMessageCommand> {
2797        final IndexAware runWithIndexLock;
2798        public AddOperation(KahaAddMessageCommand command, Location location, IndexAware runWithIndexLock) {
2799            super(command, location);
2800            this.runWithIndexLock = runWithIndexLock;
2801        }
2802
2803        @Override
2804        public void execute(Transaction tx) throws IOException {
2805            long seq = updateIndex(tx, command, location);
2806            if (runWithIndexLock != null) {
2807                runWithIndexLock.sequenceAssignedWithIndexLocked(seq);
2808            }
2809        }
2810    }
2811
2812    class RemoveOperation extends Operation<KahaRemoveMessageCommand> {
2813
2814        public RemoveOperation(KahaRemoveMessageCommand command, Location location) {
2815            super(command, location);
2816        }
2817
2818        @Override
2819        public void execute(Transaction tx) throws IOException {
2820            updateIndex(tx, command, location);
2821        }
2822    }
2823
2824    // /////////////////////////////////////////////////////////////////
2825    // Initialization related implementation methods.
2826    // /////////////////////////////////////////////////////////////////
2827
2828    private PageFile createPageFile() throws IOException {
2829        if (indexDirectory == null) {
2830            indexDirectory = directory;
2831        }
2832        IOHelper.mkdirs(indexDirectory);
2833        PageFile index = new PageFile(indexDirectory, "db");
2834        index.setEnableWriteThread(isEnableIndexWriteAsync());
2835        index.setWriteBatchSize(getIndexWriteBatchSize());
2836        index.setPageCacheSize(indexCacheSize);
2837        index.setUseLFRUEviction(isUseIndexLFRUEviction());
2838        index.setLFUEvictionFactor(getIndexLFUEvictionFactor());
2839        index.setEnableDiskSyncs(isEnableIndexDiskSyncs());
2840        index.setEnableRecoveryFile(isEnableIndexRecoveryFile());
2841        index.setEnablePageCaching(isEnableIndexPageCaching());
2842        return index;
2843    }
2844
2845    private Journal createJournal() throws IOException {
2846        Journal manager = new Journal();
2847        manager.setDirectory(directory);
2848        manager.setMaxFileLength(getJournalMaxFileLength());
2849        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
2850        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
2851        manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
2852        manager.setArchiveDataLogs(isArchiveDataLogs());
2853        manager.setSizeAccumulator(journalSize);
2854        manager.setEnableAsyncDiskSync(isEnableJournalDiskSyncs());
2855        manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
2856        manager.setPreallocationStrategy(
2857                Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
2858        manager.setJournalDiskSyncStrategy(
2859                Journal.JournalDiskSyncStrategy.valueOf(journalDiskSyncStrategy.trim().toUpperCase()));
2860        if (getDirectoryArchive() != null) {
2861            IOHelper.mkdirs(getDirectoryArchive());
2862            manager.setDirectoryArchive(getDirectoryArchive());
2863        }
2864        return manager;
2865    }
2866
2867    private Metadata createMetadata() {
2868        Metadata md = new Metadata();
2869        md.producerSequenceIdTracker.setAuditDepth(getFailoverProducersAuditDepth());
2870        md.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(getMaxFailoverProducersToTrack());
2871        return md;
2872    }
2873
2874    public int getJournalMaxWriteBatchSize() {
2875        return journalMaxWriteBatchSize;
2876    }
2877
2878    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
2879        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
2880    }
2881
2882    public File getDirectory() {
2883        return directory;
2884    }
2885
2886    public void setDirectory(File directory) {
2887        this.directory = directory;
2888    }
2889
2890    public boolean isDeleteAllMessages() {
2891        return deleteAllMessages;
2892    }
2893
2894    public void setDeleteAllMessages(boolean deleteAllMessages) {
2895        this.deleteAllMessages = deleteAllMessages;
2896    }
2897
2898    public void setIndexWriteBatchSize(int setIndexWriteBatchSize) {
2899        this.setIndexWriteBatchSize = setIndexWriteBatchSize;
2900    }
2901
2902    public int getIndexWriteBatchSize() {
2903        return setIndexWriteBatchSize;
2904    }
2905
2906    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
2907        this.enableIndexWriteAsync = enableIndexWriteAsync;
2908    }
2909
2910    boolean isEnableIndexWriteAsync() {
2911        return enableIndexWriteAsync;
2912    }
2913
2914    /**
2915     * @deprecated use {@link #getJournalDiskSyncStrategy} instead
2916     * @return
2917     */
2918    public boolean isEnableJournalDiskSyncs() {
2919        return journalDiskSyncStrategy != null && JournalDiskSyncStrategy.ALWAYS.name().equals(
2920                journalDiskSyncStrategy.trim().toUpperCase());
2921    }
2922
2923    /**
2924     * @deprecated use {@link #setEnableJournalDiskSyncs} instead
2925     * @param syncWrites
2926     */
2927    public void setEnableJournalDiskSyncs(boolean syncWrites) {
2928        if (syncWrites) {
2929            journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS.name();
2930        } else {
2931            journalDiskSyncStrategy = JournalDiskSyncStrategy.NEVER.name();
2932        }
2933    }
2934
2935    public String getJournalDiskSyncStrategy() {
2936        return journalDiskSyncStrategy;
2937    }
2938
2939    public void setJournalDiskSyncStrategy(String journalDiskSyncStrategy) {
2940        this.journalDiskSyncStrategy = journalDiskSyncStrategy;
2941    }
2942
2943    public long getJournalDiskSyncInterval() {
2944        return journalDiskSyncInterval;
2945    }
2946
2947    public void setJournalDiskSyncInterval(long journalDiskSyncInterval) {
2948        this.journalDiskSyncInterval = journalDiskSyncInterval;
2949    }
2950
2951    public long getCheckpointInterval() {
2952        return checkpointInterval;
2953    }
2954
2955    public void setCheckpointInterval(long checkpointInterval) {
2956        this.checkpointInterval = checkpointInterval;
2957    }
2958
2959    public long getCleanupInterval() {
2960        return cleanupInterval;
2961    }
2962
2963    public void setCleanupInterval(long cleanupInterval) {
2964        this.cleanupInterval = cleanupInterval;
2965    }
2966
2967    public void setJournalMaxFileLength(int journalMaxFileLength) {
2968        this.journalMaxFileLength = journalMaxFileLength;
2969    }
2970
2971    public int getJournalMaxFileLength() {
2972        return journalMaxFileLength;
2973    }
2974
2975    public void setMaxFailoverProducersToTrack(int maxFailoverProducersToTrack) {
2976        this.metadata.producerSequenceIdTracker.setMaximumNumberOfProducersToTrack(maxFailoverProducersToTrack);
2977    }
2978
2979    public int getMaxFailoverProducersToTrack() {
2980        return this.metadata.producerSequenceIdTracker.getMaximumNumberOfProducersToTrack();
2981    }
2982
2983    public void setFailoverProducersAuditDepth(int failoverProducersAuditDepth) {
2984        this.metadata.producerSequenceIdTracker.setAuditDepth(failoverProducersAuditDepth);
2985    }
2986
2987    public int getFailoverProducersAuditDepth() {
2988        return this.metadata.producerSequenceIdTracker.getAuditDepth();
2989    }
2990
2991    public PageFile getPageFile() throws IOException {
2992        if (pageFile == null) {
2993            pageFile = createPageFile();
2994        }
2995        return pageFile;
2996    }
2997
2998    public Journal getJournal() throws IOException {
2999        if (journal == null) {
3000            journal = createJournal();
3001        }
3002        return journal;
3003    }
3004
3005    protected Metadata getMetadata() {
3006        return metadata;
3007    }
3008
3009    public boolean isFailIfDatabaseIsLocked() {
3010        return failIfDatabaseIsLocked;
3011    }
3012
3013    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
3014        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
3015    }
3016
3017    public boolean isIgnoreMissingJournalfiles() {
3018        return ignoreMissingJournalfiles;
3019    }
3020
3021    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) {
3022        this.ignoreMissingJournalfiles = ignoreMissingJournalfiles;
3023    }
3024
3025    public int getIndexCacheSize() {
3026        return indexCacheSize;
3027    }
3028
3029    public void setIndexCacheSize(int indexCacheSize) {
3030        this.indexCacheSize = indexCacheSize;
3031    }
3032
3033    public boolean isCheckForCorruptJournalFiles() {
3034        return checkForCorruptJournalFiles;
3035    }
3036
3037    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
3038        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
3039    }
3040
3041    public boolean isChecksumJournalFiles() {
3042        return checksumJournalFiles;
3043    }
3044
3045    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
3046        this.checksumJournalFiles = checksumJournalFiles;
3047    }
3048
3049    @Override
3050    public void setBrokerService(BrokerService brokerService) {
3051        this.brokerService = brokerService;
3052    }
3053
3054    /**
3055     * @return the archiveDataLogs
3056     */
3057    public boolean isArchiveDataLogs() {
3058        return this.archiveDataLogs;
3059    }
3060
3061    /**
3062     * @param archiveDataLogs the archiveDataLogs to set
3063     */
3064    public void setArchiveDataLogs(boolean archiveDataLogs) {
3065        this.archiveDataLogs = archiveDataLogs;
3066    }
3067
3068    /**
3069     * @return the directoryArchive
3070     */
3071    public File getDirectoryArchive() {
3072        return this.directoryArchive;
3073    }
3074
3075    /**
3076     * @param directoryArchive the directoryArchive to set
3077     */
3078    public void setDirectoryArchive(File directoryArchive) {
3079        this.directoryArchive = directoryArchive;
3080    }
3081
3082    public boolean isArchiveCorruptedIndex() {
3083        return archiveCorruptedIndex;
3084    }
3085
3086    public void setArchiveCorruptedIndex(boolean archiveCorruptedIndex) {
3087        this.archiveCorruptedIndex = archiveCorruptedIndex;
3088    }
3089
3090    public float getIndexLFUEvictionFactor() {
3091        return indexLFUEvictionFactor;
3092    }
3093
3094    public void setIndexLFUEvictionFactor(float indexLFUEvictionFactor) {
3095        this.indexLFUEvictionFactor = indexLFUEvictionFactor;
3096    }
3097
3098    public boolean isUseIndexLFRUEviction() {
3099        return useIndexLFRUEviction;
3100    }
3101
3102    public void setUseIndexLFRUEviction(boolean useIndexLFRUEviction) {
3103        this.useIndexLFRUEviction = useIndexLFRUEviction;
3104    }
3105
3106    public void setEnableIndexDiskSyncs(boolean enableIndexDiskSyncs) {
3107        this.enableIndexDiskSyncs = enableIndexDiskSyncs;
3108    }
3109
3110    public void setEnableIndexRecoveryFile(boolean enableIndexRecoveryFile) {
3111        this.enableIndexRecoveryFile = enableIndexRecoveryFile;
3112    }
3113
3114    public void setEnableIndexPageCaching(boolean enableIndexPageCaching) {
3115        this.enableIndexPageCaching = enableIndexPageCaching;
3116    }
3117
3118    public boolean isEnableIndexDiskSyncs() {
3119        return enableIndexDiskSyncs;
3120    }
3121
3122    public boolean isEnableIndexRecoveryFile() {
3123        return enableIndexRecoveryFile;
3124    }
3125
3126    public boolean isEnableIndexPageCaching() {
3127        return enableIndexPageCaching;
3128    }
3129
3130    // /////////////////////////////////////////////////////////////////
3131    // Internal conversion methods.
3132    // /////////////////////////////////////////////////////////////////
3133
3134    class MessageOrderCursor{
3135        long defaultCursorPosition;
3136        long lowPriorityCursorPosition;
3137        long highPriorityCursorPosition;
3138        MessageOrderCursor(){
3139        }
3140
3141        MessageOrderCursor(long position){
3142            this.defaultCursorPosition=position;
3143            this.lowPriorityCursorPosition=position;
3144            this.highPriorityCursorPosition=position;
3145        }
3146
3147        MessageOrderCursor(MessageOrderCursor other){
3148            this.defaultCursorPosition=other.defaultCursorPosition;
3149            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3150            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3151        }
3152
3153        MessageOrderCursor copy() {
3154            return new MessageOrderCursor(this);
3155        }
3156
3157        void reset() {
3158            this.defaultCursorPosition=0;
3159            this.highPriorityCursorPosition=0;
3160            this.lowPriorityCursorPosition=0;
3161        }
3162
3163        void increment() {
3164            if (defaultCursorPosition!=0) {
3165                defaultCursorPosition++;
3166            }
3167            if (highPriorityCursorPosition!=0) {
3168                highPriorityCursorPosition++;
3169            }
3170            if (lowPriorityCursorPosition!=0) {
3171                lowPriorityCursorPosition++;
3172            }
3173        }
3174
3175        @Override
3176        public String toString() {
3177           return "MessageOrderCursor:[def:" + defaultCursorPosition
3178                   + ", low:" + lowPriorityCursorPosition
3179                   + ", high:" +  highPriorityCursorPosition + "]";
3180        }
3181
3182        public void sync(MessageOrderCursor other) {
3183            this.defaultCursorPosition=other.defaultCursorPosition;
3184            this.lowPriorityCursorPosition=other.lowPriorityCursorPosition;
3185            this.highPriorityCursorPosition=other.highPriorityCursorPosition;
3186        }
3187    }
3188
3189    class MessageOrderIndex {
3190        static final byte HI = 9;
3191        static final byte LO = 0;
3192        static final byte DEF = 4;
3193
3194        long nextMessageId;
3195        BTreeIndex<Long, MessageKeys> defaultPriorityIndex;
3196        BTreeIndex<Long, MessageKeys> lowPriorityIndex;
3197        BTreeIndex<Long, MessageKeys> highPriorityIndex;
3198        final MessageOrderCursor cursor = new MessageOrderCursor();
3199        Long lastDefaultKey;
3200        Long lastHighKey;
3201        Long lastLowKey;
3202        byte lastGetPriority;
3203        final List<Long> pendingAdditions = new LinkedList<Long>();
3204
3205        MessageKeys remove(Transaction tx, Long key) throws IOException {
3206            MessageKeys result = defaultPriorityIndex.remove(tx, key);
3207            if (result == null && highPriorityIndex!=null) {
3208                result = highPriorityIndex.remove(tx, key);
3209                if (result ==null && lowPriorityIndex!=null) {
3210                    result = lowPriorityIndex.remove(tx, key);
3211                }
3212            }
3213            return result;
3214        }
3215
3216        void load(Transaction tx) throws IOException {
3217            defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3218            defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
3219            defaultPriorityIndex.load(tx);
3220            lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3221            lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
3222            lowPriorityIndex.load(tx);
3223            highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
3224            highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
3225            highPriorityIndex.load(tx);
3226        }
3227
3228        void allocate(Transaction tx) throws IOException {
3229            defaultPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3230            if (metadata.version >= 2) {
3231                lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3232                highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
3233            }
3234        }
3235
3236        void configureLast(Transaction tx) throws IOException {
3237            // Figure out the next key using the last entry in the destination.
3238            TreeSet<Long> orderedSet = new TreeSet<Long>();
3239
3240            addLast(orderedSet, highPriorityIndex, tx);
3241            addLast(orderedSet, defaultPriorityIndex, tx);
3242            addLast(orderedSet, lowPriorityIndex, tx);
3243
3244            if (!orderedSet.isEmpty()) {
3245                nextMessageId = orderedSet.last() + 1;
3246            }
3247        }
3248
3249        private void addLast(TreeSet<Long> orderedSet, BTreeIndex<Long, MessageKeys> index, Transaction tx) throws IOException {
3250            if (index != null) {
3251                Entry<Long, MessageKeys> lastEntry = index.getLast(tx);
3252                if (lastEntry != null) {
3253                    orderedSet.add(lastEntry.getKey());
3254                }
3255            }
3256        }
3257
3258        void clear(Transaction tx) throws IOException {
3259            this.remove(tx);
3260            this.resetCursorPosition();
3261            this.allocate(tx);
3262            this.load(tx);
3263            this.configureLast(tx);
3264        }
3265
3266        void remove(Transaction tx) throws IOException {
3267            defaultPriorityIndex.clear(tx);
3268            defaultPriorityIndex.unload(tx);
3269            tx.free(defaultPriorityIndex.getPageId());
3270            if (lowPriorityIndex != null) {
3271                lowPriorityIndex.clear(tx);
3272                lowPriorityIndex.unload(tx);
3273
3274                tx.free(lowPriorityIndex.getPageId());
3275            }
3276            if (highPriorityIndex != null) {
3277                highPriorityIndex.clear(tx);
3278                highPriorityIndex.unload(tx);
3279                tx.free(highPriorityIndex.getPageId());
3280            }
3281        }
3282
3283        void resetCursorPosition() {
3284            this.cursor.reset();
3285            lastDefaultKey = null;
3286            lastHighKey = null;
3287            lastLowKey = null;
3288        }
3289
3290        void setBatch(Transaction tx, Long sequence) throws IOException {
3291            if (sequence != null) {
3292                Long nextPosition = new Long(sequence.longValue() + 1);
3293                lastDefaultKey = sequence;
3294                cursor.defaultCursorPosition = nextPosition.longValue();
3295                lastHighKey = sequence;
3296                cursor.highPriorityCursorPosition = nextPosition.longValue();
3297                lastLowKey = sequence;
3298                cursor.lowPriorityCursorPosition = nextPosition.longValue();
3299            }
3300        }
3301
3302        void setBatch(Transaction tx, LastAck last) throws IOException {
3303            setBatch(tx, last.lastAckedSequence);
3304            if (cursor.defaultCursorPosition == 0
3305                    && cursor.highPriorityCursorPosition == 0
3306                    && cursor.lowPriorityCursorPosition == 0) {
3307                long next = last.lastAckedSequence + 1;
3308                switch (last.priority) {
3309                    case DEF:
3310                        cursor.defaultCursorPosition = next;
3311                        cursor.highPriorityCursorPosition = next;
3312                        break;
3313                    case HI:
3314                        cursor.highPriorityCursorPosition = next;
3315                        break;
3316                    case LO:
3317                        cursor.lowPriorityCursorPosition = next;
3318                        cursor.defaultCursorPosition = next;
3319                        cursor.highPriorityCursorPosition = next;
3320                        break;
3321                }
3322            }
3323        }
3324
3325        void stoppedIterating() {
3326            if (lastDefaultKey!=null) {
3327                cursor.defaultCursorPosition=lastDefaultKey.longValue()+1;
3328            }
3329            if (lastHighKey!=null) {
3330                cursor.highPriorityCursorPosition=lastHighKey.longValue()+1;
3331            }
3332            if (lastLowKey!=null) {
3333                cursor.lowPriorityCursorPosition=lastLowKey.longValue()+1;
3334            }
3335            lastDefaultKey = null;
3336            lastHighKey = null;
3337            lastLowKey = null;
3338        }
3339
3340        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes, Long sequenceId)
3341                throws IOException {
3342            if (defaultPriorityIndex.containsKey(tx, sequenceId)) {
3343                getDeleteList(tx, deletes, defaultPriorityIndex, sequenceId);
3344            } else if (highPriorityIndex != null && highPriorityIndex.containsKey(tx, sequenceId)) {
3345                getDeleteList(tx, deletes, highPriorityIndex, sequenceId);
3346            } else if (lowPriorityIndex != null && lowPriorityIndex.containsKey(tx, sequenceId)) {
3347                getDeleteList(tx, deletes, lowPriorityIndex, sequenceId);
3348            }
3349        }
3350
3351        void getDeleteList(Transaction tx, ArrayList<Entry<Long, MessageKeys>> deletes,
3352                BTreeIndex<Long, MessageKeys> index, Long sequenceId) throws IOException {
3353
3354            Iterator<Entry<Long, MessageKeys>> iterator = index.iterator(tx, sequenceId, null);
3355            deletes.add(iterator.next());
3356        }
3357
3358        long getNextMessageId(int priority) {
3359            return nextMessageId++;
3360        }
3361
3362        MessageKeys get(Transaction tx, Long key) throws IOException {
3363            MessageKeys result = defaultPriorityIndex.get(tx, key);
3364            if (result == null) {
3365                result = highPriorityIndex.get(tx, key);
3366                if (result == null) {
3367                    result = lowPriorityIndex.get(tx, key);
3368                    lastGetPriority = LO;
3369                } else {
3370                    lastGetPriority = HI;
3371                }
3372            } else {
3373                lastGetPriority = DEF;
3374            }
3375            return result;
3376        }
3377
3378        MessageKeys put(Transaction tx, int priority, Long key, MessageKeys value) throws IOException {
3379            if (priority == javax.jms.Message.DEFAULT_PRIORITY) {
3380                return defaultPriorityIndex.put(tx, key, value);
3381            } else if (priority > javax.jms.Message.DEFAULT_PRIORITY) {
3382                return highPriorityIndex.put(tx, key, value);
3383            } else {
3384                return lowPriorityIndex.put(tx, key, value);
3385            }
3386        }
3387
3388        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx) throws IOException{
3389            return new MessageOrderIterator(tx,cursor,this);
3390        }
3391
3392        Iterator<Entry<Long, MessageKeys>> iterator(Transaction tx, MessageOrderCursor m) throws IOException{
3393            return new MessageOrderIterator(tx,m,this);
3394        }
3395
3396        public byte lastGetPriority() {
3397            return lastGetPriority;
3398        }
3399
3400        public boolean alreadyDispatched(Long sequence) {
3401            return (cursor.highPriorityCursorPosition > 0 && cursor.highPriorityCursorPosition >= sequence) ||
3402                    (cursor.defaultCursorPosition > 0 && cursor.defaultCursorPosition >= sequence) ||
3403                    (cursor.lowPriorityCursorPosition > 0 && cursor.lowPriorityCursorPosition >= sequence);
3404        }
3405
3406        public void trackPendingAdd(Long seq) {
3407            synchronized (pendingAdditions) {
3408                pendingAdditions.add(seq);
3409            }
3410        }
3411
3412        public void trackPendingAddComplete(Long seq) {
3413            synchronized (pendingAdditions) {
3414                pendingAdditions.remove(seq);
3415            }
3416        }
3417
3418        public Long minPendingAdd() {
3419            synchronized (pendingAdditions) {
3420                if (!pendingAdditions.isEmpty()) {
3421                    return pendingAdditions.get(0);
3422                } else {
3423                    return null;
3424                }
3425            }
3426        }
3427
3428
3429        class MessageOrderIterator implements Iterator<Entry<Long, MessageKeys>>{
3430            Iterator<Entry<Long, MessageKeys>>currentIterator;
3431            final Iterator<Entry<Long, MessageKeys>>highIterator;
3432            final Iterator<Entry<Long, MessageKeys>>defaultIterator;
3433            final Iterator<Entry<Long, MessageKeys>>lowIterator;
3434
3435            MessageOrderIterator(Transaction tx, MessageOrderCursor m, MessageOrderIndex messageOrderIndex) throws IOException {
3436                Long pendingAddLimiter = messageOrderIndex.minPendingAdd();
3437                this.defaultIterator = defaultPriorityIndex.iterator(tx, m.defaultCursorPosition, pendingAddLimiter);
3438                if (highPriorityIndex != null) {
3439                    this.highIterator = highPriorityIndex.iterator(tx, m.highPriorityCursorPosition, pendingAddLimiter);
3440                } else {
3441                    this.highIterator = null;
3442                }
3443                if (lowPriorityIndex != null) {
3444                    this.lowIterator = lowPriorityIndex.iterator(tx, m.lowPriorityCursorPosition, pendingAddLimiter);
3445                } else {
3446                    this.lowIterator = null;
3447                }
3448            }
3449
3450            @Override
3451            public boolean hasNext() {
3452                if (currentIterator == null) {
3453                    if (highIterator != null) {
3454                        if (highIterator.hasNext()) {
3455                            currentIterator = highIterator;
3456                            return currentIterator.hasNext();
3457                        }
3458                        if (defaultIterator.hasNext()) {
3459                            currentIterator = defaultIterator;
3460                            return currentIterator.hasNext();
3461                        }
3462                        if (lowIterator.hasNext()) {
3463                            currentIterator = lowIterator;
3464                            return currentIterator.hasNext();
3465                        }
3466                        return false;
3467                    } else {
3468                        currentIterator = defaultIterator;
3469                        return currentIterator.hasNext();
3470                    }
3471                }
3472                if (highIterator != null) {
3473                    if (currentIterator.hasNext()) {
3474                        return true;
3475                    }
3476                    if (currentIterator == highIterator) {
3477                        if (defaultIterator.hasNext()) {
3478                            currentIterator = defaultIterator;
3479                            return currentIterator.hasNext();
3480                        }
3481                        if (lowIterator.hasNext()) {
3482                            currentIterator = lowIterator;
3483                            return currentIterator.hasNext();
3484                        }
3485                        return false;
3486                    }
3487
3488                    if (currentIterator == defaultIterator) {
3489                        if (lowIterator.hasNext()) {
3490                            currentIterator = lowIterator;
3491                            return currentIterator.hasNext();
3492                        }
3493                        return false;
3494                    }
3495                }
3496                return currentIterator.hasNext();
3497            }
3498
3499            @Override
3500            public Entry<Long, MessageKeys> next() {
3501                Entry<Long, MessageKeys> result = currentIterator.next();
3502                if (result != null) {
3503                    Long key = result.getKey();
3504                    if (highIterator != null) {
3505                        if (currentIterator == defaultIterator) {
3506                            lastDefaultKey = key;
3507                        } else if (currentIterator == highIterator) {
3508                            lastHighKey = key;
3509                        } else {
3510                            lastLowKey = key;
3511                        }
3512                    } else {
3513                        lastDefaultKey = key;
3514                    }
3515                }
3516                return result;
3517            }
3518
3519            @Override
3520            public void remove() {
3521                throw new UnsupportedOperationException();
3522            }
3523
3524        }
3525    }
3526
3527    private static class HashSetStringMarshaller extends VariableMarshaller<HashSet<String>> {
3528        final static HashSetStringMarshaller INSTANCE = new HashSetStringMarshaller();
3529
3530        @Override
3531        public void writePayload(HashSet<String> object, DataOutput dataOut) throws IOException {
3532            ByteArrayOutputStream baos = new ByteArrayOutputStream();
3533            ObjectOutputStream oout = new ObjectOutputStream(baos);
3534            oout.writeObject(object);
3535            oout.flush();
3536            oout.close();
3537            byte[] data = baos.toByteArray();
3538            dataOut.writeInt(data.length);
3539            dataOut.write(data);
3540        }
3541
3542        @Override
3543        @SuppressWarnings("unchecked")
3544        public HashSet<String> readPayload(DataInput dataIn) throws IOException {
3545            int dataLen = dataIn.readInt();
3546            byte[] data = new byte[dataLen];
3547            dataIn.readFully(data);
3548            ByteArrayInputStream bais = new ByteArrayInputStream(data);
3549            ObjectInputStream oin = new ObjectInputStream(bais);
3550            try {
3551                return (HashSet<String>) oin.readObject();
3552            } catch (ClassNotFoundException cfe) {
3553                IOException ioe = new IOException("Failed to read HashSet<String>: " + cfe);
3554                ioe.initCause(cfe);
3555                throw ioe;
3556            }
3557        }
3558    }
3559
3560    public File getIndexDirectory() {
3561        return indexDirectory;
3562    }
3563
3564    public void setIndexDirectory(File indexDirectory) {
3565        this.indexDirectory = indexDirectory;
3566    }
3567
3568    interface IndexAware {
3569        public void sequenceAssignedWithIndexLocked(long index);
3570    }
3571
3572    public String getPreallocationScope() {
3573        return preallocationScope;
3574    }
3575
3576    public void setPreallocationScope(String preallocationScope) {
3577        this.preallocationScope = preallocationScope;
3578    }
3579
3580    public String getPreallocationStrategy() {
3581        return preallocationStrategy;
3582    }
3583
3584    public void setPreallocationStrategy(String preallocationStrategy) {
3585        this.preallocationStrategy = preallocationStrategy;
3586    }
3587
3588    public int getCompactAcksAfterNoGC() {
3589        return compactAcksAfterNoGC;
3590    }
3591
3592    /**
3593     * Sets the number of GC cycles where no journal logs were removed before an attempt to
3594     * move forward all the acks in the last log that contains them and is otherwise unreferenced.
3595     * <p>
3596     * A value of -1 will disable this feature.
3597     *
3598     * @param compactAcksAfterNoGC
3599     *      Number of empty GC cycles before we rewrite old ACKS.
3600     */
3601    public void setCompactAcksAfterNoGC(int compactAcksAfterNoGC) {
3602        this.compactAcksAfterNoGC = compactAcksAfterNoGC;
3603    }
3604
3605    /**
3606     * Returns whether Ack compaction will ignore that the store is still growing
3607     * and run more often.
3608     *
3609     * @return the compactAcksIgnoresStoreGrowth current value.
3610     */
3611    public boolean isCompactAcksIgnoresStoreGrowth() {
3612        return compactAcksIgnoresStoreGrowth;
3613    }
3614
3615    /**
3616     * Configure if Ack compaction will occur regardless of continued growth of the
3617     * journal logs meaning that the store has not run out of space yet.  Because the
3618     * compaction operation can be costly this value is defaulted to off and the Ack
3619     * compaction is only done when it seems that the store cannot grow and larger.
3620     *
3621     * @param compactAcksIgnoresStoreGrowth the compactAcksIgnoresStoreGrowth to set
3622     */
3623    public void setCompactAcksIgnoresStoreGrowth(boolean compactAcksIgnoresStoreGrowth) {
3624        this.compactAcksIgnoresStoreGrowth = compactAcksIgnoresStoreGrowth;
3625    }
3626
3627    /**
3628     * Returns whether Ack compaction is enabled
3629     *
3630     * @return enableAckCompaction
3631     */
3632    public boolean isEnableAckCompaction() {
3633        return enableAckCompaction;
3634    }
3635
3636    /**
3637     * Configure if the Ack compaction task should be enabled to run
3638     *
3639     * @param enableAckCompaction
3640     */
3641    public void setEnableAckCompaction(boolean enableAckCompaction) {
3642        this.enableAckCompaction = enableAckCompaction;
3643    }
3644}