001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.journal;
018
019import java.io.File;
020import java.io.FileNotFoundException;
021import java.io.FilenameFilter;
022import java.io.IOException;
023import java.io.RandomAccessFile;
024import java.io.UnsupportedEncodingException;
025import java.nio.ByteBuffer;
026import java.nio.channels.ClosedByInterruptException;
027import java.nio.channels.FileChannel;
028import java.util.Arrays;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.LinkedHashMap;
033import java.util.LinkedList;
034import java.util.Map;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.Executors;
039import java.util.concurrent.Future;
040import java.util.concurrent.ScheduledExecutorService;
041import java.util.concurrent.ScheduledFuture;
042import java.util.concurrent.ThreadFactory;
043import java.util.concurrent.TimeUnit;
044import java.util.concurrent.atomic.AtomicLong;
045import java.util.concurrent.atomic.AtomicReference;
046import java.util.zip.Adler32;
047import java.util.zip.Checksum;
048
049import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
050import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
051import org.apache.activemq.store.kahadb.disk.util.Sequence;
052import org.apache.activemq.util.ByteSequence;
053import org.apache.activemq.util.DataByteArrayInputStream;
054import org.apache.activemq.util.DataByteArrayOutputStream;
055import org.apache.activemq.util.IOHelper;
056import org.apache.activemq.util.RecoverableRandomAccessFile;
057import org.apache.activemq.util.ThreadPoolUtils;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Manages DataFiles
063 */
064public class Journal {
065    public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
066    public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
067
068    private static final int PREALLOC_CHUNK_SIZE = 1024*1024;
069
070    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
071    public static final int RECORD_HEAD_SPACE = 4 + 1;
072
073    public static final byte USER_RECORD_TYPE = 1;
074    public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
075    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
076    public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
077    public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8;
078    public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
079    public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader();
080    public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt();
081    public static final byte EOF_EOT = '4';
082    public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
083
084    private ScheduledExecutorService scheduler;
085
086    // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
087    public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
088        DataFile dataFile = getDataFile(recoveryPosition);
089        // with corruption on recovery we have no faith in the content - slip to the next batch record or eof
090        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
091        try {
092            int nextOffset = findNextBatchRecord(reader, recoveryPosition.getOffset() + 1);
093            Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset >= 0 ? nextOffset - 1 : dataFile.getLength() - 1);
094            LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
095
096            // skip corruption on getNextLocation
097            recoveryPosition.setOffset((int) sequence.getLast() + 1);
098            recoveryPosition.setSize(-1);
099
100            dataFile.corruptedBlocks.add(sequence);
101        } catch (IOException e) {
102        } finally {
103            accessorPool.closeDataFileAccessor(reader);
104        }
105    }
106
107    public DataFileAccessorPool getAccessorPool() {
108        return accessorPool;
109    }
110
111    public void allowIOResumption() {
112        if (appender instanceof DataFileAppender) {
113            DataFileAppender dataFileAppender = (DataFileAppender)appender;
114            dataFileAppender.shutdown = false;
115        }
116    }
117
118    public enum PreallocationStrategy {
119        SPARSE_FILE,
120        OS_KERNEL_COPY,
121        ZEROS,
122        CHUNKED_ZEROS;
123    }
124
125    public enum PreallocationScope {
126        ENTIRE_JOURNAL,
127        ENTIRE_JOURNAL_ASYNC,
128        NONE;
129    }
130
131    public enum JournalDiskSyncStrategy {
132        ALWAYS,
133        PERIODIC,
134        NEVER;
135    }
136
137    private static byte[] createBatchControlRecordHeader() {
138        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
139            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
140            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
141            os.write(BATCH_CONTROL_RECORD_MAGIC);
142            ByteSequence sequence = os.toByteSequence();
143            sequence.compact();
144            return sequence.getData();
145        } catch (IOException e) {
146            throw new RuntimeException("Could not create batch control record header.", e);
147        }
148    }
149
150    private static byte[] createEmptyBatchControlRecordHeader() {
151        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
152            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
153            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
154            os.write(BATCH_CONTROL_RECORD_MAGIC);
155            os.writeInt(0);
156            os.writeLong(0l);
157            ByteSequence sequence = os.toByteSequence();
158            sequence.compact();
159            return sequence.getData();
160        } catch (IOException e) {
161            throw new RuntimeException("Could not create empty batch control record header.", e);
162        }
163    }
164
165    private static byte[] createEofBatchAndLocationRecord() {
166        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
167            os.writeInt(EOF_INT);
168            os.writeByte(EOF_EOT);
169            ByteSequence sequence = os.toByteSequence();
170            sequence.compact();
171            return sequence.getData();
172        } catch (IOException e) {
173            throw new RuntimeException("Could not create eof header.", e);
174        }
175    }
176
177    public static final String DEFAULT_DIRECTORY = ".";
178    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
179    public static final String DEFAULT_FILE_PREFIX = "db-";
180    public static final String DEFAULT_FILE_SUFFIX = ".log";
181    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
182    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
183    public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
184
185    private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
186
187    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
188
189    protected File directory = new File(DEFAULT_DIRECTORY);
190    protected File directoryArchive;
191    private boolean directoryArchiveOverridden = false;
192
193    protected String filePrefix = DEFAULT_FILE_PREFIX;
194    protected String fileSuffix = DEFAULT_FILE_SUFFIX;
195    protected boolean started;
196
197    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
198    protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
199
200    protected FileAppender appender;
201    protected DataFileAccessorPool accessorPool;
202
203    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
204    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
205    protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
206
207    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
208    protected ScheduledFuture cleanupTask;
209    protected AtomicLong totalLength = new AtomicLong();
210    protected boolean archiveDataLogs;
211    private ReplicationTarget replicationTarget;
212    protected boolean checksum;
213    protected boolean checkForCorruptionOnStartup;
214    protected boolean enableAsyncDiskSync = true;
215    private int nextDataFileId = 1;
216    private Object dataFileIdLock = new Object();
217    private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
218    private volatile DataFile nextDataFile;
219
220    protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
221    protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
222    private File osKernelCopyTemplateFile = null;
223    protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
224
225    public interface DataFileRemovedListener {
226        void fileRemoved(DataFile datafile);
227    }
228
229    private DataFileRemovedListener dataFileRemovedListener;
230
231    public synchronized void start() throws IOException {
232        if (started) {
233            return;
234        }
235
236        long start = System.currentTimeMillis();
237        accessorPool = new DataFileAccessorPool(this);
238        started = true;
239
240        appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
241
242        File[] files = directory.listFiles(new FilenameFilter() {
243            @Override
244            public boolean accept(File dir, String n) {
245                return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
246            }
247        });
248
249        if (files != null) {
250            for (File file : files) {
251                try {
252                    String n = file.getName();
253                    String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
254                    int num = Integer.parseInt(numStr);
255                    DataFile dataFile = new DataFile(file, num);
256                    fileMap.put(dataFile.getDataFileId(), dataFile);
257                    totalLength.addAndGet(dataFile.getLength());
258                } catch (NumberFormatException e) {
259                    // Ignore file that do not match the pattern.
260                }
261            }
262
263            // Sort the list so that we can link the DataFiles together in the
264            // right order.
265            LinkedList<DataFile> l = new LinkedList<>(fileMap.values());
266            Collections.sort(l);
267            for (DataFile df : l) {
268                if (df.getLength() == 0) {
269                    // possibly the result of a previous failed write
270                    LOG.info("ignoring zero length, partially initialised journal data file: " + df);
271                    continue;
272                } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) {
273                    continue;
274                }
275                dataFiles.addLast(df);
276                fileByFileMap.put(df.getFile(), df);
277
278                if( isCheckForCorruptionOnStartup() ) {
279                    lastAppendLocation.set(recoveryCheck(df));
280                }
281            }
282        }
283
284        if (preallocationScope != PreallocationScope.NONE && preallocationStrategy == PreallocationStrategy.OS_KERNEL_COPY) {
285            // create a template file that will be used to pre-allocate the journal files
286            if (osKernelCopyTemplateFile == null) {
287                osKernelCopyTemplateFile = createJournalTemplateFile();
288            }
289        }
290
291        scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
292            @Override
293            public Thread newThread(Runnable r) {
294                Thread schedulerThread = new Thread(r);
295                schedulerThread.setName("ActiveMQ Journal Scheduled executor");
296                schedulerThread.setDaemon(true);
297                return schedulerThread;
298            }
299        });
300
301        // init current write file
302        if (dataFiles.isEmpty()) {
303            nextDataFileId = 1;
304            rotateWriteFile();
305        } else {
306            currentDataFile.set(dataFiles.getTail());
307            nextDataFileId = currentDataFile.get().dataFileId + 1;
308        }
309
310        if( lastAppendLocation.get()==null ) {
311            DataFile df = dataFiles.getTail();
312            lastAppendLocation.set(recoveryCheck(df));
313        }
314
315        // ensure we don't report unused space of last journal file in size metric
316        if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) {
317            totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
318        }
319
320        cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() {
321            @Override
322            public void run() {
323                cleanup();
324            }
325        }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
326
327        long end = System.currentTimeMillis();
328        LOG.trace("Startup took: "+(end-start)+" ms");
329    }
330
331    public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
332
333        if (PreallocationScope.NONE != preallocationScope) {
334
335            if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
336                doPreallocationKernelCopy(file);
337            } else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
338                doPreallocationZeros(file);
339            } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) {
340                doPreallocationChunkedZeros(file);
341            } else {
342                doPreallocationSparseFile(file);
343            }
344        }
345    }
346
347    private void doPreallocationSparseFile(RecoverableRandomAccessFile file) {
348        final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD);
349        try {
350            FileChannel channel = file.getChannel();
351            channel.position(0);
352            channel.write(journalEof);
353            channel.position(maxFileLength - 5);
354            journalEof.rewind();
355            channel.write(journalEof);
356            channel.force(false);
357            channel.position(0);
358        } catch (ClosedByInterruptException ignored) {
359            LOG.trace("Could not preallocate journal file with sparse file", ignored);
360        } catch (IOException e) {
361            LOG.error("Could not preallocate journal file with sparse file", e);
362        }
363    }
364
365    private void doPreallocationZeros(RecoverableRandomAccessFile file) {
366        ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
367        buffer.put(EOF_RECORD);
368        buffer.rewind();
369        try {
370            FileChannel channel = file.getChannel();
371            channel.write(buffer);
372            channel.force(false);
373            channel.position(0);
374        } catch (ClosedByInterruptException ignored) {
375            LOG.trace("Could not preallocate journal file with zeros", ignored);
376        } catch (IOException e) {
377            LOG.error("Could not preallocate journal file with zeros", e);
378        }
379    }
380
381    private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) {
382        try {
383            RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw");
384            templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel());
385            templateRaf.close();
386        } catch (ClosedByInterruptException ignored) {
387            LOG.trace("Could not preallocate journal file with kernel copy", ignored);
388        } catch (FileNotFoundException e) {
389            LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
390        } catch (IOException e) {
391            LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
392        }
393    }
394
395    private File createJournalTemplateFile() {
396        String fileName = "db-log.template";
397        File rc = new File(directory, fileName);
398        try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) {
399            templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD));
400            templateRaf.setLength(maxFileLength);
401            templateRaf.getChannel().force(true);
402        } catch (FileNotFoundException e) {
403            LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
404        } catch (IOException e) {
405            LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
406        }
407        return rc;
408    }
409
410    private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) {
411
412        ByteBuffer buffer = ByteBuffer.allocate(PREALLOC_CHUNK_SIZE);
413        buffer.put(EOF_RECORD);
414        buffer.rewind();
415
416        try {
417            FileChannel channel = file.getChannel();
418
419            int remLen = maxFileLength;
420            while (remLen > 0) {
421                if (remLen < buffer.remaining()) {
422                    buffer.limit(remLen);
423                }
424                int writeLen = channel.write(buffer);
425                remLen -= writeLen;
426                buffer.rewind();
427            }
428
429            channel.force(false);
430            channel.position(0);
431        } catch (ClosedByInterruptException ignored) {
432            LOG.trace("Could not preallocate journal file with zeros", ignored);
433        } catch (IOException e) {
434            LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
435        }
436    }
437
438    private static byte[] bytes(String string) {
439        try {
440            return string.getBytes("UTF-8");
441        } catch (UnsupportedEncodingException e) {
442            throw new RuntimeException(e);
443        }
444    }
445
446    public boolean isUnusedPreallocated(DataFile dataFile) throws IOException {
447        int firstBatchRecordSize = -1;
448        if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) {
449            Location location = new Location();
450            location.setDataFileId(dataFile.getDataFileId());
451            location.setOffset(0);
452
453            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
454            try {
455                firstBatchRecordSize = checkBatchRecord(reader, location.getOffset());
456            } catch (Exception ignored) {
457            } finally {
458                accessorPool.closeDataFileAccessor(reader);
459            }
460        }
461        return firstBatchRecordSize == 0;
462    }
463
464    protected Location recoveryCheck(DataFile dataFile) throws IOException {
465        Location location = new Location();
466        location.setDataFileId(dataFile.getDataFileId());
467        location.setOffset(0);
468
469        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
470        try {
471            while (true) {
472                int size = checkBatchRecord(reader, location.getOffset());
473                if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= dataFile.getLength()) {
474                    if (size == 0) {
475                        // eof batch record
476                        break;
477                    }
478                    location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
479                } else {
480
481                    // Perhaps it's just some corruption... scan through the
482                    // file to find the next valid batch record. We
483                    // may have subsequent valid batch records.
484                    int nextOffset = findNextBatchRecord(reader, location.getOffset() + 1);
485                    if (nextOffset >= 0) {
486                        Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
487                        LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
488                        dataFile.corruptedBlocks.add(sequence);
489                        location.setOffset(nextOffset);
490                    } else {
491                        break;
492                    }
493                }
494            }
495
496        } catch (IOException e) {
497        } finally {
498            accessorPool.closeDataFileAccessor(reader);
499        }
500
501        int existingLen = dataFile.getLength();
502        dataFile.setLength(location.getOffset());
503        if (existingLen > dataFile.getLength()) {
504            totalLength.addAndGet(dataFile.getLength() - existingLen);
505        }
506
507        if (!dataFile.corruptedBlocks.isEmpty()) {
508            // Is the end of the data file corrupted?
509            if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) {
510                dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
511            }
512        }
513
514        return location;
515    }
516
517    private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException {
518        ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
519        byte data[] = new byte[1024*4];
520        ByteSequence bs = new ByteSequence(data, 0, reader.read(offset, data));
521
522        int pos = 0;
523        while (true) {
524            pos = bs.indexOf(header, pos);
525            if (pos >= 0) {
526                return offset + pos;
527            } else {
528                // need to load the next data chunck in..
529                if (bs.length != data.length) {
530                    // If we had a short read then we were at EOF
531                    return -1;
532                }
533                offset += bs.length - BATCH_CONTROL_RECORD_HEADER.length;
534                bs = new ByteSequence(data, 0, reader.read(offset, data));
535                pos = 0;
536            }
537        }
538    }
539
540    public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
541        byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
542
543        try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) {
544
545            reader.readFully(offset, controlRecord);
546
547            // check for journal eof
548            if (Arrays.equals(EOF_RECORD, Arrays.copyOfRange(controlRecord, 0, EOF_RECORD.length))) {
549                // eof batch
550                return 0;
551            }
552
553            // Assert that it's a batch record.
554            for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
555                if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) {
556                    return -1;
557                }
558            }
559
560            int size = controlIs.readInt();
561            if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) {
562                return -1;
563            }
564
565            if (isChecksum()) {
566
567                long expectedChecksum = controlIs.readLong();
568                if (expectedChecksum == 0) {
569                    // Checksuming was not enabled when the record was stored.
570                    // we can't validate the record :(
571                    return size;
572                }
573
574                byte data[] = new byte[size];
575                reader.readFully(offset + BATCH_CONTROL_RECORD_SIZE, data);
576
577                Checksum checksum = new Adler32();
578                checksum.update(data, 0, data.length);
579
580                if (expectedChecksum != checksum.getValue()) {
581                    return -1;
582                }
583            }
584            return size;
585        }
586    }
587
588    void addToTotalLength(int size) {
589        totalLength.addAndGet(size);
590    }
591
592    public long length() {
593        return totalLength.get();
594    }
595
596    private void rotateWriteFile() throws IOException {
597       synchronized (dataFileIdLock) {
598            DataFile dataFile = nextDataFile;
599            if (dataFile == null) {
600                dataFile = newDataFile();
601            }
602            synchronized (currentDataFile) {
603                fileMap.put(dataFile.getDataFileId(), dataFile);
604                fileByFileMap.put(dataFile.getFile(), dataFile);
605                dataFiles.addLast(dataFile);
606                currentDataFile.set(dataFile);
607            }
608            nextDataFile = null;
609        }
610        if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) {
611            preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask);
612        }
613    }
614
615    private Runnable preAllocateNextDataFileTask = new Runnable() {
616        @Override
617        public void run() {
618            if (nextDataFile == null) {
619                synchronized (dataFileIdLock){
620                    try {
621                        nextDataFile = newDataFile();
622                    } catch (IOException e) {
623                        LOG.warn("Failed to proactively allocate data file", e);
624                    }
625                }
626            }
627        }
628    };
629
630    private volatile Future preAllocateNextDataFileFuture;
631
632    private DataFile newDataFile() throws IOException {
633        int nextNum = nextDataFileId++;
634        File file = getFile(nextNum);
635        DataFile nextWriteFile = new DataFile(file, nextNum);
636        preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile());
637        return nextWriteFile;
638    }
639
640
641    public DataFile reserveDataFile() {
642        synchronized (dataFileIdLock) {
643            int nextNum = nextDataFileId++;
644            File file = getFile(nextNum);
645            DataFile reservedDataFile = new DataFile(file, nextNum);
646            synchronized (currentDataFile) {
647                fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
648                fileByFileMap.put(file, reservedDataFile);
649                if (dataFiles.isEmpty()) {
650                    dataFiles.addLast(reservedDataFile);
651                } else {
652                    dataFiles.getTail().linkBefore(reservedDataFile);
653                }
654            }
655            return reservedDataFile;
656        }
657    }
658
659    public File getFile(int nextNum) {
660        String fileName = filePrefix + nextNum + fileSuffix;
661        File file = new File(directory, fileName);
662        return file;
663    }
664
665    DataFile getDataFile(Location item) throws IOException {
666        Integer key = Integer.valueOf(item.getDataFileId());
667        DataFile dataFile = null;
668        synchronized (currentDataFile) {
669            dataFile = fileMap.get(key);
670        }
671        if (dataFile == null) {
672            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
673            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
674        }
675        return dataFile;
676    }
677
678    public void close() throws IOException {
679        synchronized (this) {
680            if (!started) {
681                return;
682            }
683            cleanupTask.cancel(true);
684            if (preAllocateNextDataFileFuture != null) {
685                preAllocateNextDataFileFuture.cancel(true);
686            }
687            ThreadPoolUtils.shutdownGraceful(scheduler, 4000);
688            accessorPool.close();
689        }
690        // the appender can be calling back to to the journal blocking a close AMQ-5620
691        appender.close();
692        synchronized (currentDataFile) {
693            fileMap.clear();
694            fileByFileMap.clear();
695            dataFiles.clear();
696            lastAppendLocation.set(null);
697            started = false;
698        }
699    }
700
701    public synchronized void cleanup() {
702        if (accessorPool != null) {
703            accessorPool.disposeUnused();
704        }
705    }
706
707    public synchronized boolean delete() throws IOException {
708
709        // Close all open file handles...
710        appender.close();
711        accessorPool.close();
712
713        boolean result = true;
714        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
715            DataFile dataFile = i.next();
716            result &= dataFile.delete();
717        }
718
719        if (preAllocateNextDataFileFuture != null) {
720            preAllocateNextDataFileFuture.cancel(true);
721        }
722        synchronized (dataFileIdLock) {
723            if (nextDataFile != null) {
724                nextDataFile.delete();
725                nextDataFile = null;
726            }
727        }
728
729        totalLength.set(0);
730        synchronized (currentDataFile) {
731            fileMap.clear();
732            fileByFileMap.clear();
733            lastAppendLocation.set(null);
734            dataFiles = new LinkedNodeList<DataFile>();
735        }
736        // reopen open file handles...
737        accessorPool = new DataFileAccessorPool(this);
738        appender = new DataFileAppender(this);
739        return result;
740    }
741
742    public void removeDataFiles(Set<Integer> files) throws IOException {
743        for (Integer key : files) {
744            // Can't remove the data file (or subsequent files) that is currently being written to.
745            if (key >= lastAppendLocation.get().getDataFileId()) {
746                continue;
747            }
748            DataFile dataFile = null;
749            synchronized (currentDataFile) {
750                dataFile = fileMap.remove(key);
751                if (dataFile != null) {
752                    fileByFileMap.remove(dataFile.getFile());
753                    dataFile.unlink();
754                }
755            }
756            if (dataFile != null) {
757                forceRemoveDataFile(dataFile);
758            }
759        }
760    }
761
762    private void forceRemoveDataFile(DataFile dataFile) throws IOException {
763        accessorPool.disposeDataFileAccessors(dataFile);
764        totalLength.addAndGet(-dataFile.getLength());
765        if (archiveDataLogs) {
766            File directoryArchive = getDirectoryArchive();
767            if (directoryArchive.exists()) {
768                LOG.debug("Archive directory exists: {}", directoryArchive);
769            } else {
770                if (directoryArchive.isAbsolute())
771                if (LOG.isDebugEnabled()) {
772                    LOG.debug("Archive directory [{}] does not exist - creating it now",
773                            directoryArchive.getAbsolutePath());
774                }
775                IOHelper.mkdirs(directoryArchive);
776            }
777            LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath());
778            dataFile.move(directoryArchive);
779            LOG.debug("Successfully moved data file");
780        } else {
781            LOG.debug("Deleting data file: {}", dataFile);
782            if (dataFile.delete()) {
783                LOG.debug("Discarded data file: {}", dataFile);
784            } else {
785                LOG.warn("Failed to discard data file : {}", dataFile.getFile());
786            }
787        }
788        if (dataFileRemovedListener != null) {
789            dataFileRemovedListener.fileRemoved(dataFile);
790        }
791    }
792
793    /**
794     * @return the maxFileLength
795     */
796    public int getMaxFileLength() {
797        return maxFileLength;
798    }
799
800    /**
801     * @param maxFileLength the maxFileLength to set
802     */
803    public void setMaxFileLength(int maxFileLength) {
804        this.maxFileLength = maxFileLength;
805    }
806
807    @Override
808    public String toString() {
809        return directory.toString();
810    }
811
812    public Location getNextLocation(Location location) throws IOException, IllegalStateException {
813        Location cur = null;
814        while (true) {
815            if (cur == null) {
816                if (location == null) {
817                    DataFile head = null;
818                    synchronized (currentDataFile) {
819                        head = dataFiles.getHead();
820                    }
821                    if (head == null) {
822                        return null;
823                    }
824                    cur = new Location();
825                    cur.setDataFileId(head.getDataFileId());
826                    cur.setOffset(0);
827                } else {
828                    // Set to the next offset..
829                    if (location.getSize() == -1) {
830                        cur = new Location(location);
831                    } else {
832                        cur = new Location(location);
833                        cur.setOffset(location.getOffset() + location.getSize());
834                    }
835                }
836            } else {
837                cur.setOffset(cur.getOffset() + cur.getSize());
838            }
839
840            DataFile dataFile = getDataFile(cur);
841
842            // Did it go into the next file??
843            if (dataFile.getLength() <= cur.getOffset()) {
844                synchronized (currentDataFile) {
845                    dataFile = dataFile.getNext();
846                }
847                if (dataFile == null) {
848                    return null;
849                } else {
850                    cur.setDataFileId(dataFile.getDataFileId().intValue());
851                    cur.setOffset(0);
852                }
853            }
854
855            // Load in location size and type.
856            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
857            try {
858                reader.readLocationDetails(cur);
859            } finally {
860                accessorPool.closeDataFileAccessor(reader);
861            }
862
863            Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset());
864            if (corruptedRange != null) {
865                // skip corruption
866                cur.setSize((int) corruptedRange.range());
867            } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT ||
868                    (cur.getType() == 0 && cur.getSize() == 0)) {
869                // eof - jump to next datafile
870                // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for
871                // replay of existing journals
872                // possibly journal is larger than maxFileLength after config change
873                cur.setSize(EOF_RECORD.length);
874                cur.setOffset(Math.max(maxFileLength, dataFile.getLength()));
875            } else if (cur.getType() == USER_RECORD_TYPE) {
876                // Only return user records.
877                return cur;
878            }
879        }
880    }
881
882    public ByteSequence read(Location location) throws IOException, IllegalStateException {
883        DataFile dataFile = getDataFile(location);
884        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
885        ByteSequence rc = null;
886        try {
887            rc = reader.readRecord(location);
888        } finally {
889            accessorPool.closeDataFileAccessor(reader);
890        }
891        return rc;
892    }
893
894    public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
895        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
896        return loc;
897    }
898
899    public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
900        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
901        return loc;
902    }
903
904    public void update(Location location, ByteSequence data, boolean sync) throws IOException {
905        DataFile dataFile = getDataFile(location);
906        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
907        try {
908            updater.updateRecord(location, data, sync);
909        } finally {
910            accessorPool.closeDataFileAccessor(updater);
911        }
912    }
913
914    public PreallocationStrategy getPreallocationStrategy() {
915        return preallocationStrategy;
916    }
917
918    public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) {
919        this.preallocationStrategy = preallocationStrategy;
920    }
921
922    public PreallocationScope getPreallocationScope() {
923        return preallocationScope;
924    }
925
926    public void setPreallocationScope(PreallocationScope preallocationScope) {
927        this.preallocationScope = preallocationScope;
928    }
929
930    public File getDirectory() {
931        return directory;
932    }
933
934    public void setDirectory(File directory) {
935        this.directory = directory;
936    }
937
938    public String getFilePrefix() {
939        return filePrefix;
940    }
941
942    public void setFilePrefix(String filePrefix) {
943        this.filePrefix = filePrefix;
944    }
945
946    public Map<WriteKey, WriteCommand> getInflightWrites() {
947        return inflightWrites;
948    }
949
950    public Location getLastAppendLocation() {
951        return lastAppendLocation.get();
952    }
953
954    public void setLastAppendLocation(Location lastSyncedLocation) {
955        this.lastAppendLocation.set(lastSyncedLocation);
956    }
957
958    public File getDirectoryArchive() {
959        if (!directoryArchiveOverridden && (directoryArchive == null)) {
960            // create the directoryArchive relative to the journal location
961            directoryArchive = new File(directory.getAbsolutePath() +
962                    File.separator + DEFAULT_ARCHIVE_DIRECTORY);
963        }
964        return directoryArchive;
965    }
966
967    public void setDirectoryArchive(File directoryArchive) {
968        directoryArchiveOverridden = true;
969        this.directoryArchive = directoryArchive;
970    }
971
972    public boolean isArchiveDataLogs() {
973        return archiveDataLogs;
974    }
975
976    public void setArchiveDataLogs(boolean archiveDataLogs) {
977        this.archiveDataLogs = archiveDataLogs;
978    }
979
980    public DataFile getDataFileById(int dataFileId) {
981        synchronized (currentDataFile) {
982            return fileMap.get(Integer.valueOf(dataFileId));
983        }
984    }
985
986    public DataFile getCurrentDataFile(int capacity) throws IOException {
987        //First just acquire the currentDataFile lock and return if no rotation needed
988        synchronized (currentDataFile) {
989            if (currentDataFile.get().getLength() + capacity < maxFileLength) {
990                return currentDataFile.get();
991            }
992        }
993
994        //AMQ-6545 - if rotation needed, acquire dataFileIdLock first to prevent deadlocks
995        //then re-check if rotation is needed
996        synchronized (dataFileIdLock) {
997            synchronized (currentDataFile) {
998                if (currentDataFile.get().getLength() + capacity >= maxFileLength) {
999                    rotateWriteFile();
1000                }
1001                return currentDataFile.get();
1002            }
1003        }
1004    }
1005
1006    public Integer getCurrentDataFileId() {
1007        synchronized (currentDataFile) {
1008            return currentDataFile.get().getDataFileId();
1009        }
1010    }
1011
1012    /**
1013     * Get a set of files - only valid after start()
1014     *
1015     * @return files currently being used
1016     */
1017    public Set<File> getFiles() {
1018        synchronized (currentDataFile) {
1019            return fileByFileMap.keySet();
1020        }
1021    }
1022
1023    public Map<Integer, DataFile> getFileMap() {
1024        synchronized (currentDataFile) {
1025            return new TreeMap<Integer, DataFile>(fileMap);
1026        }
1027    }
1028
1029    public long getDiskSize() {
1030        return totalLength.get();
1031    }
1032
1033    public void setReplicationTarget(ReplicationTarget replicationTarget) {
1034        this.replicationTarget = replicationTarget;
1035    }
1036
1037    public ReplicationTarget getReplicationTarget() {
1038        return replicationTarget;
1039    }
1040
1041    public String getFileSuffix() {
1042        return fileSuffix;
1043    }
1044
1045    public void setFileSuffix(String fileSuffix) {
1046        this.fileSuffix = fileSuffix;
1047    }
1048
1049    public boolean isChecksum() {
1050        return checksum;
1051    }
1052
1053    public void setChecksum(boolean checksumWrites) {
1054        this.checksum = checksumWrites;
1055    }
1056
1057    public boolean isCheckForCorruptionOnStartup() {
1058        return checkForCorruptionOnStartup;
1059    }
1060
1061    public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
1062        this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
1063    }
1064
1065    public void setWriteBatchSize(int writeBatchSize) {
1066        this.writeBatchSize = writeBatchSize;
1067    }
1068
1069    public int getWriteBatchSize() {
1070        return writeBatchSize;
1071    }
1072
1073    public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
1074       this.totalLength = storeSizeAccumulator;
1075    }
1076
1077    public void setEnableAsyncDiskSync(boolean val) {
1078        this.enableAsyncDiskSync = val;
1079    }
1080
1081    public boolean isEnableAsyncDiskSync() {
1082        return enableAsyncDiskSync;
1083    }
1084
1085    public JournalDiskSyncStrategy getJournalDiskSyncStrategy() {
1086        return journalDiskSyncStrategy;
1087    }
1088
1089    public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) {
1090        this.journalDiskSyncStrategy = journalDiskSyncStrategy;
1091    }
1092
1093    public boolean isJournalDiskSyncPeriodic() {
1094        return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy);
1095    }
1096
1097    public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) {
1098        this.dataFileRemovedListener = dataFileRemovedListener;
1099    }
1100
1101    public static class WriteCommand extends LinkedNode<WriteCommand> {
1102        public final Location location;
1103        public final ByteSequence data;
1104        final boolean sync;
1105        public final Runnable onComplete;
1106
1107        public WriteCommand(Location location, ByteSequence data, boolean sync) {
1108            this.location = location;
1109            this.data = data;
1110            this.sync = sync;
1111            this.onComplete = null;
1112        }
1113
1114        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
1115            this.location = location;
1116            this.data = data;
1117            this.onComplete = onComplete;
1118            this.sync = false;
1119        }
1120    }
1121
1122    public static class WriteKey {
1123        private final int file;
1124        private final long offset;
1125        private final int hash;
1126
1127        public WriteKey(Location item) {
1128            file = item.getDataFileId();
1129            offset = item.getOffset();
1130            // TODO: see if we can build a better hash
1131            hash = (int)(file ^ offset);
1132        }
1133
1134        @Override
1135        public int hashCode() {
1136            return hash;
1137        }
1138
1139        @Override
1140        public boolean equals(Object obj) {
1141            if (obj instanceof WriteKey) {
1142                WriteKey di = (WriteKey)obj;
1143                return di.file == file && di.offset == offset;
1144            }
1145            return false;
1146        }
1147    }
1148}