001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.page; 018 019import java.io.ByteArrayInputStream; 020import java.io.ByteArrayOutputStream; 021import java.io.DataInputStream; 022import java.io.DataOutputStream; 023import java.io.File; 024import java.io.FileInputStream; 025import java.io.FileOutputStream; 026import java.io.IOException; 027import java.io.InterruptedIOException; 028import java.io.RandomAccessFile; 029import java.util.ArrayList; 030import java.util.Arrays; 031import java.util.Collection; 032import java.util.Collections; 033import java.util.Iterator; 034import java.util.LinkedHashMap; 035import java.util.Map; 036import java.util.Map.Entry; 037import java.util.Properties; 038import java.util.TreeMap; 039import java.util.concurrent.CountDownLatch; 040import java.util.concurrent.atomic.AtomicBoolean; 041import java.util.concurrent.atomic.AtomicLong; 042import java.util.concurrent.atomic.AtomicReference; 043import java.util.zip.Adler32; 044import java.util.zip.Checksum; 045 046import org.apache.activemq.store.kahadb.disk.util.Sequence; 047import org.apache.activemq.store.kahadb.disk.util.SequenceSet; 048import org.apache.activemq.util.DataByteArrayOutputStream; 049import org.apache.activemq.util.IOExceptionSupport; 050import org.apache.activemq.util.IOHelper; 051import org.apache.activemq.util.IntrospectionSupport; 052import org.apache.activemq.util.LFUCache; 053import org.apache.activemq.util.LRUCache; 054import org.apache.activemq.util.RecoverableRandomAccessFile; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * A PageFile provides you random access to fixed sized disk pages. This object is not thread safe and therefore access to it should 060 * be externally synchronized. 061 * <p/> 062 * The file has 3 parts: 063 * Metadata Space: 4k : Reserved metadata area. Used to store persistent config about the file. 064 * Recovery Buffer Space: Page Size * 1000 : This is a redo log used to prevent partial page writes from making the file inconsistent 065 * Page Space: The pages in the page file. 066 */ 067public class PageFile { 068 069 private static final String PAGEFILE_SUFFIX = ".data"; 070 private static final String RECOVERY_FILE_SUFFIX = ".redo"; 071 private static final String FREE_FILE_SUFFIX = ".free"; 072 073 // 4k Default page size. 074 public static final int DEFAULT_PAGE_SIZE = Integer.getInteger("defaultPageSize", 1024*4); 075 public static final int DEFAULT_WRITE_BATCH_SIZE = Integer.getInteger("defaultWriteBatchSize", 1000); 076 public static final int DEFAULT_PAGE_CACHE_SIZE = Integer.getInteger("defaultPageCacheSize", 100);; 077 078 private static final int RECOVERY_FILE_HEADER_SIZE = 1024 * 4; 079 private static final int PAGE_FILE_HEADER_SIZE = 1024 * 4; 080 081 // Recovery header is (long offset) 082 private static final Logger LOG = LoggerFactory.getLogger(PageFile.class); 083 084 // A PageFile will use a couple of files in this directory 085 private final File directory; 086 // And the file names in that directory will be based on this name. 087 private final String name; 088 089 // File handle used for reading pages.. 090 private RecoverableRandomAccessFile readFile; 091 // File handle used for writing pages.. 092 private RecoverableRandomAccessFile writeFile; 093 // File handle used for writing pages.. 094 private RecoverableRandomAccessFile recoveryFile; 095 096 // The size of pages 097 private int pageSize = DEFAULT_PAGE_SIZE; 098 099 // The minimum number of space allocated to the recovery file in number of pages. 100 private int recoveryFileMinPageCount = 1000; 101 // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize 102 // to this max size as soon as possible. 103 private int recoveryFileMaxPageCount = 10000; 104 // The number of pages in the current recovery buffer 105 private int recoveryPageCount; 106 107 private final AtomicBoolean loaded = new AtomicBoolean(); 108 // The number of pages we are aiming to write every time we 109 // write to disk. 110 int writeBatchSize = DEFAULT_WRITE_BATCH_SIZE; 111 112 // We keep a cache of pages recently used? 113 private Map<Long, Page> pageCache; 114 // The cache of recently used pages. 115 private boolean enablePageCaching = true; 116 // How many pages will we keep in the cache? 117 private int pageCacheSize = DEFAULT_PAGE_CACHE_SIZE; 118 119 // Should first log the page write to the recovery buffer? Avoids partial 120 // page write failures.. 121 private boolean enableRecoveryFile = true; 122 // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint() 123 private boolean enableDiskSyncs = true; 124 // Will writes be done in an async thread? 125 private boolean enabledWriteThread = false; 126 127 // These are used if enableAsyncWrites==true 128 private final AtomicBoolean stopWriter = new AtomicBoolean(); 129 private Thread writerThread; 130 private CountDownLatch checkpointLatch; 131 132 // Keeps track of writes that are being written to disk. 133 private final TreeMap<Long, PageWrite> writes = new TreeMap<Long, PageWrite>(); 134 135 // Keeps track of free pages. 136 private final AtomicLong nextFreePageId = new AtomicLong(); 137 private SequenceSet freeList = new SequenceSet(); 138 139 private AtomicReference<SequenceSet> recoveredFreeList = new AtomicReference<SequenceSet>(); 140 private AtomicReference<SequenceSet> trackingFreeDuringRecovery = new AtomicReference<SequenceSet>(); 141 142 private final AtomicLong nextTxid = new AtomicLong(); 143 144 // Persistent settings stored in the page file. 145 private MetaData metaData; 146 147 private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>(); 148 149 private boolean useLFRUEviction = false; 150 private float LFUEvictionFactor = 0.2f; 151 152 /** 153 * Use to keep track of updated pages which have not yet been committed. 154 */ 155 static class PageWrite { 156 Page page; 157 byte[] current; 158 byte[] diskBound; 159 long currentLocation = -1; 160 long diskBoundLocation = -1; 161 File tmpFile; 162 int length; 163 164 public PageWrite(Page page, byte[] data) { 165 this.page = page; 166 current = data; 167 } 168 169 public PageWrite(Page page, long currentLocation, int length, File tmpFile) { 170 this.page = page; 171 this.currentLocation = currentLocation; 172 this.tmpFile = tmpFile; 173 this.length = length; 174 } 175 176 public void setCurrent(Page page, byte[] data) { 177 this.page = page; 178 current = data; 179 currentLocation = -1; 180 } 181 182 public void setCurrentLocation(Page page, long location, int length) { 183 this.page = page; 184 this.currentLocation = location; 185 this.length = length; 186 this.current = null; 187 } 188 189 @Override 190 public String toString() { 191 return "[PageWrite:" + page.getPageId() + "-" + page.getType() + "]"; 192 } 193 194 @SuppressWarnings("unchecked") 195 public Page getPage() { 196 return page; 197 } 198 199 public byte[] getDiskBound() throws IOException { 200 if (diskBound == null && diskBoundLocation != -1) { 201 diskBound = new byte[length]; 202 try(RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) { 203 file.seek(diskBoundLocation); 204 file.read(diskBound); 205 } 206 diskBoundLocation = -1; 207 } 208 return diskBound; 209 } 210 211 void begin() { 212 if (currentLocation != -1) { 213 diskBoundLocation = currentLocation; 214 } else { 215 diskBound = current; 216 } 217 current = null; 218 currentLocation = -1; 219 } 220 221 /** 222 * @return true if there is no pending writes to do. 223 */ 224 boolean done() { 225 diskBoundLocation = -1; 226 diskBound = null; 227 return current == null || currentLocation == -1; 228 } 229 230 boolean isDone() { 231 return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1; 232 } 233 } 234 235 /** 236 * The MetaData object hold the persistent data associated with a PageFile object. 237 */ 238 public static class MetaData { 239 240 String fileType; 241 String fileTypeVersion; 242 243 long metaDataTxId = -1; 244 int pageSize; 245 boolean cleanShutdown; 246 long lastTxId; 247 long freePages; 248 249 public String getFileType() { 250 return fileType; 251 } 252 253 public void setFileType(String fileType) { 254 this.fileType = fileType; 255 } 256 257 public String getFileTypeVersion() { 258 return fileTypeVersion; 259 } 260 261 public void setFileTypeVersion(String version) { 262 this.fileTypeVersion = version; 263 } 264 265 public long getMetaDataTxId() { 266 return metaDataTxId; 267 } 268 269 public void setMetaDataTxId(long metaDataTxId) { 270 this.metaDataTxId = metaDataTxId; 271 } 272 273 public int getPageSize() { 274 return pageSize; 275 } 276 277 public void setPageSize(int pageSize) { 278 this.pageSize = pageSize; 279 } 280 281 public boolean isCleanShutdown() { 282 return cleanShutdown; 283 } 284 285 public void setCleanShutdown(boolean cleanShutdown) { 286 this.cleanShutdown = cleanShutdown; 287 } 288 289 public long getLastTxId() { 290 return lastTxId; 291 } 292 293 public void setLastTxId(long lastTxId) { 294 this.lastTxId = lastTxId; 295 } 296 297 public long getFreePages() { 298 return freePages; 299 } 300 301 public void setFreePages(long value) { 302 this.freePages = value; 303 } 304 } 305 306 public Transaction tx() { 307 assertLoaded(); 308 return new Transaction(this); 309 } 310 311 /** 312 * Creates a PageFile in the specified directory who's data files are named by name. 313 */ 314 public PageFile(File directory, String name) { 315 this.directory = directory; 316 this.name = name; 317 } 318 319 /** 320 * Deletes the files used by the PageFile object. This method can only be used when this object is not loaded. 321 * 322 * @throws IOException if the files cannot be deleted. 323 * @throws IllegalStateException if this PageFile is loaded 324 */ 325 public void delete() throws IOException { 326 if (loaded.get()) { 327 throw new IllegalStateException("Cannot delete page file data when the page file is loaded"); 328 } 329 delete(getMainPageFile()); 330 delete(getFreeFile()); 331 delete(getRecoveryFile()); 332 } 333 334 public void archive() throws IOException { 335 if (loaded.get()) { 336 throw new IllegalStateException("Cannot delete page file data when the page file is loaded"); 337 } 338 long timestamp = System.currentTimeMillis(); 339 archive(getMainPageFile(), String.valueOf(timestamp)); 340 archive(getFreeFile(), String.valueOf(timestamp)); 341 archive(getRecoveryFile(), String.valueOf(timestamp)); 342 } 343 344 /** 345 * @param file 346 * @throws IOException 347 */ 348 private void delete(File file) throws IOException { 349 if (file.exists() && !file.delete()) { 350 throw new IOException("Could not delete: " + file.getPath()); 351 } 352 } 353 354 private void archive(File file, String suffix) throws IOException { 355 if (file.exists()) { 356 File archive = new File(file.getPath() + "-" + suffix); 357 if (!file.renameTo(archive)) { 358 throw new IOException("Could not archive: " + file.getPath() + " to " + file.getPath()); 359 } 360 } 361 } 362 363 /** 364 * Loads the page file so that it can be accessed for read/write purposes. This allocates OS resources. If this is the 365 * first time the page file is loaded, then this creates the page file in the file system. 366 * 367 * @throws IOException If the page file cannot be loaded. This could be cause the existing page file is corrupt is a bad version or if 368 * there was a disk error. 369 * @throws IllegalStateException If the page file was already loaded. 370 */ 371 public void load() throws IOException, IllegalStateException { 372 if (loaded.compareAndSet(false, true)) { 373 374 if (enablePageCaching) { 375 if (isUseLFRUEviction()) { 376 pageCache = Collections.synchronizedMap(new LFUCache<Long, Page>(pageCacheSize, getLFUEvictionFactor())); 377 } else { 378 pageCache = Collections.synchronizedMap(new LRUCache<Long, Page>(pageCacheSize, pageCacheSize, 0.75f, true)); 379 } 380 } 381 382 File file = getMainPageFile(); 383 IOHelper.mkdirs(file.getParentFile()); 384 writeFile = new RecoverableRandomAccessFile(file, "rw", false); 385 readFile = new RecoverableRandomAccessFile(file, "r"); 386 387 if (readFile.length() > 0) { 388 // Load the page size setting cause that can't change once the file is created. 389 loadMetaData(); 390 pageSize = metaData.getPageSize(); 391 } else { 392 // Store the page size setting cause that can't change once the file is created. 393 metaData = new MetaData(); 394 metaData.setFileType(PageFile.class.getName()); 395 metaData.setFileTypeVersion("1"); 396 metaData.setPageSize(getPageSize()); 397 metaData.setCleanShutdown(true); 398 metaData.setFreePages(-1); 399 metaData.setLastTxId(0); 400 storeMetaData(); 401 } 402 403 if (enableRecoveryFile) { 404 recoveryFile = new RecoverableRandomAccessFile(getRecoveryFile(), "rw"); 405 } 406 407 if (metaData.isCleanShutdown()) { 408 nextTxid.set(metaData.getLastTxId() + 1); 409 if (metaData.getFreePages() > 0) { 410 loadFreeList(); 411 } 412 } else { 413 LOG.debug(toString() + ", Recovering page file..."); 414 nextTxid.set(redoRecoveryUpdates()); 415 trackingFreeDuringRecovery.set(new SequenceSet()); 416 } 417 418 if (writeFile.length() < PAGE_FILE_HEADER_SIZE) { 419 writeFile.setLength(PAGE_FILE_HEADER_SIZE); 420 } 421 nextFreePageId.set((writeFile.length() - PAGE_FILE_HEADER_SIZE) / pageSize); 422 423 metaData.setCleanShutdown(false); 424 storeMetaData(); 425 getFreeFile().delete(); 426 startWriter(); 427 if (trackingFreeDuringRecovery.get() != null) { 428 asyncFreePageRecovery(nextFreePageId.get()); 429 } 430 } else { 431 throw new IllegalStateException("Cannot load the page file when it is already loaded."); 432 } 433 } 434 435 private void asyncFreePageRecovery(final long lastRecoveryPage) { 436 Thread thread = new Thread("KahaDB Index Free Page Recovery") { 437 @Override 438 public void run() { 439 try { 440 recoverFreePages(lastRecoveryPage); 441 } catch (Throwable e) { 442 if (loaded.get()) { 443 LOG.warn("Error recovering index free page list", e); 444 } 445 } 446 } 447 }; 448 thread.setPriority(Thread.NORM_PRIORITY); 449 thread.setDaemon(true); 450 thread.start(); 451 } 452 453 private void recoverFreePages(final long lastRecoveryPage) throws Exception { 454 LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown.."); 455 SequenceSet newFreePages = new SequenceSet(); 456 // need new pageFile instance to get unshared readFile 457 PageFile recoveryPageFile = new PageFile(directory, name); 458 recoveryPageFile.loadForRecovery(nextFreePageId.get()); 459 try { 460 for (Iterator<Page> i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) { 461 Page page = i.next(); 462 463 if (page.getPageId() >= lastRecoveryPage) { 464 break; 465 } 466 467 if (page.getType() == Page.PAGE_FREE_TYPE) { 468 newFreePages.add(page.getPageId()); 469 } 470 } 471 } finally { 472 recoveryPageFile.readFile.close(); 473 } 474 475 LOG.info(toString() + ". Recovered pageFile free list of size: " + newFreePages.rangeSize()); 476 if (!newFreePages.isEmpty()) { 477 478 // allow flush (with index lock held) to merge eventually 479 recoveredFreeList.lazySet(newFreePages); 480 } else { 481 // If there is no free pages, set trackingFreeDuringRecovery to allow the broker to have a clean shutdown 482 trackingFreeDuringRecovery.set(null); 483 } 484 } 485 486 private void loadForRecovery(long nextFreePageIdSnap) throws Exception { 487 loaded.set(true); 488 enablePageCaching = false; 489 File file = getMainPageFile(); 490 readFile = new RecoverableRandomAccessFile(file, "r"); 491 loadMetaData(); 492 pageSize = metaData.getPageSize(); 493 enableRecoveryFile = false; 494 nextFreePageId.set(nextFreePageIdSnap); 495 } 496 497 498 /** 499 * Unloads a previously loaded PageFile. This deallocates OS related resources like file handles. 500 * once unloaded, you can no longer use the page file to read or write Pages. 501 * 502 * @throws IOException if there was a disk error occurred while closing the down the page file. 503 * @throws IllegalStateException if the PageFile is not loaded 504 */ 505 public void unload() throws IOException { 506 if (loaded.compareAndSet(true, false)) { 507 flush(); 508 try { 509 stopWriter(); 510 } catch (InterruptedException e) { 511 throw new InterruptedIOException(); 512 } 513 514 if (freeList.isEmpty()) { 515 metaData.setFreePages(0); 516 } else { 517 storeFreeList(); 518 metaData.setFreePages(freeList.size()); 519 } 520 521 metaData.setLastTxId(nextTxid.get() - 1); 522 if (trackingFreeDuringRecovery.get() != null) { 523 // async recovery incomplete, will have to try again 524 metaData.setCleanShutdown(false); 525 } else { 526 metaData.setCleanShutdown(true); 527 } 528 storeMetaData(); 529 530 if (readFile != null) { 531 readFile.close(); 532 readFile = null; 533 writeFile.close(); 534 writeFile = null; 535 if (enableRecoveryFile) { 536 recoveryFile.close(); 537 recoveryFile = null; 538 } 539 freeList.clear(); 540 if (pageCache != null) { 541 pageCache = null; 542 } 543 synchronized (writes) { 544 writes.clear(); 545 } 546 } 547 } else { 548 throw new IllegalStateException("Cannot unload the page file when it is not loaded"); 549 } 550 } 551 552 public boolean isLoaded() { 553 return loaded.get(); 554 } 555 556 public boolean isCleanShutdown() { 557 return metaData != null && metaData.isCleanShutdown(); 558 } 559 560 public void allowIOResumption() { 561 loaded.set(true); 562 } 563 564 /** 565 * Flush and sync all write buffers to disk. 566 * 567 * @throws IOException If an disk error occurred. 568 */ 569 public void flush() throws IOException { 570 571 if (enabledWriteThread && stopWriter.get()) { 572 throw new IOException("Page file already stopped: checkpointing is not allowed"); 573 } 574 575 SequenceSet recovered = recoveredFreeList.get(); 576 if (recovered != null) { 577 recoveredFreeList.lazySet(null); 578 SequenceSet inUse = trackingFreeDuringRecovery.get(); 579 recovered.remove(inUse); 580 freeList.merge(recovered); 581 582 // all set for clean shutdown 583 trackingFreeDuringRecovery.set(null); 584 inUse.clear(); 585 } 586 587 // Setup a latch that gets notified when all buffered writes hits the disk. 588 CountDownLatch checkpointLatch; 589 synchronized (writes) { 590 if (writes.isEmpty()) { 591 return; 592 } 593 if (enabledWriteThread) { 594 if (this.checkpointLatch == null) { 595 this.checkpointLatch = new CountDownLatch(1); 596 } 597 checkpointLatch = this.checkpointLatch; 598 writes.notify(); 599 } else { 600 writeBatch(); 601 return; 602 } 603 } 604 try { 605 checkpointLatch.await(); 606 } catch (InterruptedException e) { 607 InterruptedIOException ioe = new InterruptedIOException(); 608 ioe.initCause(e); 609 throw ioe; 610 } 611 } 612 613 614 @Override 615 public String toString() { 616 return "Page File: " + getMainPageFile(); 617 } 618 619 /////////////////////////////////////////////////////////////////// 620 // Private Implementation Methods 621 /////////////////////////////////////////////////////////////////// 622 private File getMainPageFile() { 623 return new File(directory, IOHelper.toFileSystemSafeName(name) + PAGEFILE_SUFFIX); 624 } 625 626 public File getFreeFile() { 627 return new File(directory, IOHelper.toFileSystemSafeName(name) + FREE_FILE_SUFFIX); 628 } 629 630 public File getRecoveryFile() { 631 return new File(directory, IOHelper.toFileSystemSafeName(name) + RECOVERY_FILE_SUFFIX); 632 } 633 634 public long toOffset(long pageId) { 635 return PAGE_FILE_HEADER_SIZE + (pageId * pageSize); 636 } 637 638 private void loadMetaData() throws IOException { 639 640 ByteArrayInputStream is; 641 MetaData v1 = new MetaData(); 642 MetaData v2 = new MetaData(); 643 try { 644 Properties p = new Properties(); 645 byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2]; 646 readFile.seek(0); 647 readFile.readFully(d); 648 is = new ByteArrayInputStream(d); 649 p.load(is); 650 IntrospectionSupport.setProperties(v1, p); 651 } catch (IOException e) { 652 v1 = null; 653 } 654 655 try { 656 Properties p = new Properties(); 657 byte[] d = new byte[PAGE_FILE_HEADER_SIZE / 2]; 658 readFile.seek(PAGE_FILE_HEADER_SIZE / 2); 659 readFile.readFully(d); 660 is = new ByteArrayInputStream(d); 661 p.load(is); 662 IntrospectionSupport.setProperties(v2, p); 663 } catch (IOException e) { 664 v2 = null; 665 } 666 667 if (v1 == null && v2 == null) { 668 throw new IOException("Could not load page file meta data"); 669 } 670 671 if (v1 == null || v1.metaDataTxId < 0) { 672 metaData = v2; 673 } else if (v2 == null || v1.metaDataTxId < 0) { 674 metaData = v1; 675 } else if (v1.metaDataTxId == v2.metaDataTxId) { 676 metaData = v1; // use the first since the 2nd could be a partial.. 677 } else { 678 metaData = v2; // use the second cause the first is probably a partial. 679 } 680 } 681 682 private void storeMetaData() throws IOException { 683 // Convert the metadata into a property format 684 metaData.metaDataTxId++; 685 Properties p = new Properties(); 686 IntrospectionSupport.getProperties(metaData, p, null); 687 688 ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE); 689 p.store(os, ""); 690 if (os.size() > PAGE_FILE_HEADER_SIZE / 2) { 691 throw new IOException("Configuation is larger than: " + PAGE_FILE_HEADER_SIZE / 2); 692 } 693 // Fill the rest with space... 694 byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE / 2) - os.size()]; 695 Arrays.fill(filler, (byte) ' '); 696 os.write(filler); 697 os.flush(); 698 699 byte[] d = os.toByteArray(); 700 701 // So we don't loose it.. write it 2 times... 702 writeFile.seek(0); 703 writeFile.write(d); 704 writeFile.sync(); 705 writeFile.seek(PAGE_FILE_HEADER_SIZE / 2); 706 writeFile.write(d); 707 writeFile.sync(); 708 } 709 710 private void storeFreeList() throws IOException { 711 FileOutputStream os = new FileOutputStream(getFreeFile()); 712 DataOutputStream dos = new DataOutputStream(os); 713 SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos); 714 dos.close(); 715 } 716 717 private void loadFreeList() throws IOException { 718 freeList.clear(); 719 FileInputStream is = new FileInputStream(getFreeFile()); 720 DataInputStream dis = new DataInputStream(is); 721 freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis); 722 dis.close(); 723 } 724 725 /////////////////////////////////////////////////////////////////// 726 // Property Accessors 727 /////////////////////////////////////////////////////////////////// 728 729 /** 730 * Is the recovery buffer used to double buffer page writes. Enabled by default. 731 * 732 * @return is the recovery buffer enabled. 733 */ 734 public boolean isEnableRecoveryFile() { 735 return enableRecoveryFile; 736 } 737 738 /** 739 * Sets if the recovery buffer uses to double buffer page writes. Enabled by default. Disabling this 740 * may potentially cause partial page writes which can lead to page file corruption. 741 */ 742 public void setEnableRecoveryFile(boolean doubleBuffer) { 743 assertNotLoaded(); 744 this.enableRecoveryFile = doubleBuffer; 745 } 746 747 /** 748 * @return Are page writes synced to disk? 749 */ 750 public boolean isEnableDiskSyncs() { 751 return enableDiskSyncs; 752 } 753 754 /** 755 * Allows you enable syncing writes to disk. 756 */ 757 public void setEnableDiskSyncs(boolean syncWrites) { 758 assertNotLoaded(); 759 this.enableDiskSyncs = syncWrites; 760 } 761 762 /** 763 * @return the page size 764 */ 765 public int getPageSize() { 766 return this.pageSize; 767 } 768 769 /** 770 * @return the amount of content data that a page can hold. 771 */ 772 public int getPageContentSize() { 773 return this.pageSize - Page.PAGE_HEADER_SIZE; 774 } 775 776 /** 777 * Configures the page size used by the page file. By default it is 4k. Once a page file is created on disk, 778 * subsequent loads of that file will use the original pageSize. Once the PageFile is loaded, this setting 779 * can no longer be changed. 780 * 781 * @param pageSize the pageSize to set 782 * @throws IllegalStateException once the page file is loaded. 783 */ 784 public void setPageSize(int pageSize) throws IllegalStateException { 785 assertNotLoaded(); 786 this.pageSize = pageSize; 787 } 788 789 /** 790 * @return true if read page caching is enabled 791 */ 792 public boolean isEnablePageCaching() { 793 return this.enablePageCaching; 794 } 795 796 /** 797 * @param enablePageCaching allows you to enable read page caching 798 */ 799 public void setEnablePageCaching(boolean enablePageCaching) { 800 assertNotLoaded(); 801 this.enablePageCaching = enablePageCaching; 802 } 803 804 /** 805 * @return the maximum number of pages that will get stored in the read page cache. 806 */ 807 public int getPageCacheSize() { 808 return this.pageCacheSize; 809 } 810 811 /** 812 * @param pageCacheSize Sets the maximum number of pages that will get stored in the read page cache. 813 */ 814 public void setPageCacheSize(int pageCacheSize) { 815 assertNotLoaded(); 816 this.pageCacheSize = pageCacheSize; 817 } 818 819 public boolean isEnabledWriteThread() { 820 return enabledWriteThread; 821 } 822 823 public void setEnableWriteThread(boolean enableAsyncWrites) { 824 assertNotLoaded(); 825 this.enabledWriteThread = enableAsyncWrites; 826 } 827 828 public long getDiskSize() throws IOException { 829 return toOffset(nextFreePageId.get()); 830 } 831 832 public boolean isFreePage(long pageId) { 833 return freeList.contains(pageId); 834 } 835 /** 836 * @return the number of pages allocated in the PageFile 837 */ 838 public long getPageCount() { 839 return nextFreePageId.get(); 840 } 841 842 public int getRecoveryFileMinPageCount() { 843 return recoveryFileMinPageCount; 844 } 845 846 public long getFreePageCount() { 847 assertLoaded(); 848 return freeList.rangeSize(); 849 } 850 851 public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) { 852 assertNotLoaded(); 853 this.recoveryFileMinPageCount = recoveryFileMinPageCount; 854 } 855 856 public int getRecoveryFileMaxPageCount() { 857 return recoveryFileMaxPageCount; 858 } 859 860 public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) { 861 assertNotLoaded(); 862 this.recoveryFileMaxPageCount = recoveryFileMaxPageCount; 863 } 864 865 public int getWriteBatchSize() { 866 return writeBatchSize; 867 } 868 869 public void setWriteBatchSize(int writeBatchSize) { 870 this.writeBatchSize = writeBatchSize; 871 } 872 873 public float getLFUEvictionFactor() { 874 return LFUEvictionFactor; 875 } 876 877 public void setLFUEvictionFactor(float LFUEvictionFactor) { 878 this.LFUEvictionFactor = LFUEvictionFactor; 879 } 880 881 public boolean isUseLFRUEviction() { 882 return useLFRUEviction; 883 } 884 885 public void setUseLFRUEviction(boolean useLFRUEviction) { 886 this.useLFRUEviction = useLFRUEviction; 887 } 888 889 /////////////////////////////////////////////////////////////////// 890 // Package Protected Methods exposed to Transaction 891 /////////////////////////////////////////////////////////////////// 892 893 /** 894 * @throws IllegalStateException if the page file is not loaded. 895 */ 896 void assertLoaded() throws IllegalStateException { 897 if (!loaded.get()) { 898 throw new IllegalStateException("PageFile is not loaded"); 899 } 900 } 901 902 void assertNotLoaded() throws IllegalStateException { 903 if (loaded.get()) { 904 throw new IllegalStateException("PageFile is loaded"); 905 } 906 } 907 908 /** 909 * Allocates a block of free pages that you can write data to. 910 * 911 * @param count the number of sequential pages to allocate 912 * @return the first page of the sequential set. 913 * @throws IOException If an disk error occurred. 914 * @throws IllegalStateException if the PageFile is not loaded 915 */ 916 <T> Page<T> allocate(int count) throws IOException { 917 assertLoaded(); 918 if (count <= 0) { 919 throw new IllegalArgumentException("The allocation count must be larger than zero"); 920 } 921 922 Sequence seq = freeList.removeFirstSequence(count); 923 924 // We may need to create new free pages... 925 if (seq == null) { 926 927 Page<T> first = null; 928 int c = count; 929 930 // Perform the id's only once.... 931 long pageId = nextFreePageId.getAndAdd(count); 932 long writeTxnId = nextTxid.getAndAdd(count); 933 934 while (c-- > 0) { 935 Page<T> page = new Page<T>(pageId++); 936 page.makeFree(writeTxnId++); 937 938 if (first == null) { 939 first = page; 940 } 941 942 addToCache(page); 943 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize); 944 page.write(out); 945 write(page, out.getData()); 946 947 // LOG.debug("allocate writing: "+page.getPageId()); 948 } 949 950 return first; 951 } 952 953 Page<T> page = new Page<T>(seq.getFirst()); 954 page.makeFree(0); 955 // LOG.debug("allocated: "+page.getPageId()); 956 return page; 957 } 958 959 long getNextWriteTransactionId() { 960 return nextTxid.incrementAndGet(); 961 } 962 963 synchronized void readPage(long pageId, byte[] data) throws IOException { 964 readFile.seek(toOffset(pageId)); 965 readFile.readFully(data); 966 } 967 968 public void freePage(long pageId) { 969 freeList.add(pageId); 970 removeFromCache(pageId); 971 972 SequenceSet trackFreeDuringRecovery = trackingFreeDuringRecovery.get(); 973 if (trackFreeDuringRecovery != null) { 974 trackFreeDuringRecovery.add(pageId); 975 } 976 } 977 978 @SuppressWarnings("unchecked") 979 private <T> void write(Page<T> page, byte[] data) throws IOException { 980 final PageWrite write = new PageWrite(page, data); 981 Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>() { 982 @Override 983 public Long getKey() { 984 return write.getPage().getPageId(); 985 } 986 987 @Override 988 public PageWrite getValue() { 989 return write; 990 } 991 992 @Override 993 public PageWrite setValue(PageWrite value) { 994 return null; 995 } 996 }; 997 Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry}; 998 write(Arrays.asList(entries)); 999 } 1000 1001 void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException { 1002 synchronized (writes) { 1003 if (enabledWriteThread) { 1004 while (writes.size() >= writeBatchSize && !stopWriter.get()) { 1005 try { 1006 writes.wait(); 1007 } catch (InterruptedException e) { 1008 Thread.currentThread().interrupt(); 1009 throw new InterruptedIOException(); 1010 } 1011 } 1012 } 1013 1014 boolean longTx = false; 1015 1016 for (Map.Entry<Long, PageWrite> entry : updates) { 1017 Long key = entry.getKey(); 1018 PageWrite value = entry.getValue(); 1019 PageWrite write = writes.get(key); 1020 if (write == null) { 1021 writes.put(key, value); 1022 } else { 1023 if (value.currentLocation != -1) { 1024 write.setCurrentLocation(value.page, value.currentLocation, value.length); 1025 write.tmpFile = value.tmpFile; 1026 longTx = true; 1027 } else { 1028 write.setCurrent(value.page, value.current); 1029 } 1030 } 1031 } 1032 1033 // Once we start approaching capacity, notify the writer to start writing 1034 // sync immediately for long txs 1035 if (longTx || canStartWriteBatch()) { 1036 1037 if (enabledWriteThread) { 1038 writes.notify(); 1039 } else { 1040 writeBatch(); 1041 } 1042 } 1043 } 1044 } 1045 1046 private boolean canStartWriteBatch() { 1047 int capacityUsed = ((writes.size() * 100) / writeBatchSize); 1048 if (enabledWriteThread) { 1049 // The constant 10 here controls how soon write batches start going to disk.. 1050 // would be nice to figure out how to auto tune that value. Make to small and 1051 // we reduce through put because we are locking the write mutex too often doing writes 1052 return capacityUsed >= 10 || checkpointLatch != null; 1053 } else { 1054 return capacityUsed >= 80 || checkpointLatch != null; 1055 } 1056 } 1057 1058 /////////////////////////////////////////////////////////////////// 1059 // Cache Related operations 1060 /////////////////////////////////////////////////////////////////// 1061 @SuppressWarnings("unchecked") 1062 <T> Page<T> getFromCache(long pageId) { 1063 synchronized (writes) { 1064 PageWrite pageWrite = writes.get(pageId); 1065 if (pageWrite != null) { 1066 return pageWrite.page; 1067 } 1068 } 1069 1070 Page<T> result = null; 1071 if (enablePageCaching) { 1072 result = pageCache.get(pageId); 1073 } 1074 return result; 1075 } 1076 1077 void addToCache(Page page) { 1078 if (enablePageCaching) { 1079 pageCache.put(page.getPageId(), page); 1080 } 1081 } 1082 1083 void removeFromCache(long pageId) { 1084 if (enablePageCaching) { 1085 pageCache.remove(pageId); 1086 } 1087 } 1088 1089 /////////////////////////////////////////////////////////////////// 1090 // Internal Double write implementation follows... 1091 /////////////////////////////////////////////////////////////////// 1092 1093 private void pollWrites() { 1094 try { 1095 while (!stopWriter.get()) { 1096 // Wait for a notification... 1097 synchronized (writes) { 1098 writes.notifyAll(); 1099 1100 // If there is not enough to write, wait for a notification... 1101 while (writes.isEmpty() && checkpointLatch == null && !stopWriter.get()) { 1102 writes.wait(100); 1103 } 1104 1105 if (writes.isEmpty()) { 1106 releaseCheckpointWaiter(); 1107 } 1108 } 1109 writeBatch(); 1110 } 1111 } catch (Throwable e) { 1112 LOG.info("An exception was raised while performing poll writes", e); 1113 } finally { 1114 releaseCheckpointWaiter(); 1115 } 1116 } 1117 1118 private void writeBatch() throws IOException { 1119 1120 CountDownLatch checkpointLatch; 1121 ArrayList<PageWrite> batch; 1122 synchronized (writes) { 1123 // If there is not enough to write, wait for a notification... 1124 1125 batch = new ArrayList<PageWrite>(writes.size()); 1126 // build a write batch from the current write cache. 1127 for (PageWrite write : writes.values()) { 1128 batch.add(write); 1129 // Move the current write to the diskBound write, this lets folks update the 1130 // page again without blocking for this write. 1131 write.begin(); 1132 if (write.diskBound == null && write.diskBoundLocation == -1) { 1133 batch.remove(write); 1134 } 1135 } 1136 1137 // Grab on to the existing checkpoint latch cause once we do this write we can 1138 // release the folks that were waiting for those writes to hit disk. 1139 checkpointLatch = this.checkpointLatch; 1140 this.checkpointLatch = null; 1141 } 1142 1143 try { 1144 1145 // First land the writes in the recovery file 1146 if (enableRecoveryFile) { 1147 Checksum checksum = new Adler32(); 1148 1149 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); 1150 1151 for (PageWrite w : batch) { 1152 try { 1153 checksum.update(w.getDiskBound(), 0, pageSize); 1154 } catch (Throwable t) { 1155 throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t); 1156 } 1157 recoveryFile.writeLong(w.page.getPageId()); 1158 recoveryFile.write(w.getDiskBound(), 0, pageSize); 1159 } 1160 1161 // Can we shrink the recovery buffer?? 1162 if (recoveryPageCount > recoveryFileMaxPageCount) { 1163 int t = Math.max(recoveryFileMinPageCount, batch.size()); 1164 recoveryFile.setLength(recoveryFileSizeForPages(t)); 1165 } 1166 1167 // Record the page writes in the recovery buffer. 1168 recoveryFile.seek(0); 1169 // Store the next tx id... 1170 recoveryFile.writeLong(nextTxid.get()); 1171 // Store the checksum for thw write batch so that on recovery we 1172 // know if we have a consistent 1173 // write batch on disk. 1174 recoveryFile.writeLong(checksum.getValue()); 1175 // Write the # of pages that will follow 1176 recoveryFile.writeInt(batch.size()); 1177 1178 if (enableDiskSyncs) { 1179 recoveryFile.sync(); 1180 } 1181 } 1182 1183 for (PageWrite w : batch) { 1184 writeFile.seek(toOffset(w.page.getPageId())); 1185 writeFile.write(w.getDiskBound(), 0, pageSize); 1186 w.done(); 1187 } 1188 1189 if (enableDiskSyncs) { 1190 writeFile.sync(); 1191 } 1192 1193 } catch (IOException ioError) { 1194 LOG.info("Unexpected io error on pagefile write of " + batch.size() + " pages.", ioError); 1195 // any subsequent write needs to be prefaced with a considered call to redoRecoveryUpdates 1196 // to ensure disk image is self consistent 1197 loaded.set(false); 1198 throw ioError; 1199 } finally { 1200 synchronized (writes) { 1201 for (PageWrite w : batch) { 1202 // If there are no more pending writes, then remove it from 1203 // the write cache. 1204 if (w.isDone()) { 1205 writes.remove(w.page.getPageId()); 1206 if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) { 1207 if (!w.tmpFile.delete()) { 1208 throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile); 1209 } 1210 tmpFilesForRemoval.remove(w.tmpFile); 1211 } 1212 } 1213 } 1214 } 1215 1216 if (checkpointLatch != null) { 1217 checkpointLatch.countDown(); 1218 } 1219 } 1220 } 1221 1222 public void removeTmpFile(File file) { 1223 tmpFilesForRemoval.add(file); 1224 } 1225 1226 private long recoveryFileSizeForPages(int pageCount) { 1227 return RECOVERY_FILE_HEADER_SIZE + ((pageSize + 8L) * pageCount); 1228 } 1229 1230 private void releaseCheckpointWaiter() { 1231 if (checkpointLatch != null) { 1232 checkpointLatch.countDown(); 1233 checkpointLatch = null; 1234 } 1235 } 1236 1237 /** 1238 * Inspects the recovery buffer and re-applies any 1239 * partially applied page writes. 1240 * 1241 * @return the next transaction id that can be used. 1242 */ 1243 private long redoRecoveryUpdates() throws IOException { 1244 if (!enableRecoveryFile) { 1245 return 0; 1246 } 1247 recoveryPageCount = 0; 1248 1249 // Are we initializing the recovery file? 1250 if (recoveryFile.length() == 0) { 1251 // Write an empty header.. 1252 recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]); 1253 // Preallocate the minium size for better performance. 1254 recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount)); 1255 return 0; 1256 } 1257 1258 // How many recovery pages do we have in the recovery buffer? 1259 recoveryFile.seek(0); 1260 long nextTxId = recoveryFile.readLong(); 1261 long expectedChecksum = recoveryFile.readLong(); 1262 int pageCounter = recoveryFile.readInt(); 1263 1264 recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); 1265 Checksum checksum = new Adler32(); 1266 LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>(); 1267 try { 1268 for (int i = 0; i < pageCounter; i++) { 1269 long offset = recoveryFile.readLong(); 1270 byte[] data = new byte[pageSize]; 1271 if (recoveryFile.read(data, 0, pageSize) != pageSize) { 1272 // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer 1273 return nextTxId; 1274 } 1275 checksum.update(data, 0, pageSize); 1276 batch.put(offset, data); 1277 } 1278 } catch (Exception e) { 1279 // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 1280 // as the pages should still be consistent. 1281 LOG.debug("Redo buffer was not fully intact: ", e); 1282 return nextTxId; 1283 } 1284 1285 recoveryPageCount = pageCounter; 1286 1287 // If the checksum is not valid then the recovery buffer was partially written to disk. 1288 if (checksum.getValue() != expectedChecksum) { 1289 return nextTxId; 1290 } 1291 1292 // Re-apply all the writes in the recovery buffer. 1293 for (Map.Entry<Long, byte[]> e : batch.entrySet()) { 1294 writeFile.seek(toOffset(e.getKey())); 1295 writeFile.write(e.getValue()); 1296 } 1297 1298 // And sync it to disk 1299 writeFile.sync(); 1300 return nextTxId; 1301 } 1302 1303 private void startWriter() { 1304 synchronized (writes) { 1305 if (enabledWriteThread) { 1306 stopWriter.set(false); 1307 writerThread = new Thread("KahaDB Page Writer") { 1308 @Override 1309 public void run() { 1310 pollWrites(); 1311 } 1312 }; 1313 writerThread.setPriority(Thread.MAX_PRIORITY); 1314 writerThread.setDaemon(true); 1315 writerThread.start(); 1316 } 1317 } 1318 } 1319 1320 private void stopWriter() throws InterruptedException { 1321 if (enabledWriteThread) { 1322 stopWriter.set(true); 1323 writerThread.join(); 1324 } 1325 } 1326 1327 public File getFile() { 1328 return getMainPageFile(); 1329 } 1330 1331 public File getDirectory() { 1332 return directory; 1333 } 1334}