001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.page;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.io.RandomAccessFile;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.Iterator;
034import java.util.LinkedHashMap;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Properties;
038import java.util.TreeMap;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.concurrent.atomic.AtomicReference;
043import java.util.zip.Adler32;
044import java.util.zip.Checksum;
045
046import org.apache.activemq.store.kahadb.disk.util.Sequence;
047import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
048import org.apache.activemq.util.DataByteArrayOutputStream;
049import org.apache.activemq.util.IOExceptionSupport;
050import org.apache.activemq.util.IOHelper;
051import org.apache.activemq.util.IntrospectionSupport;
052import org.apache.activemq.util.LFUCache;
053import org.apache.activemq.util.LRUCache;
054import org.apache.activemq.util.RecoverableRandomAccessFile;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
060 * be externally synchronized.
061 * <p/>
062 * The file has 3 parts:
063 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
064 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
065 * Page Space: The pages in the page file.
066 */
067public class PageFile {
068
069    private static final String PAGEFILE_SUFFIX = ".data";
070    private static final String RECOVERY_FILE_SUFFIX = ".redo";
071    private static final String FREE_FILE_SUFFIX = ".free";
072
073    // 4k Default page size.
074    public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
075    public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
076    public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
077
078    private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
079    private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
080
081    // Recovery header is (long offset)
082    private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
083
084    // A PageFile will use a couple of files in this directory
085    private final File directory;
086    // And the file names in that directory will be based on this name.
087    private final String name;
088
089    // File handle used for reading pages..
090    private RecoverableRandomAccessFile readFile;
091    // File handle used for writing pages..
092    private RecoverableRandomAccessFile writeFile;
093    // File handle used for writing pages..
094    private RecoverableRandomAccessFile recoveryFile;
095
096    // The size of pages
097    private int pageSize = DEFAULT_PAGE_SIZE;
098
099    // The minimum number of space allocated to the recovery file in number of pages.
100    private int recoveryFileMinPageCount = 1000;
101    // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
102    // to this max size as soon as  possible.
103    private int recoveryFileMaxPageCount = 10000;
104    // The number of pages in the current recovery buffer
105    private int recoveryPageCount;
106
107    private final AtomicBoolean loaded = new AtomicBoolean();
108    // The number of pages we are aiming to write every time we
109    // write to disk.
110    int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
111
112    // We keep a cache of pages recently used?
113    private Map<Long, Page> pageCache;
114    // The cache of recently used pages.
115    private boolean enablePageCaching = true;
116    // How many pages will we keep in the cache?
117    private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
118
119    // Should first log the page write to the recovery buffer? Avoids partial
120    // page write failures..
121    private boolean enableRecoveryFile = true;
122    // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
123    private boolean enableDiskSyncs = true;
124    // Will writes be done in an async thread?
125    private boolean enabledWriteThread = false;
126
127    // These are used if enableAsyncWrites==true
128    private final AtomicBoolean stopWriter = new AtomicBoolean();
129    private Thread writerThread;
130    private CountDownLatch checkpointLatch;
131
132    // Keeps track of writes that are being written to disk.
133    private final TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
134
135    // Keeps track of free pages.
136    private final AtomicLong nextFreePageId = new AtomicLong();
137    private SequenceSet freeList = new SequenceSet();
138
139    private AtomicReference<SequenceSet> recoveredFreeList = new AtomicReference<SequenceSet>();
140    private AtomicReference<SequenceSet> trackingFreeDuringRecovery = new AtomicReference<SequenceSet>();
141
142    private final AtomicLong nextTxid = new AtomicLong();
143
144    // Persistent settings stored in the page file.
145    private MetaData metaData;
146
147    private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
148
149    private boolean useLFRUEviction = false;
150    private float LFUEvictionFactor = 0.2f;
151
152    /**
153     * Use to keep track of updated pages which have not yet been committed.
154     */
155    static class PageWrite {
156        Page page;
157        byte[] current;
158        byte[] diskBound;
159        long currentLocation = -1;
160        long diskBoundLocation = -1;
161        File tmpFile;
162        int length;
163
164        public PageWrite(Page page, byte[] data) {
165            this.page = page;
166            current = data;
167        }
168
169        public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
170            this.page = page;
171            this.currentLocation = currentLocation;
172            this.tmpFile = tmpFile;
173            this.length = length;
174        }
175
176        public void setCurrent(Page page, byte[] data) {
177            this.page = page;
178            current = data;
179            currentLocation = -1;
180            diskBoundLocation = -1;
181        }
182
183        public void setCurrentLocation(Page page, long location, int length) {
184            this.page = page;
185            this.currentLocation = location;
186            this.length = length;
187            this.current = null;
188        }
189
190        @Override
191        public String toString() {
192            return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]";
193        }
194
195        @SuppressWarnings("unchecked")
196        public Page getPage() {
197            return page;
198        }
199
200        public byte[] getDiskBound() throws IOException {
201            if (diskBound == null && diskBoundLocation != -1) {
202                diskBound = new byte[length];
203                try(RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) {
204                    file.seek(diskBoundLocation);
205                    file.read(diskBound);
206                }
207                diskBoundLocation = -1;
208            }
209            return diskBound;
210        }
211
212        void begin() {
213            if (currentLocation != -1) {
214                diskBoundLocation = currentLocation;
215            } else {
216                diskBound = current;
217            }
218            current = null;
219            currentLocation = -1;
220        }
221
222        /**
223         * @return true if there is no pending writes to do.
224         */
225        boolean done() {
226            diskBoundLocation = -1;
227            diskBound = null;
228            return current == null || currentLocation == -1;
229        }
230
231        boolean isDone() {
232            return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
233        }
234    }
235
236    /**
237     * The MetaData object hold the persistent data associated with a PageFile object.
238     */
239    public static class MetaData {
240
241        String fileType;
242        String fileTypeVersion;
243
244        long metaDataTxId = -1;
245        int pageSize;
246        boolean cleanShutdown;
247        long lastTxId;
248        long freePages;
249
250        public String getFileType() {
251            return fileType;
252        }
253
254        public void setFileType(String fileType) {
255            this.fileType = fileType;
256        }
257
258        public String getFileTypeVersion() {
259            return fileTypeVersion;
260        }
261
262        public void setFileTypeVersion(String version) {
263            this.fileTypeVersion = version;
264        }
265
266        public long getMetaDataTxId() {
267            return metaDataTxId;
268        }
269
270        public void setMetaDataTxId(long metaDataTxId) {
271            this.metaDataTxId = metaDataTxId;
272        }
273
274        public int getPageSize() {
275            return pageSize;
276        }
277
278        public void setPageSize(int pageSize) {
279            this.pageSize = pageSize;
280        }
281
282        public boolean isCleanShutdown() {
283            return cleanShutdown;
284        }
285
286        public void setCleanShutdown(boolean cleanShutdown) {
287            this.cleanShutdown = cleanShutdown;
288        }
289
290        public long getLastTxId() {
291            return lastTxId;
292        }
293
294        public void setLastTxId(long lastTxId) {
295            this.lastTxId = lastTxId;
296        }
297
298        public long getFreePages() {
299            return freePages;
300        }
301
302        public void setFreePages(long value) {
303            this.freePages = value;
304        }
305    }
306
307    public Transaction tx() {
308        assertLoaded();
309        return new Transaction(this);
310    }
311
312    /**
313     * Creates a PageFile in the specified directory who's data files are named by name.
314     */
315    public PageFile(File directory, String name) {
316        this.directory = directory;
317        this.name = name;
318    }
319
320    /**
321     * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
322     *
323     * @throws IOException           if the files cannot be deleted.
324     * @throws IllegalStateException if this PageFile is loaded
325     */
326    public void delete() throws IOException {
327        if (loaded.get()) {
328            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
329        }
330        delete(getMainPageFile());
331        delete(getFreeFile());
332        delete(getRecoveryFile());
333    }
334
335    public void archive() throws IOException {
336        if (loaded.get()) {
337            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
338        }
339        long timestamp = System.currentTimeMillis();
340        archive(getMainPageFile(), String.valueOf(timestamp));
341        archive(getFreeFile(), String.valueOf(timestamp));
342        archive(getRecoveryFile(), String.valueOf(timestamp));
343    }
344
345    /**
346     * @param file
347     * @throws IOException
348     */
349    private void delete(File file) throws IOException {
350        if (file.exists() && !file.delete()) {
351            throw new IOException("Could not delete: " + file.getPath());
352        }
353    }
354
355    private void archive(File file, String suffix) throws IOException {
356        if (file.exists()) {
357            File archive = new File(file.getPath() + "-" + suffix);
358            if (!file.renameTo(archive)) {
359                throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
360            }
361        }
362    }
363
364    /**
365     * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the
366     * first time the page file is loaded, then this creates the page file in the file system.
367     *
368     * @throws IOException           If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
369     *                               there was a disk error.
370     * @throws IllegalStateException If the page file was already loaded.
371     */
372    public void load() throws IOException, IllegalStateException {
373        if (loaded.compareAndSet(false, true)) {
374
375            if (enablePageCaching) {
376                if (isUseLFRUEviction()) {
377                    pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor()));
378                } else {
379                    pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
380                }
381            }
382
383            File file = getMainPageFile();
384            IOHelper.mkdirs(file.getParentFile());
385            writeFile = new RecoverableRandomAccessFile(file, "rw", false);
386            readFile = new RecoverableRandomAccessFile(file, "r");
387
388            if (readFile.length() > 0) {
389                // Load the page size setting cause that can't change once the file is created.
390                loadMetaData();
391                pageSize = metaData.getPageSize();
392            } else {
393                // Store the page size setting cause that can't change once the file is created.
394                metaData = new MetaData();
395                metaData.setFileType(PageFile.class.getName());
396                metaData.setFileTypeVersion("1");
397                metaData.setPageSize(getPageSize());
398                metaData.setCleanShutdown(true);
399                metaData.setFreePages(-1);
400                metaData.setLastTxId(0);
401                storeMetaData();
402            }
403
404            if (enableRecoveryFile) {
405                recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw");
406            }
407
408            if (metaData.isCleanShutdown()) {
409                nextTxid.set(metaData.getLastTxId() + 1);
410                if (metaData.getFreePages() > 0) {
411                    loadFreeList();
412                }
413            } else {
414                LOG.debug(toString() + ", Recovering page file...");
415                nextTxid.set(redoRecoveryUpdates());
416                trackingFreeDuringRecovery.set(new SequenceSet());
417            }
418
419            if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
420                writeFile.setLength(PAGE_FILE_HEADER_SIZE);
421            }
422            nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
423
424            metaData.setCleanShutdown(false);
425            storeMetaData();
426            getFreeFile().delete();
427            startWriter();
428            if (trackingFreeDuringRecovery.get() != null) {
429                asyncFreePageRecovery(nextFreePageId.get());
430            }
431        } else {
432            throw new IllegalStateException("Cannot load the page file when it is already loaded.");
433        }
434    }
435
436    private void asyncFreePageRecovery(final long lastRecoveryPage) {
437        Thread thread = new Thread("KahaDB Index Free Page Recovery") {
438            @Override
439            public void run() {
440                try {
441                    recoverFreePages(lastRecoveryPage);
442                } catch (Throwable e) {
443                    if (loaded.get()) {
444                        LOG.warn("Error recovering index free page list", e);
445                    }
446                }
447            }
448        };
449        thread.setPriority(Thread.NORM_PRIORITY);
450        thread.setDaemon(true);
451        thread.start();
452    }
453
454    private void recoverFreePages(final long lastRecoveryPage) throws Exception {
455        LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown..");
456        SequenceSet newFreePages = new SequenceSet();
457        // need new pageFile instance to get unshared readFile
458        PageFile recoveryPageFile = new PageFile(directory, name);
459        recoveryPageFile.loadForRecovery(nextFreePageId.get());
460        try {
461            for (Iterator<Page> i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) {
462                Page page = i.next();
463
464                if (page.getPageId() >= lastRecoveryPage) {
465                    break;
466                }
467
468                if (page.getType() == Page.PAGE_FREE_TYPE) {
469                    newFreePages.add(page.getPageId());
470                }
471            }
472        } finally {
473            recoveryPageFile.readFile.close();
474        }
475
476        LOG.info(toString() + ". Recovered pageFile free list of size: " + newFreePages.rangeSize());
477        if (!newFreePages.isEmpty()) {
478
479            // allow flush (with index lock held) to merge eventually
480            recoveredFreeList.lazySet(newFreePages);
481        }
482    }
483
484    private void loadForRecovery(long nextFreePageIdSnap) throws Exception {
485        loaded.set(true);
486        enablePageCaching = false;
487        File file = getMainPageFile();
488        readFile = new RecoverableRandomAccessFile(file, "r");
489        loadMetaData();
490        pageSize = metaData.getPageSize();
491        enableRecoveryFile = false;
492        nextFreePageId.set(nextFreePageIdSnap);
493    }
494
495
496    /**
497     * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
498     * once unloaded, you can no longer use the page file to read or write Pages.
499     *
500     * @throws IOException           if there was a disk error occurred while closing the down the page file.
501     * @throws IllegalStateException if the PageFile is not loaded
502     */
503    public void unload() throws IOException {
504        if (loaded.compareAndSet(true, false)) {
505            flush();
506            try {
507                stopWriter();
508            } catch (InterruptedException e) {
509                throw new InterruptedIOException();
510            }
511
512            if (freeList.isEmpty()) {
513                metaData.setFreePages(0);
514            } else {
515                storeFreeList();
516                metaData.setFreePages(freeList.size());
517            }
518
519            metaData.setLastTxId(nextTxid.get() - 1);
520            if (trackingFreeDuringRecovery.get() != null) {
521                // async recovery incomplete, will have to try again
522                metaData.setCleanShutdown(false);
523            } else {
524                metaData.setCleanShutdown(true);
525            }
526            storeMetaData();
527
528            if (readFile != null) {
529                readFile.close();
530                readFile = null;
531                writeFile.close();
532                writeFile = null;
533                if (enableRecoveryFile) {
534                    recoveryFile.close();
535                    recoveryFile = null;
536                }
537                freeList.clear();
538                if (pageCache != null) {
539                    pageCache = null;
540                }
541                synchronized (writes) {
542                    writes.clear();
543                }
544            }
545        } else {
546            throw new IllegalStateException("Cannot unload the page file when it is not loaded");
547        }
548    }
549
550    public boolean isLoaded() {
551        return loaded.get();
552    }
553
554    public void allowIOResumption() {
555        loaded.set(true);
556    }
557
558    /**
559     * Flush and sync all write buffers to disk.
560     *
561     * @throws IOException If an disk error occurred.
562     */
563    public void flush() throws IOException {
564
565        if (enabledWriteThread && stopWriter.get()) {
566            throw new IOException("Page file already stopped: checkpointing is not allowed");
567        }
568
569        SequenceSet recovered = recoveredFreeList.get();
570        if (recovered != null) {
571            recoveredFreeList.lazySet(null);
572            SequenceSet inUse = trackingFreeDuringRecovery.get();
573            recovered.remove(inUse);
574            freeList.merge(recovered);
575
576            // all set for clean shutdown
577            trackingFreeDuringRecovery.set(null);
578            inUse.clear();
579        }
580
581        // Setup a latch that gets notified when all buffered writes hits the disk.
582        CountDownLatch checkpointLatch;
583        synchronized (writes) {
584            if (writes.isEmpty()) {
585                return;
586            }
587            if (enabledWriteThread) {
588                if (this.checkpointLatch == null) {
589                    this.checkpointLatch = new CountDownLatch(1);
590                }
591                checkpointLatch = this.checkpointLatch;
592                writes.notify();
593            } else {
594                writeBatch();
595                return;
596            }
597        }
598        try {
599            checkpointLatch.await();
600        } catch (InterruptedException e) {
601            InterruptedIOException ioe = new InterruptedIOException();
602            ioe.initCause(e);
603            throw ioe;
604        }
605    }
606
607
608    @Override
609    public String toString() {
610        return "Page File: " + getMainPageFile();
611    }
612
613    ///////////////////////////////////////////////////////////////////
614    // Private Implementation Methods
615    ///////////////////////////////////////////////////////////////////
616    private File getMainPageFile() {
617        return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX);
618    }
619
620    public File getFreeFile() {
621        return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX);
622    }
623
624    public File getRecoveryFile() {
625        return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX);
626    }
627
628    public long toOffset(long pageId) {
629        return PAGE_FILE_HEADER_SIZE + (pageId * pageSize);
630    }
631
632    private void loadMetaData() throws IOException {
633
634        ByteArrayInputStream is;
635        MetaData v1 = new MetaData();
636        MetaData v2 = new MetaData();
637        try {
638            Properties p = new Properties();
639            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
640            readFile.seek(0);
641            readFile.readFully(d);
642            is = new ByteArrayInputStream(d);
643            p.load(is);
644            IntrospectionSupport.setProperties(v1, p);
645        } catch (IOException e) {
646            v1 = null;
647        }
648
649        try {
650            Properties p = new Properties();
651            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
652            readFile.seek(PAGE_FILE_HEADER_SIZE / 2);
653            readFile.readFully(d);
654            is = new ByteArrayInputStream(d);
655            p.load(is);
656            IntrospectionSupport.setProperties(v2, p);
657        } catch (IOException e) {
658            v2 = null;
659        }
660
661        if (v1 == null && v2 == null) {
662            throw new IOException("Could not load page file meta data");
663        }
664
665        if (v1 == null || v1.metaDataTxId < 0) {
666            metaData = v2;
667        } else if (v2 == null || v1.metaDataTxId < 0) {
668            metaData = v1;
669        } else if (v1.metaDataTxId == v2.metaDataTxId) {
670            metaData = v1; // use the first since the 2nd could be a partial..
671        } else {
672            metaData = v2; // use the second cause the first is probably a partial.
673        }
674    }
675
676    private void storeMetaData() throws IOException {
677        // Convert the metadata into a property format
678        metaData.metaDataTxId++;
679        Properties p = new Properties();
680        IntrospectionSupport.getProperties(metaData, p, null);
681
682        ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
683        p.store(os, "");
684        if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
685            throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
686        }
687        // Fill the rest with space...
688        byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
689        Arrays.fill(filler, (byte) ' ');
690        os.write(filler);
691        os.flush();
692
693        byte[] d = os.toByteArray();
694
695        // So we don't loose it.. write it 2 times...
696        writeFile.seek(0);
697        writeFile.write(d);
698        writeFile.sync();
699        writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
700        writeFile.write(d);
701        writeFile.sync();
702    }
703
704    private void storeFreeList() throws IOException {
705        FileOutputStream os = new FileOutputStream(getFreeFile());
706        DataOutputStream dos = new DataOutputStream(os);
707        SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
708        dos.close();
709    }
710
711    private void loadFreeList() throws IOException {
712        freeList.clear();
713        FileInputStream is = new FileInputStream(getFreeFile());
714        DataInputStream dis = new DataInputStream(is);
715        freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
716        dis.close();
717    }
718
719    ///////////////////////////////////////////////////////////////////
720    // Property Accessors
721    ///////////////////////////////////////////////////////////////////
722
723    /**
724     * Is the recovery buffer used to double buffer page writes.  Enabled by default.
725     *
726     * @return is the recovery buffer enabled.
727     */
728    public boolean isEnableRecoveryFile() {
729        return enableRecoveryFile;
730    }
731
732    /**
733     * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
734     * may potentially cause partial page writes which can lead to page file corruption.
735     */
736    public void setEnableRecoveryFile(boolean doubleBuffer) {
737        assertNotLoaded();
738        this.enableRecoveryFile = doubleBuffer;
739    }
740
741    /**
742     * @return Are page writes synced to disk?
743     */
744    public boolean isEnableDiskSyncs() {
745        return enableDiskSyncs;
746    }
747
748    /**
749     * Allows you enable syncing writes to disk.
750     */
751    public void setEnableDiskSyncs(boolean syncWrites) {
752        assertNotLoaded();
753        this.enableDiskSyncs = syncWrites;
754    }
755
756    /**
757     * @return the page size
758     */
759    public int getPageSize() {
760        return this.pageSize;
761    }
762
763    /**
764     * @return the amount of content data that a page can hold.
765     */
766    public int getPageContentSize() {
767        return this.pageSize - Page.PAGE_HEADER_SIZE;
768    }
769
770    /**
771     * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
772     * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
773     * can no longer be changed.
774     *
775     * @param pageSize the pageSize to set
776     * @throws IllegalStateException once the page file is loaded.
777     */
778    public void setPageSize(int pageSize) throws IllegalStateException {
779        assertNotLoaded();
780        this.pageSize = pageSize;
781    }
782
783    /**
784     * @return true if read page caching is enabled
785     */
786    public boolean isEnablePageCaching() {
787        return this.enablePageCaching;
788    }
789
790    /**
791     * @param enablePageCaching allows you to enable read page caching
792     */
793    public void setEnablePageCaching(boolean enablePageCaching) {
794        assertNotLoaded();
795        this.enablePageCaching = enablePageCaching;
796    }
797
798    /**
799     * @return the maximum number of pages that will get stored in the read page cache.
800     */
801    public int getPageCacheSize() {
802        return this.pageCacheSize;
803    }
804
805    /**
806     * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache.
807     */
808    public void setPageCacheSize(int pageCacheSize) {
809        assertNotLoaded();
810        this.pageCacheSize = pageCacheSize;
811    }
812
813    public boolean isEnabledWriteThread() {
814        return enabledWriteThread;
815    }
816
817    public void setEnableWriteThread(boolean enableAsyncWrites) {
818        assertNotLoaded();
819        this.enabledWriteThread = enableAsyncWrites;
820    }
821
822    public long getDiskSize() throws IOException {
823        return toOffset(nextFreePageId.get());
824    }
825
826    public boolean isFreePage(long pageId) {
827        return freeList.contains(pageId);
828    }
829    /**
830     * @return the number of pages allocated in the PageFile
831     */
832    public long getPageCount() {
833        return nextFreePageId.get();
834    }
835
836    public int getRecoveryFileMinPageCount() {
837        return recoveryFileMinPageCount;
838    }
839
840    public long getFreePageCount() {
841        assertLoaded();
842        return freeList.rangeSize();
843    }
844
845    public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
846        assertNotLoaded();
847        this.recoveryFileMinPageCount = recoveryFileMinPageCount;
848    }
849
850    public int getRecoveryFileMaxPageCount() {
851        return recoveryFileMaxPageCount;
852    }
853
854    public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
855        assertNotLoaded();
856        this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
857    }
858
859    public int getWriteBatchSize() {
860        return writeBatchSize;
861    }
862
863    public void setWriteBatchSize(int writeBatchSize) {
864        this.writeBatchSize = writeBatchSize;
865    }
866
867    public float getLFUEvictionFactor() {
868        return LFUEvictionFactor;
869    }
870
871    public void setLFUEvictionFactor(float LFUEvictionFactor) {
872        this.LFUEvictionFactor = LFUEvictionFactor;
873    }
874
875    public boolean isUseLFRUEviction() {
876        return useLFRUEviction;
877    }
878
879    public void setUseLFRUEviction(boolean useLFRUEviction) {
880        this.useLFRUEviction = useLFRUEviction;
881    }
882
883    ///////////////////////////////////////////////////////////////////
884    // Package Protected Methods exposed to Transaction
885    ///////////////////////////////////////////////////////////////////
886
887    /**
888     * @throws IllegalStateException if the page file is not loaded.
889     */
890    void assertLoaded() throws IllegalStateException {
891        if (!loaded.get()) {
892            throw new IllegalStateException("PageFile is not loaded");
893        }
894    }
895
896    void assertNotLoaded() throws IllegalStateException {
897        if (loaded.get()) {
898            throw new IllegalStateException("PageFile is loaded");
899        }
900    }
901
902    /**
903     * Allocates a block of free pages that you can write data to.
904     *
905     * @param count the number of sequential pages to allocate
906     * @return the first page of the sequential set.
907     * @throws IOException           If an disk error occurred.
908     * @throws IllegalStateException if the PageFile is not loaded
909     */
910    <T> Page<T> allocate(int count) throws IOException {
911        assertLoaded();
912        if (count <= 0) {
913            throw new IllegalArgumentException("The allocation count must be larger than zero");
914        }
915
916        Sequence seq = freeList.removeFirstSequence(count);
917
918        // We may need to create new free pages...
919        if (seq == null) {
920
921            Page<T> first = null;
922            int c = count;
923
924            // Perform the id's only once....
925            long pageId = nextFreePageId.getAndAdd(count);
926            long writeTxnId = nextTxid.getAndAdd(count);
927
928            while (c-- > 0) {
929                Page<T> page = new Page<T>(pageId++);
930                page.makeFree(writeTxnId++);
931
932                if (first == null) {
933                    first = page;
934                }
935
936                addToCache(page);
937                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
938                page.write(out);
939                write(page, out.getData());
940
941                // LOG.debug("allocate writing: "+page.getPageId());
942            }
943
944            return first;
945        }
946
947        Page<T> page = new Page<T>(seq.getFirst());
948        page.makeFree(0);
949        // LOG.debug("allocated: "+page.getPageId());
950        return page;
951    }
952
953    long getNextWriteTransactionId() {
954        return nextTxid.incrementAndGet();
955    }
956
957    synchronized void readPage(long pageId, byte[] data) throws IOException {
958        readFile.seek(toOffset(pageId));
959        readFile.readFully(data);
960    }
961
962    public void freePage(long pageId) {
963        freeList.add(pageId);
964        removeFromCache(pageId);
965
966        SequenceSet trackFreeDuringRecovery = trackingFreeDuringRecovery.get();
967        if (trackFreeDuringRecovery != null) {
968            trackFreeDuringRecovery.add(pageId);
969        }
970    }
971
972    @SuppressWarnings("unchecked")
973    private <T> void write(Page<T> page, byte[] data) throws IOException {
974        final PageWrite write = new PageWrite(page, data);
975        Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
976            @Override
977            public Long getKey() {
978                return write.getPage().getPageId();
979            }
980
981            @Override
982            public PageWrite getValue() {
983                return write;
984            }
985
986            @Override
987            public PageWrite setValue(PageWrite value) {
988                return null;
989            }
990        };
991        Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
992        write(Arrays.asList(entries));
993    }
994
995    void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
996        synchronized (writes) {
997            if (enabledWriteThread) {
998                while (writes.size() >= writeBatchSize && !stopWriter.get()) {
999                    try {
1000                        writes.wait();
1001                    } catch (InterruptedException e) {
1002                        Thread.currentThread().interrupt();
1003                        throw new InterruptedIOException();
1004                    }
1005                }
1006            }
1007
1008            boolean longTx = false;
1009
1010            for (Map.Entry<Long, PageWrite> entry : updates) {
1011                Long key = entry.getKey();
1012                PageWrite value = entry.getValue();
1013                PageWrite write = writes.get(key);
1014                if (write == null) {
1015                    writes.put(key, value);
1016                } else {
1017                    if (value.currentLocation != -1) {
1018                        write.setCurrentLocation(value.page, value.currentLocation, value.length);
1019                        write.tmpFile = value.tmpFile;
1020                        longTx = true;
1021                    } else {
1022                        write.setCurrent(value.page, value.current);
1023                    }
1024                }
1025            }
1026
1027            // Once we start approaching capacity, notify the writer to start writing
1028            // sync immediately for long txs
1029            if (longTx || canStartWriteBatch()) {
1030
1031                if (enabledWriteThread) {
1032                    writes.notify();
1033                } else {
1034                    writeBatch();
1035                }
1036            }
1037        }
1038    }
1039
1040    private boolean canStartWriteBatch() {
1041        int capacityUsed = ((writes.size() * 100) / writeBatchSize);
1042        if (enabledWriteThread) {
1043            // The constant 10 here controls how soon write batches start going to disk..
1044            // would be nice to figure out how to auto tune that value.  Make to small and
1045            // we reduce through put because we are locking the write mutex too often doing writes
1046            return capacityUsed >= 10 || checkpointLatch != null;
1047        } else {
1048            return capacityUsed >= 80 || checkpointLatch != null;
1049        }
1050    }
1051
1052    ///////////////////////////////////////////////////////////////////
1053    // Cache Related operations
1054    ///////////////////////////////////////////////////////////////////
1055    @SuppressWarnings("unchecked")
1056    <T> Page<T> getFromCache(long pageId) {
1057        synchronized (writes) {
1058            PageWrite pageWrite = writes.get(pageId);
1059            if (pageWrite != null) {
1060                return pageWrite.page;
1061            }
1062        }
1063
1064        Page<T> result = null;
1065        if (enablePageCaching) {
1066            result = pageCache.get(pageId);
1067        }
1068        return result;
1069    }
1070
1071    void addToCache(Page page) {
1072        if (enablePageCaching) {
1073            pageCache.put(page.getPageId(), page);
1074        }
1075    }
1076
1077    void removeFromCache(long pageId) {
1078        if (enablePageCaching) {
1079            pageCache.remove(pageId);
1080        }
1081    }
1082
1083    ///////////////////////////////////////////////////////////////////
1084    // Internal Double write implementation follows...
1085    ///////////////////////////////////////////////////////////////////
1086
1087    private void pollWrites() {
1088        try {
1089            while (!stopWriter.get()) {
1090                // Wait for a notification...
1091                synchronized (writes) {
1092                    writes.notifyAll();
1093
1094                    // If there is not enough to write, wait for a notification...
1095                    while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) {
1096                        writes.wait(100);
1097                    }
1098
1099                    if (writes.isEmpty()) {
1100                        releaseCheckpointWaiter();
1101                    }
1102                }
1103                writeBatch();
1104            }
1105        } catch (Throwable e) {
1106            LOG.info("An exception was raised while performing poll writes", e);
1107        } finally {
1108            releaseCheckpointWaiter();
1109        }
1110    }
1111
1112    private void writeBatch() throws IOException {
1113
1114        CountDownLatch checkpointLatch;
1115        ArrayList<PageWrite> batch;
1116        synchronized (writes) {
1117            // If there is not enough to write, wait for a notification...
1118
1119            batch = new ArrayList<PageWrite>(writes.size());
1120            // build a write batch from the current write cache.
1121            for (PageWrite write : writes.values()) {
1122                batch.add(write);
1123                // Move the current write to the diskBound write, this lets folks update the
1124                // page again without blocking for this write.
1125                write.begin();
1126                if (write.diskBound == null && write.diskBoundLocation == -1) {
1127                    batch.remove(write);
1128                }
1129            }
1130
1131            // Grab on to the existing checkpoint latch cause once we do this write we can
1132            // release the folks that were waiting for those writes to hit disk.
1133            checkpointLatch = this.checkpointLatch;
1134            this.checkpointLatch = null;
1135        }
1136
1137        // First land the writes in the recovery file
1138        if (enableRecoveryFile) {
1139            Checksum checksum = new Adler32();
1140
1141            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1142
1143            for (PageWrite w : batch) {
1144                try {
1145                    checksum.update(w.getDiskBound(), 0, pageSize);
1146                } catch (Throwable t) {
1147                    throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
1148                }
1149                recoveryFile.writeLong(w.page.getPageId());
1150                recoveryFile.write(w.getDiskBound(), 0, pageSize);
1151            }
1152
1153            // Can we shrink the recovery buffer??
1154            if (recoveryPageCount > recoveryFileMaxPageCount) {
1155                int t = Math.max(recoveryFileMinPageCount, batch.size());
1156                recoveryFile.setLength(recoveryFileSizeForPages(t));
1157            }
1158
1159            // Record the page writes in the recovery buffer.
1160            recoveryFile.seek(0);
1161            // Store the next tx id...
1162            recoveryFile.writeLong(nextTxid.get());
1163            // Store the checksum for thw write batch so that on recovery we
1164            // know if we have a consistent
1165            // write batch on disk.
1166            recoveryFile.writeLong(checksum.getValue());
1167            // Write the # of pages that will follow
1168            recoveryFile.writeInt(batch.size());
1169
1170            if (enableDiskSyncs) {
1171                recoveryFile.sync();
1172            }
1173        }
1174
1175        try {
1176            for (PageWrite w : batch) {
1177                writeFile.seek(toOffset(w.page.getPageId()));
1178                writeFile.write(w.getDiskBound(), 0, pageSize);
1179                w.done();
1180            }
1181
1182            if (enableDiskSyncs) {
1183                writeFile.sync();
1184            }
1185
1186        } catch (IOException ioError) {
1187            LOG.info("Unexpected io error on pagefile write of " + batch.size() + " pages.", ioError);
1188            // any subsequent write needs to be prefaced with a considered call to redoRecoveryUpdates
1189            // to ensure disk image is self consistent
1190            loaded.set(false);
1191            throw  ioError;
1192        } finally {
1193            synchronized (writes) {
1194                for (PageWrite w : batch) {
1195                    // If there are no more pending writes, then remove it from
1196                    // the write cache.
1197                    if (w.isDone()) {
1198                        writes.remove(w.page.getPageId());
1199                        if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
1200                            if (!w.tmpFile.delete()) {
1201                                throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
1202                            }
1203                            tmpFilesForRemoval.remove(w.tmpFile);
1204                        }
1205                    }
1206                }
1207            }
1208
1209            if (checkpointLatch != null) {
1210                checkpointLatch.countDown();
1211            }
1212        }
1213    }
1214
1215    public void removeTmpFile(File file) {
1216        tmpFilesForRemoval.add(file);
1217    }
1218
1219    private long recoveryFileSizeForPages(int pageCount) {
1220        return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount);
1221    }
1222
1223    private void releaseCheckpointWaiter() {
1224        if (checkpointLatch != null) {
1225            checkpointLatch.countDown();
1226            checkpointLatch = null;
1227        }
1228    }
1229
1230    /**
1231     * Inspects the recovery buffer and re-applies any
1232     * partially applied page writes.
1233     *
1234     * @return the next transaction id that can be used.
1235     */
1236    private long redoRecoveryUpdates() throws IOException {
1237        if (!enableRecoveryFile) {
1238            return 0;
1239        }
1240        recoveryPageCount = 0;
1241
1242        // Are we initializing the recovery file?
1243        if (recoveryFile.length() == 0) {
1244            // Write an empty header..
1245            recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1246            // Preallocate the minium size for better performance.
1247            recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1248            return 0;
1249        }
1250
1251        // How many recovery pages do we have in the recovery buffer?
1252        recoveryFile.seek(0);
1253        long nextTxId = recoveryFile.readLong();
1254        long expectedChecksum = recoveryFile.readLong();
1255        int pageCounter = recoveryFile.readInt();
1256
1257        recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1258        Checksum checksum = new Adler32();
1259        LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1260        try {
1261            for (int i = 0; i < pageCounter; i++) {
1262                long offset = recoveryFile.readLong();
1263                byte[] data = new byte[pageSize];
1264                if (recoveryFile.read(data, 0, pageSize) != pageSize) {
1265                    // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1266                    return nextTxId;
1267                }
1268                checksum.update(data, 0, pageSize);
1269                batch.put(offset, data);
1270            }
1271        } catch (Exception e) {
1272            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
1273            // as the pages should still be consistent.
1274            LOG.debug("Redo buffer was not fully intact: ", e);
1275            return nextTxId;
1276        }
1277
1278        recoveryPageCount = pageCounter;
1279
1280        // If the checksum is not valid then the recovery buffer was partially written to disk.
1281        if (checksum.getValue() != expectedChecksum) {
1282            return nextTxId;
1283        }
1284
1285        // Re-apply all the writes in the recovery buffer.
1286        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1287            writeFile.seek(toOffset(e.getKey()));
1288            writeFile.write(e.getValue());
1289        }
1290
1291        // And sync it to disk
1292        writeFile.sync();
1293        return nextTxId;
1294    }
1295
1296    private void startWriter() {
1297        synchronized (writes) {
1298            if (enabledWriteThread) {
1299                stopWriter.set(false);
1300                writerThread = new Thread("KahaDB Page Writer") {
1301                    @Override
1302                    public void run() {
1303                        pollWrites();
1304                    }
1305                };
1306                writerThread.setPriority(Thread.MAX_PRIORITY);
1307                writerThread.setDaemon(true);
1308                writerThread.start();
1309            }
1310        }
1311    }
1312
1313    private void stopWriter() throws InterruptedException {
1314        if (enabledWriteThread) {
1315            stopWriter.set(true);
1316            writerThread.join();
1317        }
1318    }
1319
1320    public File getFile() {
1321        return getMainPageFile();
1322    }
1323
1324    public File getDirectory() {
1325        return directory;
1326    }
1327}