001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.EOFException; 020import java.io.File; 021import java.io.FileNotFoundException; 022import java.io.FilenameFilter; 023import java.io.IOException; 024import java.io.RandomAccessFile; 025import java.io.UnsupportedEncodingException; 026import java.nio.ByteBuffer; 027import java.nio.channels.ClosedByInterruptException; 028import java.nio.channels.FileChannel; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.LinkedHashMap; 033import java.util.LinkedList; 034import java.util.Map; 035import java.util.Set; 036import java.util.TreeMap; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.Executors; 039import java.util.concurrent.Future; 040import java.util.concurrent.ScheduledExecutorService; 041import java.util.concurrent.ScheduledFuture; 042import java.util.concurrent.ThreadFactory; 043import java.util.concurrent.TimeUnit; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.atomic.AtomicReference; 046import java.util.zip.Adler32; 047import java.util.zip.Checksum; 048 049import org.apache.activemq.store.kahadb.disk.util.LinkedNode; 050import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; 051import org.apache.activemq.store.kahadb.disk.util.Sequence; 052import org.apache.activemq.util.ByteSequence; 053import org.apache.activemq.util.DataByteArrayInputStream; 054import org.apache.activemq.util.DataByteArrayOutputStream; 055import org.apache.activemq.util.IOHelper; 056import org.apache.activemq.util.RecoverableRandomAccessFile; 057import org.apache.activemq.util.ThreadPoolUtils; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Manages DataFiles 063 */ 064public class Journal { 065 public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER"; 066 public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false")); 067 068 private static final int PREALLOC_CHUNK_SIZE = 1024*1024; 069 070 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 071 public static final int RECORD_HEAD_SPACE = 4 + 1; 072 073 public static final byte USER_RECORD_TYPE = 1; 074 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 075 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 076 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 077 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8; 078 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 079 public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader(); 080 public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt(); 081 public static final byte EOF_EOT = '4'; 082 public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord(); 083 084 private ScheduledExecutorService scheduler; 085 086 // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss 087 public void corruptRecoveryLocation(Location recoveryPosition) throws IOException { 088 DataFile dataFile = getDataFile(recoveryPosition); 089 // with corruption on recovery we have no faith in the content - slip to the next batch record or eof 090 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 091 try { 092 RandomAccessFile randomAccessFile = reader.getRaf().getRaf(); 093 randomAccessFile.seek(recoveryPosition.getOffset() + 1); 094 byte[] data = new byte[getWriteBatchSize()]; 095 ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data)); 096 int nextOffset = 0; 097 if (findNextBatchRecord(bs, randomAccessFile) >= 0) { 098 nextOffset = (int) randomAccessFile.getFilePointer() - bs.remaining(); 099 } else { 100 nextOffset =(int) randomAccessFile.length(); 101 } 102 Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset - 1); 103 LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence); 104 105 // skip corruption on getNextLocation 106 recoveryPosition.setOffset(nextOffset); 107 recoveryPosition.setSize(-1); 108 109 dataFile.corruptedBlocks.add(sequence); 110 } catch (IOException e) { 111 } finally { 112 accessorPool.closeDataFileAccessor(reader); 113 } 114 } 115 116 public DataFileAccessorPool getAccessorPool() { 117 return accessorPool; 118 } 119 120 public void allowIOResumption() { 121 if (appender instanceof DataFileAppender) { 122 DataFileAppender dataFileAppender = (DataFileAppender)appender; 123 dataFileAppender.shutdown = false; 124 } 125 } 126 127 public void setCleanupInterval(long cleanupInterval) { 128 this.cleanupInterval = cleanupInterval; 129 } 130 131 public long getCleanupInterval() { 132 return cleanupInterval; 133 } 134 135 public enum PreallocationStrategy { 136 SPARSE_FILE, 137 OS_KERNEL_COPY, 138 ZEROS, 139 CHUNKED_ZEROS; 140 } 141 142 public enum PreallocationScope { 143 ENTIRE_JOURNAL, 144 ENTIRE_JOURNAL_ASYNC, 145 NONE; 146 } 147 148 public enum JournalDiskSyncStrategy { 149 ALWAYS, 150 PERIODIC, 151 NEVER; 152 } 153 154 private static byte[] createBatchControlRecordHeader() { 155 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 156 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 157 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 158 os.write(BATCH_CONTROL_RECORD_MAGIC); 159 ByteSequence sequence = os.toByteSequence(); 160 sequence.compact(); 161 return sequence.getData(); 162 } catch (IOException e) { 163 throw new RuntimeException("Could not create batch control record header.", e); 164 } 165 } 166 167 private static byte[] createEmptyBatchControlRecordHeader() { 168 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 169 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 170 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 171 os.write(BATCH_CONTROL_RECORD_MAGIC); 172 os.writeInt(0); 173 os.writeLong(0l); 174 ByteSequence sequence = os.toByteSequence(); 175 sequence.compact(); 176 return sequence.getData(); 177 } catch (IOException e) { 178 throw new RuntimeException("Could not create empty batch control record header.", e); 179 } 180 } 181 182 private static byte[] createEofBatchAndLocationRecord() { 183 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 184 os.writeInt(EOF_INT); 185 os.writeByte(EOF_EOT); 186 ByteSequence sequence = os.toByteSequence(); 187 sequence.compact(); 188 return sequence.getData(); 189 } catch (IOException e) { 190 throw new RuntimeException("Could not create eof header.", e); 191 } 192 } 193 194 public static final String DEFAULT_DIRECTORY = "."; 195 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 196 public static final String DEFAULT_FILE_PREFIX = "db-"; 197 public static final String DEFAULT_FILE_SUFFIX = ".log"; 198 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 199 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 200 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 201 202 private static final Logger LOG = LoggerFactory.getLogger(Journal.class); 203 204 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 205 206 protected File directory = new File(DEFAULT_DIRECTORY); 207 protected File directoryArchive; 208 private boolean directoryArchiveOverridden = false; 209 210 protected String filePrefix = DEFAULT_FILE_PREFIX; 211 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 212 protected boolean started; 213 214 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 215 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 216 217 protected FileAppender appender; 218 protected DataFileAccessorPool accessorPool; 219 220 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 221 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 222 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 223 224 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 225 protected ScheduledFuture cleanupTask; 226 protected AtomicLong totalLength = new AtomicLong(); 227 protected boolean archiveDataLogs; 228 private ReplicationTarget replicationTarget; 229 protected boolean checksum; 230 protected boolean checkForCorruptionOnStartup; 231 protected boolean enableAsyncDiskSync = true; 232 private int nextDataFileId = 1; 233 private Object dataFileIdLock = new Object(); 234 private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null); 235 private volatile DataFile nextDataFile; 236 237 protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; 238 protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; 239 private File osKernelCopyTemplateFile = null; 240 private ByteBuffer preAllocateDirectBuffer = null; 241 private long cleanupInterval = DEFAULT_CLEANUP_INTERVAL; 242 243 protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 244 245 public interface DataFileRemovedListener { 246 void fileRemoved(DataFile datafile); 247 } 248 249 private DataFileRemovedListener dataFileRemovedListener; 250 251 public synchronized void start() throws IOException { 252 if (started) { 253 return; 254 } 255 256 long start = System.currentTimeMillis(); 257 accessorPool = new DataFileAccessorPool(this); 258 started = true; 259 260 appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this); 261 262 File[] files = directory.listFiles(new FilenameFilter() { 263 @Override 264 public boolean accept(File dir, String n) { 265 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 266 } 267 }); 268 269 if (files != null) { 270 for (File file : files) { 271 try { 272 String n = file.getName(); 273 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 274 int num = Integer.parseInt(numStr); 275 DataFile dataFile = new DataFile(file, num); 276 fileMap.put(dataFile.getDataFileId(), dataFile); 277 totalLength.addAndGet(dataFile.getLength()); 278 } catch (NumberFormatException e) { 279 // Ignore file that do not match the pattern. 280 } 281 } 282 283 // Sort the list so that we can link the DataFiles together in the 284 // right order. 285 LinkedList<DataFile> l = new LinkedList<>(fileMap.values()); 286 Collections.sort(l); 287 for (DataFile df : l) { 288 if (df.getLength() == 0) { 289 // possibly the result of a previous failed write 290 LOG.info("ignoring zero length, partially initialised journal data file: " + df); 291 continue; 292 } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) { 293 continue; 294 } 295 dataFiles.addLast(df); 296 fileByFileMap.put(df.getFile(), df); 297 298 if( isCheckForCorruptionOnStartup() ) { 299 lastAppendLocation.set(recoveryCheck(df)); 300 } 301 } 302 } 303 304 if (preallocationScope != PreallocationScope.NONE) { 305 switch (preallocationStrategy) { 306 case SPARSE_FILE: 307 break; 308 case OS_KERNEL_COPY: { 309 osKernelCopyTemplateFile = createJournalTemplateFile(); 310 } 311 break; 312 case CHUNKED_ZEROS: { 313 preAllocateDirectBuffer = allocateDirectBuffer(PREALLOC_CHUNK_SIZE); 314 } 315 break; 316 case ZEROS: { 317 preAllocateDirectBuffer = allocateDirectBuffer(getMaxFileLength()); 318 } 319 break; 320 } 321 } 322 scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { 323 @Override 324 public Thread newThread(Runnable r) { 325 Thread schedulerThread = new Thread(r); 326 schedulerThread.setName("ActiveMQ Journal Scheduled executor"); 327 schedulerThread.setDaemon(true); 328 return schedulerThread; 329 } 330 }); 331 332 // init current write file 333 if (dataFiles.isEmpty()) { 334 nextDataFileId = 1; 335 rotateWriteFile(); 336 } else { 337 currentDataFile.set(dataFiles.getTail()); 338 nextDataFileId = currentDataFile.get().dataFileId + 1; 339 } 340 341 if( lastAppendLocation.get()==null ) { 342 DataFile df = dataFiles.getTail(); 343 lastAppendLocation.set(recoveryCheck(df)); 344 } 345 346 // ensure we don't report unused space of last journal file in size metric 347 if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) { 348 totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength); 349 } 350 351 cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() { 352 @Override 353 public void run() { 354 cleanup(); 355 } 356 }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS); 357 358 long end = System.currentTimeMillis(); 359 LOG.trace("Startup took: "+(end-start)+" ms"); 360 } 361 362 private ByteBuffer allocateDirectBuffer(int size) { 363 ByteBuffer buffer = ByteBuffer.allocateDirect(size); 364 buffer.put(EOF_RECORD); 365 return buffer; 366 } 367 368 public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { 369 370 if (PreallocationScope.NONE != preallocationScope) { 371 372 try { 373 if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { 374 doPreallocationKernelCopy(file); 375 } else if (PreallocationStrategy.ZEROS == preallocationStrategy) { 376 doPreallocationZeros(file); 377 } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) { 378 doPreallocationChunkedZeros(file); 379 } else { 380 doPreallocationSparseFile(file); 381 } 382 } catch (Throwable continueWithNoPrealloc) { 383 // error on preallocation is non fatal, and we don't want to leak the journal handle 384 LOG.error("cound not preallocate journal data file", continueWithNoPrealloc); 385 } 386 } 387 } 388 389 private void doPreallocationSparseFile(RecoverableRandomAccessFile file) { 390 final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD); 391 try { 392 FileChannel channel = file.getChannel(); 393 channel.position(0); 394 channel.write(journalEof); 395 channel.position(maxFileLength - 5); 396 journalEof.rewind(); 397 channel.write(journalEof); 398 channel.force(false); 399 channel.position(0); 400 } catch (ClosedByInterruptException ignored) { 401 LOG.trace("Could not preallocate journal file with sparse file", ignored); 402 } catch (IOException e) { 403 LOG.error("Could not preallocate journal file with sparse file", e); 404 } 405 } 406 407 private void doPreallocationZeros(RecoverableRandomAccessFile file) { 408 preAllocateDirectBuffer.rewind(); 409 try { 410 FileChannel channel = file.getChannel(); 411 channel.write(preAllocateDirectBuffer); 412 channel.force(false); 413 channel.position(0); 414 } catch (ClosedByInterruptException ignored) { 415 LOG.trace("Could not preallocate journal file with zeros", ignored); 416 } catch (IOException e) { 417 LOG.error("Could not preallocate journal file with zeros", e); 418 } 419 } 420 421 private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) { 422 try { 423 RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw"); 424 templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel()); 425 templateRaf.close(); 426 } catch (ClosedByInterruptException ignored) { 427 LOG.trace("Could not preallocate journal file with kernel copy", ignored); 428 } catch (FileNotFoundException e) { 429 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 430 } catch (IOException e) { 431 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 432 } 433 } 434 435 private File createJournalTemplateFile() { 436 String fileName = "db-log.template"; 437 File rc = new File(directory, fileName); 438 try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) { 439 templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD)); 440 templateRaf.setLength(maxFileLength); 441 templateRaf.getChannel().force(true); 442 } catch (FileNotFoundException e) { 443 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 444 } catch (IOException e) { 445 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 446 } 447 return rc; 448 } 449 450 private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) { 451 preAllocateDirectBuffer.limit(preAllocateDirectBuffer.capacity()); 452 preAllocateDirectBuffer.rewind(); 453 try { 454 FileChannel channel = file.getChannel(); 455 456 int remLen = maxFileLength; 457 while (remLen > 0) { 458 if (remLen < preAllocateDirectBuffer.remaining()) { 459 preAllocateDirectBuffer.limit(remLen); 460 } 461 int writeLen = channel.write(preAllocateDirectBuffer); 462 remLen -= writeLen; 463 preAllocateDirectBuffer.rewind(); 464 } 465 466 channel.force(false); 467 channel.position(0); 468 } catch (ClosedByInterruptException ignored) { 469 LOG.trace("Could not preallocate journal file with zeros", ignored); 470 } catch (IOException e) { 471 LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e); 472 } 473 } 474 475 private static byte[] bytes(String string) { 476 try { 477 return string.getBytes("UTF-8"); 478 } catch (UnsupportedEncodingException e) { 479 throw new RuntimeException(e); 480 } 481 } 482 483 public boolean isUnusedPreallocated(DataFile dataFile) throws IOException { 484 if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) { 485 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 486 try { 487 byte[] firstFewBytes = new byte[BATCH_CONTROL_RECORD_HEADER.length]; 488 reader.readFully(0, firstFewBytes); 489 ByteSequence bs = new ByteSequence(firstFewBytes); 490 return bs.startsWith(EOF_RECORD); 491 } catch (Exception ignored) { 492 } finally { 493 accessorPool.closeDataFileAccessor(reader); 494 } 495 } 496 return false; 497 } 498 499 protected Location recoveryCheck(DataFile dataFile) throws IOException { 500 Location location = new Location(); 501 location.setDataFileId(dataFile.getDataFileId()); 502 location.setOffset(0); 503 504 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 505 try { 506 RandomAccessFile randomAccessFile = reader.getRaf().getRaf(); 507 randomAccessFile.seek(0); 508 final long totalFileLength = randomAccessFile.length(); 509 byte[] data = new byte[getWriteBatchSize()]; 510 ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data)); 511 512 while (true) { 513 int size = checkBatchRecord(bs, randomAccessFile); 514 if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) { 515 if (size == 0) { 516 // eof batch record 517 break; 518 } 519 location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size); 520 } else { 521 522 // Perhaps it's just some corruption... scan through the 523 // file to find the next valid batch record. We 524 // may have subsequent valid batch records. 525 if (findNextBatchRecord(bs, randomAccessFile) >= 0) { 526 int nextOffset = (int) randomAccessFile.getFilePointer() - bs.remaining(); 527 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); 528 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 529 dataFile.corruptedBlocks.add(sequence); 530 location.setOffset(nextOffset); 531 } else { 532 break; 533 } 534 } 535 } 536 537 } catch (IOException e) { 538 } finally { 539 accessorPool.closeDataFileAccessor(reader); 540 } 541 542 int existingLen = dataFile.getLength(); 543 dataFile.setLength(location.getOffset()); 544 if (existingLen > dataFile.getLength()) { 545 totalLength.addAndGet(dataFile.getLength() - existingLen); 546 } 547 548 if (!dataFile.corruptedBlocks.isEmpty()) { 549 // Is the end of the data file corrupted? 550 if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) { 551 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); 552 } 553 } 554 555 return location; 556 } 557 558 private int findNextBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException { 559 final ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 560 int pos = 0; 561 while (true) { 562 pos = bs.indexOf(header, 0); 563 if (pos >= 0) { 564 bs.setOffset(bs.offset + pos); 565 return pos; 566 } else { 567 // need to load the next data chunck in.. 568 if (bs.length != bs.data.length) { 569 // If we had a short read then we were at EOF 570 return -1; 571 } 572 bs.setOffset(bs.length - BATCH_CONTROL_RECORD_HEADER.length); 573 bs.reset(); 574 bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - BATCH_CONTROL_RECORD_HEADER.length)); 575 } 576 } 577 } 578 579 private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException { 580 ensureAvailable(bs, reader, EOF_RECORD.length); 581 if (bs.startsWith(EOF_RECORD)) { 582 return 0; // eof 583 } 584 ensureAvailable(bs, reader, BATCH_CONTROL_RECORD_SIZE); 585 try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) { 586 587 // Assert that it's a batch record. 588 for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { 589 if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) { 590 return -1; 591 } 592 } 593 594 int size = controlIs.readInt(); 595 if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) { 596 return -2; 597 } 598 599 long expectedChecksum = controlIs.readLong(); 600 Checksum checksum = null; 601 if (isChecksum() && expectedChecksum > 0) { 602 checksum = new Adler32(); 603 } 604 605 // revert to bs to consume data 606 bs.setOffset(controlIs.position()); 607 int toRead = size; 608 while (toRead > 0) { 609 if (bs.remaining() >= toRead) { 610 if (checksum != null) { 611 checksum.update(bs.getData(), bs.getOffset(), toRead); 612 } 613 bs.setOffset(bs.offset + toRead); 614 toRead = 0; 615 } else { 616 if (bs.length != bs.data.length) { 617 // buffer exhausted 618 return -3; 619 } 620 621 toRead -= bs.remaining(); 622 if (checksum != null) { 623 checksum.update(bs.getData(), bs.getOffset(), bs.remaining()); 624 } 625 bs.setLength(reader.read(bs.data)); 626 bs.setOffset(0); 627 } 628 } 629 if (checksum != null && expectedChecksum != checksum.getValue()) { 630 return -4; 631 } 632 633 return size; 634 } 635 } 636 637 private void ensureAvailable(ByteSequence bs, RandomAccessFile reader, int required) throws IOException { 638 if (bs.remaining() < required) { 639 bs.reset(); 640 int read = reader.read(bs.data, bs.length, bs.data.length - bs.length); 641 if (read < 0) { 642 if (bs.remaining() == 0) { 643 throw new EOFException("request for " + required + " bytes reached EOF"); 644 } 645 } 646 bs.setLength(bs.length + read); 647 } 648 } 649 650 void addToTotalLength(int size) { 651 totalLength.addAndGet(size); 652 } 653 654 public long length() { 655 return totalLength.get(); 656 } 657 658 private void rotateWriteFile() throws IOException { 659 synchronized (dataFileIdLock) { 660 DataFile dataFile = nextDataFile; 661 if (dataFile == null) { 662 dataFile = newDataFile(); 663 } 664 synchronized (currentDataFile) { 665 fileMap.put(dataFile.getDataFileId(), dataFile); 666 fileByFileMap.put(dataFile.getFile(), dataFile); 667 dataFiles.addLast(dataFile); 668 currentDataFile.set(dataFile); 669 } 670 nextDataFile = null; 671 } 672 if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) { 673 preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask); 674 } 675 } 676 677 private Runnable preAllocateNextDataFileTask = new Runnable() { 678 @Override 679 public void run() { 680 if (nextDataFile == null) { 681 synchronized (dataFileIdLock){ 682 try { 683 nextDataFile = newDataFile(); 684 } catch (IOException e) { 685 LOG.warn("Failed to proactively allocate data file", e); 686 } 687 } 688 } 689 } 690 }; 691 692 private volatile Future preAllocateNextDataFileFuture; 693 694 private DataFile newDataFile() throws IOException { 695 int nextNum = nextDataFileId++; 696 File file = getFile(nextNum); 697 DataFile nextWriteFile = new DataFile(file, nextNum); 698 preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile()); 699 return nextWriteFile; 700 } 701 702 703 public DataFile reserveDataFile() { 704 synchronized (dataFileIdLock) { 705 int nextNum = nextDataFileId++; 706 File file = getFile(nextNum); 707 DataFile reservedDataFile = new DataFile(file, nextNum); 708 synchronized (currentDataFile) { 709 fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile); 710 fileByFileMap.put(file, reservedDataFile); 711 if (dataFiles.isEmpty()) { 712 dataFiles.addLast(reservedDataFile); 713 } else { 714 dataFiles.getTail().linkBefore(reservedDataFile); 715 } 716 } 717 return reservedDataFile; 718 } 719 } 720 721 public File getFile(int nextNum) { 722 String fileName = filePrefix + nextNum + fileSuffix; 723 File file = new File(directory, fileName); 724 return file; 725 } 726 727 DataFile getDataFile(Location item) throws IOException { 728 Integer key = Integer.valueOf(item.getDataFileId()); 729 DataFile dataFile = null; 730 synchronized (currentDataFile) { 731 dataFile = fileMap.get(key); 732 } 733 if (dataFile == null) { 734 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 735 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 736 } 737 return dataFile; 738 } 739 740 public void close() throws IOException { 741 synchronized (this) { 742 if (!started) { 743 return; 744 } 745 cleanupTask.cancel(true); 746 if (preAllocateNextDataFileFuture != null) { 747 preAllocateNextDataFileFuture.cancel(true); 748 } 749 ThreadPoolUtils.shutdownGraceful(scheduler, 4000); 750 accessorPool.close(); 751 } 752 // the appender can be calling back to to the journal blocking a close AMQ-5620 753 appender.close(); 754 synchronized (currentDataFile) { 755 fileMap.clear(); 756 fileByFileMap.clear(); 757 dataFiles.clear(); 758 lastAppendLocation.set(null); 759 started = false; 760 } 761 } 762 763 public synchronized void cleanup() { 764 if (accessorPool != null) { 765 accessorPool.disposeUnused(); 766 } 767 } 768 769 public synchronized boolean delete() throws IOException { 770 771 // Close all open file handles... 772 appender.close(); 773 accessorPool.close(); 774 775 boolean result = true; 776 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 777 DataFile dataFile = i.next(); 778 result &= dataFile.delete(); 779 } 780 781 if (preAllocateNextDataFileFuture != null) { 782 preAllocateNextDataFileFuture.cancel(true); 783 } 784 synchronized (dataFileIdLock) { 785 if (nextDataFile != null) { 786 nextDataFile.delete(); 787 nextDataFile = null; 788 } 789 } 790 791 totalLength.set(0); 792 synchronized (currentDataFile) { 793 fileMap.clear(); 794 fileByFileMap.clear(); 795 lastAppendLocation.set(null); 796 dataFiles = new LinkedNodeList<DataFile>(); 797 } 798 // reopen open file handles... 799 accessorPool = new DataFileAccessorPool(this); 800 appender = new DataFileAppender(this); 801 return result; 802 } 803 804 public void removeDataFiles(Set<Integer> files) throws IOException { 805 for (Integer key : files) { 806 // Can't remove the data file (or subsequent files) that is currently being written to. 807 if (key >= lastAppendLocation.get().getDataFileId()) { 808 continue; 809 } 810 DataFile dataFile = null; 811 synchronized (currentDataFile) { 812 dataFile = fileMap.remove(key); 813 if (dataFile != null) { 814 fileByFileMap.remove(dataFile.getFile()); 815 dataFile.unlink(); 816 } 817 } 818 if (dataFile != null) { 819 forceRemoveDataFile(dataFile); 820 } 821 } 822 } 823 824 private void forceRemoveDataFile(DataFile dataFile) throws IOException { 825 accessorPool.disposeDataFileAccessors(dataFile); 826 totalLength.addAndGet(-dataFile.getLength()); 827 if (archiveDataLogs) { 828 File directoryArchive = getDirectoryArchive(); 829 if (directoryArchive.exists()) { 830 LOG.debug("Archive directory exists: {}", directoryArchive); 831 } else { 832 if (directoryArchive.isAbsolute()) 833 if (LOG.isDebugEnabled()) { 834 LOG.debug("Archive directory [{}] does not exist - creating it now", 835 directoryArchive.getAbsolutePath()); 836 } 837 IOHelper.mkdirs(directoryArchive); 838 } 839 LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath()); 840 dataFile.move(directoryArchive); 841 LOG.debug("Successfully moved data file"); 842 } else { 843 LOG.debug("Deleting data file: {}", dataFile); 844 if (dataFile.delete()) { 845 LOG.debug("Discarded data file: {}", dataFile); 846 } else { 847 LOG.warn("Failed to discard data file : {}", dataFile.getFile()); 848 } 849 } 850 if (dataFileRemovedListener != null) { 851 dataFileRemovedListener.fileRemoved(dataFile); 852 } 853 } 854 855 /** 856 * @return the maxFileLength 857 */ 858 public int getMaxFileLength() { 859 return maxFileLength; 860 } 861 862 /** 863 * @param maxFileLength the maxFileLength to set 864 */ 865 public void setMaxFileLength(int maxFileLength) { 866 this.maxFileLength = maxFileLength; 867 } 868 869 @Override 870 public String toString() { 871 return directory.toString(); 872 } 873 874 public Location getNextLocation(Location location) throws IOException, IllegalStateException { 875 return getNextLocation(location, null); 876 } 877 878 public Location getNextLocation(Location location, Location limit) throws IOException, IllegalStateException { 879 Location cur = null; 880 while (true) { 881 if (cur == null) { 882 if (location == null) { 883 DataFile head = null; 884 synchronized (currentDataFile) { 885 head = dataFiles.getHead(); 886 } 887 if (head == null) { 888 return null; 889 } 890 cur = new Location(); 891 cur.setDataFileId(head.getDataFileId()); 892 cur.setOffset(0); 893 } else { 894 // Set to the next offset.. 895 if (location.getSize() == -1) { 896 cur = new Location(location); 897 } else { 898 cur = new Location(location); 899 cur.setOffset(location.getOffset() + location.getSize()); 900 } 901 } 902 } else { 903 cur.setOffset(cur.getOffset() + cur.getSize()); 904 } 905 906 DataFile dataFile = getDataFile(cur); 907 908 // Did it go into the next file?? 909 if (dataFile.getLength() <= cur.getOffset()) { 910 synchronized (currentDataFile) { 911 dataFile = dataFile.getNext(); 912 } 913 if (dataFile == null) { 914 return null; 915 } else { 916 cur.setDataFileId(dataFile.getDataFileId().intValue()); 917 cur.setOffset(0); 918 if (limit != null && cur.compareTo(limit) >= 0) { 919 LOG.trace("reached limit: {} at: {}", limit, cur); 920 return null; 921 } 922 } 923 } 924 925 // Load in location size and type. 926 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 927 try { 928 reader.readLocationDetails(cur); 929 } catch (EOFException eof) { 930 LOG.trace("EOF on next: " + location + ", cur: " + cur); 931 throw eof; 932 } finally { 933 accessorPool.closeDataFileAccessor(reader); 934 } 935 936 Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset()); 937 if (corruptedRange != null) { 938 // skip corruption 939 cur.setSize((int) corruptedRange.range()); 940 } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT || 941 (cur.getType() == 0 && cur.getSize() == 0)) { 942 // eof - jump to next datafile 943 // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for 944 // replay of existing journals 945 // possibly journal is larger than maxFileLength after config change 946 cur.setSize(EOF_RECORD.length); 947 cur.setOffset(Math.max(maxFileLength, dataFile.getLength())); 948 } else if (cur.getType() == USER_RECORD_TYPE) { 949 // Only return user records. 950 return cur; 951 } 952 } 953 } 954 955 public ByteSequence read(Location location) throws IOException, IllegalStateException { 956 DataFile dataFile = getDataFile(location); 957 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 958 ByteSequence rc = null; 959 try { 960 rc = reader.readRecord(location); 961 } finally { 962 accessorPool.closeDataFileAccessor(reader); 963 } 964 return rc; 965 } 966 967 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 968 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 969 return loc; 970 } 971 972 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 973 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 974 return loc; 975 } 976 977 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 978 DataFile dataFile = getDataFile(location); 979 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 980 try { 981 updater.updateRecord(location, data, sync); 982 } finally { 983 accessorPool.closeDataFileAccessor(updater); 984 } 985 } 986 987 public PreallocationStrategy getPreallocationStrategy() { 988 return preallocationStrategy; 989 } 990 991 public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) { 992 this.preallocationStrategy = preallocationStrategy; 993 } 994 995 public PreallocationScope getPreallocationScope() { 996 return preallocationScope; 997 } 998 999 public void setPreallocationScope(PreallocationScope preallocationScope) { 1000 this.preallocationScope = preallocationScope; 1001 } 1002 1003 public File getDirectory() { 1004 return directory; 1005 } 1006 1007 public void setDirectory(File directory) { 1008 this.directory = directory; 1009 } 1010 1011 public String getFilePrefix() { 1012 return filePrefix; 1013 } 1014 1015 public void setFilePrefix(String filePrefix) { 1016 this.filePrefix = filePrefix; 1017 } 1018 1019 public Map<WriteKey, WriteCommand> getInflightWrites() { 1020 return inflightWrites; 1021 } 1022 1023 public Location getLastAppendLocation() { 1024 return lastAppendLocation.get(); 1025 } 1026 1027 public void setLastAppendLocation(Location lastSyncedLocation) { 1028 this.lastAppendLocation.set(lastSyncedLocation); 1029 } 1030 1031 public File getDirectoryArchive() { 1032 if (!directoryArchiveOverridden && (directoryArchive == null)) { 1033 // create the directoryArchive relative to the journal location 1034 directoryArchive = new File(directory.getAbsolutePath() + 1035 File.separator + DEFAULT_ARCHIVE_DIRECTORY); 1036 } 1037 return directoryArchive; 1038 } 1039 1040 public void setDirectoryArchive(File directoryArchive) { 1041 directoryArchiveOverridden = true; 1042 this.directoryArchive = directoryArchive; 1043 } 1044 1045 public boolean isArchiveDataLogs() { 1046 return archiveDataLogs; 1047 } 1048 1049 public void setArchiveDataLogs(boolean archiveDataLogs) { 1050 this.archiveDataLogs = archiveDataLogs; 1051 } 1052 1053 public DataFile getDataFileById(int dataFileId) { 1054 synchronized (currentDataFile) { 1055 return fileMap.get(Integer.valueOf(dataFileId)); 1056 } 1057 } 1058 1059 public DataFile getCurrentDataFile(int capacity) throws IOException { 1060 //First just acquire the currentDataFile lock and return if no rotation needed 1061 synchronized (currentDataFile) { 1062 if (currentDataFile.get().getLength() + capacity < maxFileLength) { 1063 return currentDataFile.get(); 1064 } 1065 } 1066 1067 //AMQ-6545 - if rotation needed, acquire dataFileIdLock first to prevent deadlocks 1068 //then re-check if rotation is needed 1069 synchronized (dataFileIdLock) { 1070 synchronized (currentDataFile) { 1071 if (currentDataFile.get().getLength() + capacity >= maxFileLength) { 1072 rotateWriteFile(); 1073 } 1074 return currentDataFile.get(); 1075 } 1076 } 1077 } 1078 1079 public Integer getCurrentDataFileId() { 1080 synchronized (currentDataFile) { 1081 return currentDataFile.get().getDataFileId(); 1082 } 1083 } 1084 1085 /** 1086 * Get a set of files - only valid after start() 1087 * 1088 * @return files currently being used 1089 */ 1090 public Set<File> getFiles() { 1091 synchronized (currentDataFile) { 1092 return fileByFileMap.keySet(); 1093 } 1094 } 1095 1096 public Map<Integer, DataFile> getFileMap() { 1097 synchronized (currentDataFile) { 1098 return new TreeMap<Integer, DataFile>(fileMap); 1099 } 1100 } 1101 1102 public long getDiskSize() { 1103 return totalLength.get(); 1104 } 1105 1106 public void setReplicationTarget(ReplicationTarget replicationTarget) { 1107 this.replicationTarget = replicationTarget; 1108 } 1109 1110 public ReplicationTarget getReplicationTarget() { 1111 return replicationTarget; 1112 } 1113 1114 public String getFileSuffix() { 1115 return fileSuffix; 1116 } 1117 1118 public void setFileSuffix(String fileSuffix) { 1119 this.fileSuffix = fileSuffix; 1120 } 1121 1122 public boolean isChecksum() { 1123 return checksum; 1124 } 1125 1126 public void setChecksum(boolean checksumWrites) { 1127 this.checksum = checksumWrites; 1128 } 1129 1130 public boolean isCheckForCorruptionOnStartup() { 1131 return checkForCorruptionOnStartup; 1132 } 1133 1134 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 1135 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 1136 } 1137 1138 public void setWriteBatchSize(int writeBatchSize) { 1139 this.writeBatchSize = writeBatchSize; 1140 } 1141 1142 public int getWriteBatchSize() { 1143 return writeBatchSize; 1144 } 1145 1146 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { 1147 this.totalLength = storeSizeAccumulator; 1148 } 1149 1150 public void setEnableAsyncDiskSync(boolean val) { 1151 this.enableAsyncDiskSync = val; 1152 } 1153 1154 public boolean isEnableAsyncDiskSync() { 1155 return enableAsyncDiskSync; 1156 } 1157 1158 public JournalDiskSyncStrategy getJournalDiskSyncStrategy() { 1159 return journalDiskSyncStrategy; 1160 } 1161 1162 public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) { 1163 this.journalDiskSyncStrategy = journalDiskSyncStrategy; 1164 } 1165 1166 public boolean isJournalDiskSyncPeriodic() { 1167 return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy); 1168 } 1169 1170 public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) { 1171 this.dataFileRemovedListener = dataFileRemovedListener; 1172 } 1173 1174 public static class WriteCommand extends LinkedNode<WriteCommand> { 1175 public final Location location; 1176 public final ByteSequence data; 1177 final boolean sync; 1178 public final Runnable onComplete; 1179 1180 public WriteCommand(Location location, ByteSequence data, boolean sync) { 1181 this.location = location; 1182 this.data = data; 1183 this.sync = sync; 1184 this.onComplete = null; 1185 } 1186 1187 public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { 1188 this.location = location; 1189 this.data = data; 1190 this.onComplete = onComplete; 1191 this.sync = false; 1192 } 1193 } 1194 1195 public static class WriteKey { 1196 private final int file; 1197 private final long offset; 1198 private final int hash; 1199 1200 public WriteKey(Location item) { 1201 file = item.getDataFileId(); 1202 offset = item.getOffset(); 1203 // TODO: see if we can build a better hash 1204 hash = (int)(file ^ offset); 1205 } 1206 1207 @Override 1208 public int hashCode() { 1209 return hash; 1210 } 1211 1212 @Override 1213 public boolean equals(Object obj) { 1214 if (obj instanceof WriteKey) { 1215 WriteKey di = (WriteKey)obj; 1216 return di.file == file && di.offset == offset; 1217 } 1218 return false; 1219 } 1220 } 1221}