001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.page;
018
019import java.io.ByteArrayInputStream;
020import java.io.ByteArrayOutputStream;
021import java.io.DataInputStream;
022import java.io.DataOutputStream;
023import java.io.File;
024import java.io.FileInputStream;
025import java.io.FileOutputStream;
026import java.io.IOException;
027import java.io.InterruptedIOException;
028import java.io.RandomAccessFile;
029import java.util.ArrayList;
030import java.util.Arrays;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.Iterator;
034import java.util.LinkedHashMap;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Properties;
038import java.util.TreeMap;
039import java.util.concurrent.CountDownLatch;
040import java.util.concurrent.atomic.AtomicBoolean;
041import java.util.concurrent.atomic.AtomicLong;
042import java.util.concurrent.atomic.AtomicReference;
043import java.util.zip.Adler32;
044import java.util.zip.Checksum;
045
046import org.apache.activemq.store.kahadb.disk.util.Sequence;
047import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
048import org.apache.activemq.util.DataByteArrayOutputStream;
049import org.apache.activemq.util.IOExceptionSupport;
050import org.apache.activemq.util.IOHelper;
051import org.apache.activemq.util.IntrospectionSupport;
052import org.apache.activemq.util.LFUCache;
053import org.apache.activemq.util.LRUCache;
054import org.apache.activemq.util.RecoverableRandomAccessFile;
055import org.slf4j.Logger;
056import org.slf4j.LoggerFactory;
057
058/**
059 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should
060 * be externally synchronized.
061 * <p/>
062 * The file has 3 parts:
063 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file.
064 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent
065 * Page Space: The pages in the page file.
066 */
067public class PageFile {
068
069    private static final String PAGEFILE_SUFFIX = ".data";
070    private static final String RECOVERY_FILE_SUFFIX = ".redo";
071    private static final String FREE_FILE_SUFFIX = ".free";
072
073    // 4k Default page size.
074    public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4);
075    public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000);
076    public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);;
077
078    private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4;
079    private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4;
080
081    // Recovery header is (long offset)
082    private static final Logger LOG = LoggerFactory.getLogger(PageFile.class);
083
084    // A PageFile will use a couple of files in this directory
085    private final File directory;
086    // And the file names in that directory will be based on this name.
087    private final String name;
088
089    // File handle used for reading pages..
090    private RecoverableRandomAccessFile readFile;
091    // File handle used for writing pages..
092    private RecoverableRandomAccessFile writeFile;
093    // File handle used for writing pages..
094    private RecoverableRandomAccessFile recoveryFile;
095
096    // The size of pages
097    private int pageSize = DEFAULT_PAGE_SIZE;
098
099    // The minimum number of space allocated to the recovery file in number of pages.
100    private int recoveryFileMinPageCount = 1000;
101    // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize
102    // to this max size as soon as  possible.
103    private int recoveryFileMaxPageCount = 10000;
104    // The number of pages in the current recovery buffer
105    private int recoveryPageCount;
106
107    private final AtomicBoolean loaded = new AtomicBoolean();
108    // The number of pages we are aiming to write every time we
109    // write to disk.
110    int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE;
111
112    // We keep a cache of pages recently used?
113    private Map<Long, Page> pageCache;
114    // The cache of recently used pages.
115    private boolean enablePageCaching = true;
116    // How many pages will we keep in the cache?
117    private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE;
118
119    // Should first log the page write to the recovery buffer? Avoids partial
120    // page write failures..
121    private boolean enableRecoveryFile = true;
122    // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
123    private boolean enableDiskSyncs = true;
124    // Will writes be done in an async thread?
125    private boolean enabledWriteThread = false;
126
127    // These are used if enableAsyncWrites==true
128    private final AtomicBoolean stopWriter = new AtomicBoolean();
129    private Thread writerThread;
130    private CountDownLatch checkpointLatch;
131
132    // Keeps track of writes that are being written to disk.
133    private final TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>();
134
135    // Keeps track of free pages.
136    private final AtomicLong nextFreePageId = new AtomicLong();
137    private SequenceSet freeList = new SequenceSet();
138
139    private AtomicReference<SequenceSet> recoveredFreeList = new AtomicReference<SequenceSet>();
140    private AtomicReference<SequenceSet> trackingFreeDuringRecovery = new AtomicReference<SequenceSet>();
141
142    private final AtomicLong nextTxid = new AtomicLong();
143
144    // Persistent settings stored in the page file.
145    private MetaData metaData;
146
147    private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
148
149    private boolean useLFRUEviction = false;
150    private float LFUEvictionFactor = 0.2f;
151
152    /**
153     * Use to keep track of updated pages which have not yet been committed.
154     */
155    static class PageWrite {
156        Page page;
157        byte[] current;
158        byte[] diskBound;
159        long currentLocation = -1;
160        long diskBoundLocation = -1;
161        File tmpFile;
162        int length;
163
164        public PageWrite(Page page, byte[] data) {
165            this.page = page;
166            current = data;
167        }
168
169        public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
170            this.page = page;
171            this.currentLocation = currentLocation;
172            this.tmpFile = tmpFile;
173            this.length = length;
174        }
175
176        public void setCurrent(Page page, byte[] data) {
177            this.page = page;
178            current = data;
179            currentLocation = -1;
180        }
181
182        public void setCurrentLocation(Page page, long location, int length) {
183            this.page = page;
184            this.currentLocation = location;
185            this.length = length;
186            this.current = null;
187        }
188
189        @Override
190        public String toString() {
191            return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]";
192        }
193
194        @SuppressWarnings("unchecked")
195        public Page getPage() {
196            return page;
197        }
198
199        public byte[] getDiskBound() throws IOException {
200            if (diskBound == null && diskBoundLocation != -1) {
201                diskBound = new byte[length];
202                try(RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) {
203                    file.seek(diskBoundLocation);
204                    file.read(diskBound);
205                }
206                diskBoundLocation = -1;
207            }
208            return diskBound;
209        }
210
211        void begin() {
212            if (currentLocation != -1) {
213                diskBoundLocation = currentLocation;
214            } else {
215                diskBound = current;
216            }
217            current = null;
218            currentLocation = -1;
219        }
220
221        /**
222         * @return true if there is no pending writes to do.
223         */
224        boolean done() {
225            diskBoundLocation = -1;
226            diskBound = null;
227            return current == null || currentLocation == -1;
228        }
229
230        boolean isDone() {
231            return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
232        }
233    }
234
235    /**
236     * The MetaData object hold the persistent data associated with a PageFile object.
237     */
238    public static class MetaData {
239
240        String fileType;
241        String fileTypeVersion;
242
243        long metaDataTxId = -1;
244        int pageSize;
245        boolean cleanShutdown;
246        long lastTxId;
247        long freePages;
248
249        public String getFileType() {
250            return fileType;
251        }
252
253        public void setFileType(String fileType) {
254            this.fileType = fileType;
255        }
256
257        public String getFileTypeVersion() {
258            return fileTypeVersion;
259        }
260
261        public void setFileTypeVersion(String version) {
262            this.fileTypeVersion = version;
263        }
264
265        public long getMetaDataTxId() {
266            return metaDataTxId;
267        }
268
269        public void setMetaDataTxId(long metaDataTxId) {
270            this.metaDataTxId = metaDataTxId;
271        }
272
273        public int getPageSize() {
274            return pageSize;
275        }
276
277        public void setPageSize(int pageSize) {
278            this.pageSize = pageSize;
279        }
280
281        public boolean isCleanShutdown() {
282            return cleanShutdown;
283        }
284
285        public void setCleanShutdown(boolean cleanShutdown) {
286            this.cleanShutdown = cleanShutdown;
287        }
288
289        public long getLastTxId() {
290            return lastTxId;
291        }
292
293        public void setLastTxId(long lastTxId) {
294            this.lastTxId = lastTxId;
295        }
296
297        public long getFreePages() {
298            return freePages;
299        }
300
301        public void setFreePages(long value) {
302            this.freePages = value;
303        }
304    }
305
306    public Transaction tx() {
307        assertLoaded();
308        return new Transaction(this);
309    }
310
311    /**
312     * Creates a PageFile in the specified directory who's data files are named by name.
313     */
314    public PageFile(File directory, String name) {
315        this.directory = directory;
316        this.name = name;
317    }
318
319    /**
320     * Deletes the files used by the PageFile object.  This method can only be used when this object is not loaded.
321     *
322     * @throws IOException           if the files cannot be deleted.
323     * @throws IllegalStateException if this PageFile is loaded
324     */
325    public void delete() throws IOException {
326        if (loaded.get()) {
327            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
328        }
329        delete(getMainPageFile());
330        delete(getFreeFile());
331        delete(getRecoveryFile());
332    }
333
334    public void archive() throws IOException {
335        if (loaded.get()) {
336            throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
337        }
338        long timestamp = System.currentTimeMillis();
339        archive(getMainPageFile(), String.valueOf(timestamp));
340        archive(getFreeFile(), String.valueOf(timestamp));
341        archive(getRecoveryFile(), String.valueOf(timestamp));
342    }
343
344    /**
345     * @param file
346     * @throws IOException
347     */
348    private void delete(File file) throws IOException {
349        if (file.exists() && !file.delete()) {
350            throw new IOException("Could not delete: " + file.getPath());
351        }
352    }
353
354    private void archive(File file, String suffix) throws IOException {
355        if (file.exists()) {
356            File archive = new File(file.getPath() + "-" + suffix);
357            if (!file.renameTo(archive)) {
358                throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath());
359            }
360        }
361    }
362
363    /**
364     * Loads the page file so that it can be accessed for read/write purposes.  This allocates OS resources.  If this is the
365     * first time the page file is loaded, then this creates the page file in the file system.
366     *
367     * @throws IOException           If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if
368     *                               there was a disk error.
369     * @throws IllegalStateException If the page file was already loaded.
370     */
371    public void load() throws IOException, IllegalStateException {
372        if (loaded.compareAndSet(false, true)) {
373
374            if (enablePageCaching) {
375                if (isUseLFRUEviction()) {
376                    pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor()));
377                } else {
378                    pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true));
379                }
380            }
381
382            File file = getMainPageFile();
383            IOHelper.mkdirs(file.getParentFile());
384            writeFile = new RecoverableRandomAccessFile(file, "rw", false);
385            readFile = new RecoverableRandomAccessFile(file, "r");
386
387            if (readFile.length() > 0) {
388                // Load the page size setting cause that can't change once the file is created.
389                loadMetaData();
390                pageSize = metaData.getPageSize();
391            } else {
392                // Store the page size setting cause that can't change once the file is created.
393                metaData = new MetaData();
394                metaData.setFileType(PageFile.class.getName());
395                metaData.setFileTypeVersion("1");
396                metaData.setPageSize(getPageSize());
397                metaData.setCleanShutdown(true);
398                metaData.setFreePages(-1);
399                metaData.setLastTxId(0);
400                storeMetaData();
401            }
402
403            if (enableRecoveryFile) {
404                recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw");
405            }
406
407            if (metaData.isCleanShutdown()) {
408                nextTxid.set(metaData.getLastTxId() + 1);
409                if (metaData.getFreePages() > 0) {
410                    loadFreeList();
411                }
412            } else {
413                LOG.debug(toString() + ", Recovering page file...");
414                nextTxid.set(redoRecoveryUpdates());
415                trackingFreeDuringRecovery.set(new SequenceSet());
416            }
417
418            if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
419                writeFile.setLength(PAGE_FILE_HEADER_SIZE);
420            }
421            nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize);
422
423            metaData.setCleanShutdown(false);
424            storeMetaData();
425            getFreeFile().delete();
426            startWriter();
427            if (trackingFreeDuringRecovery.get() != null) {
428                asyncFreePageRecovery(nextFreePageId.get());
429            }
430        } else {
431            throw new IllegalStateException("Cannot load the page file when it is already loaded.");
432        }
433    }
434
435    private void asyncFreePageRecovery(final long lastRecoveryPage) {
436        Thread thread = new Thread("KahaDB Index Free Page Recovery") {
437            @Override
438            public void run() {
439                try {
440                    recoverFreePages(lastRecoveryPage);
441                } catch (Throwable e) {
442                    if (loaded.get()) {
443                        LOG.warn("Error recovering index free page list", e);
444                    }
445                }
446            }
447        };
448        thread.setPriority(Thread.NORM_PRIORITY);
449        thread.setDaemon(true);
450        thread.start();
451    }
452
453    private void recoverFreePages(final long lastRecoveryPage) throws Exception {
454        LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown..");
455        SequenceSet newFreePages = new SequenceSet();
456        // need new pageFile instance to get unshared readFile
457        PageFile recoveryPageFile = new PageFile(directory, name);
458        recoveryPageFile.loadForRecovery(nextFreePageId.get());
459        try {
460            for (Iterator<Page> i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) {
461                Page page = i.next();
462
463                if (page.getPageId() >= lastRecoveryPage) {
464                    break;
465                }
466
467                if (page.getType() == Page.PAGE_FREE_TYPE) {
468                    newFreePages.add(page.getPageId());
469                }
470            }
471        } finally {
472            recoveryPageFile.readFile.close();
473        }
474
475        LOG.info(toString() + ". Recovered pageFile free list of size: " + newFreePages.rangeSize());
476        if (!newFreePages.isEmpty()) {
477
478            // allow flush (with index lock held) to merge eventually
479            recoveredFreeList.lazySet(newFreePages);
480        } else {
481            // If there is no free pages, set trackingFreeDuringRecovery to allow the broker to have a clean shutdown
482            trackingFreeDuringRecovery.set(null);
483        }
484    }
485
486    private void loadForRecovery(long nextFreePageIdSnap) throws Exception {
487        loaded.set(true);
488        enablePageCaching = false;
489        File file = getMainPageFile();
490        readFile = new RecoverableRandomAccessFile(file, "r");
491        loadMetaData();
492        pageSize = metaData.getPageSize();
493        enableRecoveryFile = false;
494        nextFreePageId.set(nextFreePageIdSnap);
495    }
496
497
498    /**
499     * Unloads a previously loaded PageFile.  This deallocates OS related resources like file handles.
500     * once unloaded, you can no longer use the page file to read or write Pages.
501     *
502     * @throws IOException           if there was a disk error occurred while closing the down the page file.
503     * @throws IllegalStateException if the PageFile is not loaded
504     */
505    public void unload() throws IOException {
506        if (loaded.compareAndSet(true, false)) {
507            flush();
508            try {
509                stopWriter();
510            } catch (InterruptedException e) {
511                throw new InterruptedIOException();
512            }
513
514            if (freeList.isEmpty()) {
515                metaData.setFreePages(0);
516            } else {
517                storeFreeList();
518                metaData.setFreePages(freeList.size());
519            }
520
521            metaData.setLastTxId(nextTxid.get() - 1);
522            if (trackingFreeDuringRecovery.get() != null) {
523                // async recovery incomplete, will have to try again
524                metaData.setCleanShutdown(false);
525            } else {
526                metaData.setCleanShutdown(true);
527            }
528            storeMetaData();
529
530            if (readFile != null) {
531                readFile.close();
532                readFile = null;
533                writeFile.close();
534                writeFile = null;
535                if (enableRecoveryFile) {
536                    recoveryFile.close();
537                    recoveryFile = null;
538                }
539                freeList.clear();
540                if (pageCache != null) {
541                    pageCache = null;
542                }
543                synchronized (writes) {
544                    writes.clear();
545                }
546            }
547        } else {
548            throw new IllegalStateException("Cannot unload the page file when it is not loaded");
549        }
550    }
551
552    public boolean isLoaded() {
553        return loaded.get();
554    }
555
556    public boolean isCleanShutdown() {
557        return metaData != null && metaData.isCleanShutdown();
558    }
559
560    public void allowIOResumption() {
561        loaded.set(true);
562    }
563
564    /**
565     * Flush and sync all write buffers to disk.
566     *
567     * @throws IOException If an disk error occurred.
568     */
569    public void flush() throws IOException {
570
571        if (enabledWriteThread && stopWriter.get()) {
572            throw new IOException("Page file already stopped: checkpointing is not allowed");
573        }
574
575        SequenceSet recovered = recoveredFreeList.get();
576        if (recovered != null) {
577            recoveredFreeList.lazySet(null);
578            SequenceSet inUse = trackingFreeDuringRecovery.get();
579            recovered.remove(inUse);
580            freeList.merge(recovered);
581
582            // all set for clean shutdown
583            trackingFreeDuringRecovery.set(null);
584            inUse.clear();
585        }
586
587        // Setup a latch that gets notified when all buffered writes hits the disk.
588        CountDownLatch checkpointLatch;
589        synchronized (writes) {
590            if (writes.isEmpty()) {
591                return;
592            }
593            if (enabledWriteThread) {
594                if (this.checkpointLatch == null) {
595                    this.checkpointLatch = new CountDownLatch(1);
596                }
597                checkpointLatch = this.checkpointLatch;
598                writes.notify();
599            } else {
600                writeBatch();
601                return;
602            }
603        }
604        try {
605            checkpointLatch.await();
606        } catch (InterruptedException e) {
607            InterruptedIOException ioe = new InterruptedIOException();
608            ioe.initCause(e);
609            throw ioe;
610        }
611    }
612
613
614    @Override
615    public String toString() {
616        return "Page File: " + getMainPageFile();
617    }
618
619    ///////////////////////////////////////////////////////////////////
620    // Private Implementation Methods
621    ///////////////////////////////////////////////////////////////////
622    private File getMainPageFile() {
623        return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX);
624    }
625
626    public File getFreeFile() {
627        return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX);
628    }
629
630    public File getRecoveryFile() {
631        return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX);
632    }
633
634    public long toOffset(long pageId) {
635        return PAGE_FILE_HEADER_SIZE + (pageId * pageSize);
636    }
637
638    private void loadMetaData() throws IOException {
639
640        ByteArrayInputStream is;
641        MetaData v1 = new MetaData();
642        MetaData v2 = new MetaData();
643        try {
644            Properties p = new Properties();
645            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
646            readFile.seek(0);
647            readFile.readFully(d);
648            is = new ByteArrayInputStream(d);
649            p.load(is);
650            IntrospectionSupport.setProperties(v1, p);
651        } catch (IOException e) {
652            v1 = null;
653        }
654
655        try {
656            Properties p = new Properties();
657            byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2];
658            readFile.seek(PAGE_FILE_HEADER_SIZE / 2);
659            readFile.readFully(d);
660            is = new ByteArrayInputStream(d);
661            p.load(is);
662            IntrospectionSupport.setProperties(v2, p);
663        } catch (IOException e) {
664            v2 = null;
665        }
666
667        if (v1 == null && v2 == null) {
668            throw new IOException("Could not load page file meta data");
669        }
670
671        if (v1 == null || v1.metaDataTxId < 0) {
672            metaData = v2;
673        } else if (v2 == null || v1.metaDataTxId < 0) {
674            metaData = v1;
675        } else if (v1.metaDataTxId == v2.metaDataTxId) {
676            metaData = v1; // use the first since the 2nd could be a partial..
677        } else {
678            metaData = v2; // use the second cause the first is probably a partial.
679        }
680    }
681
682    private void storeMetaData() throws IOException {
683        // Convert the metadata into a property format
684        metaData.metaDataTxId++;
685        Properties p = new Properties();
686        IntrospectionSupport.getProperties(metaData, p, null);
687
688        ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
689        p.store(os, "");
690        if (os.size() > PAGE_FILE_HEADER_SIZE / 2) {
691            throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2);
692        }
693        // Fill the rest with space...
694        byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()];
695        Arrays.fill(filler, (byte) ' ');
696        os.write(filler);
697        os.flush();
698
699        byte[] d = os.toByteArray();
700
701        // So we don't loose it.. write it 2 times...
702        writeFile.seek(0);
703        writeFile.write(d);
704        writeFile.sync();
705        writeFile.seek(PAGE_FILE_HEADER_SIZE / 2);
706        writeFile.write(d);
707        writeFile.sync();
708    }
709
710    private void storeFreeList() throws IOException {
711        FileOutputStream os = new FileOutputStream(getFreeFile());
712        DataOutputStream dos = new DataOutputStream(os);
713        SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
714        dos.close();
715    }
716
717    private void loadFreeList() throws IOException {
718        freeList.clear();
719        FileInputStream is = new FileInputStream(getFreeFile());
720        DataInputStream dis = new DataInputStream(is);
721        freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
722        dis.close();
723    }
724
725    ///////////////////////////////////////////////////////////////////
726    // Property Accessors
727    ///////////////////////////////////////////////////////////////////
728
729    /**
730     * Is the recovery buffer used to double buffer page writes.  Enabled by default.
731     *
732     * @return is the recovery buffer enabled.
733     */
734    public boolean isEnableRecoveryFile() {
735        return enableRecoveryFile;
736    }
737
738    /**
739     * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
740     * may potentially cause partial page writes which can lead to page file corruption.
741     */
742    public void setEnableRecoveryFile(boolean doubleBuffer) {
743        assertNotLoaded();
744        this.enableRecoveryFile = doubleBuffer;
745    }
746
747    /**
748     * @return Are page writes synced to disk?
749     */
750    public boolean isEnableDiskSyncs() {
751        return enableDiskSyncs;
752    }
753
754    /**
755     * Allows you enable syncing writes to disk.
756     */
757    public void setEnableDiskSyncs(boolean syncWrites) {
758        assertNotLoaded();
759        this.enableDiskSyncs = syncWrites;
760    }
761
762    /**
763     * @return the page size
764     */
765    public int getPageSize() {
766        return this.pageSize;
767    }
768
769    /**
770     * @return the amount of content data that a page can hold.
771     */
772    public int getPageContentSize() {
773        return this.pageSize - Page.PAGE_HEADER_SIZE;
774    }
775
776    /**
777     * Configures the page size used by the page file.  By default it is 4k.  Once a page file is created on disk,
778     * subsequent loads of that file will use the original pageSize.  Once the PageFile is loaded, this setting
779     * can no longer be changed.
780     *
781     * @param pageSize the pageSize to set
782     * @throws IllegalStateException once the page file is loaded.
783     */
784    public void setPageSize(int pageSize) throws IllegalStateException {
785        assertNotLoaded();
786        this.pageSize = pageSize;
787    }
788
789    /**
790     * @return true if read page caching is enabled
791     */
792    public boolean isEnablePageCaching() {
793        return this.enablePageCaching;
794    }
795
796    /**
797     * @param enablePageCaching allows you to enable read page caching
798     */
799    public void setEnablePageCaching(boolean enablePageCaching) {
800        assertNotLoaded();
801        this.enablePageCaching = enablePageCaching;
802    }
803
804    /**
805     * @return the maximum number of pages that will get stored in the read page cache.
806     */
807    public int getPageCacheSize() {
808        return this.pageCacheSize;
809    }
810
811    /**
812     * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache.
813     */
814    public void setPageCacheSize(int pageCacheSize) {
815        assertNotLoaded();
816        this.pageCacheSize = pageCacheSize;
817    }
818
819    public boolean isEnabledWriteThread() {
820        return enabledWriteThread;
821    }
822
823    public void setEnableWriteThread(boolean enableAsyncWrites) {
824        assertNotLoaded();
825        this.enabledWriteThread = enableAsyncWrites;
826    }
827
828    public long getDiskSize() throws IOException {
829        return toOffset(nextFreePageId.get());
830    }
831
832    public boolean isFreePage(long pageId) {
833        return freeList.contains(pageId);
834    }
835    /**
836     * @return the number of pages allocated in the PageFile
837     */
838    public long getPageCount() {
839        return nextFreePageId.get();
840    }
841
842    public int getRecoveryFileMinPageCount() {
843        return recoveryFileMinPageCount;
844    }
845
846    public long getFreePageCount() {
847        assertLoaded();
848        return freeList.rangeSize();
849    }
850
851    public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
852        assertNotLoaded();
853        this.recoveryFileMinPageCount = recoveryFileMinPageCount;
854    }
855
856    public int getRecoveryFileMaxPageCount() {
857        return recoveryFileMaxPageCount;
858    }
859
860    public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
861        assertNotLoaded();
862        this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
863    }
864
865    public int getWriteBatchSize() {
866        return writeBatchSize;
867    }
868
869    public void setWriteBatchSize(int writeBatchSize) {
870        this.writeBatchSize = writeBatchSize;
871    }
872
873    public float getLFUEvictionFactor() {
874        return LFUEvictionFactor;
875    }
876
877    public void setLFUEvictionFactor(float LFUEvictionFactor) {
878        this.LFUEvictionFactor = LFUEvictionFactor;
879    }
880
881    public boolean isUseLFRUEviction() {
882        return useLFRUEviction;
883    }
884
885    public void setUseLFRUEviction(boolean useLFRUEviction) {
886        this.useLFRUEviction = useLFRUEviction;
887    }
888
889    ///////////////////////////////////////////////////////////////////
890    // Package Protected Methods exposed to Transaction
891    ///////////////////////////////////////////////////////////////////
892
893    /**
894     * @throws IllegalStateException if the page file is not loaded.
895     */
896    void assertLoaded() throws IllegalStateException {
897        if (!loaded.get()) {
898            throw new IllegalStateException("PageFile is not loaded");
899        }
900    }
901
902    void assertNotLoaded() throws IllegalStateException {
903        if (loaded.get()) {
904            throw new IllegalStateException("PageFile is loaded");
905        }
906    }
907
908    /**
909     * Allocates a block of free pages that you can write data to.
910     *
911     * @param count the number of sequential pages to allocate
912     * @return the first page of the sequential set.
913     * @throws IOException           If an disk error occurred.
914     * @throws IllegalStateException if the PageFile is not loaded
915     */
916    <T> Page<T> allocate(int count) throws IOException {
917        assertLoaded();
918        if (count <= 0) {
919            throw new IllegalArgumentException("The allocation count must be larger than zero");
920        }
921
922        Sequence seq = freeList.removeFirstSequence(count);
923
924        // We may need to create new free pages...
925        if (seq == null) {
926
927            Page<T> first = null;
928            int c = count;
929
930            // Perform the id's only once....
931            long pageId = nextFreePageId.getAndAdd(count);
932            long writeTxnId = nextTxid.getAndAdd(count);
933
934            while (c-- > 0) {
935                Page<T> page = new Page<T>(pageId++);
936                page.makeFree(writeTxnId++);
937
938                if (first == null) {
939                    first = page;
940                }
941
942                addToCache(page);
943                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
944                page.write(out);
945                write(page, out.getData());
946
947                // LOG.debug("allocate writing: "+page.getPageId());
948            }
949
950            return first;
951        }
952
953        Page<T> page = new Page<T>(seq.getFirst());
954        page.makeFree(0);
955        // LOG.debug("allocated: "+page.getPageId());
956        return page;
957    }
958
959    long getNextWriteTransactionId() {
960        return nextTxid.incrementAndGet();
961    }
962
963    synchronized void readPage(long pageId, byte[] data) throws IOException {
964        readFile.seek(toOffset(pageId));
965        readFile.readFully(data);
966    }
967
968    public void freePage(long pageId) {
969        freeList.add(pageId);
970        removeFromCache(pageId);
971
972        SequenceSet trackFreeDuringRecovery = trackingFreeDuringRecovery.get();
973        if (trackFreeDuringRecovery != null) {
974            trackFreeDuringRecovery.add(pageId);
975        }
976    }
977
978    @SuppressWarnings("unchecked")
979    private <T> void write(Page<T> page, byte[] data) throws IOException {
980        final PageWrite write = new PageWrite(page, data);
981        Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() {
982            @Override
983            public Long getKey() {
984                return write.getPage().getPageId();
985            }
986
987            @Override
988            public PageWrite getValue() {
989                return write;
990            }
991
992            @Override
993            public PageWrite setValue(PageWrite value) {
994                return null;
995            }
996        };
997        Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
998        write(Arrays.asList(entries));
999    }
1000
1001    void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
1002        synchronized (writes) {
1003            if (enabledWriteThread) {
1004                while (writes.size() >= writeBatchSize && !stopWriter.get()) {
1005                    try {
1006                        writes.wait();
1007                    } catch (InterruptedException e) {
1008                        Thread.currentThread().interrupt();
1009                        throw new InterruptedIOException();
1010                    }
1011                }
1012            }
1013
1014            boolean longTx = false;
1015
1016            for (Map.Entry<Long, PageWrite> entry : updates) {
1017                Long key = entry.getKey();
1018                PageWrite value = entry.getValue();
1019                PageWrite write = writes.get(key);
1020                if (write == null) {
1021                    writes.put(key, value);
1022                } else {
1023                    if (value.currentLocation != -1) {
1024                        write.setCurrentLocation(value.page, value.currentLocation, value.length);
1025                        write.tmpFile = value.tmpFile;
1026                        longTx = true;
1027                    } else {
1028                        write.setCurrent(value.page, value.current);
1029                    }
1030                }
1031            }
1032
1033            // Once we start approaching capacity, notify the writer to start writing
1034            // sync immediately for long txs
1035            if (longTx || canStartWriteBatch()) {
1036
1037                if (enabledWriteThread) {
1038                    writes.notify();
1039                } else {
1040                    writeBatch();
1041                }
1042            }
1043        }
1044    }
1045
1046    private boolean canStartWriteBatch() {
1047        int capacityUsed = ((writes.size() * 100) / writeBatchSize);
1048        if (enabledWriteThread) {
1049            // The constant 10 here controls how soon write batches start going to disk..
1050            // would be nice to figure out how to auto tune that value.  Make to small and
1051            // we reduce through put because we are locking the write mutex too often doing writes
1052            return capacityUsed >= 10 || checkpointLatch != null;
1053        } else {
1054            return capacityUsed >= 80 || checkpointLatch != null;
1055        }
1056    }
1057
1058    ///////////////////////////////////////////////////////////////////
1059    // Cache Related operations
1060    ///////////////////////////////////////////////////////////////////
1061    @SuppressWarnings("unchecked")
1062    <T> Page<T> getFromCache(long pageId) {
1063        synchronized (writes) {
1064            PageWrite pageWrite = writes.get(pageId);
1065            if (pageWrite != null) {
1066                return pageWrite.page;
1067            }
1068        }
1069
1070        Page<T> result = null;
1071        if (enablePageCaching) {
1072            result = pageCache.get(pageId);
1073        }
1074        return result;
1075    }
1076
1077    void addToCache(Page page) {
1078        if (enablePageCaching) {
1079            pageCache.put(page.getPageId(), page);
1080        }
1081    }
1082
1083    void removeFromCache(long pageId) {
1084        if (enablePageCaching) {
1085            pageCache.remove(pageId);
1086        }
1087    }
1088
1089    ///////////////////////////////////////////////////////////////////
1090    // Internal Double write implementation follows...
1091    ///////////////////////////////////////////////////////////////////
1092
1093    private void pollWrites() {
1094        try {
1095            while (!stopWriter.get()) {
1096                // Wait for a notification...
1097                synchronized (writes) {
1098                    writes.notifyAll();
1099
1100                    // If there is not enough to write, wait for a notification...
1101                    while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) {
1102                        writes.wait(100);
1103                    }
1104
1105                    if (writes.isEmpty()) {
1106                        releaseCheckpointWaiter();
1107                    }
1108                }
1109                writeBatch();
1110            }
1111        } catch (Throwable e) {
1112            LOG.info("An exception was raised while performing poll writes", e);
1113        } finally {
1114            releaseCheckpointWaiter();
1115        }
1116    }
1117
1118    private void writeBatch() throws IOException {
1119
1120        CountDownLatch checkpointLatch;
1121        ArrayList<PageWrite> batch;
1122        synchronized (writes) {
1123            // If there is not enough to write, wait for a notification...
1124
1125            batch = new ArrayList<PageWrite>(writes.size());
1126            // build a write batch from the current write cache.
1127            for (PageWrite write : writes.values()) {
1128                batch.add(write);
1129                // Move the current write to the diskBound write, this lets folks update the
1130                // page again without blocking for this write.
1131                write.begin();
1132                if (write.diskBound == null && write.diskBoundLocation == -1) {
1133                    batch.remove(write);
1134                }
1135            }
1136
1137            // Grab on to the existing checkpoint latch cause once we do this write we can
1138            // release the folks that were waiting for those writes to hit disk.
1139            checkpointLatch = this.checkpointLatch;
1140            this.checkpointLatch = null;
1141        }
1142
1143        try {
1144
1145            // First land the writes in the recovery file
1146            if (enableRecoveryFile) {
1147                Checksum checksum = new Adler32();
1148
1149                recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1150
1151                for (PageWrite w : batch) {
1152                    try {
1153                        checksum.update(w.getDiskBound(), 0, pageSize);
1154                    } catch (Throwable t) {
1155                        throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
1156                    }
1157                    recoveryFile.writeLong(w.page.getPageId());
1158                    recoveryFile.write(w.getDiskBound(), 0, pageSize);
1159                }
1160
1161                // Can we shrink the recovery buffer??
1162                if (recoveryPageCount > recoveryFileMaxPageCount) {
1163                    int t = Math.max(recoveryFileMinPageCount, batch.size());
1164                    recoveryFile.setLength(recoveryFileSizeForPages(t));
1165                }
1166
1167                // Record the page writes in the recovery buffer.
1168                recoveryFile.seek(0);
1169                // Store the next tx id...
1170                recoveryFile.writeLong(nextTxid.get());
1171                // Store the checksum for thw write batch so that on recovery we
1172                // know if we have a consistent
1173                // write batch on disk.
1174                recoveryFile.writeLong(checksum.getValue());
1175                // Write the # of pages that will follow
1176                recoveryFile.writeInt(batch.size());
1177
1178                if (enableDiskSyncs) {
1179                    recoveryFile.sync();
1180                }
1181            }
1182
1183            for (PageWrite w : batch) {
1184                writeFile.seek(toOffset(w.page.getPageId()));
1185                writeFile.write(w.getDiskBound(), 0, pageSize);
1186                w.done();
1187            }
1188
1189            if (enableDiskSyncs) {
1190                writeFile.sync();
1191            }
1192
1193        } catch (IOException ioError) {
1194            LOG.info("Unexpected io error on pagefile write of " + batch.size() + " pages.", ioError);
1195            // any subsequent write needs to be prefaced with a considered call to redoRecoveryUpdates
1196            // to ensure disk image is self consistent
1197            loaded.set(false);
1198            throw  ioError;
1199        } finally {
1200            synchronized (writes) {
1201                for (PageWrite w : batch) {
1202                    // If there are no more pending writes, then remove it from
1203                    // the write cache.
1204                    if (w.isDone()) {
1205                        writes.remove(w.page.getPageId());
1206                        if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
1207                            if (!w.tmpFile.delete()) {
1208                                throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
1209                            }
1210                            tmpFilesForRemoval.remove(w.tmpFile);
1211                        }
1212                    }
1213                }
1214            }
1215
1216            if (checkpointLatch != null) {
1217                checkpointLatch.countDown();
1218            }
1219        }
1220    }
1221
1222    public void removeTmpFile(File file) {
1223        tmpFilesForRemoval.add(file);
1224    }
1225
1226    private long recoveryFileSizeForPages(int pageCount) {
1227        return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8L) * pageCount);
1228    }
1229
1230    private void releaseCheckpointWaiter() {
1231        if (checkpointLatch != null) {
1232            checkpointLatch.countDown();
1233            checkpointLatch = null;
1234        }
1235    }
1236
1237    /**
1238     * Inspects the recovery buffer and re-applies any
1239     * partially applied page writes.
1240     *
1241     * @return the next transaction id that can be used.
1242     */
1243    private long redoRecoveryUpdates() throws IOException {
1244        if (!enableRecoveryFile) {
1245            return 0;
1246        }
1247        recoveryPageCount = 0;
1248
1249        // Are we initializing the recovery file?
1250        if (recoveryFile.length() == 0) {
1251            // Write an empty header..
1252            recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
1253            // Preallocate the minium size for better performance.
1254            recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
1255            return 0;
1256        }
1257
1258        // How many recovery pages do we have in the recovery buffer?
1259        recoveryFile.seek(0);
1260        long nextTxId = recoveryFile.readLong();
1261        long expectedChecksum = recoveryFile.readLong();
1262        int pageCounter = recoveryFile.readInt();
1263
1264        recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
1265        Checksum checksum = new Adler32();
1266        LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
1267        try {
1268            for (int i = 0; i < pageCounter; i++) {
1269                long offset = recoveryFile.readLong();
1270                byte[] data = new byte[pageSize];
1271                if (recoveryFile.read(data, 0, pageSize) != pageSize) {
1272                    // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
1273                    return nextTxId;
1274                }
1275                checksum.update(data, 0, pageSize);
1276                batch.put(offset, data);
1277            }
1278        } catch (Exception e) {
1279            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it.
1280            // as the pages should still be consistent.
1281            LOG.debug("Redo buffer was not fully intact: ", e);
1282            return nextTxId;
1283        }
1284
1285        recoveryPageCount = pageCounter;
1286
1287        // If the checksum is not valid then the recovery buffer was partially written to disk.
1288        if (checksum.getValue() != expectedChecksum) {
1289            return nextTxId;
1290        }
1291
1292        // Re-apply all the writes in the recovery buffer.
1293        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
1294            writeFile.seek(toOffset(e.getKey()));
1295            writeFile.write(e.getValue());
1296        }
1297
1298        // And sync it to disk
1299        writeFile.sync();
1300        return nextTxId;
1301    }
1302
1303    private void startWriter() {
1304        synchronized (writes) {
1305            if (enabledWriteThread) {
1306                stopWriter.set(false);
1307                writerThread = new Thread("KahaDB Page Writer") {
1308                    @Override
1309                    public void run() {
1310                        pollWrites();
1311                    }
1312                };
1313                writerThread.setPriority(Thread.MAX_PRIORITY);
1314                writerThread.setDaemon(true);
1315                writerThread.start();
1316            }
1317        }
1318    }
1319
1320    private void stopWriter() throws InterruptedException {
1321        if (enabledWriteThread) {
1322            stopWriter.set(true);
1323            writerThread.join();
1324        }
1325    }
1326
1327    public File getFile() {
1328        return getMainPageFile();
1329    }
1330
1331    public File getDirectory() {
1332        return directory;
1333    }
1334}