001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.page; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.io.RandomAccessFile; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collection; 032import java.util.Collections; 033import java.util.Iterator; 034import java.util.LinkedHashMap; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Properties; 038import java.util.TreeMap; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.concurrent.atomic.AtomicReference; 043import java.util.zip.Adler32; 044import java.util.zip.Checksum; 045 046import org.apache.activemq.store.kahadb.disk.util.Sequence; 047import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 048import org.apache.activemq.util.DataByteArrayOutputStream; 049import org.apache.activemq.util.IOExceptionSupport; 050import org.apache.activemq.util.IOHelper; 051import org.apache.activemq.util.IntrospectionSupport; 052import org.apache.activemq.util.LFUCache; 053import org.apache.activemq.util.LRUCache; 054import org.apache.activemq.util.RecoverableRandomAccessFile; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should 060 * be externally synchronized. 061 * <p/> 062 * The file has 3 parts: 063 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file. 064 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent 065 * Page Space: The pages in the page file. 066 */ 067public class PageFile { 068 069 private static final String PAGEFILE_SUFFIX = ".data"; 070 private static final String RECOVERY_FILE_SUFFIX = ".redo"; 071 private static final String FREE_FILE_SUFFIX = ".free"; 072 073 // 4k Default page size. 074 public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4); 075 public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000); 076 public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);; 077 078 private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4; 079 private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4; 080 081 // Recovery header is (long offset) 082 private static final Logger LOG = LoggerFactory.getLogger(PageFile.class); 083 084 // A PageFile will use a couple of files in this directory 085 private final File directory; 086 // And the file names in that directory will be based on this name. 087 private final String name; 088 089 // File handle used for reading pages.. 090 private RecoverableRandomAccessFile readFile; 091 // File handle used for writing pages.. 092 private RecoverableRandomAccessFile writeFile; 093 // File handle used for writing pages.. 094 private RecoverableRandomAccessFile recoveryFile; 095 096 // The size of pages 097 private int pageSize = DEFAULT_PAGE_SIZE; 098 099 // The minimum number of space allocated to the recovery file in number of pages. 100 private int recoveryFileMinPageCount = 1000; 101 // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize 102 // to this max size as soon as possible. 103 private int recoveryFileMaxPageCount = 10000; 104 // The number of pages in the current recovery buffer 105 private int recoveryPageCount; 106 107 private final AtomicBoolean loaded = new AtomicBoolean(); 108 // The number of pages we are aiming to write every time we 109 // write to disk. 110 int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE; 111 112 // We keep a cache of pages recently used? 113 private Map<Long, Page> pageCache; 114 // The cache of recently used pages. 115 private boolean enablePageCaching = true; 116 // How many pages will we keep in the cache? 117 private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE; 118 119 // Should first log the page write to the recovery buffer? Avoids partial 120 // page write failures.. 121 private boolean enableRecoveryFile = true; 122 // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint() 123 private boolean enableDiskSyncs = true; 124 // Will writes be done in an async thread? 125 private boolean enabledWriteThread = false; 126 127 // These are used if enableAsyncWrites==true 128 private final AtomicBoolean stopWriter = new AtomicBoolean(); 129 private Thread writerThread; 130 private CountDownLatch checkpointLatch; 131 132 // Keeps track of writes that are being written to disk. 133 private final TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>(); 134 135 // Keeps track of free pages. 136 private final AtomicLong nextFreePageId = new AtomicLong(); 137 private SequenceSet freeList = new SequenceSet(); 138 139 private AtomicReference<SequenceSet> recoveredFreeList = new AtomicReference<SequenceSet>(); 140 private AtomicReference<SequenceSet> trackingFreeDuringRecovery = new AtomicReference<SequenceSet>(); 141 142 private final AtomicLong nextTxid = new AtomicLong(); 143 144 // Persistent settings stored in the page file. 145 private MetaData metaData; 146 147 private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>(); 148 149 private boolean useLFRUEviction = false; 150 private float LFUEvictionFactor = 0.2f; 151 152 /** 153 * Use to keep track of updated pages which have not yet been committed. 154 */ 155 static class PageWrite { 156 Page page; 157 byte[] current; 158 byte[] diskBound; 159 long currentLocation = -1; 160 long diskBoundLocation = -1; 161 File tmpFile; 162 int length; 163 164 public PageWrite(Page page, byte[] data) { 165 this.page = page; 166 current = data; 167 } 168 169 public PageWrite(Page page, long currentLocation, int length, File tmpFile) { 170 this.page = page; 171 this.currentLocation = currentLocation; 172 this.tmpFile = tmpFile; 173 this.length = length; 174 } 175 176 public void setCurrent(Page page, byte[] data) { 177 this.page = page; 178 current = data; 179 currentLocation = -1; 180 diskBoundLocation = -1; 181 } 182 183 public void setCurrentLocation(Page page, long location, int length) { 184 this.page = page; 185 this.currentLocation = location; 186 this.length = length; 187 this.current = null; 188 } 189 190 @Override 191 public String toString() { 192 return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]"; 193 } 194 195 @SuppressWarnings("unchecked") 196 public Page getPage() { 197 return page; 198 } 199 200 public byte[] getDiskBound() throws IOException { 201 if (diskBound == null && diskBoundLocation != -1) { 202 diskBound = new byte[length]; 203 try(RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) { 204 file.seek(diskBoundLocation); 205 file.read(diskBound); 206 } 207 diskBoundLocation = -1; 208 } 209 return diskBound; 210 } 211 212 void begin() { 213 if (currentLocation != -1) { 214 diskBoundLocation = currentLocation; 215 } else { 216 diskBound = current; 217 } 218 current = null; 219 currentLocation = -1; 220 } 221 222 /** 223 * @return true if there is no pending writes to do. 224 */ 225 boolean done() { 226 diskBoundLocation = -1; 227 diskBound = null; 228 return current == null || currentLocation == -1; 229 } 230 231 boolean isDone() { 232 return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1; 233 } 234 } 235 236 /** 237 * The MetaData object hold the persistent data associated with a PageFile object. 238 */ 239 public static class MetaData { 240 241 String fileType; 242 String fileTypeVersion; 243 244 long metaDataTxId = -1; 245 int pageSize; 246 boolean cleanShutdown; 247 long lastTxId; 248 long freePages; 249 250 public String getFileType() { 251 return fileType; 252 } 253 254 public void setFileType(String fileType) { 255 this.fileType = fileType; 256 } 257 258 public String getFileTypeVersion() { 259 return fileTypeVersion; 260 } 261 262 public void setFileTypeVersion(String version) { 263 this.fileTypeVersion = version; 264 } 265 266 public long getMetaDataTxId() { 267 return metaDataTxId; 268 } 269 270 public void setMetaDataTxId(long metaDataTxId) { 271 this.metaDataTxId = metaDataTxId; 272 } 273 274 public int getPageSize() { 275 return pageSize; 276 } 277 278 public void setPageSize(int pageSize) { 279 this.pageSize = pageSize; 280 } 281 282 public boolean isCleanShutdown() { 283 return cleanShutdown; 284 } 285 286 public void setCleanShutdown(boolean cleanShutdown) { 287 this.cleanShutdown = cleanShutdown; 288 } 289 290 public long getLastTxId() { 291 return lastTxId; 292 } 293 294 public void setLastTxId(long lastTxId) { 295 this.lastTxId = lastTxId; 296 } 297 298 public long getFreePages() { 299 return freePages; 300 } 301 302 public void setFreePages(long value) { 303 this.freePages = value; 304 } 305 } 306 307 public Transaction tx() { 308 assertLoaded(); 309 return new Transaction(this); 310 } 311 312 /** 313 * Creates a PageFile in the specified directory who's data files are named by name. 314 */ 315 public PageFile(File directory, String name) { 316 this.directory = directory; 317 this.name = name; 318 } 319 320 /** 321 * Deletes the files used by the PageFile object. This method can only be used when this object is not loaded. 322 * 323 * @throws IOException if the files cannot be deleted. 324 * @throws IllegalStateException if this PageFile is loaded 325 */ 326 public void delete() throws IOException { 327 if (loaded.get()) { 328 throw new IllegalStateException("Cannot delete page file data when the page file is loaded"); 329 } 330 delete(getMainPageFile()); 331 delete(getFreeFile()); 332 delete(getRecoveryFile()); 333 } 334 335 public void archive() throws IOException { 336 if (loaded.get()) { 337 throw new IllegalStateException("Cannot delete page file data when the page file is loaded"); 338 } 339 long timestamp = System.currentTimeMillis(); 340 archive(getMainPageFile(), String.valueOf(timestamp)); 341 archive(getFreeFile(), String.valueOf(timestamp)); 342 archive(getRecoveryFile(), String.valueOf(timestamp)); 343 } 344 345 /** 346 * @param file 347 * @throws IOException 348 */ 349 private void delete(File file) throws IOException { 350 if (file.exists() && !file.delete()) { 351 throw new IOException("Could not delete: " + file.getPath()); 352 } 353 } 354 355 private void archive(File file, String suffix) throws IOException { 356 if (file.exists()) { 357 File archive = new File(file.getPath() + "-" + suffix); 358 if (!file.renameTo(archive)) { 359 throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath()); 360 } 361 } 362 } 363 364 /** 365 * Loads the page file so that it can be accessed for read/write purposes. This allocates OS resources. If this is the 366 * first time the page file is loaded, then this creates the page file in the file system. 367 * 368 * @throws IOException If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if 369 * there was a disk error. 370 * @throws IllegalStateException If the page file was already loaded. 371 */ 372 public void load() throws IOException, IllegalStateException { 373 if (loaded.compareAndSet(false, true)) { 374 375 if (enablePageCaching) { 376 if (isUseLFRUEviction()) { 377 pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor())); 378 } else { 379 pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true)); 380 } 381 } 382 383 File file = getMainPageFile(); 384 IOHelper.mkdirs(file.getParentFile()); 385 writeFile = new RecoverableRandomAccessFile(file, "rw", false); 386 readFile = new RecoverableRandomAccessFile(file, "r"); 387 388 if (readFile.length() > 0) { 389 // Load the page size setting cause that can't change once the file is created. 390 loadMetaData(); 391 pageSize = metaData.getPageSize(); 392 } else { 393 // Store the page size setting cause that can't change once the file is created. 394 metaData = new MetaData(); 395 metaData.setFileType(PageFile.class.getName()); 396 metaData.setFileTypeVersion("1"); 397 metaData.setPageSize(getPageSize()); 398 metaData.setCleanShutdown(true); 399 metaData.setFreePages(-1); 400 metaData.setLastTxId(0); 401 storeMetaData(); 402 } 403 404 if (enableRecoveryFile) { 405 recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw"); 406 } 407 408 if (metaData.isCleanShutdown()) { 409 nextTxid.set(metaData.getLastTxId() + 1); 410 if (metaData.getFreePages() > 0) { 411 loadFreeList(); 412 } 413 } else { 414 LOG.debug(toString() + ", Recovering page file..."); 415 nextTxid.set(redoRecoveryUpdates()); 416 trackingFreeDuringRecovery.set(new SequenceSet()); 417 } 418 419 if (writeFile.length() < PAGE_FILE_HEADER_SIZE) { 420 writeFile.setLength(PAGE_FILE_HEADER_SIZE); 421 } 422 nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize); 423 424 metaData.setCleanShutdown(false); 425 storeMetaData(); 426 getFreeFile().delete(); 427 startWriter(); 428 if (trackingFreeDuringRecovery.get() != null) { 429 asyncFreePageRecovery(nextFreePageId.get()); 430 } 431 } else { 432 throw new IllegalStateException("Cannot load the page file when it is already loaded."); 433 } 434 } 435 436 private void asyncFreePageRecovery(final long lastRecoveryPage) { 437 Thread thread = new Thread("KahaDB Index Free Page Recovery") { 438 @Override 439 public void run() { 440 try { 441 recoverFreePages(lastRecoveryPage); 442 } catch (Throwable e) { 443 if (loaded.get()) { 444 LOG.warn("Error recovering index free page list", e); 445 } 446 } 447 } 448 }; 449 thread.setPriority(Thread.NORM_PRIORITY); 450 thread.setDaemon(true); 451 thread.start(); 452 } 453 454 private void recoverFreePages(final long lastRecoveryPage) throws Exception { 455 LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown.."); 456 SequenceSet newFreePages = new SequenceSet(); 457 // need new pageFile instance to get unshared readFile 458 PageFile recoveryPageFile = new PageFile(directory, name); 459 recoveryPageFile.loadForRecovery(nextFreePageId.get()); 460 try { 461 for (Iterator<Page> i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) { 462 Page page = i.next(); 463 464 if (page.getPageId() >= lastRecoveryPage) { 465 break; 466 } 467 468 if (page.getType() == Page.PAGE_FREE_TYPE) { 469 newFreePages.add(page.getPageId()); 470 } 471 } 472 } finally { 473 recoveryPageFile.readFile.close(); 474 } 475 476 LOG.info(toString() + ". Recovered pageFile free list of size: " + newFreePages.rangeSize()); 477 if (!newFreePages.isEmpty()) { 478 479 // allow flush (with index lock held) to merge eventually 480 recoveredFreeList.lazySet(newFreePages); 481 } 482 } 483 484 private void loadForRecovery(long nextFreePageIdSnap) throws Exception { 485 loaded.set(true); 486 enablePageCaching = false; 487 File file = getMainPageFile(); 488 readFile = new RecoverableRandomAccessFile(file, "r"); 489 loadMetaData(); 490 pageSize = metaData.getPageSize(); 491 enableRecoveryFile = false; 492 nextFreePageId.set(nextFreePageIdSnap); 493 } 494 495 496 /** 497 * Unloads a previously loaded PageFile. This deallocates OS related resources like file handles. 498 * once unloaded, you can no longer use the page file to read or write Pages. 499 * 500 * @throws IOException if there was a disk error occurred while closing the down the page file. 501 * @throws IllegalStateException if the PageFile is not loaded 502 */ 503 public void unload() throws IOException { 504 if (loaded.compareAndSet(true, false)) { 505 flush(); 506 try { 507 stopWriter(); 508 } catch (InterruptedException e) { 509 throw new InterruptedIOException(); 510 } 511 512 if (freeList.isEmpty()) { 513 metaData.setFreePages(0); 514 } else { 515 storeFreeList(); 516 metaData.setFreePages(freeList.size()); 517 } 518 519 metaData.setLastTxId(nextTxid.get() - 1); 520 if (trackingFreeDuringRecovery.get() != null) { 521 // async recovery incomplete, will have to try again 522 metaData.setCleanShutdown(false); 523 } else { 524 metaData.setCleanShutdown(true); 525 } 526 storeMetaData(); 527 528 if (readFile != null) { 529 readFile.close(); 530 readFile = null; 531 writeFile.close(); 532 writeFile = null; 533 if (enableRecoveryFile) { 534 recoveryFile.close(); 535 recoveryFile = null; 536 } 537 freeList.clear(); 538 if (pageCache != null) { 539 pageCache = null; 540 } 541 synchronized (writes) { 542 writes.clear(); 543 } 544 } 545 } else { 546 throw new IllegalStateException("Cannot unload the page file when it is not loaded"); 547 } 548 } 549 550 public boolean isLoaded() { 551 return loaded.get(); 552 } 553 554 public void allowIOResumption() { 555 loaded.set(true); 556 } 557 558 /** 559 * Flush and sync all write buffers to disk. 560 * 561 * @throws IOException If an disk error occurred. 562 */ 563 public void flush() throws IOException { 564 565 if (enabledWriteThread && stopWriter.get()) { 566 throw new IOException("Page file already stopped: checkpointing is not allowed"); 567 } 568 569 SequenceSet recovered = recoveredFreeList.get(); 570 if (recovered != null) { 571 recoveredFreeList.lazySet(null); 572 SequenceSet inUse = trackingFreeDuringRecovery.get(); 573 recovered.remove(inUse); 574 freeList.merge(recovered); 575 576 // all set for clean shutdown 577 trackingFreeDuringRecovery.set(null); 578 inUse.clear(); 579 } 580 581 // Setup a latch that gets notified when all buffered writes hits the disk. 582 CountDownLatch checkpointLatch; 583 synchronized (writes) { 584 if (writes.isEmpty()) { 585 return; 586 } 587 if (enabledWriteThread) { 588 if (this.checkpointLatch == null) { 589 this.checkpointLatch = new CountDownLatch(1); 590 } 591 checkpointLatch = this.checkpointLatch; 592 writes.notify(); 593 } else { 594 writeBatch(); 595 return; 596 } 597 } 598 try { 599 checkpointLatch.await(); 600 } catch (InterruptedException e) { 601 InterruptedIOException ioe = new InterruptedIOException(); 602 ioe.initCause(e); 603 throw ioe; 604 } 605 } 606 607 608 @Override 609 public String toString() { 610 return "Page File: " + getMainPageFile(); 611 } 612 613 /////////////////////////////////////////////////////////////////// 614 // Private Implementation Methods 615 /////////////////////////////////////////////////////////////////// 616 private File getMainPageFile() { 617 return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX); 618 } 619 620 public File getFreeFile() { 621 return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX); 622 } 623 624 public File getRecoveryFile() { 625 return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX); 626 } 627 628 public long toOffset(long pageId) { 629 return PAGE_FILE_HEADER_SIZE + (pageId * pageSize); 630 } 631 632 private void loadMetaData() throws IOException { 633 634 ByteArrayInputStream is; 635 MetaData v1 = new MetaData(); 636 MetaData v2 = new MetaData(); 637 try { 638 Properties p = new Properties(); 639 byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2]; 640 readFile.seek(0); 641 readFile.readFully(d); 642 is = new ByteArrayInputStream(d); 643 p.load(is); 644 IntrospectionSupport.setProperties(v1, p); 645 } catch (IOException e) { 646 v1 = null; 647 } 648 649 try { 650 Properties p = new Properties(); 651 byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2]; 652 readFile.seek(PAGE_FILE_HEADER_SIZE / 2); 653 readFile.readFully(d); 654 is = new ByteArrayInputStream(d); 655 p.load(is); 656 IntrospectionSupport.setProperties(v2, p); 657 } catch (IOException e) { 658 v2 = null; 659 } 660 661 if (v1 == null && v2 == null) { 662 throw new IOException("Could not load page file meta data"); 663 } 664 665 if (v1 == null || v1.metaDataTxId < 0) { 666 metaData = v2; 667 } else if (v2 == null || v1.metaDataTxId < 0) { 668 metaData = v1; 669 } else if (v1.metaDataTxId == v2.metaDataTxId) { 670 metaData = v1; // use the first since the 2nd could be a partial.. 671 } else { 672 metaData = v2; // use the second cause the first is probably a partial. 673 } 674 } 675 676 private void storeMetaData() throws IOException { 677 // Convert the metadata into a property format 678 metaData.metaDataTxId++; 679 Properties p = new Properties(); 680 IntrospectionSupport.getProperties(metaData, p, null); 681 682 ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE); 683 p.store(os, ""); 684 if (os.size() > PAGE_FILE_HEADER_SIZE / 2) { 685 throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2); 686 } 687 // Fill the rest with space... 688 byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()]; 689 Arrays.fill(filler, (byte) ' '); 690 os.write(filler); 691 os.flush(); 692 693 byte[] d = os.toByteArray(); 694 695 // So we don't loose it.. write it 2 times... 696 writeFile.seek(0); 697 writeFile.write(d); 698 writeFile.sync(); 699 writeFile.seek(PAGE_FILE_HEADER_SIZE / 2); 700 writeFile.write(d); 701 writeFile.sync(); 702 } 703 704 private void storeFreeList() throws IOException { 705 FileOutputStream os = new FileOutputStream(getFreeFile()); 706 DataOutputStream dos = new DataOutputStream(os); 707 SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos); 708 dos.close(); 709 } 710 711 private void loadFreeList() throws IOException { 712 freeList.clear(); 713 FileInputStream is = new FileInputStream(getFreeFile()); 714 DataInputStream dis = new DataInputStream(is); 715 freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis); 716 dis.close(); 717 } 718 719 /////////////////////////////////////////////////////////////////// 720 // Property Accessors 721 /////////////////////////////////////////////////////////////////// 722 723 /** 724 * Is the recovery buffer used to double buffer page writes. Enabled by default. 725 * 726 * @return is the recovery buffer enabled. 727 */ 728 public boolean isEnableRecoveryFile() { 729 return enableRecoveryFile; 730 } 731 732 /** 733 * Sets if the recovery buffer uses to double buffer page writes. Enabled by default. Disabling this 734 * may potentially cause partial page writes which can lead to page file corruption. 735 */ 736 public void setEnableRecoveryFile(boolean doubleBuffer) { 737 assertNotLoaded(); 738 this.enableRecoveryFile = doubleBuffer; 739 } 740 741 /** 742 * @return Are page writes synced to disk? 743 */ 744 public boolean isEnableDiskSyncs() { 745 return enableDiskSyncs; 746 } 747 748 /** 749 * Allows you enable syncing writes to disk. 750 */ 751 public void setEnableDiskSyncs(boolean syncWrites) { 752 assertNotLoaded(); 753 this.enableDiskSyncs = syncWrites; 754 } 755 756 /** 757 * @return the page size 758 */ 759 public int getPageSize() { 760 return this.pageSize; 761 } 762 763 /** 764 * @return the amount of content data that a page can hold. 765 */ 766 public int getPageContentSize() { 767 return this.pageSize - Page.PAGE_HEADER_SIZE; 768 } 769 770 /** 771 * Configures the page size used by the page file. By default it is 4k. Once a page file is created on disk, 772 * subsequent loads of that file will use the original pageSize. Once the PageFile is loaded, this setting 773 * can no longer be changed. 774 * 775 * @param pageSize the pageSize to set 776 * @throws IllegalStateException once the page file is loaded. 777 */ 778 public void setPageSize(int pageSize) throws IllegalStateException { 779 assertNotLoaded(); 780 this.pageSize = pageSize; 781 } 782 783 /** 784 * @return true if read page caching is enabled 785 */ 786 public boolean isEnablePageCaching() { 787 return this.enablePageCaching; 788 } 789 790 /** 791 * @param enablePageCaching allows you to enable read page caching 792 */ 793 public void setEnablePageCaching(boolean enablePageCaching) { 794 assertNotLoaded(); 795 this.enablePageCaching = enablePageCaching; 796 } 797 798 /** 799 * @return the maximum number of pages that will get stored in the read page cache. 800 */ 801 public int getPageCacheSize() { 802 return this.pageCacheSize; 803 } 804 805 /** 806 * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache. 807 */ 808 public void setPageCacheSize(int pageCacheSize) { 809 assertNotLoaded(); 810 this.pageCacheSize = pageCacheSize; 811 } 812 813 public boolean isEnabledWriteThread() { 814 return enabledWriteThread; 815 } 816 817 public void setEnableWriteThread(boolean enableAsyncWrites) { 818 assertNotLoaded(); 819 this.enabledWriteThread = enableAsyncWrites; 820 } 821 822 public long getDiskSize() throws IOException { 823 return toOffset(nextFreePageId.get()); 824 } 825 826 public boolean isFreePage(long pageId) { 827 return freeList.contains(pageId); 828 } 829 /** 830 * @return the number of pages allocated in the PageFile 831 */ 832 public long getPageCount() { 833 return nextFreePageId.get(); 834 } 835 836 public int getRecoveryFileMinPageCount() { 837 return recoveryFileMinPageCount; 838 } 839 840 public long getFreePageCount() { 841 assertLoaded(); 842 return freeList.rangeSize(); 843 } 844 845 public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) { 846 assertNotLoaded(); 847 this.recoveryFileMinPageCount = recoveryFileMinPageCount; 848 } 849 850 public int getRecoveryFileMaxPageCount() { 851 return recoveryFileMaxPageCount; 852 } 853 854 public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) { 855 assertNotLoaded(); 856 this.recoveryFileMaxPageCount = recoveryFileMaxPageCount; 857 } 858 859 public int getWriteBatchSize() { 860 return writeBatchSize; 861 } 862 863 public void setWriteBatchSize(int writeBatchSize) { 864 this.writeBatchSize = writeBatchSize; 865 } 866 867 public float getLFUEvictionFactor() { 868 return LFUEvictionFactor; 869 } 870 871 public void setLFUEvictionFactor(float LFUEvictionFactor) { 872 this.LFUEvictionFactor = LFUEvictionFactor; 873 } 874 875 public boolean isUseLFRUEviction() { 876 return useLFRUEviction; 877 } 878 879 public void setUseLFRUEviction(boolean useLFRUEviction) { 880 this.useLFRUEviction = useLFRUEviction; 881 } 882 883 /////////////////////////////////////////////////////////////////// 884 // Package Protected Methods exposed to Transaction 885 /////////////////////////////////////////////////////////////////// 886 887 /** 888 * @throws IllegalStateException if the page file is not loaded. 889 */ 890 void assertLoaded() throws IllegalStateException { 891 if (!loaded.get()) { 892 throw new IllegalStateException("PageFile is not loaded"); 893 } 894 } 895 896 void assertNotLoaded() throws IllegalStateException { 897 if (loaded.get()) { 898 throw new IllegalStateException("PageFile is loaded"); 899 } 900 } 901 902 /** 903 * Allocates a block of free pages that you can write data to. 904 * 905 * @param count the number of sequential pages to allocate 906 * @return the first page of the sequential set. 907 * @throws IOException If an disk error occurred. 908 * @throws IllegalStateException if the PageFile is not loaded 909 */ 910 <T> Page<T> allocate(int count) throws IOException { 911 assertLoaded(); 912 if (count <= 0) { 913 throw new IllegalArgumentException("The allocation count must be larger than zero"); 914 } 915 916 Sequence seq = freeList.removeFirstSequence(count); 917 918 // We may need to create new free pages... 919 if (seq == null) { 920 921 Page<T> first = null; 922 int c = count; 923 924 // Perform the id's only once.... 925 long pageId = nextFreePageId.getAndAdd(count); 926 long writeTxnId = nextTxid.getAndAdd(count); 927 928 while (c-- > 0) { 929 Page<T> page = new Page<T>(pageId++); 930 page.makeFree(writeTxnId++); 931 932 if (first == null) { 933 first = page; 934 } 935 936 addToCache(page); 937 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize); 938 page.write(out); 939 write(page, out.getData()); 940 941 // LOG.debug("allocate writing: "+page.getPageId()); 942 } 943 944 return first; 945 } 946 947 Page<T> page = new Page<T>(seq.getFirst()); 948 page.makeFree(0); 949 // LOG.debug("allocated: "+page.getPageId()); 950 return page; 951 } 952 953 long getNextWriteTransactionId() { 954 return nextTxid.incrementAndGet(); 955 } 956 957 synchronized void readPage(long pageId, byte[] data) throws IOException { 958 readFile.seek(toOffset(pageId)); 959 readFile.readFully(data); 960 } 961 962 public void freePage(long pageId) { 963 freeList.add(pageId); 964 removeFromCache(pageId); 965 966 SequenceSet trackFreeDuringRecovery = trackingFreeDuringRecovery.get(); 967 if (trackFreeDuringRecovery != null) { 968 trackFreeDuringRecovery.add(pageId); 969 } 970 } 971 972 @SuppressWarnings("unchecked") 973 private <T> void write(Page<T> page, byte[] data) throws IOException { 974 final PageWrite write = new PageWrite(page, data); 975 Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() { 976 @Override 977 public Long getKey() { 978 return write.getPage().getPageId(); 979 } 980 981 @Override 982 public PageWrite getValue() { 983 return write; 984 } 985 986 @Override 987 public PageWrite setValue(PageWrite value) { 988 return null; 989 } 990 }; 991 Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry}; 992 write(Arrays.asList(entries)); 993 } 994 995 void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException { 996 synchronized (writes) { 997 if (enabledWriteThread) { 998 while (writes.size() >= writeBatchSize && !stopWriter.get()) { 999 try { 1000 writes.wait(); 1001 } catch (InterruptedException e) { 1002 Thread.currentThread().interrupt(); 1003 throw new InterruptedIOException(); 1004 } 1005 } 1006 } 1007 1008 boolean longTx = false; 1009 1010 for (Map.Entry<Long, PageWrite> entry : updates) { 1011 Long key = entry.getKey(); 1012 PageWrite value = entry.getValue(); 1013 PageWrite write = writes.get(key); 1014 if (write == null) { 1015 writes.put(key, value); 1016 } else { 1017 if (value.currentLocation != -1) { 1018 write.setCurrentLocation(value.page, value.currentLocation, value.length); 1019 write.tmpFile = value.tmpFile; 1020 longTx = true; 1021 } else { 1022 write.setCurrent(value.page, value.current); 1023 } 1024 } 1025 } 1026 1027 // Once we start approaching capacity, notify the writer to start writing 1028 // sync immediately for long txs 1029 if (longTx || canStartWriteBatch()) { 1030 1031 if (enabledWriteThread) { 1032 writes.notify(); 1033 } else { 1034 writeBatch(); 1035 } 1036 } 1037 } 1038 } 1039 1040 private boolean canStartWriteBatch() { 1041 int capacityUsed = ((writes.size() * 100) / writeBatchSize); 1042 if (enabledWriteThread) { 1043 // The constant 10 here controls how soon write batches start going to disk.. 1044 // would be nice to figure out how to auto tune that value. Make to small and 1045 // we reduce through put because we are locking the write mutex too often doing writes 1046 return capacityUsed >= 10 || checkpointLatch != null; 1047 } else { 1048 return capacityUsed >= 80 || checkpointLatch != null; 1049 } 1050 } 1051 1052 /////////////////////////////////////////////////////////////////// 1053 // Cache Related operations 1054 /////////////////////////////////////////////////////////////////// 1055 @SuppressWarnings("unchecked") 1056 <T> Page<T> getFromCache(long pageId) { 1057 synchronized (writes) { 1058 PageWrite pageWrite = writes.get(pageId); 1059 if (pageWrite != null) { 1060 return pageWrite.page; 1061 } 1062 } 1063 1064 Page<T> result = null; 1065 if (enablePageCaching) { 1066 result = pageCache.get(pageId); 1067 } 1068 return result; 1069 } 1070 1071 void addToCache(Page page) { 1072 if (enablePageCaching) { 1073 pageCache.put(page.getPageId(), page); 1074 } 1075 } 1076 1077 void removeFromCache(long pageId) { 1078 if (enablePageCaching) { 1079 pageCache.remove(pageId); 1080 } 1081 } 1082 1083 /////////////////////////////////////////////////////////////////// 1084 // Internal Double write implementation follows... 1085 /////////////////////////////////////////////////////////////////// 1086 1087 private void pollWrites() { 1088 try { 1089 while (!stopWriter.get()) { 1090 // Wait for a notification... 1091 synchronized (writes) { 1092 writes.notifyAll(); 1093 1094 // If there is not enough to write, wait for a notification... 1095 while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) { 1096 writes.wait(100); 1097 } 1098 1099 if (writes.isEmpty()) { 1100 releaseCheckpointWaiter(); 1101 } 1102 } 1103 writeBatch(); 1104 } 1105 } catch (Throwable e) { 1106 LOG.info("An exception was raised while performing poll writes", e); 1107 } finally { 1108 releaseCheckpointWaiter(); 1109 } 1110 } 1111 1112 private void writeBatch() throws IOException { 1113 1114 CountDownLatch checkpointLatch; 1115 ArrayList<PageWrite> batch; 1116 synchronized (writes) { 1117 // If there is not enough to write, wait for a notification... 1118 1119 batch = new ArrayList<PageWrite>(writes.size()); 1120 // build a write batch from the current write cache. 1121 for (PageWrite write : writes.values()) { 1122 batch.add(write); 1123 // Move the current write to the diskBound write, this lets folks update the 1124 // page again without blocking for this write. 1125 write.begin(); 1126 if (write.diskBound == null && write.diskBoundLocation == -1) { 1127 batch.remove(write); 1128 } 1129 } 1130 1131 // Grab on to the existing checkpoint latch cause once we do this write we can 1132 // release the folks that were waiting for those writes to hit disk. 1133 checkpointLatch = this.checkpointLatch; 1134 this.checkpointLatch = null; 1135 } 1136 1137 // First land the writes in the recovery file 1138 if (enableRecoveryFile) { 1139 Checksum checksum = new Adler32(); 1140 1141 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); 1142 1143 for (PageWrite w : batch) { 1144 try { 1145 checksum.update(w.getDiskBound(), 0, pageSize); 1146 } catch (Throwable t) { 1147 throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t); 1148 } 1149 recoveryFile.writeLong(w.page.getPageId()); 1150 recoveryFile.write(w.getDiskBound(), 0, pageSize); 1151 } 1152 1153 // Can we shrink the recovery buffer?? 1154 if (recoveryPageCount > recoveryFileMaxPageCount) { 1155 int t = Math.max(recoveryFileMinPageCount, batch.size()); 1156 recoveryFile.setLength(recoveryFileSizeForPages(t)); 1157 } 1158 1159 // Record the page writes in the recovery buffer. 1160 recoveryFile.seek(0); 1161 // Store the next tx id... 1162 recoveryFile.writeLong(nextTxid.get()); 1163 // Store the checksum for thw write batch so that on recovery we 1164 // know if we have a consistent 1165 // write batch on disk. 1166 recoveryFile.writeLong(checksum.getValue()); 1167 // Write the # of pages that will follow 1168 recoveryFile.writeInt(batch.size()); 1169 1170 if (enableDiskSyncs) { 1171 recoveryFile.sync(); 1172 } 1173 } 1174 1175 try { 1176 for (PageWrite w : batch) { 1177 writeFile.seek(toOffset(w.page.getPageId())); 1178 writeFile.write(w.getDiskBound(), 0, pageSize); 1179 w.done(); 1180 } 1181 1182 if (enableDiskSyncs) { 1183 writeFile.sync(); 1184 } 1185 1186 } catch (IOException ioError) { 1187 LOG.info("Unexpected io error on pagefile write of " + batch.size() + " pages.", ioError); 1188 // any subsequent write needs to be prefaced with a considered call to redoRecoveryUpdates 1189 // to ensure disk image is self consistent 1190 loaded.set(false); 1191 throw ioError; 1192 } finally { 1193 synchronized (writes) { 1194 for (PageWrite w : batch) { 1195 // If there are no more pending writes, then remove it from 1196 // the write cache. 1197 if (w.isDone()) { 1198 writes.remove(w.page.getPageId()); 1199 if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) { 1200 if (!w.tmpFile.delete()) { 1201 throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile); 1202 } 1203 tmpFilesForRemoval.remove(w.tmpFile); 1204 } 1205 } 1206 } 1207 } 1208 1209 if (checkpointLatch != null) { 1210 checkpointLatch.countDown(); 1211 } 1212 } 1213 } 1214 1215 public void removeTmpFile(File file) { 1216 tmpFilesForRemoval.add(file); 1217 } 1218 1219 private long recoveryFileSizeForPages(int pageCount) { 1220 return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8) * pageCount); 1221 } 1222 1223 private void releaseCheckpointWaiter() { 1224 if (checkpointLatch != null) { 1225 checkpointLatch.countDown(); 1226 checkpointLatch = null; 1227 } 1228 } 1229 1230 /** 1231 * Inspects the recovery buffer and re-applies any 1232 * partially applied page writes. 1233 * 1234 * @return the next transaction id that can be used. 1235 */ 1236 private long redoRecoveryUpdates() throws IOException { 1237 if (!enableRecoveryFile) { 1238 return 0; 1239 } 1240 recoveryPageCount = 0; 1241 1242 // Are we initializing the recovery file? 1243 if (recoveryFile.length() == 0) { 1244 // Write an empty header.. 1245 recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]); 1246 // Preallocate the minium size for better performance. 1247 recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount)); 1248 return 0; 1249 } 1250 1251 // How many recovery pages do we have in the recovery buffer? 1252 recoveryFile.seek(0); 1253 long nextTxId = recoveryFile.readLong(); 1254 long expectedChecksum = recoveryFile.readLong(); 1255 int pageCounter = recoveryFile.readInt(); 1256 1257 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); 1258 Checksum checksum = new Adler32(); 1259 LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>(); 1260 try { 1261 for (int i = 0; i < pageCounter; i++) { 1262 long offset = recoveryFile.readLong(); 1263 byte[] data = new byte[pageSize]; 1264 if (recoveryFile.read(data, 0, pageSize) != pageSize) { 1265 // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer 1266 return nextTxId; 1267 } 1268 checksum.update(data, 0, pageSize); 1269 batch.put(offset, data); 1270 } 1271 } catch (Exception e) { 1272 // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 1273 // as the pages should still be consistent. 1274 LOG.debug("Redo buffer was not fully intact: ", e); 1275 return nextTxId; 1276 } 1277 1278 recoveryPageCount = pageCounter; 1279 1280 // If the checksum is not valid then the recovery buffer was partially written to disk. 1281 if (checksum.getValue() != expectedChecksum) { 1282 return nextTxId; 1283 } 1284 1285 // Re-apply all the writes in the recovery buffer. 1286 for (Map.Entry<Long, byte[]> e : batch.entrySet()) { 1287 writeFile.seek(toOffset(e.getKey())); 1288 writeFile.write(e.getValue()); 1289 } 1290 1291 // And sync it to disk 1292 writeFile.sync(); 1293 return nextTxId; 1294 } 1295 1296 private void startWriter() { 1297 synchronized (writes) { 1298 if (enabledWriteThread) { 1299 stopWriter.set(false); 1300 writerThread = new Thread("KahaDB Page Writer") { 1301 @Override 1302 public void run() { 1303 pollWrites(); 1304 } 1305 }; 1306 writerThread.setPriority(Thread.MAX_PRIORITY); 1307 writerThread.setDaemon(true); 1308 writerThread.start(); 1309 } 1310 } 1311 } 1312 1313 private void stopWriter() throws InterruptedException { 1314 if (enabledWriteThread) { 1315 stopWriter.set(true); 1316 writerThread.join(); 1317 } 1318 } 1319 1320 public File getFile() { 1321 return getMainPageFile(); 1322 } 1323 1324 public File getDirectory() { 1325 return directory; 1326 } 1327}