001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.page; 018 019import org.apache.activemq.store.kahadb.disk.page.PageFile.PageWrite; 020import org.apache.activemq.store.kahadb.disk.util.*; 021import org.apache.activemq.util.ByteSequence; 022import org.apache.activemq.store.kahadb.disk.util.DataByteArrayInputStream; 023import org.apache.activemq.store.kahadb.disk.util.DataByteArrayOutputStream; 024import org.apache.activemq.util.IOHelper; 025 026import java.io.*; 027import java.util.Iterator; 028import java.util.NoSuchElementException; 029import java.util.TreeMap; 030 031/** 032 * The class used to read/update a PageFile object. Using a transaction allows you to 033 * do multiple update operations in a single unit of work. 034 */ 035public class Transaction implements Iterable<Page> { 036 037 private RandomAccessFile tmpFile; 038 private File txFile; 039 private long nextLocation = 0; 040 041 /** 042 * The PageOverflowIOException occurs when a page write is requested 043 * and it's data is larger than what would fit into a single page. 044 */ 045 public class PageOverflowIOException extends IOException { 046 private static final long serialVersionUID = 1L; 047 048 public PageOverflowIOException(String message) { 049 super(message); 050 } 051 } 052 053 /** 054 * The InvalidPageIOException is thrown if try to load/store a a page 055 * with an invalid page id. 056 */ 057 public class InvalidPageIOException extends IOException { 058 private static final long serialVersionUID = 1L; 059 060 private final long page; 061 062 public InvalidPageIOException(String message, long page) { 063 super(message); 064 this.page = page; 065 } 066 067 public long getPage() { 068 return page; 069 } 070 } 071 072 /** 073 * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method. 074 * 075 * @param <T> The type of exceptions that operation will throw. 076 */ 077 public interface Closure <T extends Throwable> { 078 public void execute(Transaction tx) throws T; 079 } 080 081 /** 082 * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method. 083 * 084 * @param <R> The type of result that the closure produces. 085 * @param <T> The type of exceptions that operation will throw. 086 */ 087 public interface CallableClosure<R, T extends Throwable> { 088 public R execute(Transaction tx) throws T; 089 } 090 091 092 // The page file that this Transaction operates against. 093 private final PageFile pageFile; 094 // If this transaction is updating stuff.. this is the tx of 095 private long writeTransactionId=-1; 096 // List of pages that this transaction has modified. 097 private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>(); 098 // List of pages allocated in this transaction 099 private final SequenceSet allocateList = new SequenceSet(); 100 // List of pages freed in this transaction 101 private final SequenceSet freeList = new SequenceSet(); 102 103 private long maxTransactionSize = Long.getLong("maxKahaDBTxSize", 10485760L); 104 105 private long size = 0; 106 107 Transaction(PageFile pageFile) { 108 this.pageFile = pageFile; 109 } 110 111 /** 112 * @return the page file that created this Transaction 113 */ 114 public PageFile getPageFile() { 115 return this.pageFile; 116 } 117 118 /** 119 * Allocates a free page that you can write data to. 120 * 121 * @return a newly allocated page. 122 * @throws IOException 123 * If an disk error occurred. 124 * @throws IllegalStateException 125 * if the PageFile is not loaded 126 */ 127 public <T> Page<T> allocate() throws IOException { 128 return allocate(1); 129 } 130 131 /** 132 * Allocates a block of free pages that you can write data to. 133 * 134 * @param count the number of sequential pages to allocate 135 * @return the first page of the sequential set. 136 * @throws IOException 137 * If an disk error occurred. 138 * @throws IllegalStateException 139 * if the PageFile is not loaded 140 */ 141 public <T> Page<T> allocate(int count) throws IOException { 142 Page<T> rc = pageFile.allocate(count); 143 allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1)); 144 return rc; 145 } 146 147 /** 148 * Frees up a previously allocated page so that it can be re-allocated again. 149 * 150 * @param pageId the page to free up 151 * @throws IOException 152 * If an disk error occurred. 153 * @throws IllegalStateException 154 * if the PageFile is not loaded 155 */ 156 public void free(long pageId) throws IOException { 157 free(load(pageId, null)); 158 } 159 160 /** 161 * Frees up a previously allocated sequence of pages so that it can be re-allocated again. 162 * 163 * @param pageId the initial page of the sequence that will be getting freed 164 * @param count the number of pages in the sequence 165 * 166 * @throws IOException 167 * If an disk error occurred. 168 * @throws IllegalStateException 169 * if the PageFile is not loaded 170 */ 171 public void free(long pageId, int count) throws IOException { 172 free(load(pageId, null), count); 173 } 174 175 /** 176 * Frees up a previously allocated sequence of pages so that it can be re-allocated again. 177 * 178 * @param page the initial page of the sequence that will be getting freed 179 * @param count the number of pages in the sequence 180 * 181 * @throws IOException 182 * If an disk error occurred. 183 * @throws IllegalStateException 184 * if the PageFile is not loaded 185 */ 186 public <T> void free(Page<T> page, int count) throws IOException { 187 pageFile.assertLoaded(); 188 long offsetPage = page.getPageId(); 189 while (count-- > 0) { 190 if (page == null) { 191 page = load(offsetPage, null); 192 } 193 free(page); 194 page = null; 195 // Increment the offsetPage value since using it depends on the current count. 196 offsetPage++; 197 } 198 } 199 200 /** 201 * Frees up a previously allocated page so that it can be re-allocated again. 202 * 203 * @param page the page to free up 204 * @throws IOException 205 * If an disk error occurred. 206 * @throws IllegalStateException 207 * if the PageFile is not loaded 208 */ 209 public <T> void free(Page<T> page) throws IOException { 210 pageFile.assertLoaded(); 211 212 // We may need loop to free up a page chain. 213 while (page != null) { 214 215 // Is it already free?? 216 if (page.getType() == Page.PAGE_FREE_TYPE) { 217 return; 218 } 219 220 Page<T> next = null; 221 if (page.getType() == Page.PAGE_PART_TYPE) { 222 next = load(page.getNext(), null); 223 } 224 225 page.makeFree(getWriteTransactionId()); 226 // ensure free page is visible while write is pending 227 pageFile.addToCache(page.copy()); 228 229 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize()); 230 page.write(out); 231 write(page, out.getData()); 232 233 freeList.add(page.getPageId()); 234 page = next; 235 } 236 } 237 238 /** 239 * 240 * @param page 241 * the page to write. The Page object must be fully populated with a valid pageId, type, and data. 242 * @param marshaller 243 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data. 244 * @param overflow 245 * If true, then if the page data marshalls to a bigger size than can fit in one page, then additional 246 * overflow pages are automatically allocated and chained to this page to store all the data. If false, 247 * and the overflow condition would occur, then the PageOverflowIOException is thrown. 248 * @throws IOException 249 * If an disk error occurred. 250 * @throws PageOverflowIOException 251 * If the page data marshalls to size larger than maximum page size and overflow was false. 252 * @throws IllegalStateException 253 * if the PageFile is not loaded 254 */ 255 public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException { 256 DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow); 257 if (marshaller != null) { 258 marshaller.writePayload(page.get(), out); 259 } 260 out.close(); 261 } 262 263 /** 264 * @throws IOException 265 */ 266 public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException { 267 pageFile.assertLoaded(); 268 269 // Copy to protect against the end user changing 270 // the page instance while we are doing a write. 271 final Page copy = page.copy(); 272 pageFile.addToCache(copy); 273 274 // 275 // To support writing VERY large data, we override the output stream so 276 // that we 277 // we do the page writes incrementally while the data is being 278 // marshalled. 279 DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) { 280 Page current = copy; 281 282 @SuppressWarnings("unchecked") 283 @Override 284 protected void onWrite() throws IOException { 285 286 // Are we at an overflow condition? 287 final int pageSize = pageFile.getPageSize(); 288 if (pos >= pageSize) { 289 // If overflow is allowed 290 if (overflow) { 291 292 do { 293 Page next; 294 if (current.getType() == Page.PAGE_PART_TYPE) { 295 next = load(current.getNext(), null); 296 } else { 297 next = allocate(); 298 } 299 300 next.txId = current.txId; 301 302 // Write the page header 303 int oldPos = pos; 304 pos = 0; 305 306 current.makePagePart(next.getPageId(), getWriteTransactionId()); 307 current.write(this); 308 309 // Do the page write.. 310 byte[] data = new byte[pageSize]; 311 System.arraycopy(buf, 0, data, 0, pageSize); 312 Transaction.this.write(current, data); 313 314 // make the new link visible 315 pageFile.addToCache(current); 316 317 // Reset for the next page chunk 318 pos = 0; 319 // The page header marshalled after the data is written. 320 skip(Page.PAGE_HEADER_SIZE); 321 // Move the overflow data after the header. 322 System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize); 323 pos += oldPos - pageSize; 324 current = next; 325 326 } while (pos > pageSize); 327 } else { 328 throw new PageOverflowIOException("Page overflow."); 329 } 330 } 331 332 } 333 334 @Override 335 public void close() throws IOException { 336 super.close(); 337 338 // We need to free up the rest of the page chain.. 339 if (current.getType() == Page.PAGE_PART_TYPE) { 340 free(current.getNext()); 341 } 342 343 current.makePageEnd(pos, getWriteTransactionId()); 344 345 // make visible as end page 346 pageFile.addToCache(current); 347 348 // Write the header.. 349 pos = 0; 350 current.write(this); 351 352 Transaction.this.write(current, buf); 353 } 354 }; 355 356 // The page header marshaled after the data is written. 357 out.skip(Page.PAGE_HEADER_SIZE); 358 return out; 359 } 360 361 /** 362 * Loads a page from disk. 363 * 364 * @param pageId 365 * the id of the page to load 366 * @param marshaller 367 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data. 368 * @return The page with the given id 369 * @throws IOException 370 * If an disk error occurred. 371 * @throws IllegalStateException 372 * if the PageFile is not loaded 373 */ 374 public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException { 375 pageFile.assertLoaded(); 376 Page<T> page = new Page<T>(pageId); 377 load(page, marshaller); 378 return page; 379 } 380 381 /** 382 * Loads a page from disk. 383 * 384 * @param page - The pageId field must be properly set 385 * @param marshaller 386 * the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data. 387 * @throws IOException 388 * If an disk error occurred. 389 * @throws InvalidPageIOException 390 * If the page is is not valid. 391 * @throws IllegalStateException 392 * if the PageFile is not loaded 393 */ 394 @SuppressWarnings("unchecked") 395 public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException { 396 pageFile.assertLoaded(); 397 398 // Can't load invalid offsets... 399 long pageId = page.getPageId(); 400 if (pageId < 0) { 401 throw new InvalidPageIOException("Page id is not valid", pageId); 402 } 403 404 // It might be a page this transaction has modified... 405 PageWrite update = writes.get(pageId); 406 if (update != null) { 407 page.copy(update.getPage()); 408 return; 409 } 410 411 // We may be able to get it from the cache... 412 Page<T> t = pageFile.getFromCache(pageId); 413 if (t != null) { 414 page.copy(t); 415 return; 416 } 417 418 if (marshaller != null) { 419 // Full page read.. 420 InputStream is = openInputStream(page); 421 DataInputStream dataIn = new DataInputStream(is); 422 page.set(marshaller.readPayload(dataIn)); 423 is.close(); 424 } else { 425 // Page header read. 426 DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]); 427 pageFile.readPage(pageId, in.getRawData()); 428 page.read(in); 429 page.set(null); 430 } 431 432 // Cache it. 433 if (marshaller != null) { 434 pageFile.addToCache(page); 435 } 436 } 437 438 /** 439 * @see org.apache.activemq.store.kahadb.disk.page.Transaction#load(org.apache.activemq.store.kahadb.disk.page.Page, 440 * org.apache.activemq.store.kahadb.disk.util.Marshaller) 441 */ 442 public InputStream openInputStream(final Page p) throws IOException { 443 444 return new InputStream() { 445 446 private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]); 447 private Page page = readPage(p); 448 private int pageCount = 1; 449 450 private Page markPage; 451 private ByteSequence markChunk; 452 453 private Page readPage(Page page) throws IOException { 454 // Read the page data 455 456 pageFile.readPage(page.getPageId(), chunk.getData()); 457 458 chunk.setOffset(0); 459 chunk.setLength(pageFile.getPageSize()); 460 461 DataByteArrayInputStream in = new DataByteArrayInputStream(chunk); 462 page.read(in); 463 464 chunk.setOffset(Page.PAGE_HEADER_SIZE); 465 if (page.getType() == Page.PAGE_END_TYPE) { 466 chunk.setLength((int)(page.getNext())); 467 } 468 469 if (page.getType() == Page.PAGE_FREE_TYPE) { 470 throw new EOFException("Chunk stream does not exist, page: " + page.getPageId() + " is marked free"); 471 } 472 473 return page; 474 } 475 476 public int read() throws IOException { 477 if (!atEOF()) { 478 return chunk.data[chunk.offset++] & 0xff; 479 } else { 480 return -1; 481 } 482 } 483 484 private boolean atEOF() throws IOException { 485 if (chunk.offset < chunk.length) { 486 return false; 487 } 488 if (page.getType() == Page.PAGE_END_TYPE) { 489 return true; 490 } 491 fill(); 492 return chunk.offset >= chunk.length; 493 } 494 495 private void fill() throws IOException { 496 page = readPage(new Page(page.getNext())); 497 pageCount++; 498 } 499 500 public int read(byte[] b) throws IOException { 501 return read(b, 0, b.length); 502 } 503 504 public int read(byte b[], int off, int len) throws IOException { 505 if (!atEOF()) { 506 int rc = 0; 507 while (!atEOF() && rc < len) { 508 len = Math.min(len, chunk.length - chunk.offset); 509 if (len > 0) { 510 System.arraycopy(chunk.data, chunk.offset, b, off, len); 511 chunk.offset += len; 512 } 513 rc += len; 514 } 515 return rc; 516 } else { 517 return -1; 518 } 519 } 520 521 public long skip(long len) throws IOException { 522 if (atEOF()) { 523 int rc = 0; 524 while (!atEOF() && rc < len) { 525 len = Math.min(len, chunk.length - chunk.offset); 526 if (len > 0) { 527 chunk.offset += len; 528 } 529 rc += len; 530 } 531 return rc; 532 } else { 533 return -1; 534 } 535 } 536 537 public int available() { 538 return chunk.length - chunk.offset; 539 } 540 541 public boolean markSupported() { 542 return true; 543 } 544 545 public void mark(int markpos) { 546 markPage = page; 547 byte data[] = new byte[pageFile.getPageSize()]; 548 System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize()); 549 markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength()); 550 } 551 552 public void reset() { 553 page = markPage; 554 chunk = markChunk; 555 } 556 557 }; 558 } 559 560 /** 561 * Allows you to iterate through all active Pages in this object. Pages with type Page.FREE_TYPE are 562 * not included in this iteration. 563 * 564 * Pages removed with Iterator.remove() will not actually get removed until the transaction commits. 565 * 566 * @throws IllegalStateException 567 * if the PageFile is not loaded 568 */ 569 public Iterator<Page> iterator() { 570 return (Iterator<Page>)iterator(false); 571 } 572 573 /** 574 * Allows you to iterate through all active Pages in this object. You can optionally include free pages in the pages 575 * iterated. 576 * 577 * @param includeFreePages - if true, free pages are included in the iteration 578 * @throws IllegalStateException 579 * if the PageFile is not loaded 580 */ 581 public Iterator<Page> iterator(final boolean includeFreePages) { 582 583 pageFile.assertLoaded(); 584 585 return new Iterator<Page>() { 586 587 long nextId; 588 Page nextPage; 589 Page lastPage; 590 591 private void findNextPage() { 592 if (!pageFile.isLoaded()) { 593 throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded"); 594 } 595 596 if (nextPage != null) { 597 return; 598 } 599 600 try { 601 while (nextId < pageFile.getPageCount()) { 602 603 Page page = load(nextId, null); 604 605 if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) { 606 nextPage = page; 607 return; 608 } else { 609 nextId++; 610 } 611 } 612 } catch (IOException e) { 613 } 614 } 615 616 public boolean hasNext() { 617 findNextPage(); 618 return nextPage != null; 619 } 620 621 public Page next() { 622 findNextPage(); 623 if (nextPage != null) { 624 lastPage = nextPage; 625 nextPage = null; 626 nextId++; 627 return lastPage; 628 } else { 629 throw new NoSuchElementException(); 630 } 631 } 632 633 @SuppressWarnings("unchecked") 634 public void remove() { 635 if (lastPage == null) { 636 throw new IllegalStateException(); 637 } 638 try { 639 free(lastPage); 640 lastPage = null; 641 } catch (IOException e) { 642 throw new RuntimeException(e); 643 } 644 } 645 }; 646 } 647 648 /////////////////////////////////////////////////////////////////// 649 // Commit / Rollback related methods.. 650 /////////////////////////////////////////////////////////////////// 651 652 /** 653 * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated 654 * with the transaction are written to disk or none will. 655 */ 656 public void commit() throws IOException { 657 if( writeTransactionId!=-1 ) { 658 if (tmpFile != null) { 659 tmpFile.close(); 660 pageFile.removeTmpFile(getTempFile()); 661 tmpFile = null; 662 txFile = null; 663 } 664 // Actually do the page writes... 665 pageFile.write(writes.entrySet()); 666 // Release the pages that were freed up in the transaction.. 667 freePages(freeList); 668 669 freeList.clear(); 670 allocateList.clear(); 671 writes.clear(); 672 writeTransactionId = -1; 673 } else { 674 freePages(allocateList); 675 } 676 size = 0; 677 } 678 679 /** 680 * Rolls back the transaction. 681 */ 682 public void rollback() throws IOException { 683 if( writeTransactionId!=-1 ) { 684 if (tmpFile != null) { 685 tmpFile.close(); 686 pageFile.removeTmpFile(getTempFile()); 687 tmpFile = null; 688 txFile = null; 689 } 690 // Release the pages that were allocated in the transaction... 691 freePages(allocateList); 692 693 freeList.clear(); 694 allocateList.clear(); 695 writes.clear(); 696 writeTransactionId = -1; 697 } else { 698 freePages(allocateList); 699 } 700 size = 0; 701 } 702 703 private long getWriteTransactionId() { 704 if( writeTransactionId==-1 ) { 705 writeTransactionId = pageFile.getNextWriteTransactionId(); 706 } 707 return writeTransactionId; 708 } 709 710 711 protected File getTempFile() { 712 if (txFile == null) { 713 txFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName("tx-" + Long.toString(getWriteTransactionId()) + "-" + Long.toString(System.currentTimeMillis()) + ".tmp")); 714 } 715 return txFile; 716 } 717 718 /** 719 * Queues up a page write that should get done when commit() gets called. 720 */ 721 private void write(final Page page, byte[] data) throws IOException { 722 Long key = page.getPageId(); 723 724 // how much pages we have for this transaction 725 size = writes.size() * (long) pageFile.getPageSize(); 726 727 PageWrite write; 728 729 if (size > maxTransactionSize) { 730 if (tmpFile == null) { 731 tmpFile = new RandomAccessFile(getTempFile(), "rw"); 732 } 733 long location = nextLocation; 734 tmpFile.seek(nextLocation); 735 tmpFile.write(data); 736 nextLocation = location + data.length; 737 write = new PageWrite(page, location, data.length, getTempFile()); 738 } else { 739 write = new PageWrite(page, data); 740 } 741 writes.put(key, write); 742 } 743 744 /** 745 * @param list 746 * @throws RuntimeException 747 */ 748 private void freePages(SequenceSet list) throws RuntimeException { 749 Sequence seq = list.getHead(); 750 while( seq!=null ) { 751 seq.each(new Sequence.Closure<RuntimeException>(){ 752 public void execute(long value) { 753 pageFile.freePage(value); 754 } 755 }); 756 seq = seq.getNext(); 757 } 758 } 759 760 /** 761 * @return true if there are no uncommitted page file updates associated with this transaction. 762 */ 763 public boolean isReadOnly() { 764 return writeTransactionId==-1; 765 } 766 767 /////////////////////////////////////////////////////////////////// 768 // Transaction closure helpers... 769 /////////////////////////////////////////////////////////////////// 770 771 /** 772 * Executes a closure and if it does not throw any exceptions, then it commits the transaction. 773 * If the closure throws an Exception, then the transaction is rolled back. 774 * 775 * @param <T> 776 * @param closure - the work to get exectued. 777 * @throws T if the closure throws it 778 * @throws IOException If the commit fails. 779 */ 780 public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException { 781 boolean success = false; 782 try { 783 closure.execute(this); 784 success = true; 785 } finally { 786 if (success) { 787 commit(); 788 } else { 789 rollback(); 790 } 791 } 792 } 793 794 /** 795 * Executes a closure and if it does not throw any exceptions, then it commits the transaction. 796 * If the closure throws an Exception, then the transaction is rolled back. 797 * 798 * @param <T> 799 * @param closure - the work to get exectued. 800 * @throws T if the closure throws it 801 * @throws IOException If the commit fails. 802 */ 803 public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException { 804 boolean success = false; 805 try { 806 R rc = closure.execute(this); 807 success = true; 808 return rc; 809 } finally { 810 if (success) { 811 commit(); 812 } else { 813 rollback(); 814 } 815 } 816 } 817}