001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.disk.journal; 018 019import java.io.File; 020import java.io.FileNotFoundException; 021import java.io.FilenameFilter; 022import java.io.IOException; 023import java.io.RandomAccessFile; 024import java.io.UnsupportedEncodingException; 025import java.nio.ByteBuffer; 026import java.nio.channels.ClosedByInterruptException; 027import java.nio.channels.FileChannel; 028import java.util.Arrays; 029import java.util.Collections; 030import java.util.HashMap; 031import java.util.Iterator; 032import java.util.LinkedHashMap; 033import java.util.LinkedList; 034import java.util.Map; 035import java.util.Set; 036import java.util.TreeMap; 037import java.util.concurrent.ConcurrentHashMap; 038import java.util.concurrent.Executors; 039import java.util.concurrent.Future; 040import java.util.concurrent.ScheduledExecutorService; 041import java.util.concurrent.ScheduledFuture; 042import java.util.concurrent.ThreadFactory; 043import java.util.concurrent.TimeUnit; 044import java.util.concurrent.atomic.AtomicLong; 045import java.util.concurrent.atomic.AtomicReference; 046import java.util.zip.Adler32; 047import java.util.zip.Checksum; 048 049import org.apache.activemq.store.kahadb.disk.util.LinkedNode; 050import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList; 051import org.apache.activemq.store.kahadb.disk.util.Sequence; 052import org.apache.activemq.util.ByteSequence; 053import org.apache.activemq.util.DataByteArrayInputStream; 054import org.apache.activemq.util.DataByteArrayOutputStream; 055import org.apache.activemq.util.IOHelper; 056import org.apache.activemq.util.RecoverableRandomAccessFile; 057import org.apache.activemq.util.ThreadPoolUtils; 058import org.slf4j.Logger; 059import org.slf4j.LoggerFactory; 060 061/** 062 * Manages DataFiles 063 */ 064public class Journal { 065 public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER"; 066 public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false")); 067 068 private static final int PREALLOC_CHUNK_SIZE = 1024*1024; 069 070 // ITEM_HEAD_SPACE = length + type+ reserved space + SOR 071 public static final int RECORD_HEAD_SPACE = 4 + 1; 072 073 public static final byte USER_RECORD_TYPE = 1; 074 public static final byte BATCH_CONTROL_RECORD_TYPE = 2; 075 // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. 076 public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); 077 public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8; 078 public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); 079 public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader(); 080 public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt(); 081 public static final byte EOF_EOT = '4'; 082 public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord(); 083 084 private ScheduledExecutorService scheduler; 085 086 // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss 087 public void corruptRecoveryLocation(Location recoveryPosition) throws IOException { 088 DataFile dataFile = getDataFile(recoveryPosition); 089 // with corruption on recovery we have no faith in the content - slip to the next batch record or eof 090 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 091 try { 092 int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1); 093 Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1); 094 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 095 096 // skip corruption on getNextLocation 097 recoveryPosition.setOffset((int) sequence.getLast() + 1); 098 recoveryPosition.setSize(-1); 099 100 dataFile.corruptedBlocks.add(sequence); 101 } catch (IOException e) { 102 } finally { 103 accessorPool.closeDataFileAccessor(reader); 104 } 105 } 106 107 public DataFileAccessorPool getAccessorPool() { 108 return accessorPool; 109 } 110 111 public void allowIOResumption() { 112 if (appender instanceof DataFileAppender) { 113 DataFileAppender dataFileAppender = (DataFileAppender)appender; 114 dataFileAppender.shutdown = false; 115 } 116 } 117 118 public enum PreallocationStrategy { 119 SPARSE_FILE, 120 OS_KERNEL_COPY, 121 ZEROS, 122 CHUNKED_ZEROS; 123 } 124 125 public enum PreallocationScope { 126 ENTIRE_JOURNAL, 127 ENTIRE_JOURNAL_ASYNC, 128 NONE; 129 } 130 131 public enum JournalDiskSyncStrategy { 132 ALWAYS, 133 PERIODIC, 134 NEVER; 135 } 136 137 private static byte[] createBatchControlRecordHeader() { 138 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 139 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 140 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 141 os.write(BATCH_CONTROL_RECORD_MAGIC); 142 ByteSequence sequence = os.toByteSequence(); 143 sequence.compact(); 144 return sequence.getData(); 145 } catch (IOException e) { 146 throw new RuntimeException("Could not create batch control record header.", e); 147 } 148 } 149 150 private static byte[] createEmptyBatchControlRecordHeader() { 151 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 152 os.writeInt(BATCH_CONTROL_RECORD_SIZE); 153 os.writeByte(BATCH_CONTROL_RECORD_TYPE); 154 os.write(BATCH_CONTROL_RECORD_MAGIC); 155 os.writeInt(0); 156 os.writeLong(0l); 157 ByteSequence sequence = os.toByteSequence(); 158 sequence.compact(); 159 return sequence.getData(); 160 } catch (IOException e) { 161 throw new RuntimeException("Could not create empty batch control record header.", e); 162 } 163 } 164 165 private static byte[] createEofBatchAndLocationRecord() { 166 try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) { 167 os.writeInt(EOF_INT); 168 os.writeByte(EOF_EOT); 169 ByteSequence sequence = os.toByteSequence(); 170 sequence.compact(); 171 return sequence.getData(); 172 } catch (IOException e) { 173 throw new RuntimeException("Could not create eof header.", e); 174 } 175 } 176 177 public static final String DEFAULT_DIRECTORY = "."; 178 public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; 179 public static final String DEFAULT_FILE_PREFIX = "db-"; 180 public static final String DEFAULT_FILE_SUFFIX = ".log"; 181 public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32; 182 public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30; 183 public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4; 184 185 private static final Logger LOG = LoggerFactory.getLogger(Journal.class); 186 187 protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>(); 188 189 protected File directory = new File(DEFAULT_DIRECTORY); 190 protected File directoryArchive; 191 private boolean directoryArchiveOverridden = false; 192 193 protected String filePrefix = DEFAULT_FILE_PREFIX; 194 protected String fileSuffix = DEFAULT_FILE_SUFFIX; 195 protected boolean started; 196 197 protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH; 198 protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE; 199 200 protected FileAppender appender; 201 protected DataFileAccessorPool accessorPool; 202 203 protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>(); 204 protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>(); 205 protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>(); 206 207 protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); 208 protected ScheduledFuture cleanupTask; 209 protected AtomicLong totalLength = new AtomicLong(); 210 protected boolean archiveDataLogs; 211 private ReplicationTarget replicationTarget; 212 protected boolean checksum; 213 protected boolean checkForCorruptionOnStartup; 214 protected boolean enableAsyncDiskSync = true; 215 private int nextDataFileId = 1; 216 private Object dataFileIdLock = new Object(); 217 private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null); 218 private volatile DataFile nextDataFile; 219 220 protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL; 221 protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE; 222 private File osKernelCopyTemplateFile = null; 223 protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS; 224 225 public interface DataFileRemovedListener { 226 void fileRemoved(DataFile datafile); 227 } 228 229 private DataFileRemovedListener dataFileRemovedListener; 230 231 public synchronized void start() throws IOException { 232 if (started) { 233 return; 234 } 235 236 long start = System.currentTimeMillis(); 237 accessorPool = new DataFileAccessorPool(this); 238 started = true; 239 240 appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this); 241 242 File[] files = directory.listFiles(new FilenameFilter() { 243 @Override 244 public boolean accept(File dir, String n) { 245 return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix); 246 } 247 }); 248 249 if (files != null) { 250 for (File file : files) { 251 try { 252 String n = file.getName(); 253 String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length()); 254 int num = Integer.parseInt(numStr); 255 DataFile dataFile = new DataFile(file, num); 256 fileMap.put(dataFile.getDataFileId(), dataFile); 257 totalLength.addAndGet(dataFile.getLength()); 258 } catch (NumberFormatException e) { 259 // Ignore file that do not match the pattern. 260 } 261 } 262 263 // Sort the list so that we can link the DataFiles together in the 264 // right order. 265 LinkedList<DataFile> l = new LinkedList<>(fileMap.values()); 266 Collections.sort(l); 267 for (DataFile df : l) { 268 if (df.getLength() == 0) { 269 // possibly the result of a previous failed write 270 LOG.info("ignoring zero length, partially initialised journal data file: " + df); 271 continue; 272 } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) { 273 continue; 274 } 275 dataFiles.addLast(df); 276 fileByFileMap.put(df.getFile(), df); 277 278 if( isCheckForCorruptionOnStartup() ) { 279 lastAppendLocation.set(recoveryCheck(df)); 280 } 281 } 282 } 283 284 if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) { 285 // create a template file that will be used to pre-allocate the journal files 286 if (osKernelCopyTemplateFile == null) { 287 osKernelCopyTemplateFile = createJournalTemplateFile(); 288 } 289 } 290 291 scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() { 292 @Override 293 public Thread newThread(Runnable r) { 294 Thread schedulerThread = new Thread(r); 295 schedulerThread.setName("ActiveMQ Journal Scheduled executor"); 296 schedulerThread.setDaemon(true); 297 return schedulerThread; 298 } 299 }); 300 301 // init current write file 302 if (dataFiles.isEmpty()) { 303 nextDataFileId = 1; 304 rotateWriteFile(); 305 } else { 306 currentDataFile.set(dataFiles.getTail()); 307 nextDataFileId = currentDataFile.get().dataFileId + 1; 308 } 309 310 if( lastAppendLocation.get()==null ) { 311 DataFile df = dataFiles.getTail(); 312 lastAppendLocation.set(recoveryCheck(df)); 313 } 314 315 // ensure we don't report unused space of last journal file in size metric 316 if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) { 317 totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength); 318 } 319 320 cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() { 321 @Override 322 public void run() { 323 cleanup(); 324 } 325 }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS); 326 327 long end = System.currentTimeMillis(); 328 LOG.trace("Startup took: "+(end-start)+" ms"); 329 } 330 331 public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) { 332 333 if (PreallocationScope.NONE != preallocationScope) { 334 335 if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) { 336 doPreallocationKernelCopy(file); 337 } else if (PreallocationStrategy.ZEROS == preallocationStrategy) { 338 doPreallocationZeros(file); 339 } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) { 340 doPreallocationChunkedZeros(file); 341 } else { 342 doPreallocationSparseFile(file); 343 } 344 } 345 } 346 347 private void doPreallocationSparseFile(RecoverableRandomAccessFile file) { 348 final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD); 349 try { 350 FileChannel channel = file.getChannel(); 351 channel.position(0); 352 channel.write(journalEof); 353 channel.position(maxFileLength - 5); 354 journalEof.rewind(); 355 channel.write(journalEof); 356 channel.force(false); 357 channel.position(0); 358 } catch (ClosedByInterruptException ignored) { 359 LOG.trace("Could not preallocate journal file with sparse file", ignored); 360 } catch (IOException e) { 361 LOG.error("Could not preallocate journal file with sparse file", e); 362 } 363 } 364 365 private void doPreallocationZeros(RecoverableRandomAccessFile file) { 366 ByteBuffer buffer = ByteBuffer.allocate(maxFileLength); 367 buffer.put(EOF_RECORD); 368 buffer.rewind(); 369 try { 370 FileChannel channel = file.getChannel(); 371 channel.write(buffer); 372 channel.force(false); 373 channel.position(0); 374 } catch (ClosedByInterruptException ignored) { 375 LOG.trace("Could not preallocate journal file with zeros", ignored); 376 } catch (IOException e) { 377 LOG.error("Could not preallocate journal file with zeros", e); 378 } 379 } 380 381 private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) { 382 try { 383 RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw"); 384 templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel()); 385 templateRaf.close(); 386 } catch (ClosedByInterruptException ignored) { 387 LOG.trace("Could not preallocate journal file with kernel copy", ignored); 388 } catch (FileNotFoundException e) { 389 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 390 } catch (IOException e) { 391 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 392 } 393 } 394 395 private File createJournalTemplateFile() { 396 String fileName = "db-log.template"; 397 File rc = new File(directory, fileName); 398 try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) { 399 templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD)); 400 templateRaf.setLength(maxFileLength); 401 templateRaf.getChannel().force(true); 402 } catch (FileNotFoundException e) { 403 LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e); 404 } catch (IOException e) { 405 LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e); 406 } 407 return rc; 408 } 409 410 private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) { 411 412 ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE); 413 buffer.put(EOF_RECORD); 414 buffer.rewind(); 415 416 try { 417 FileChannel channel = file.getChannel(); 418 419 int remLen = maxFileLength; 420 while (remLen > 0) { 421 if (remLen < buffer.remaining()) { 422 buffer.limit(remLen); 423 } 424 int writeLen = channel.write(buffer); 425 remLen -= writeLen; 426 buffer.rewind(); 427 } 428 429 channel.force(false); 430 channel.position(0); 431 } catch (ClosedByInterruptException ignored) { 432 LOG.trace("Could not preallocate journal file with zeros", ignored); 433 } catch (IOException e) { 434 LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e); 435 } 436 } 437 438 private static byte[] bytes(String string) { 439 try { 440 return string.getBytes("UTF-8"); 441 } catch (UnsupportedEncodingException e) { 442 throw new RuntimeException(e); 443 } 444 } 445 446 public boolean isUnusedPreallocated(DataFile dataFile) throws IOException { 447 int firstBatchRecordSize = -1; 448 if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) { 449 Location location = new Location(); 450 location.setDataFileId(dataFile.getDataFileId()); 451 location.setOffset(0); 452 453 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 454 try { 455 firstBatchRecordSize = checkBatchRecord(reader, location.getOffset()); 456 } catch (Exception ignored) { 457 } finally { 458 accessorPool.closeDataFileAccessor(reader); 459 } 460 } 461 return firstBatchRecordSize == 0; 462 } 463 464 protected Location recoveryCheck(DataFile dataFile) throws IOException { 465 Location location = new Location(); 466 location.setDataFileId(dataFile.getDataFileId()); 467 location.setOffset(0); 468 469 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 470 try { 471 while (true) { 472 int size = checkBatchRecord(reader, location.getOffset()); 473 if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) { 474 if (size == 0) { 475 // eof batch record 476 break; 477 } 478 location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size); 479 } else { 480 481 // Perhaps it's just some corruption... scan through the 482 // file to find the next valid batch record. We 483 // may have subsequent valid batch records. 484 int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1); 485 if (nextOffset >= 0) { 486 Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); 487 LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence); 488 dataFile.corruptedBlocks.add(sequence); 489 location.setOffset(nextOffset); 490 } else { 491 break; 492 } 493 } 494 } 495 496 } catch (IOException e) { 497 } finally { 498 accessorPool.closeDataFileAccessor(reader); 499 } 500 501 int existingLen = dataFile.getLength(); 502 dataFile.setLength(location.getOffset()); 503 if (existingLen > dataFile.getLength()) { 504 totalLength.addAndGet(dataFile.getLength() - existingLen); 505 } 506 507 if (!dataFile.corruptedBlocks.isEmpty()) { 508 // Is the end of the data file corrupted? 509 if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) { 510 dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); 511 } 512 } 513 514 return location; 515 } 516 517 private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException { 518 ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER); 519 byte data[] = new byte[1024*4]; 520 ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data)); 521 522 int pos = 0; 523 while (true) { 524 pos = bs.indexOf(header, pos); 525 if (pos >= 0) { 526 return offset + pos; 527 } else { 528 // need to load the next data chunck in.. 529 if (bs.length != data.length) { 530 // If we had a short read then we were at EOF 531 return -1; 532 } 533 offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length; 534 bs = new ByteSequence(data, 0, reader.read(offset, data)); 535 pos = 0; 536 } 537 } 538 } 539 540 public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException { 541 byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE]; 542 543 try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) { 544 545 reader.readFully(offset, controlRecord); 546 547 // check for journal eof 548 if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) { 549 // eof batch 550 return 0; 551 } 552 553 // Assert that it's a batch record. 554 for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) { 555 if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) { 556 return -1; 557 } 558 } 559 560 int size = controlIs.readInt(); 561 if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) { 562 return -1; 563 } 564 565 if (isChecksum()) { 566 567 long expectedChecksum = controlIs.readLong(); 568 if (expectedChecksum == 0) { 569 // Checksuming was not enabled when the record was stored. 570 // we can't validate the record :( 571 return size; 572 } 573 574 byte data[] = new byte[size]; 575 reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data); 576 577 Checksum checksum = new Adler32(); 578 checksum.update(data, 0, data.length); 579 580 if (expectedChecksum != checksum.getValue()) { 581 return -1; 582 } 583 } 584 return size; 585 } 586 } 587 588 void addToTotalLength(int size) { 589 totalLength.addAndGet(size); 590 } 591 592 public long length() { 593 return totalLength.get(); 594 } 595 596 private void rotateWriteFile() throws IOException { 597 synchronized (dataFileIdLock) { 598 DataFile dataFile = nextDataFile; 599 if (dataFile == null) { 600 dataFile = newDataFile(); 601 } 602 synchronized (currentDataFile) { 603 fileMap.put(dataFile.getDataFileId(), dataFile); 604 fileByFileMap.put(dataFile.getFile(), dataFile); 605 dataFiles.addLast(dataFile); 606 currentDataFile.set(dataFile); 607 } 608 nextDataFile = null; 609 } 610 if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) { 611 preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask); 612 } 613 } 614 615 private Runnable preAllocateNextDataFileTask = new Runnable() { 616 @Override 617 public void run() { 618 if (nextDataFile == null) { 619 synchronized (dataFileIdLock){ 620 try { 621 nextDataFile = newDataFile(); 622 } catch (IOException e) { 623 LOG.warn("Failed to proactively allocate data file", e); 624 } 625 } 626 } 627 } 628 }; 629 630 private volatile Future preAllocateNextDataFileFuture; 631 632 private DataFile newDataFile() throws IOException { 633 int nextNum = nextDataFileId++; 634 File file = getFile(nextNum); 635 DataFile nextWriteFile = new DataFile(file, nextNum); 636 preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile()); 637 return nextWriteFile; 638 } 639 640 641 public DataFile reserveDataFile() { 642 synchronized (dataFileIdLock) { 643 int nextNum = nextDataFileId++; 644 File file = getFile(nextNum); 645 DataFile reservedDataFile = new DataFile(file, nextNum); 646 synchronized (currentDataFile) { 647 fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile); 648 fileByFileMap.put(file, reservedDataFile); 649 if (dataFiles.isEmpty()) { 650 dataFiles.addLast(reservedDataFile); 651 } else { 652 dataFiles.getTail().linkBefore(reservedDataFile); 653 } 654 } 655 return reservedDataFile; 656 } 657 } 658 659 public File getFile(int nextNum) { 660 String fileName = filePrefix + nextNum + fileSuffix; 661 File file = new File(directory, fileName); 662 return file; 663 } 664 665 DataFile getDataFile(Location item) throws IOException { 666 Integer key = Integer.valueOf(item.getDataFileId()); 667 DataFile dataFile = null; 668 synchronized (currentDataFile) { 669 dataFile = fileMap.get(key); 670 } 671 if (dataFile == null) { 672 LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap); 673 throw new IOException("Could not locate data file " + getFile(item.getDataFileId())); 674 } 675 return dataFile; 676 } 677 678 public void close() throws IOException { 679 synchronized (this) { 680 if (!started) { 681 return; 682 } 683 cleanupTask.cancel(true); 684 if (preAllocateNextDataFileFuture != null) { 685 preAllocateNextDataFileFuture.cancel(true); 686 } 687 ThreadPoolUtils.shutdownGraceful(scheduler, 4000); 688 accessorPool.close(); 689 } 690 // the appender can be calling back to to the journal blocking a close AMQ-5620 691 appender.close(); 692 synchronized (currentDataFile) { 693 fileMap.clear(); 694 fileByFileMap.clear(); 695 dataFiles.clear(); 696 lastAppendLocation.set(null); 697 started = false; 698 } 699 } 700 701 public synchronized void cleanup() { 702 if (accessorPool != null) { 703 accessorPool.disposeUnused(); 704 } 705 } 706 707 public synchronized boolean delete() throws IOException { 708 709 // Close all open file handles... 710 appender.close(); 711 accessorPool.close(); 712 713 boolean result = true; 714 for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) { 715 DataFile dataFile = i.next(); 716 result &= dataFile.delete(); 717 } 718 719 if (preAllocateNextDataFileFuture != null) { 720 preAllocateNextDataFileFuture.cancel(true); 721 } 722 synchronized (dataFileIdLock) { 723 if (nextDataFile != null) { 724 nextDataFile.delete(); 725 nextDataFile = null; 726 } 727 } 728 729 totalLength.set(0); 730 synchronized (currentDataFile) { 731 fileMap.clear(); 732 fileByFileMap.clear(); 733 lastAppendLocation.set(null); 734 dataFiles = new LinkedNodeList<DataFile>(); 735 } 736 // reopen open file handles... 737 accessorPool = new DataFileAccessorPool(this); 738 appender = new DataFileAppender(this); 739 return result; 740 } 741 742 public void removeDataFiles(Set<Integer> files) throws IOException { 743 for (Integer key : files) { 744 // Can't remove the data file (or subsequent files) that is currently being written to. 745 if (key >= lastAppendLocation.get().getDataFileId()) { 746 continue; 747 } 748 DataFile dataFile = null; 749 synchronized (currentDataFile) { 750 dataFile = fileMap.remove(key); 751 if (dataFile != null) { 752 fileByFileMap.remove(dataFile.getFile()); 753 dataFile.unlink(); 754 } 755 } 756 if (dataFile != null) { 757 forceRemoveDataFile(dataFile); 758 } 759 } 760 } 761 762 private void forceRemoveDataFile(DataFile dataFile) throws IOException { 763 accessorPool.disposeDataFileAccessors(dataFile); 764 totalLength.addAndGet(-dataFile.getLength()); 765 if (archiveDataLogs) { 766 File directoryArchive = getDirectoryArchive(); 767 if (directoryArchive.exists()) { 768 LOG.debug("Archive directory exists: {}", directoryArchive); 769 } else { 770 if (directoryArchive.isAbsolute()) 771 if (LOG.isDebugEnabled()) { 772 LOG.debug("Archive directory [{}] does not exist - creating it now", 773 directoryArchive.getAbsolutePath()); 774 } 775 IOHelper.mkdirs(directoryArchive); 776 } 777 LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath()); 778 dataFile.move(directoryArchive); 779 LOG.debug("Successfully moved data file"); 780 } else { 781 LOG.debug("Deleting data file: {}", dataFile); 782 if (dataFile.delete()) { 783 LOG.debug("Discarded data file: {}", dataFile); 784 } else { 785 LOG.warn("Failed to discard data file : {}", dataFile.getFile()); 786 } 787 } 788 if (dataFileRemovedListener != null) { 789 dataFileRemovedListener.fileRemoved(dataFile); 790 } 791 } 792 793 /** 794 * @return the maxFileLength 795 */ 796 public int getMaxFileLength() { 797 return maxFileLength; 798 } 799 800 /** 801 * @param maxFileLength the maxFileLength to set 802 */ 803 public void setMaxFileLength(int maxFileLength) { 804 this.maxFileLength = maxFileLength; 805 } 806 807 @Override 808 public String toString() { 809 return directory.toString(); 810 } 811 812 public Location getNextLocation(Location location) throws IOException, IllegalStateException { 813 Location cur = null; 814 while (true) { 815 if (cur == null) { 816 if (location == null) { 817 DataFile head = null; 818 synchronized (currentDataFile) { 819 head = dataFiles.getHead(); 820 } 821 if (head == null) { 822 return null; 823 } 824 cur = new Location(); 825 cur.setDataFileId(head.getDataFileId()); 826 cur.setOffset(0); 827 } else { 828 // Set to the next offset.. 829 if (location.getSize() == -1) { 830 cur = new Location(location); 831 } else { 832 cur = new Location(location); 833 cur.setOffset(location.getOffset() + location.getSize()); 834 } 835 } 836 } else { 837 cur.setOffset(cur.getOffset() + cur.getSize()); 838 } 839 840 DataFile dataFile = getDataFile(cur); 841 842 // Did it go into the next file?? 843 if (dataFile.getLength() <= cur.getOffset()) { 844 synchronized (currentDataFile) { 845 dataFile = dataFile.getNext(); 846 } 847 if (dataFile == null) { 848 return null; 849 } else { 850 cur.setDataFileId(dataFile.getDataFileId().intValue()); 851 cur.setOffset(0); 852 } 853 } 854 855 // Load in location size and type. 856 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 857 try { 858 reader.readLocationDetails(cur); 859 } finally { 860 accessorPool.closeDataFileAccessor(reader); 861 } 862 863 Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset()); 864 if (corruptedRange != null) { 865 // skip corruption 866 cur.setSize((int) corruptedRange.range()); 867 } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT || 868 (cur.getType() == 0 && cur.getSize() == 0)) { 869 // eof - jump to next datafile 870 // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for 871 // replay of existing journals 872 // possibly journal is larger than maxFileLength after config change 873 cur.setSize(EOF_RECORD.length); 874 cur.setOffset(Math.max(maxFileLength, dataFile.getLength())); 875 } else if (cur.getType() == USER_RECORD_TYPE) { 876 // Only return user records. 877 return cur; 878 } 879 } 880 } 881 882 public ByteSequence read(Location location) throws IOException, IllegalStateException { 883 DataFile dataFile = getDataFile(location); 884 DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); 885 ByteSequence rc = null; 886 try { 887 rc = reader.readRecord(location); 888 } finally { 889 accessorPool.closeDataFileAccessor(reader); 890 } 891 return rc; 892 } 893 894 public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException { 895 Location loc = appender.storeItem(data, Location.USER_TYPE, sync); 896 return loc; 897 } 898 899 public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException { 900 Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete); 901 return loc; 902 } 903 904 public void update(Location location, ByteSequence data, boolean sync) throws IOException { 905 DataFile dataFile = getDataFile(location); 906 DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile); 907 try { 908 updater.updateRecord(location, data, sync); 909 } finally { 910 accessorPool.closeDataFileAccessor(updater); 911 } 912 } 913 914 public PreallocationStrategy getPreallocationStrategy() { 915 return preallocationStrategy; 916 } 917 918 public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) { 919 this.preallocationStrategy = preallocationStrategy; 920 } 921 922 public PreallocationScope getPreallocationScope() { 923 return preallocationScope; 924 } 925 926 public void setPreallocationScope(PreallocationScope preallocationScope) { 927 this.preallocationScope = preallocationScope; 928 } 929 930 public File getDirectory() { 931 return directory; 932 } 933 934 public void setDirectory(File directory) { 935 this.directory = directory; 936 } 937 938 public String getFilePrefix() { 939 return filePrefix; 940 } 941 942 public void setFilePrefix(String filePrefix) { 943 this.filePrefix = filePrefix; 944 } 945 946 public Map<WriteKey, WriteCommand> getInflightWrites() { 947 return inflightWrites; 948 } 949 950 public Location getLastAppendLocation() { 951 return lastAppendLocation.get(); 952 } 953 954 public void setLastAppendLocation(Location lastSyncedLocation) { 955 this.lastAppendLocation.set(lastSyncedLocation); 956 } 957 958 public File getDirectoryArchive() { 959 if (!directoryArchiveOverridden && (directoryArchive == null)) { 960 // create the directoryArchive relative to the journal location 961 directoryArchive = new File(directory.getAbsolutePath() + 962 File.separator + DEFAULT_ARCHIVE_DIRECTORY); 963 } 964 return directoryArchive; 965 } 966 967 public void setDirectoryArchive(File directoryArchive) { 968 directoryArchiveOverridden = true; 969 this.directoryArchive = directoryArchive; 970 } 971 972 public boolean isArchiveDataLogs() { 973 return archiveDataLogs; 974 } 975 976 public void setArchiveDataLogs(boolean archiveDataLogs) { 977 this.archiveDataLogs = archiveDataLogs; 978 } 979 980 public DataFile getDataFileById(int dataFileId) { 981 synchronized (currentDataFile) { 982 return fileMap.get(Integer.valueOf(dataFileId)); 983 } 984 } 985 986 public DataFile getCurrentDataFile(int capacity) throws IOException { 987 //First just acquire the currentDataFile lock and return if no rotation needed 988 synchronized (currentDataFile) { 989 if (currentDataFile.get().getLength() + capacity < maxFileLength) { 990 return currentDataFile.get(); 991 } 992 } 993 994 //AMQ-6545 - if rotation needed, acquire dataFileIdLock first to prevent deadlocks 995 //then re-check if rotation is needed 996 synchronized (dataFileIdLock) { 997 synchronized (currentDataFile) { 998 if (currentDataFile.get().getLength() + capacity >= maxFileLength) { 999 rotateWriteFile(); 1000 } 1001 return currentDataFile.get(); 1002 } 1003 } 1004 } 1005 1006 public Integer getCurrentDataFileId() { 1007 synchronized (currentDataFile) { 1008 return currentDataFile.get().getDataFileId(); 1009 } 1010 } 1011 1012 /** 1013 * Get a set of files - only valid after start() 1014 * 1015 * @return files currently being used 1016 */ 1017 public Set<File> getFiles() { 1018 synchronized (currentDataFile) { 1019 return fileByFileMap.keySet(); 1020 } 1021 } 1022 1023 public Map<Integer, DataFile> getFileMap() { 1024 synchronized (currentDataFile) { 1025 return new TreeMap<Integer, DataFile>(fileMap); 1026 } 1027 } 1028 1029 public long getDiskSize() { 1030 return totalLength.get(); 1031 } 1032 1033 public void setReplicationTarget(ReplicationTarget replicationTarget) { 1034 this.replicationTarget = replicationTarget; 1035 } 1036 1037 public ReplicationTarget getReplicationTarget() { 1038 return replicationTarget; 1039 } 1040 1041 public String getFileSuffix() { 1042 return fileSuffix; 1043 } 1044 1045 public void setFileSuffix(String fileSuffix) { 1046 this.fileSuffix = fileSuffix; 1047 } 1048 1049 public boolean isChecksum() { 1050 return checksum; 1051 } 1052 1053 public void setChecksum(boolean checksumWrites) { 1054 this.checksum = checksumWrites; 1055 } 1056 1057 public boolean isCheckForCorruptionOnStartup() { 1058 return checkForCorruptionOnStartup; 1059 } 1060 1061 public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { 1062 this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; 1063 } 1064 1065 public void setWriteBatchSize(int writeBatchSize) { 1066 this.writeBatchSize = writeBatchSize; 1067 } 1068 1069 public int getWriteBatchSize() { 1070 return writeBatchSize; 1071 } 1072 1073 public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { 1074 this.totalLength = storeSizeAccumulator; 1075 } 1076 1077 public void setEnableAsyncDiskSync(boolean val) { 1078 this.enableAsyncDiskSync = val; 1079 } 1080 1081 public boolean isEnableAsyncDiskSync() { 1082 return enableAsyncDiskSync; 1083 } 1084 1085 public JournalDiskSyncStrategy getJournalDiskSyncStrategy() { 1086 return journalDiskSyncStrategy; 1087 } 1088 1089 public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) { 1090 this.journalDiskSyncStrategy = journalDiskSyncStrategy; 1091 } 1092 1093 public boolean isJournalDiskSyncPeriodic() { 1094 return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy); 1095 } 1096 1097 public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) { 1098 this.dataFileRemovedListener = dataFileRemovedListener; 1099 } 1100 1101 public static class WriteCommand extends LinkedNode<WriteCommand> { 1102 public final Location location; 1103 public final ByteSequence data; 1104 final boolean sync; 1105 public final Runnable onComplete; 1106 1107 public WriteCommand(Location location, ByteSequence data, boolean sync) { 1108 this.location = location; 1109 this.data = data; 1110 this.sync = sync; 1111 this.onComplete = null; 1112 } 1113 1114 public WriteCommand(Location location, ByteSequence data, Runnable onComplete) { 1115 this.location = location; 1116 this.data = data; 1117 this.onComplete = onComplete; 1118 this.sync = false; 1119 } 1120 } 1121 1122 public static class WriteKey { 1123 private final int file; 1124 private final long offset; 1125 private final int hash; 1126 1127 public WriteKey(Location item) { 1128 file = item.getDataFileId(); 1129 offset = item.getOffset(); 1130 // TODO: see if we can build a better hash 1131 hash = (int)(file ^ offset); 1132 } 1133 1134 @Override 1135 public int hashCode() { 1136 return hash; 1137 } 1138 1139 @Override 1140 public boolean equals(Object obj) { 1141 if (obj instanceof WriteKey) { 1142 WriteKey di = (WriteKey)obj; 1143 return di.file == file && di.offset == offset; 1144 } 1145 return false; 1146 } 1147 } 1148}