001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.EOFException; 020import java.io.File; 021import java.io.FileNotFoundException; 022import java.io.FilenameFilter; 023import java.io.IOException; 024import java.io.RandomAccessFile; 025import java.io.UnsupportedEncodingException; 026import java.nio.ByteBuffer; 027import java.nio.channels.ClosedByInterruptException; 028import java.nio.channels.FileChannel; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.LinkedHashMap; 033import java.util.LinkedList; 034import java.util.Map; 035import java.util.Set; 036import java.util.TreeMap; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.Executors; 039import java.util.concurrent.Future; 040import java.util.concurrent.ScheduledExecutorService; 041import java.util.concurrent.ScheduledFuture; 042import java.util.concurrent.ThreadFactory; 043import java.util.concurrent.TimeUnit; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.atomic.AtomicReference; 046import java.util.zip.Adler32; 047import java.util.zip.Checksum; 048 049import org.apache.activemq.store.kahadb.disk.util.LinkedNode; 050import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; 051import org.apache.activemq.store.kahadb.disk.util.Sequence; 052import org.apache.activemq.util.ByteSequence; 053import org.apache.activemq.util.DataByteArrayInputStream; 054import org.apache.activemq.util.DataByteArrayOutputStream; 055import org.apache.activemq.util.IOHelper; 056import org.apache.activemq.util.RecoverableRandomAccessFile; 057import org.apache.activemq.util.ThreadPoolUtils; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Manages DataFiles 063 */ 064public class Journal { 065 public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER"; 066 public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false")); 067 068 private static final int PREALLOC_CHUNK_SIZE = 1024*1024; 069 070 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 071 public static final int RECORD_HEAD_SPACE = 4 + 1; 072 073 public static final byte USER_RECORD_TYPE = 1; 074 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 075 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 076 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 077 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8; 078 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 079 public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader(); 080 public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt(); 081 public static final byte EOF_EOT = '4'; 082 public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord(); 083 084 private ScheduledExecutorService scheduler; 085 086 // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss 087 public void corruptRecoveryLocation(Location recoveryPosition) throws IOException { 088 DataFile dataFile = getDataFile(recoveryPosition); 089 // with corruption on recovery we have no faith in the content - slip to the next batch record or eof 090 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 091 try { 092 RandomAccessFile randomAccessFile = reader.getRaf().getRaf(); 093 randomAccessFile.seek(recoveryPosition.getOffset() + 1); 094 byte[] data = new byte[getWriteBatchSize()]; 095 ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data)); 096 int nextOffset = 0; 097 if (findNextBatchRecord(bs, randomAccessFile) >= 0) { 098 nextOffset = (int) randomAccessFile.getFilePointer() - bs.remaining(); 099 } else { 100 nextOffset =(int) randomAccessFile.length(); 101 } 102 Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset - 1); 103 LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence); 104 105 // skip corruption on getNextLocation 106 recoveryPosition.setOffset(nextOffset); 107 recoveryPosition.setSize(-1); 108 109 dataFile.corruptedBlocks.add(sequence); 110 } catch (IOException e) { 111 } finally { 112 accessorPool.closeDataFileAccessor(reader); 113 } 114 } 115 116 public DataFileAccessorPool getAccessorPool() { 117 return accessorPool; 118 } 119 120 public void allowIOResumption() { 121 if (appender instanceof DataFileAppender) { 122 DataFileAppender dataFileAppender = (DataFileAppender)appender; 123 dataFileAppender.shutdown = false; 124 } 125 } 126 127 public enum PreallocationStrategy { 128 SPARSE_FILE, 129 OS_KERNEL_COPY, 130 ZEROS, 131 CHUNKED_ZEROS; 132 } 133 134 public enum PreallocationScope { 135 ENTIRE_JOURNAL, 136 ENTIRE_JOURNAL_ASYNC, 137 NONE; 138 } 139 140 public enum JournalDiskSyncStrategy { 141 ALWAYS, 142 PERIODIC, 143 NEVER; 144 } 145 146 private static byte[] createBatchControlRecordHeader() { 147 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 148 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 149 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 150 os.write(BATCH_CONTROL_RECORD_MAGIC); 151 ByteSequence sequence = os.toByteSequence(); 152 sequence.compact(); 153 return sequence.getData(); 154 } catch (IOException e) { 155 throw new RuntimeException("Could not create batch control record header.", e); 156 } 157 } 158 159 private static byte[] createEmptyBatchControlRecordHeader() { 160 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 161 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 162 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 163 os.write(BATCH_CONTROL_RECORD_MAGIC); 164 os.writeInt(0); 165 os.writeLong(0l); 166 ByteSequence sequence = os.toByteSequence(); 167 sequence.compact(); 168 return sequence.getData(); 169 } catch (IOException e) { 170 throw new RuntimeException("Could not create empty batch control record header.", e); 171 } 172 } 173 174 private static byte[] createEofBatchAndLocationRecord() { 175 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 176 os.writeInt(EOF_INT); 177 os.writeByte(EOF_EOT); 178 ByteSequence sequence = os.toByteSequence(); 179 sequence.compact(); 180 return sequence.getData(); 181 } catch (IOException e) { 182 throw new RuntimeException("Could not create eof header.", e); 183 } 184 } 185 186 public static final String DEFAULT_DIRECTORY = "."; 187 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 188 public static final String DEFAULT_FILE_PREFIX = "db-"; 189 public static final String DEFAULT_FILE_SUFFIX = ".log"; 190 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 191 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 192 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 193 194 private static final Logger LOG = LoggerFactory.getLogger(Journal.class); 195 196 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 197 198 protected File directory = new File(DEFAULT_DIRECTORY); 199 protected File directoryArchive; 200 private boolean directoryArchiveOverridden = false; 201 202 protected String filePrefix = DEFAULT_FILE_PREFIX; 203 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 204 protected boolean started; 205 206 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 207 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 208 209 protected FileAppender appender; 210 protected DataFileAccessorPool accessorPool; 211 212 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 213 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 214 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 215 216 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 217 protected ScheduledFuture cleanupTask; 218 protected AtomicLong totalLength = new AtomicLong(); 219 protected boolean archiveDataLogs; 220 private ReplicationTarget replicationTarget; 221 protected boolean checksum; 222 protected boolean checkForCorruptionOnStartup; 223 protected boolean enableAsyncDiskSync = true; 224 private int nextDataFileId = 1; 225 private Object dataFileIdLock = new Object(); 226 private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null); 227 private volatile DataFile nextDataFile; 228 229 protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; 230 protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; 231 private File osKernelCopyTemplateFile = null; 232 private ByteBuffer preAllocateDirectBuffer = null; 233 234 protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 235 236 public interface DataFileRemovedListener { 237 void fileRemoved(DataFile datafile); 238 } 239 240 private DataFileRemovedListener dataFileRemovedListener; 241 242 public synchronized void start() throws IOException { 243 if (started) { 244 return; 245 } 246 247 long start = System.currentTimeMillis(); 248 accessorPool = new DataFileAccessorPool(this); 249 started = true; 250 251 appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this); 252 253 File[] files = directory.listFiles(new FilenameFilter() { 254 @Override 255 public boolean accept(File dir, String n) { 256 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 257 } 258 }); 259 260 if (files != null) { 261 for (File file : files) { 262 try { 263 String n = file.getName(); 264 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 265 int num = Integer.parseInt(numStr); 266 DataFile dataFile = new DataFile(file, num); 267 fileMap.put(dataFile.getDataFileId(), dataFile); 268 totalLength.addAndGet(dataFile.getLength()); 269 } catch (NumberFormatException e) { 270 // Ignore file that do not match the pattern. 271 } 272 } 273 274 // Sort the list so that we can link the DataFiles together in the 275 // right order. 276 LinkedList<DataFile> l = new LinkedList<>(fileMap.values()); 277 Collections.sort(l); 278 for (DataFile df : l) { 279 if (df.getLength() == 0) { 280 // possibly the result of a previous failed write 281 LOG.info("ignoring zero length, partially initialised journal data file: " + df); 282 continue; 283 } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) { 284 continue; 285 } 286 dataFiles.addLast(df); 287 fileByFileMap.put(df.getFile(), df); 288 289 if( isCheckForCorruptionOnStartup() ) { 290 lastAppendLocation.set(recoveryCheck(df)); 291 } 292 } 293 } 294 295 if (preallocationScope != PreallocationScope.NONE) { 296 switch (preallocationStrategy) { 297 case SPARSE_FILE: 298 break; 299 case OS_KERNEL_COPY: { 300 osKernelCopyTemplateFile = createJournalTemplateFile(); 301 } 302 break; 303 case CHUNKED_ZEROS: { 304 preAllocateDirectBuffer = allocateDirectBuffer(PREALLOC_CHUNK_SIZE); 305 } 306 break; 307 case ZEROS: { 308 preAllocateDirectBuffer = allocateDirectBuffer(getMaxFileLength()); 309 } 310 break; 311 } 312 } 313 scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { 314 @Override 315 public Thread newThread(Runnable r) { 316 Thread schedulerThread = new Thread(r); 317 schedulerThread.setName("ActiveMQ Journal Scheduled executor"); 318 schedulerThread.setDaemon(true); 319 return schedulerThread; 320 } 321 }); 322 323 // init current write file 324 if (dataFiles.isEmpty()) { 325 nextDataFileId = 1; 326 rotateWriteFile(); 327 } else { 328 currentDataFile.set(dataFiles.getTail()); 329 nextDataFileId = currentDataFile.get().dataFileId + 1; 330 } 331 332 if( lastAppendLocation.get()==null ) { 333 DataFile df = dataFiles.getTail(); 334 lastAppendLocation.set(recoveryCheck(df)); 335 } 336 337 // ensure we don't report unused space of last journal file in size metric 338 if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) { 339 totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength); 340 } 341 342 cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() { 343 @Override 344 public void run() { 345 cleanup(); 346 } 347 }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS); 348 349 long end = System.currentTimeMillis(); 350 LOG.trace("Startup took: "+(end-start)+" ms"); 351 } 352 353 private ByteBuffer allocateDirectBuffer(int size) { 354 ByteBuffer buffer = ByteBuffer.allocateDirect(size); 355 buffer.put(EOF_RECORD); 356 return buffer; 357 } 358 359 public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { 360 361 if (PreallocationScope.NONE != preallocationScope) { 362 363 try { 364 if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { 365 doPreallocationKernelCopy(file); 366 } else if (PreallocationStrategy.ZEROS == preallocationStrategy) { 367 doPreallocationZeros(file); 368 } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) { 369 doPreallocationChunkedZeros(file); 370 } else { 371 doPreallocationSparseFile(file); 372 } 373 } catch (Throwable continueWithNoPrealloc) { 374 // error on preallocation is non fatal, and we don't want to leak the journal handle 375 LOG.error("cound not preallocate journal data file", continueWithNoPrealloc); 376 } 377 } 378 } 379 380 private void doPreallocationSparseFile(RecoverableRandomAccessFile file) { 381 final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD); 382 try { 383 FileChannel channel = file.getChannel(); 384 channel.position(0); 385 channel.write(journalEof); 386 channel.position(maxFileLength - 5); 387 journalEof.rewind(); 388 channel.write(journalEof); 389 channel.force(false); 390 channel.position(0); 391 } catch (ClosedByInterruptException ignored) { 392 LOG.trace("Could not preallocate journal file with sparse file", ignored); 393 } catch (IOException e) { 394 LOG.error("Could not preallocate journal file with sparse file", e); 395 } 396 } 397 398 private void doPreallocationZeros(RecoverableRandomAccessFile file) { 399 preAllocateDirectBuffer.rewind(); 400 try { 401 FileChannel channel = file.getChannel(); 402 channel.write(preAllocateDirectBuffer); 403 channel.force(false); 404 channel.position(0); 405 } catch (ClosedByInterruptException ignored) { 406 LOG.trace("Could not preallocate journal file with zeros", ignored); 407 } catch (IOException e) { 408 LOG.error("Could not preallocate journal file with zeros", e); 409 } 410 } 411 412 private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) { 413 try { 414 RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw"); 415 templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel()); 416 templateRaf.close(); 417 } catch (ClosedByInterruptException ignored) { 418 LOG.trace("Could not preallocate journal file with kernel copy", ignored); 419 } catch (FileNotFoundException e) { 420 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 421 } catch (IOException e) { 422 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 423 } 424 } 425 426 private File createJournalTemplateFile() { 427 String fileName = "db-log.template"; 428 File rc = new File(directory, fileName); 429 try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) { 430 templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD)); 431 templateRaf.setLength(maxFileLength); 432 templateRaf.getChannel().force(true); 433 } catch (FileNotFoundException e) { 434 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 435 } catch (IOException e) { 436 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 437 } 438 return rc; 439 } 440 441 private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) { 442 preAllocateDirectBuffer.limit(preAllocateDirectBuffer.capacity()); 443 preAllocateDirectBuffer.rewind(); 444 try { 445 FileChannel channel = file.getChannel(); 446 447 int remLen = maxFileLength; 448 while (remLen > 0) { 449 if (remLen < preAllocateDirectBuffer.remaining()) { 450 preAllocateDirectBuffer.limit(remLen); 451 } 452 int writeLen = channel.write(preAllocateDirectBuffer); 453 remLen -= writeLen; 454 preAllocateDirectBuffer.rewind(); 455 } 456 457 channel.force(false); 458 channel.position(0); 459 } catch (ClosedByInterruptException ignored) { 460 LOG.trace("Could not preallocate journal file with zeros", ignored); 461 } catch (IOException e) { 462 LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e); 463 } 464 } 465 466 private static byte[] bytes(String string) { 467 try { 468 return string.getBytes("UTF-8"); 469 } catch (UnsupportedEncodingException e) { 470 throw new RuntimeException(e); 471 } 472 } 473 474 public boolean isUnusedPreallocated(DataFile dataFile) throws IOException { 475 if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) { 476 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 477 try { 478 byte[] firstFewBytes = new byte[BATCH_CONTROL_RECORD_HEADER.length]; 479 reader.readFully(0, firstFewBytes); 480 ByteSequence bs = new ByteSequence(firstFewBytes); 481 return bs.startsWith(EOF_RECORD); 482 } catch (Exception ignored) { 483 } finally { 484 accessorPool.closeDataFileAccessor(reader); 485 } 486 } 487 return false; 488 } 489 490 protected Location recoveryCheck(DataFile dataFile) throws IOException { 491 Location location = new Location(); 492 location.setDataFileId(dataFile.getDataFileId()); 493 location.setOffset(0); 494 495 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 496 try { 497 RandomAccessFile randomAccessFile = reader.getRaf().getRaf(); 498 randomAccessFile.seek(0); 499 final long totalFileLength = randomAccessFile.length(); 500 byte[] data = new byte[getWriteBatchSize()]; 501 ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data)); 502 503 while (true) { 504 int size = checkBatchRecord(bs, randomAccessFile); 505 if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) { 506 if (size == 0) { 507 // eof batch record 508 break; 509 } 510 location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size); 511 } else { 512 513 // Perhaps it's just some corruption... scan through the 514 // file to find the next valid batch record. We 515 // may have subsequent valid batch records. 516 if (findNextBatchRecord(bs, randomAccessFile) >= 0) { 517 int nextOffset = (int) randomAccessFile.getFilePointer() - bs.remaining(); 518 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); 519 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 520 dataFile.corruptedBlocks.add(sequence); 521 location.setOffset(nextOffset); 522 } else { 523 break; 524 } 525 } 526 } 527 528 } catch (IOException e) { 529 } finally { 530 accessorPool.closeDataFileAccessor(reader); 531 } 532 533 int existingLen = dataFile.getLength(); 534 dataFile.setLength(location.getOffset()); 535 if (existingLen > dataFile.getLength()) { 536 totalLength.addAndGet(dataFile.getLength() - existingLen); 537 } 538 539 if (!dataFile.corruptedBlocks.isEmpty()) { 540 // Is the end of the data file corrupted? 541 if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) { 542 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); 543 } 544 } 545 546 return location; 547 } 548 549 private int findNextBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException { 550 final ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 551 int pos = 0; 552 while (true) { 553 pos = bs.indexOf(header, 0); 554 if (pos >= 0) { 555 bs.setOffset(bs.offset + pos); 556 return pos; 557 } else { 558 // need to load the next data chunck in.. 559 if (bs.length != bs.data.length) { 560 // If we had a short read then we were at EOF 561 return -1; 562 } 563 bs.setOffset(bs.length - BATCH_CONTROL_RECORD_HEADER.length); 564 bs.reset(); 565 bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - BATCH_CONTROL_RECORD_HEADER.length)); 566 } 567 } 568 } 569 570 private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException { 571 ensureAvailable(bs, reader, EOF_RECORD.length); 572 if (bs.startsWith(EOF_RECORD)) { 573 return 0; // eof 574 } 575 ensureAvailable(bs, reader, BATCH_CONTROL_RECORD_SIZE); 576 try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) { 577 578 // Assert that it's a batch record. 579 for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { 580 if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) { 581 return -1; 582 } 583 } 584 585 int size = controlIs.readInt(); 586 if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) { 587 return -2; 588 } 589 590 long expectedChecksum = controlIs.readLong(); 591 Checksum checksum = null; 592 if (isChecksum() && expectedChecksum > 0) { 593 checksum = new Adler32(); 594 } 595 596 // revert to bs to consume data 597 bs.setOffset(controlIs.position()); 598 int toRead = size; 599 while (toRead > 0) { 600 if (bs.remaining() >= toRead) { 601 if (checksum != null) { 602 checksum.update(bs.getData(), bs.getOffset(), toRead); 603 } 604 bs.setOffset(bs.offset + toRead); 605 toRead = 0; 606 } else { 607 if (bs.length != bs.data.length) { 608 // buffer exhausted 609 return -3; 610 } 611 612 toRead -= bs.remaining(); 613 if (checksum != null) { 614 checksum.update(bs.getData(), bs.getOffset(), bs.remaining()); 615 } 616 bs.setLength(reader.read(bs.data)); 617 bs.setOffset(0); 618 } 619 } 620 if (checksum != null && expectedChecksum != checksum.getValue()) { 621 return -4; 622 } 623 624 return size; 625 } 626 } 627 628 private void ensureAvailable(ByteSequence bs, RandomAccessFile reader, int required) throws IOException { 629 if (bs.remaining() < required) { 630 bs.reset(); 631 int read = reader.read(bs.data, bs.length, bs.data.length - bs.length); 632 if (read < 0) { 633 if (bs.remaining() == 0) { 634 throw new EOFException("request for " + required + " bytes reached EOF"); 635 } 636 } 637 bs.setLength(bs.length + read); 638 } 639 } 640 641 void addToTotalLength(int size) { 642 totalLength.addAndGet(size); 643 } 644 645 public long length() { 646 return totalLength.get(); 647 } 648 649 private void rotateWriteFile() throws IOException { 650 synchronized (dataFileIdLock) { 651 DataFile dataFile = nextDataFile; 652 if (dataFile == null) { 653 dataFile = newDataFile(); 654 } 655 synchronized (currentDataFile) { 656 fileMap.put(dataFile.getDataFileId(), dataFile); 657 fileByFileMap.put(dataFile.getFile(), dataFile); 658 dataFiles.addLast(dataFile); 659 currentDataFile.set(dataFile); 660 } 661 nextDataFile = null; 662 } 663 if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) { 664 preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask); 665 } 666 } 667 668 private Runnable preAllocateNextDataFileTask = new Runnable() { 669 @Override 670 public void run() { 671 if (nextDataFile == null) { 672 synchronized (dataFileIdLock){ 673 try { 674 nextDataFile = newDataFile(); 675 } catch (IOException e) { 676 LOG.warn("Failed to proactively allocate data file", e); 677 } 678 } 679 } 680 } 681 }; 682 683 private volatile Future preAllocateNextDataFileFuture; 684 685 private DataFile newDataFile() throws IOException { 686 int nextNum = nextDataFileId++; 687 File file = getFile(nextNum); 688 DataFile nextWriteFile = new DataFile(file, nextNum); 689 preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile()); 690 return nextWriteFile; 691 } 692 693 694 public DataFile reserveDataFile() { 695 synchronized (dataFileIdLock) { 696 int nextNum = nextDataFileId++; 697 File file = getFile(nextNum); 698 DataFile reservedDataFile = new DataFile(file, nextNum); 699 synchronized (currentDataFile) { 700 fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile); 701 fileByFileMap.put(file, reservedDataFile); 702 if (dataFiles.isEmpty()) { 703 dataFiles.addLast(reservedDataFile); 704 } else { 705 dataFiles.getTail().linkBefore(reservedDataFile); 706 } 707 } 708 return reservedDataFile; 709 } 710 } 711 712 public File getFile(int nextNum) { 713 String fileName = filePrefix + nextNum + fileSuffix; 714 File file = new File(directory, fileName); 715 return file; 716 } 717 718 DataFile getDataFile(Location item) throws IOException { 719 Integer key = Integer.valueOf(item.getDataFileId()); 720 DataFile dataFile = null; 721 synchronized (currentDataFile) { 722 dataFile = fileMap.get(key); 723 } 724 if (dataFile == null) { 725 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 726 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 727 } 728 return dataFile; 729 } 730 731 public void close() throws IOException { 732 synchronized (this) { 733 if (!started) { 734 return; 735 } 736 cleanupTask.cancel(true); 737 if (preAllocateNextDataFileFuture != null) { 738 preAllocateNextDataFileFuture.cancel(true); 739 } 740 ThreadPoolUtils.shutdownGraceful(scheduler, 4000); 741 accessorPool.close(); 742 } 743 // the appender can be calling back to to the journal blocking a close AMQ-5620 744 appender.close(); 745 synchronized (currentDataFile) { 746 fileMap.clear(); 747 fileByFileMap.clear(); 748 dataFiles.clear(); 749 lastAppendLocation.set(null); 750 started = false; 751 } 752 } 753 754 public synchronized void cleanup() { 755 if (accessorPool != null) { 756 accessorPool.disposeUnused(); 757 } 758 } 759 760 public synchronized boolean delete() throws IOException { 761 762 // Close all open file handles... 763 appender.close(); 764 accessorPool.close(); 765 766 boolean result = true; 767 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 768 DataFile dataFile = i.next(); 769 result &= dataFile.delete(); 770 } 771 772 if (preAllocateNextDataFileFuture != null) { 773 preAllocateNextDataFileFuture.cancel(true); 774 } 775 synchronized (dataFileIdLock) { 776 if (nextDataFile != null) { 777 nextDataFile.delete(); 778 nextDataFile = null; 779 } 780 } 781 782 totalLength.set(0); 783 synchronized (currentDataFile) { 784 fileMap.clear(); 785 fileByFileMap.clear(); 786 lastAppendLocation.set(null); 787 dataFiles = new LinkedNodeList<DataFile>(); 788 } 789 // reopen open file handles... 790 accessorPool = new DataFileAccessorPool(this); 791 appender = new DataFileAppender(this); 792 return result; 793 } 794 795 public void removeDataFiles(Set<Integer> files) throws IOException { 796 for (Integer key : files) { 797 // Can't remove the data file (or subsequent files) that is currently being written to. 798 if (key >= lastAppendLocation.get().getDataFileId()) { 799 continue; 800 } 801 DataFile dataFile = null; 802 synchronized (currentDataFile) { 803 dataFile = fileMap.remove(key); 804 if (dataFile != null) { 805 fileByFileMap.remove(dataFile.getFile()); 806 dataFile.unlink(); 807 } 808 } 809 if (dataFile != null) { 810 forceRemoveDataFile(dataFile); 811 } 812 } 813 } 814 815 private void forceRemoveDataFile(DataFile dataFile) throws IOException { 816 accessorPool.disposeDataFileAccessors(dataFile); 817 totalLength.addAndGet(-dataFile.getLength()); 818 if (archiveDataLogs) { 819 File directoryArchive = getDirectoryArchive(); 820 if (directoryArchive.exists()) { 821 LOG.debug("Archive directory exists: {}", directoryArchive); 822 } else { 823 if (directoryArchive.isAbsolute()) 824 if (LOG.isDebugEnabled()) { 825 LOG.debug("Archive directory [{}] does not exist - creating it now", 826 directoryArchive.getAbsolutePath()); 827 } 828 IOHelper.mkdirs(directoryArchive); 829 } 830 LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath()); 831 dataFile.move(directoryArchive); 832 LOG.debug("Successfully moved data file"); 833 } else { 834 LOG.debug("Deleting data file: {}", dataFile); 835 if (dataFile.delete()) { 836 LOG.debug("Discarded data file: {}", dataFile); 837 } else { 838 LOG.warn("Failed to discard data file : {}", dataFile.getFile()); 839 } 840 } 841 if (dataFileRemovedListener != null) { 842 dataFileRemovedListener.fileRemoved(dataFile); 843 } 844 } 845 846 /** 847 * @return the maxFileLength 848 */ 849 public int getMaxFileLength() { 850 return maxFileLength; 851 } 852 853 /** 854 * @param maxFileLength the maxFileLength to set 855 */ 856 public void setMaxFileLength(int maxFileLength) { 857 this.maxFileLength = maxFileLength; 858 } 859 860 @Override 861 public String toString() { 862 return directory.toString(); 863 } 864 865 public Location getNextLocation(Location location) throws IOException, IllegalStateException { 866 return getNextLocation(location, null); 867 } 868 869 public Location getNextLocation(Location location, Location limit) throws IOException, IllegalStateException { 870 Location cur = null; 871 while (true) { 872 if (cur == null) { 873 if (location == null) { 874 DataFile head = null; 875 synchronized (currentDataFile) { 876 head = dataFiles.getHead(); 877 } 878 if (head == null) { 879 return null; 880 } 881 cur = new Location(); 882 cur.setDataFileId(head.getDataFileId()); 883 cur.setOffset(0); 884 } else { 885 // Set to the next offset.. 886 if (location.getSize() == -1) { 887 cur = new Location(location); 888 } else { 889 cur = new Location(location); 890 cur.setOffset(location.getOffset() + location.getSize()); 891 } 892 } 893 } else { 894 cur.setOffset(cur.getOffset() + cur.getSize()); 895 } 896 897 DataFile dataFile = getDataFile(cur); 898 899 // Did it go into the next file?? 900 if (dataFile.getLength() <= cur.getOffset()) { 901 synchronized (currentDataFile) { 902 dataFile = dataFile.getNext(); 903 } 904 if (dataFile == null) { 905 return null; 906 } else { 907 cur.setDataFileId(dataFile.getDataFileId().intValue()); 908 cur.setOffset(0); 909 if (limit != null && cur.compareTo(limit) >= 0) { 910 LOG.trace("reached limit: {} at: {}", limit, cur); 911 return null; 912 } 913 } 914 } 915 916 // Load in location size and type. 917 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 918 try { 919 reader.readLocationDetails(cur); 920 } catch (EOFException eof) { 921 LOG.trace("EOF on next: " + location + ", cur: " + cur); 922 throw eof; 923 } finally { 924 accessorPool.closeDataFileAccessor(reader); 925 } 926 927 Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset()); 928 if (corruptedRange != null) { 929 // skip corruption 930 cur.setSize((int) corruptedRange.range()); 931 } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT || 932 (cur.getType() == 0 && cur.getSize() == 0)) { 933 // eof - jump to next datafile 934 // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for 935 // replay of existing journals 936 // possibly journal is larger than maxFileLength after config change 937 cur.setSize(EOF_RECORD.length); 938 cur.setOffset(Math.max(maxFileLength, dataFile.getLength())); 939 } else if (cur.getType() == USER_RECORD_TYPE) { 940 // Only return user records. 941 return cur; 942 } 943 } 944 } 945 946 public ByteSequence read(Location location) throws IOException, IllegalStateException { 947 DataFile dataFile = getDataFile(location); 948 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 949 ByteSequence rc = null; 950 try { 951 rc = reader.readRecord(location); 952 } finally { 953 accessorPool.closeDataFileAccessor(reader); 954 } 955 return rc; 956 } 957 958 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 959 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 960 return loc; 961 } 962 963 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 964 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 965 return loc; 966 } 967 968 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 969 DataFile dataFile = getDataFile(location); 970 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 971 try { 972 updater.updateRecord(location, data, sync); 973 } finally { 974 accessorPool.closeDataFileAccessor(updater); 975 } 976 } 977 978 public PreallocationStrategy getPreallocationStrategy() { 979 return preallocationStrategy; 980 } 981 982 public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) { 983 this.preallocationStrategy = preallocationStrategy; 984 } 985 986 public PreallocationScope getPreallocationScope() { 987 return preallocationScope; 988 } 989 990 public void setPreallocationScope(PreallocationScope preallocationScope) { 991 this.preallocationScope = preallocationScope; 992 } 993 994 public File getDirectory() { 995 return directory; 996 } 997 998 public void setDirectory(File directory) { 999 this.directory = directory; 1000 } 1001 1002 public String getFilePrefix() { 1003 return filePrefix; 1004 } 1005 1006 public void setFilePrefix(String filePrefix) { 1007 this.filePrefix = filePrefix; 1008 } 1009 1010 public Map<WriteKey, WriteCommand> getInflightWrites() { 1011 return inflightWrites; 1012 } 1013 1014 public Location getLastAppendLocation() { 1015 return lastAppendLocation.get(); 1016 } 1017 1018 public void setLastAppendLocation(Location lastSyncedLocation) { 1019 this.lastAppendLocation.set(lastSyncedLocation); 1020 } 1021 1022 public File getDirectoryArchive() { 1023 if (!directoryArchiveOverridden && (directoryArchive == null)) { 1024 // create the directoryArchive relative to the journal location 1025 directoryArchive = new File(directory.getAbsolutePath() + 1026 File.separator + DEFAULT_ARCHIVE_DIRECTORY); 1027 } 1028 return directoryArchive; 1029 } 1030 1031 public void setDirectoryArchive(File directoryArchive) { 1032 directoryArchiveOverridden = true; 1033 this.directoryArchive = directoryArchive; 1034 } 1035 1036 public boolean isArchiveDataLogs() { 1037 return archiveDataLogs; 1038 } 1039 1040 public void setArchiveDataLogs(boolean archiveDataLogs) { 1041 this.archiveDataLogs = archiveDataLogs; 1042 } 1043 1044 public DataFile getDataFileById(int dataFileId) { 1045 synchronized (currentDataFile) { 1046 return fileMap.get(Integer.valueOf(dataFileId)); 1047 } 1048 } 1049 1050 public DataFile getCurrentDataFile(int capacity) throws IOException { 1051 //First just acquire the currentDataFile lock and return if no rotation needed 1052 synchronized (currentDataFile) { 1053 if (currentDataFile.get().getLength() + capacity < maxFileLength) { 1054 return currentDataFile.get(); 1055 } 1056 } 1057 1058 //AMQ-6545 - if rotation needed, acquire dataFileIdLock first to prevent deadlocks 1059 //then re-check if rotation is needed 1060 synchronized (dataFileIdLock) { 1061 synchronized (currentDataFile) { 1062 if (currentDataFile.get().getLength() + capacity >= maxFileLength) { 1063 rotateWriteFile(); 1064 } 1065 return currentDataFile.get(); 1066 } 1067 } 1068 } 1069 1070 public Integer getCurrentDataFileId() { 1071 synchronized (currentDataFile) { 1072 return currentDataFile.get().getDataFileId(); 1073 } 1074 } 1075 1076 /** 1077 * Get a set of files - only valid after start() 1078 * 1079 * @return files currently being used 1080 */ 1081 public Set<File> getFiles() { 1082 synchronized (currentDataFile) { 1083 return fileByFileMap.keySet(); 1084 } 1085 } 1086 1087 public Map<Integer, DataFile> getFileMap() { 1088 synchronized (currentDataFile) { 1089 return new TreeMap<Integer, DataFile>(fileMap); 1090 } 1091 } 1092 1093 public long getDiskSize() { 1094 return totalLength.get(); 1095 } 1096 1097 public void setReplicationTarget(ReplicationTarget replicationTarget) { 1098 this.replicationTarget = replicationTarget; 1099 } 1100 1101 public ReplicationTarget getReplicationTarget() { 1102 return replicationTarget; 1103 } 1104 1105 public String getFileSuffix() { 1106 return fileSuffix; 1107 } 1108 1109 public void setFileSuffix(String fileSuffix) { 1110 this.fileSuffix = fileSuffix; 1111 } 1112 1113 public boolean isChecksum() { 1114 return checksum; 1115 } 1116 1117 public void setChecksum(boolean checksumWrites) { 1118 this.checksum = checksumWrites; 1119 } 1120 1121 public boolean isCheckForCorruptionOnStartup() { 1122 return checkForCorruptionOnStartup; 1123 } 1124 1125 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 1126 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 1127 } 1128 1129 public void setWriteBatchSize(int writeBatchSize) { 1130 this.writeBatchSize = writeBatchSize; 1131 } 1132 1133 public int getWriteBatchSize() { 1134 return writeBatchSize; 1135 } 1136 1137 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { 1138 this.totalLength = storeSizeAccumulator; 1139 } 1140 1141 public void setEnableAsyncDiskSync(boolean val) { 1142 this.enableAsyncDiskSync = val; 1143 } 1144 1145 public boolean isEnableAsyncDiskSync() { 1146 return enableAsyncDiskSync; 1147 } 1148 1149 public JournalDiskSyncStrategy getJournalDiskSyncStrategy() { 1150 return journalDiskSyncStrategy; 1151 } 1152 1153 public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) { 1154 this.journalDiskSyncStrategy = journalDiskSyncStrategy; 1155 } 1156 1157 public boolean isJournalDiskSyncPeriodic() { 1158 return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy); 1159 } 1160 1161 public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) { 1162 this.dataFileRemovedListener = dataFileRemovedListener; 1163 } 1164 1165 public static class WriteCommand extends LinkedNode<WriteCommand> { 1166 public final Location location; 1167 public final ByteSequence data; 1168 final boolean sync; 1169 public final Runnable onComplete; 1170 1171 public WriteCommand(Location location, ByteSequence data, boolean sync) { 1172 this.location = location; 1173 this.data = data; 1174 this.sync = sync; 1175 this.onComplete = null; 1176 } 1177 1178 public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { 1179 this.location = location; 1180 this.data = data; 1181 this.onComplete = onComplete; 1182 this.sync = false; 1183 } 1184 } 1185 1186 public static class WriteKey { 1187 private final int file; 1188 private final long offset; 1189 private final int hash; 1190 1191 public WriteKey(Location item) { 1192 file = item.getDataFileId(); 1193 offset = item.getOffset(); 1194 // TODO: see if we can build a better hash 1195 hash = (int)(file ^ offset); 1196 } 1197 1198 @Override 1199 public int hashCode() { 1200 return hash; 1201 } 1202 1203 @Override 1204 public boolean equals(Object obj) { 1205 if (obj instanceof WriteKey) { 1206 WriteKey di = (WriteKey)obj; 1207 return di.file == file && di.offset == offset; 1208 } 1209 return false; 1210 } 1211 } 1212}