001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.EOFException; 020import java.io.File; 021import java.io.FileNotFoundException; 022import java.io.FilenameFilter; 023import java.io.IOException; 024import java.io.RandomAccessFile; 025import java.io.UnsupportedEncodingException; 026import java.nio.ByteBuffer; 027import java.nio.channels.ClosedByInterruptException; 028import java.nio.channels.FileChannel; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.LinkedHashMap; 033import java.util.LinkedList; 034import java.util.Map; 035import java.util.Set; 036import java.util.TreeMap; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.Executors; 039import java.util.concurrent.Future; 040import java.util.concurrent.ScheduledExecutorService; 041import java.util.concurrent.ScheduledFuture; 042import java.util.concurrent.ThreadFactory; 043import java.util.concurrent.TimeUnit; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.atomic.AtomicReference; 046import java.util.zip.Adler32; 047import java.util.zip.Checksum; 048 049import org.apache.activemq.store.kahadb.disk.util.LinkedNode; 050import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; 051import org.apache.activemq.store.kahadb.disk.util.Sequence; 052import org.apache.activemq.util.ByteSequence; 053import org.apache.activemq.util.DataByteArrayInputStream; 054import org.apache.activemq.util.DataByteArrayOutputStream; 055import org.apache.activemq.util.IOHelper; 056import org.apache.activemq.util.RecoverableRandomAccessFile; 057import org.apache.activemq.util.ThreadPoolUtils; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Manages DataFiles 063 */ 064public class Journal { 065 public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER"; 066 public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false")); 067 068 private static final int PREALLOC_CHUNK_SIZE = 1024*1024; 069 070 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 071 public static final int RECORD_HEAD_SPACE = 4 + 1; 072 073 public static final byte USER_RECORD_TYPE = 1; 074 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 075 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 076 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 077 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8; 078 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 079 public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader(); 080 public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt(); 081 public static final byte EOF_EOT = '4'; 082 public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord(); 083 084 private ScheduledExecutorService scheduler; 085 086 // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss 087 public void corruptRecoveryLocation(Location recoveryPosition) throws IOException { 088 DataFile dataFile = getDataFile(recoveryPosition); 089 // with corruption on recovery we have no faith in the content - slip to the next batch record or eof 090 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 091 try { 092 RandomAccessFile randomAccessFile = reader.getRaf().getRaf(); 093 randomAccessFile.seek(recoveryPosition.getOffset() + 1); 094 byte[] data = new byte[getWriteBatchSize()]; 095 ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data)); 096 int nextOffset = 0; 097 if (findNextBatchRecord(bs, randomAccessFile) >= 0) { 098 nextOffset = (int) randomAccessFile.getFilePointer() - bs.remaining(); 099 } else { 100 nextOffset =(int) randomAccessFile.length(); 101 } 102 Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset - 1); 103 LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence); 104 105 // skip corruption on getNextLocation 106 recoveryPosition.setOffset(nextOffset); 107 recoveryPosition.setSize(-1); 108 109 dataFile.corruptedBlocks.add(sequence); 110 } catch (IOException e) { 111 } finally { 112 accessorPool.closeDataFileAccessor(reader); 113 } 114 } 115 116 public DataFileAccessorPool getAccessorPool() { 117 return accessorPool; 118 } 119 120 public void allowIOResumption() { 121 if (appender instanceof DataFileAppender) { 122 DataFileAppender dataFileAppender = (DataFileAppender)appender; 123 dataFileAppender.shutdown = false; 124 } 125 } 126 127 public void setCleanupInterval(long cleanupInterval) { 128 this.cleanupInterval = cleanupInterval; 129 } 130 131 public long getCleanupInterval() { 132 return cleanupInterval; 133 } 134 135 public enum PreallocationStrategy { 136 SPARSE_FILE, 137 OS_KERNEL_COPY, 138 ZEROS, 139 CHUNKED_ZEROS; 140 } 141 142 public enum PreallocationScope { 143 ENTIRE_JOURNAL, 144 ENTIRE_JOURNAL_ASYNC, 145 NONE; 146 } 147 148 public enum JournalDiskSyncStrategy { 149 ALWAYS, 150 PERIODIC, 151 NEVER; 152 } 153 154 private static byte[] createBatchControlRecordHeader() { 155 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 156 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 157 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 158 os.write(BATCH_CONTROL_RECORD_MAGIC); 159 ByteSequence sequence = os.toByteSequence(); 160 sequence.compact(); 161 return sequence.getData(); 162 } catch (IOException e) { 163 throw new RuntimeException("Could not create batch control record header.", e); 164 } 165 } 166 167 private static byte[] createEmptyBatchControlRecordHeader() { 168 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 169 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 170 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 171 os.write(BATCH_CONTROL_RECORD_MAGIC); 172 os.writeInt(0); 173 os.writeLong(0l); 174 ByteSequence sequence = os.toByteSequence(); 175 sequence.compact(); 176 return sequence.getData(); 177 } catch (IOException e) { 178 throw new RuntimeException("Could not create empty batch control record header.", e); 179 } 180 } 181 182 private static byte[] createEofBatchAndLocationRecord() { 183 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 184 os.writeInt(EOF_INT); 185 os.writeByte(EOF_EOT); 186 ByteSequence sequence = os.toByteSequence(); 187 sequence.compact(); 188 return sequence.getData(); 189 } catch (IOException e) { 190 throw new RuntimeException("Could not create eof header.", e); 191 } 192 } 193 194 public static final String DEFAULT_DIRECTORY = "."; 195 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 196 public static final String DEFAULT_FILE_PREFIX = "db-"; 197 public static final String DEFAULT_FILE_SUFFIX = ".log"; 198 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 199 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 200 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 201 202 private static final Logger LOG = LoggerFactory.getLogger(Journal.class); 203 204 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 205 206 protected File directory = new File(DEFAULT_DIRECTORY); 207 protected File directoryArchive; 208 private boolean directoryArchiveOverridden = false; 209 210 protected String filePrefix = DEFAULT_FILE_PREFIX; 211 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 212 protected boolean started; 213 214 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 215 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 216 217 protected FileAppender appender; 218 protected DataFileAccessorPool accessorPool; 219 220 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 221 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 222 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 223 224 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 225 protected ScheduledFuture cleanupTask; 226 protected AtomicLong totalLength = new AtomicLong(); 227 protected boolean archiveDataLogs; 228 private ReplicationTarget replicationTarget; 229 protected boolean checksum; 230 protected boolean checkForCorruptionOnStartup; 231 protected boolean enableAsyncDiskSync = true; 232 private int nextDataFileId = 1; 233 private Object dataFileIdLock = new Object(); 234 private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null); 235 private volatile DataFile nextDataFile; 236 237 protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; 238 protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; 239 private File osKernelCopyTemplateFile = null; 240 private ByteBuffer preAllocateDirectBuffer = null; 241 private long cleanupInterval = DEFAULT_CLEANUP_INTERVAL; 242 243 protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 244 245 public interface DataFileRemovedListener { 246 void fileRemoved(DataFile datafile); 247 } 248 249 private DataFileRemovedListener dataFileRemovedListener; 250 251 public synchronized void start() throws IOException { 252 if (started) { 253 return; 254 } 255 256 long start = System.currentTimeMillis(); 257 accessorPool = new DataFileAccessorPool(this); 258 started = true; 259 260 appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this); 261 262 File[] files = directory.listFiles(new FilenameFilter() { 263 @Override 264 public boolean accept(File dir, String n) { 265 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 266 } 267 }); 268 269 if (files != null) { 270 for (File file : files) { 271 try { 272 String n = file.getName(); 273 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 274 int num = Integer.parseInt(numStr); 275 DataFile dataFile = new DataFile(file, num); 276 fileMap.put(dataFile.getDataFileId(), dataFile); 277 totalLength.addAndGet(dataFile.getLength()); 278 } catch (NumberFormatException e) { 279 // Ignore file that do not match the pattern. 280 } 281 } 282 283 // Sort the list so that we can link the DataFiles together in the 284 // right order. 285 LinkedList<DataFile> l = new LinkedList<>(fileMap.values()); 286 Collections.sort(l); 287 for (DataFile df : l) { 288 if (df.getLength() == 0) { 289 // possibly the result of a previous failed write 290 LOG.info("ignoring zero length, partially initialised journal data file: " + df); 291 continue; 292 } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) { 293 continue; 294 } 295 dataFiles.addLast(df); 296 fileByFileMap.put(df.getFile(), df); 297 298 if( isCheckForCorruptionOnStartup() ) { 299 lastAppendLocation.set(recoveryCheck(df)); 300 } 301 } 302 } 303 304 if (preallocationScope != PreallocationScope.NONE) { 305 switch (preallocationStrategy) { 306 case SPARSE_FILE: 307 break; 308 case OS_KERNEL_COPY: { 309 osKernelCopyTemplateFile = createJournalTemplateFile(); 310 } 311 break; 312 case CHUNKED_ZEROS: { 313 preAllocateDirectBuffer = allocateDirectBuffer(PREALLOC_CHUNK_SIZE); 314 } 315 break; 316 case ZEROS: { 317 preAllocateDirectBuffer = allocateDirectBuffer(getMaxFileLength()); 318 } 319 break; 320 } 321 } 322 scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { 323 @Override 324 public Thread newThread(Runnable r) { 325 Thread schedulerThread = new Thread(r); 326 schedulerThread.setName("ActiveMQ Journal Scheduled executor"); 327 schedulerThread.setDaemon(true); 328 return schedulerThread; 329 } 330 }); 331 332 // init current write file 333 if (dataFiles.isEmpty()) { 334 nextDataFileId = 1; 335 rotateWriteFile(); 336 } else { 337 currentDataFile.set(dataFiles.getTail()); 338 nextDataFileId = currentDataFile.get().dataFileId + 1; 339 } 340 341 if( lastAppendLocation.get()==null ) { 342 DataFile df = dataFiles.getTail(); 343 lastAppendLocation.set(recoveryCheck(df)); 344 } 345 346 // ensure we don't report unused space of last journal file in size metric 347 if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) { 348 totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength); 349 } 350 351 cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() { 352 @Override 353 public void run() { 354 cleanup(); 355 } 356 }, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS); 357 358 long end = System.currentTimeMillis(); 359 LOG.trace("Startup took: "+(end-start)+" ms"); 360 } 361 362 private ByteBuffer allocateDirectBuffer(int size) { 363 ByteBuffer buffer = ByteBuffer.allocateDirect(size); 364 buffer.put(EOF_RECORD); 365 return buffer; 366 } 367 368 public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { 369 370 if (PreallocationScope.NONE != preallocationScope) { 371 372 try { 373 if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { 374 doPreallocationKernelCopy(file); 375 } else if (PreallocationStrategy.ZEROS == preallocationStrategy) { 376 doPreallocationZeros(file); 377 } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) { 378 doPreallocationChunkedZeros(file); 379 } else { 380 doPreallocationSparseFile(file); 381 } 382 } catch (Throwable continueWithNoPrealloc) { 383 // error on preallocation is non fatal, and we don't want to leak the journal handle 384 LOG.error("cound not preallocate journal data file", continueWithNoPrealloc); 385 } 386 } 387 } 388 389 private void doPreallocationSparseFile(RecoverableRandomAccessFile file) { 390 final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD); 391 try { 392 FileChannel channel = file.getChannel(); 393 channel.position(0); 394 channel.write(journalEof); 395 channel.position(maxFileLength - 5); 396 journalEof.rewind(); 397 channel.write(journalEof); 398 channel.force(false); 399 channel.position(0); 400 } catch (ClosedByInterruptException ignored) { 401 LOG.trace("Could not preallocate journal file with sparse file", ignored); 402 } catch (IOException e) { 403 LOG.error("Could not preallocate journal file with sparse file", e); 404 } 405 } 406 407 private void doPreallocationZeros(RecoverableRandomAccessFile file) { 408 preAllocateDirectBuffer.rewind(); 409 try { 410 FileChannel channel = file.getChannel(); 411 channel.write(preAllocateDirectBuffer); 412 channel.force(false); 413 channel.position(0); 414 } catch (ClosedByInterruptException ignored) { 415 LOG.trace("Could not preallocate journal file with zeros", ignored); 416 } catch (IOException e) { 417 LOG.error("Could not preallocate journal file with zeros", e); 418 } 419 } 420 421 private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) { 422 try { 423 RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw"); 424 templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel()); 425 templateRaf.close(); 426 } catch (ClosedByInterruptException ignored) { 427 LOG.trace("Could not preallocate journal file with kernel copy", ignored); 428 } catch (FileNotFoundException e) { 429 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 430 } catch (IOException e) { 431 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 432 } 433 } 434 435 private File createJournalTemplateFile() { 436 String fileName = "db-log.template"; 437 File rc = new File(directory, fileName); 438 try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) { 439 templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD)); 440 templateRaf.setLength(maxFileLength); 441 templateRaf.getChannel().force(true); 442 } catch (FileNotFoundException e) { 443 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 444 } catch (IOException e) { 445 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 446 } 447 return rc; 448 } 449 450 private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) { 451 preAllocateDirectBuffer.limit(preAllocateDirectBuffer.capacity()); 452 preAllocateDirectBuffer.rewind(); 453 try { 454 FileChannel channel = file.getChannel(); 455 456 int remLen = maxFileLength; 457 while (remLen > 0) { 458 if (remLen < preAllocateDirectBuffer.remaining()) { 459 preAllocateDirectBuffer.limit(remLen); 460 } 461 int writeLen = channel.write(preAllocateDirectBuffer); 462 remLen -= writeLen; 463 preAllocateDirectBuffer.rewind(); 464 } 465 466 channel.force(false); 467 channel.position(0); 468 } catch (ClosedByInterruptException ignored) { 469 LOG.trace("Could not preallocate journal file with zeros", ignored); 470 } catch (IOException e) { 471 LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e); 472 } 473 } 474 475 private static byte[] bytes(String string) { 476 try { 477 return string.getBytes("UTF-8"); 478 } catch (UnsupportedEncodingException e) { 479 throw new RuntimeException(e); 480 } 481 } 482 483 public boolean isUnusedPreallocated(DataFile dataFile) throws IOException { 484 if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) { 485 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 486 try { 487 byte[] firstFewBytes = new byte[BATCH_CONTROL_RECORD_HEADER.length]; 488 reader.readFully(0, firstFewBytes); 489 ByteSequence bs = new ByteSequence(firstFewBytes); 490 return bs.startsWith(EOF_RECORD); 491 } catch (Exception ignored) { 492 } finally { 493 accessorPool.closeDataFileAccessor(reader); 494 } 495 } 496 return false; 497 } 498 499 protected Location recoveryCheck(DataFile dataFile) throws IOException { 500 Location location = new Location(); 501 location.setDataFileId(dataFile.getDataFileId()); 502 location.setOffset(0); 503 504 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 505 try { 506 RandomAccessFile randomAccessFile = reader.getRaf().getRaf(); 507 randomAccessFile.seek(0); 508 final long totalFileLength = randomAccessFile.length(); 509 byte[] data = new byte[getWriteBatchSize()]; 510 ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data)); 511 512 while (true) { 513 int size = checkBatchRecord(bs, randomAccessFile); 514 if (size > 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) { 515 location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size); 516 } else if (size == 0 && location.getOffset() + EOF_RECORD.length + size <= totalFileLength) { 517 // eof batch record 518 break; 519 } else { 520 // track corruption and skip if possible 521 Sequence sequence = new Sequence(location.getOffset()); 522 if (findNextBatchRecord(bs, randomAccessFile) >= 0) { 523 int nextOffset = (int) randomAccessFile.getFilePointer() - bs.remaining(); 524 sequence.setLast(nextOffset - 1); 525 dataFile.corruptedBlocks.add(sequence); 526 LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence); 527 location.setOffset(nextOffset); 528 } else { 529 // corruption to eof, don't loose track of this corruption, don't truncate 530 sequence.setLast(randomAccessFile.getFilePointer()); 531 dataFile.corruptedBlocks.add(sequence); 532 LOG.warn("Corrupt journal records found in '{}' from offset: {} to EOF", dataFile.getFile(), sequence); 533 break; 534 } 535 } 536 } 537 538 } catch (IOException e) { 539 LOG.trace("exception on recovery check of: " + dataFile + ", at " + location, e); 540 } finally { 541 accessorPool.closeDataFileAccessor(reader); 542 } 543 544 int existingLen = dataFile.getLength(); 545 dataFile.setLength(location.getOffset()); 546 if (existingLen > dataFile.getLength()) { 547 totalLength.addAndGet(dataFile.getLength() - existingLen); 548 } 549 return location; 550 } 551 552 private int findNextBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException { 553 final ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 554 int pos = 0; 555 while (true) { 556 pos = bs.indexOf(header, 0); 557 if (pos >= 0) { 558 bs.setOffset(bs.offset + pos); 559 return pos; 560 } else { 561 // need to load the next data chunck in.. 562 if (bs.length != bs.data.length) { 563 // If we had a short read then we were at EOF 564 return -1; 565 } 566 bs.setOffset(bs.length - BATCH_CONTROL_RECORD_HEADER.length); 567 bs.reset(); 568 bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - BATCH_CONTROL_RECORD_HEADER.length)); 569 } 570 } 571 } 572 573 private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException { 574 ensureAvailable(bs, reader, EOF_RECORD.length); 575 if (bs.startsWith(EOF_RECORD)) { 576 return 0; // eof 577 } 578 ensureAvailable(bs, reader, BATCH_CONTROL_RECORD_SIZE); 579 try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) { 580 581 // Assert that it's a batch record. 582 for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { 583 if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) { 584 return -1; 585 } 586 } 587 588 int size = controlIs.readInt(); 589 if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) { 590 return -2; 591 } 592 593 long expectedChecksum = controlIs.readLong(); 594 Checksum checksum = null; 595 if (isChecksum() && expectedChecksum > 0) { 596 checksum = new Adler32(); 597 } 598 599 // revert to bs to consume data 600 bs.setOffset(controlIs.position()); 601 int toRead = size; 602 while (toRead > 0) { 603 if (bs.remaining() >= toRead) { 604 if (checksum != null) { 605 checksum.update(bs.getData(), bs.getOffset(), toRead); 606 } 607 bs.setOffset(bs.offset + toRead); 608 toRead = 0; 609 } else { 610 if (bs.length != bs.data.length) { 611 // buffer exhausted 612 return -3; 613 } 614 615 toRead -= bs.remaining(); 616 if (checksum != null) { 617 checksum.update(bs.getData(), bs.getOffset(), bs.remaining()); 618 } 619 bs.setLength(reader.read(bs.data)); 620 bs.setOffset(0); 621 } 622 } 623 if (checksum != null && expectedChecksum != checksum.getValue()) { 624 return -4; 625 } 626 627 return size; 628 } 629 } 630 631 private void ensureAvailable(ByteSequence bs, RandomAccessFile reader, int required) throws IOException { 632 if (bs.remaining() < required) { 633 bs.reset(); 634 int read = reader.read(bs.data, bs.length, bs.data.length - bs.length); 635 if (read < 0) { 636 if (bs.remaining() == 0) { 637 throw new EOFException("request for " + required + " bytes reached EOF"); 638 } 639 } 640 bs.setLength(bs.length + read); 641 } 642 } 643 644 void addToTotalLength(int size) { 645 totalLength.addAndGet(size); 646 } 647 648 public long length() { 649 return totalLength.get(); 650 } 651 652 public void rotateWriteFile() throws IOException { 653 synchronized (dataFileIdLock) { 654 DataFile dataFile = nextDataFile; 655 if (dataFile == null) { 656 dataFile = newDataFile(); 657 } 658 synchronized (currentDataFile) { 659 fileMap.put(dataFile.getDataFileId(), dataFile); 660 fileByFileMap.put(dataFile.getFile(), dataFile); 661 dataFiles.addLast(dataFile); 662 currentDataFile.set(dataFile); 663 } 664 nextDataFile = null; 665 } 666 if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) { 667 preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask); 668 } 669 } 670 671 private Runnable preAllocateNextDataFileTask = new Runnable() { 672 @Override 673 public void run() { 674 if (nextDataFile == null) { 675 synchronized (dataFileIdLock){ 676 try { 677 nextDataFile = newDataFile(); 678 } catch (IOException e) { 679 LOG.warn("Failed to proactively allocate data file", e); 680 } 681 } 682 } 683 } 684 }; 685 686 private volatile Future preAllocateNextDataFileFuture; 687 688 private DataFile newDataFile() throws IOException { 689 int nextNum = nextDataFileId++; 690 File file = getFile(nextNum); 691 DataFile nextWriteFile = new DataFile(file, nextNum); 692 preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile()); 693 return nextWriteFile; 694 } 695 696 697 public DataFile reserveDataFile() { 698 synchronized (dataFileIdLock) { 699 int nextNum = nextDataFileId++; 700 File file = getFile(nextNum); 701 DataFile reservedDataFile = new DataFile(file, nextNum); 702 synchronized (currentDataFile) { 703 fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile); 704 fileByFileMap.put(file, reservedDataFile); 705 if (dataFiles.isEmpty()) { 706 dataFiles.addLast(reservedDataFile); 707 } else { 708 dataFiles.getTail().linkBefore(reservedDataFile); 709 } 710 } 711 return reservedDataFile; 712 } 713 } 714 715 public File getFile(int nextNum) { 716 String fileName = filePrefix + nextNum + fileSuffix; 717 File file = new File(directory, fileName); 718 return file; 719 } 720 721 DataFile getDataFile(Location item) throws IOException { 722 Integer key = Integer.valueOf(item.getDataFileId()); 723 DataFile dataFile = null; 724 synchronized (currentDataFile) { 725 dataFile = fileMap.get(key); 726 } 727 if (dataFile == null) { 728 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 729 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 730 } 731 return dataFile; 732 } 733 734 public void close() throws IOException { 735 synchronized (this) { 736 if (!started) { 737 return; 738 } 739 cleanupTask.cancel(true); 740 if (preAllocateNextDataFileFuture != null) { 741 preAllocateNextDataFileFuture.cancel(true); 742 } 743 ThreadPoolUtils.shutdownGraceful(scheduler, 4000); 744 accessorPool.close(); 745 } 746 // the appender can be calling back to to the journal blocking a close AMQ-5620 747 appender.close(); 748 synchronized (currentDataFile) { 749 fileMap.clear(); 750 fileByFileMap.clear(); 751 dataFiles.clear(); 752 lastAppendLocation.set(null); 753 started = false; 754 } 755 } 756 757 public synchronized void cleanup() { 758 if (accessorPool != null) { 759 accessorPool.disposeUnused(); 760 } 761 } 762 763 public synchronized boolean delete() throws IOException { 764 765 // Close all open file handles... 766 appender.close(); 767 accessorPool.close(); 768 769 boolean result = true; 770 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 771 DataFile dataFile = i.next(); 772 result &= dataFile.delete(); 773 } 774 775 if (preAllocateNextDataFileFuture != null) { 776 preAllocateNextDataFileFuture.cancel(true); 777 } 778 synchronized (dataFileIdLock) { 779 if (nextDataFile != null) { 780 nextDataFile.delete(); 781 nextDataFile = null; 782 } 783 } 784 785 totalLength.set(0); 786 synchronized (currentDataFile) { 787 fileMap.clear(); 788 fileByFileMap.clear(); 789 lastAppendLocation.set(null); 790 dataFiles = new LinkedNodeList<DataFile>(); 791 } 792 // reopen open file handles... 793 accessorPool = new DataFileAccessorPool(this); 794 appender = new DataFileAppender(this); 795 return result; 796 } 797 798 public void removeDataFiles(Set<Integer> files) throws IOException { 799 for (Integer key : files) { 800 // Can't remove the data file (or subsequent files) that is currently being written to. 801 if (key >= lastAppendLocation.get().getDataFileId()) { 802 continue; 803 } 804 DataFile dataFile = null; 805 synchronized (currentDataFile) { 806 dataFile = fileMap.remove(key); 807 if (dataFile != null) { 808 fileByFileMap.remove(dataFile.getFile()); 809 dataFile.unlink(); 810 } 811 } 812 if (dataFile != null) { 813 forceRemoveDataFile(dataFile); 814 } 815 } 816 } 817 818 private void forceRemoveDataFile(DataFile dataFile) throws IOException { 819 accessorPool.disposeDataFileAccessors(dataFile); 820 totalLength.addAndGet(-dataFile.getLength()); 821 if (archiveDataLogs) { 822 File directoryArchive = getDirectoryArchive(); 823 if (directoryArchive.exists()) { 824 LOG.debug("Archive directory exists: {}", directoryArchive); 825 } else { 826 if (directoryArchive.isAbsolute()) 827 if (LOG.isDebugEnabled()) { 828 LOG.debug("Archive directory [{}] does not exist - creating it now", 829 directoryArchive.getAbsolutePath()); 830 } 831 IOHelper.mkdirs(directoryArchive); 832 } 833 LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath()); 834 dataFile.move(directoryArchive); 835 LOG.debug("Successfully moved data file"); 836 } else { 837 LOG.debug("Deleting data file: {}", dataFile); 838 if (dataFile.delete()) { 839 LOG.debug("Discarded data file: {}", dataFile); 840 } else { 841 LOG.warn("Failed to discard data file : {}", dataFile.getFile()); 842 } 843 } 844 if (dataFileRemovedListener != null) { 845 dataFileRemovedListener.fileRemoved(dataFile); 846 } 847 } 848 849 /** 850 * @return the maxFileLength 851 */ 852 public int getMaxFileLength() { 853 return maxFileLength; 854 } 855 856 /** 857 * @param maxFileLength the maxFileLength to set 858 */ 859 public void setMaxFileLength(int maxFileLength) { 860 this.maxFileLength = maxFileLength; 861 } 862 863 @Override 864 public String toString() { 865 return directory.toString(); 866 } 867 868 public Location getNextLocation(Location location) throws IOException, IllegalStateException { 869 return getNextLocation(location, null); 870 } 871 872 public Location getNextLocation(Location location, Location limit) throws IOException, IllegalStateException { 873 Location cur = null; 874 while (true) { 875 if (cur == null) { 876 if (location == null) { 877 DataFile head = null; 878 synchronized (currentDataFile) { 879 head = dataFiles.getHead(); 880 } 881 if (head == null) { 882 return null; 883 } 884 cur = new Location(); 885 cur.setDataFileId(head.getDataFileId()); 886 cur.setOffset(0); 887 } else { 888 // Set to the next offset.. 889 if (location.getSize() == -1) { 890 cur = new Location(location); 891 } else { 892 cur = new Location(location); 893 cur.setOffset(location.getOffset() + location.getSize()); 894 } 895 } 896 } else { 897 cur.setOffset(cur.getOffset() + cur.getSize()); 898 } 899 900 DataFile dataFile = getDataFile(cur); 901 902 // Did it go into the next file?? 903 if (dataFile.getLength() <= cur.getOffset()) { 904 synchronized (currentDataFile) { 905 dataFile = dataFile.getNext(); 906 } 907 if (dataFile == null) { 908 return null; 909 } else { 910 cur.setDataFileId(dataFile.getDataFileId().intValue()); 911 cur.setOffset(0); 912 if (limit != null && cur.compareTo(limit) >= 0) { 913 LOG.trace("reached limit: {} at: {}", limit, cur); 914 return null; 915 } 916 } 917 } 918 919 // Load in location size and type. 920 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 921 try { 922 reader.readLocationDetails(cur); 923 } catch (EOFException eof) { 924 LOG.trace("EOF on next: " + location + ", cur: " + cur); 925 throw eof; 926 } finally { 927 accessorPool.closeDataFileAccessor(reader); 928 } 929 930 Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset()); 931 if (corruptedRange != null) { 932 // skip corruption 933 cur.setSize((int) corruptedRange.range()); 934 } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT || 935 (cur.getType() == 0 && cur.getSize() == 0)) { 936 // eof - jump to next datafile 937 // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for 938 // replay of existing journals 939 // possibly journal is larger than maxFileLength after config change 940 cur.setSize(EOF_RECORD.length); 941 cur.setOffset(Math.max(maxFileLength, dataFile.getLength())); 942 } else if (cur.getType() == USER_RECORD_TYPE) { 943 // Only return user records. 944 return cur; 945 } 946 } 947 } 948 949 public ByteSequence read(Location location) throws IOException, IllegalStateException { 950 DataFile dataFile = getDataFile(location); 951 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 952 ByteSequence rc = null; 953 try { 954 rc = reader.readRecord(location); 955 } finally { 956 accessorPool.closeDataFileAccessor(reader); 957 } 958 return rc; 959 } 960 961 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 962 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 963 return loc; 964 } 965 966 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 967 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 968 return loc; 969 } 970 971 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 972 DataFile dataFile = getDataFile(location); 973 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 974 try { 975 updater.updateRecord(location, data, sync); 976 } finally { 977 accessorPool.closeDataFileAccessor(updater); 978 } 979 } 980 981 public PreallocationStrategy getPreallocationStrategy() { 982 return preallocationStrategy; 983 } 984 985 public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) { 986 this.preallocationStrategy = preallocationStrategy; 987 } 988 989 public PreallocationScope getPreallocationScope() { 990 return preallocationScope; 991 } 992 993 public void setPreallocationScope(PreallocationScope preallocationScope) { 994 this.preallocationScope = preallocationScope; 995 } 996 997 public File getDirectory() { 998 return directory; 999 } 1000 1001 public void setDirectory(File directory) { 1002 this.directory = directory; 1003 } 1004 1005 public String getFilePrefix() { 1006 return filePrefix; 1007 } 1008 1009 public void setFilePrefix(String filePrefix) { 1010 this.filePrefix = filePrefix; 1011 } 1012 1013 public Map<WriteKey, WriteCommand> getInflightWrites() { 1014 return inflightWrites; 1015 } 1016 1017 public Location getLastAppendLocation() { 1018 return lastAppendLocation.get(); 1019 } 1020 1021 public void setLastAppendLocation(Location lastSyncedLocation) { 1022 this.lastAppendLocation.set(lastSyncedLocation); 1023 } 1024 1025 public File getDirectoryArchive() { 1026 if (!directoryArchiveOverridden && (directoryArchive == null)) { 1027 // create the directoryArchive relative to the journal location 1028 directoryArchive = new File(directory.getAbsolutePath() + 1029 File.separator + DEFAULT_ARCHIVE_DIRECTORY); 1030 } 1031 return directoryArchive; 1032 } 1033 1034 public void setDirectoryArchive(File directoryArchive) { 1035 directoryArchiveOverridden = true; 1036 this.directoryArchive = directoryArchive; 1037 } 1038 1039 public boolean isArchiveDataLogs() { 1040 return archiveDataLogs; 1041 } 1042 1043 public void setArchiveDataLogs(boolean archiveDataLogs) { 1044 this.archiveDataLogs = archiveDataLogs; 1045 } 1046 1047 public DataFile getDataFileById(int dataFileId) { 1048 synchronized (currentDataFile) { 1049 return fileMap.get(Integer.valueOf(dataFileId)); 1050 } 1051 } 1052 1053 public DataFile getCurrentDataFile(int capacity) throws IOException { 1054 //First just acquire the currentDataFile lock and return if no rotation needed 1055 synchronized (currentDataFile) { 1056 if (currentDataFile.get().getLength() + capacity < maxFileLength) { 1057 return currentDataFile.get(); 1058 } 1059 } 1060 1061 //AMQ-6545 - if rotation needed, acquire dataFileIdLock first to prevent deadlocks 1062 //then re-check if rotation is needed 1063 synchronized (dataFileIdLock) { 1064 synchronized (currentDataFile) { 1065 if (currentDataFile.get().getLength() + capacity >= maxFileLength) { 1066 rotateWriteFile(); 1067 } 1068 return currentDataFile.get(); 1069 } 1070 } 1071 } 1072 1073 public Integer getCurrentDataFileId() { 1074 synchronized (currentDataFile) { 1075 return currentDataFile.get().getDataFileId(); 1076 } 1077 } 1078 1079 /** 1080 * Get a set of files - only valid after start() 1081 * 1082 * @return files currently being used 1083 */ 1084 public Set<File> getFiles() { 1085 synchronized (currentDataFile) { 1086 return fileByFileMap.keySet(); 1087 } 1088 } 1089 1090 public Map<Integer, DataFile> getFileMap() { 1091 synchronized (currentDataFile) { 1092 return new TreeMap<Integer, DataFile>(fileMap); 1093 } 1094 } 1095 1096 public long getDiskSize() { 1097 return totalLength.get(); 1098 } 1099 1100 public void setReplicationTarget(ReplicationTarget replicationTarget) { 1101 this.replicationTarget = replicationTarget; 1102 } 1103 1104 public ReplicationTarget getReplicationTarget() { 1105 return replicationTarget; 1106 } 1107 1108 public String getFileSuffix() { 1109 return fileSuffix; 1110 } 1111 1112 public void setFileSuffix(String fileSuffix) { 1113 this.fileSuffix = fileSuffix; 1114 } 1115 1116 public boolean isChecksum() { 1117 return checksum; 1118 } 1119 1120 public void setChecksum(boolean checksumWrites) { 1121 this.checksum = checksumWrites; 1122 } 1123 1124 public boolean isCheckForCorruptionOnStartup() { 1125 return checkForCorruptionOnStartup; 1126 } 1127 1128 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 1129 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 1130 } 1131 1132 public void setWriteBatchSize(int writeBatchSize) { 1133 this.writeBatchSize = writeBatchSize; 1134 } 1135 1136 public int getWriteBatchSize() { 1137 return writeBatchSize; 1138 } 1139 1140 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { 1141 this.totalLength = storeSizeAccumulator; 1142 } 1143 1144 public void setEnableAsyncDiskSync(boolean val) { 1145 this.enableAsyncDiskSync = val; 1146 } 1147 1148 public boolean isEnableAsyncDiskSync() { 1149 return enableAsyncDiskSync; 1150 } 1151 1152 public JournalDiskSyncStrategy getJournalDiskSyncStrategy() { 1153 return journalDiskSyncStrategy; 1154 } 1155 1156 public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) { 1157 this.journalDiskSyncStrategy = journalDiskSyncStrategy; 1158 } 1159 1160 public boolean isJournalDiskSyncPeriodic() { 1161 return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy); 1162 } 1163 1164 public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) { 1165 this.dataFileRemovedListener = dataFileRemovedListener; 1166 } 1167 1168 public static class WriteCommand extends LinkedNode<WriteCommand> { 1169 public final Location location; 1170 public final ByteSequence data; 1171 final boolean sync; 1172 public final Runnable onComplete; 1173 1174 public WriteCommand(Location location, ByteSequence data, boolean sync) { 1175 this.location = location; 1176 this.data = data; 1177 this.sync = sync; 1178 this.onComplete = null; 1179 } 1180 1181 public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { 1182 this.location = location; 1183 this.data = data; 1184 this.onComplete = onComplete; 1185 this.sync = false; 1186 } 1187 } 1188 1189 public static class WriteKey { 1190 private final int file; 1191 private final long offset; 1192 private final int hash; 1193 1194 public WriteKey(Location item) { 1195 file = item.getDataFileId(); 1196 offset = item.getOffset(); 1197 // TODO: see if we can build a better hash 1198 hash = (int)(file ^ offset); 1199 } 1200 1201 @Override 1202 public int hashCode() { 1203 return hash; 1204 } 1205 1206 @Override 1207 public boolean equals(Object obj) { 1208 if (obj instanceof WriteKey) { 1209 WriteKey di = (WriteKey)obj; 1210 return di.file == file && di.offset == offset; 1211 } 1212 return false; 1213 } 1214 } 1215}