001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.disk.journal;
018
019import java.io.EOFException;
020import java.io.File;
021import java.io.FileNotFoundException;
022import java.io.FilenameFilter;
023import java.io.IOException;
024import java.io.RandomAccessFile;
025import java.io.UnsupportedEncodingException;
026import java.nio.ByteBuffer;
027import java.nio.channels.ClosedByInterruptException;
028import java.nio.channels.FileChannel;
029import java.util.Collections;
030import java.util.HashMap;
031import java.util.Iterator;
032import java.util.LinkedHashMap;
033import java.util.LinkedList;
034import java.util.Map;
035import java.util.Set;
036import java.util.TreeMap;
037import java.util.concurrent.ConcurrentHashMap;
038import java.util.concurrent.Executors;
039import java.util.concurrent.Future;
040import java.util.concurrent.ScheduledExecutorService;
041import java.util.concurrent.ScheduledFuture;
042import java.util.concurrent.ThreadFactory;
043import java.util.concurrent.TimeUnit;
044import java.util.concurrent.atomic.AtomicLong;
045import java.util.concurrent.atomic.AtomicReference;
046import java.util.zip.Adler32;
047import java.util.zip.Checksum;
048
049import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
050import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
051import org.apache.activemq.store.kahadb.disk.util.Sequence;
052import org.apache.activemq.util.ByteSequence;
053import org.apache.activemq.util.DataByteArrayInputStream;
054import org.apache.activemq.util.DataByteArrayOutputStream;
055import org.apache.activemq.util.IOHelper;
056import org.apache.activemq.util.RecoverableRandomAccessFile;
057import org.apache.activemq.util.ThreadPoolUtils;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Manages DataFiles
063 */
064public class Journal {
065    public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
066    public static final boolean callerBufferAppender = Boolean.parseBoolean(System.getProperty(CALLER_BUFFER_APPENDER, "false"));
067
068    private static final int PREALLOC_CHUNK_SIZE = 1024*1024;
069
070    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
071    public static final int RECORD_HEAD_SPACE = 4 + 1;
072
073    public static final byte USER_RECORD_TYPE = 1;
074    public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
075    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.
076    public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
077    public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE + BATCH_CONTROL_RECORD_MAGIC.length + 4 + 8;
078    public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
079    public static final byte[] EMPTY_BATCH_CONTROL_RECORD = createEmptyBatchControlRecordHeader();
080    public static final int EOF_INT = ByteBuffer.wrap(new byte[]{'-', 'q', 'M', 'a'}).getInt();
081    public static final byte EOF_EOT = '4';
082    public static final byte[] EOF_RECORD = createEofBatchAndLocationRecord();
083
084    private ScheduledExecutorService scheduler;
085
086    // tackle corruption when checksum is disabled or corrupt with zeros, minimize data loss
087    public void corruptRecoveryLocation(Location recoveryPosition) throws IOException {
088        DataFile dataFile = getDataFile(recoveryPosition);
089        // with corruption on recovery we have no faith in the content - slip to the next batch record or eof
090        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
091        try {
092            RandomAccessFile randomAccessFile = reader.getRaf().getRaf();
093            randomAccessFile.seek(recoveryPosition.getOffset() + 1);
094            byte[] data = new byte[getWriteBatchSize()];
095            ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data));
096            int nextOffset = 0;
097            if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
098                nextOffset = (int) randomAccessFile.getFilePointer() - bs.remaining();
099            } else {
100                nextOffset =(int) randomAccessFile.length();
101            }
102            Sequence sequence = new Sequence(recoveryPosition.getOffset(), nextOffset - 1);
103            LOG.warn("Corrupt journal records found in '{}' between offsets: {}", dataFile.getFile(), sequence);
104
105            // skip corruption on getNextLocation
106            recoveryPosition.setOffset(nextOffset);
107            recoveryPosition.setSize(-1);
108
109            dataFile.corruptedBlocks.add(sequence);
110        } catch (IOException e) {
111        } finally {
112            accessorPool.closeDataFileAccessor(reader);
113        }
114    }
115
116    public DataFileAccessorPool getAccessorPool() {
117        return accessorPool;
118    }
119
120    public void allowIOResumption() {
121        if (appender instanceof DataFileAppender) {
122            DataFileAppender dataFileAppender = (DataFileAppender)appender;
123            dataFileAppender.shutdown = false;
124        }
125    }
126
127    public enum PreallocationStrategy {
128        SPARSE_FILE,
129        OS_KERNEL_COPY,
130        ZEROS,
131        CHUNKED_ZEROS;
132    }
133
134    public enum PreallocationScope {
135        ENTIRE_JOURNAL,
136        ENTIRE_JOURNAL_ASYNC,
137        NONE;
138    }
139
140    public enum JournalDiskSyncStrategy {
141        ALWAYS,
142        PERIODIC,
143        NEVER;
144    }
145
146    private static byte[] createBatchControlRecordHeader() {
147        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
148            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
149            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
150            os.write(BATCH_CONTROL_RECORD_MAGIC);
151            ByteSequence sequence = os.toByteSequence();
152            sequence.compact();
153            return sequence.getData();
154        } catch (IOException e) {
155            throw new RuntimeException("Could not create batch control record header.", e);
156        }
157    }
158
159    private static byte[] createEmptyBatchControlRecordHeader() {
160        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
161            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
162            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
163            os.write(BATCH_CONTROL_RECORD_MAGIC);
164            os.writeInt(0);
165            os.writeLong(0l);
166            ByteSequence sequence = os.toByteSequence();
167            sequence.compact();
168            return sequence.getData();
169        } catch (IOException e) {
170            throw new RuntimeException("Could not create empty batch control record header.", e);
171        }
172    }
173
174    private static byte[] createEofBatchAndLocationRecord() {
175        try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
176            os.writeInt(EOF_INT);
177            os.writeByte(EOF_EOT);
178            ByteSequence sequence = os.toByteSequence();
179            sequence.compact();
180            return sequence.getData();
181        } catch (IOException e) {
182            throw new RuntimeException("Could not create eof header.", e);
183        }
184    }
185
186    public static final String DEFAULT_DIRECTORY = ".";
187    public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
188    public static final String DEFAULT_FILE_PREFIX = "db-";
189    public static final String DEFAULT_FILE_SUFFIX = ".log";
190    public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
191    public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
192    public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
193
194    private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
195
196    protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
197
198    protected File directory = new File(DEFAULT_DIRECTORY);
199    protected File directoryArchive;
200    private boolean directoryArchiveOverridden = false;
201
202    protected String filePrefix = DEFAULT_FILE_PREFIX;
203    protected String fileSuffix = DEFAULT_FILE_SUFFIX;
204    protected boolean started;
205
206    protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
207    protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
208
209    protected FileAppender appender;
210    protected DataFileAccessorPool accessorPool;
211
212    protected Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
213    protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
214    protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
215
216    protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
217    protected ScheduledFuture cleanupTask;
218    protected AtomicLong totalLength = new AtomicLong();
219    protected boolean archiveDataLogs;
220    private ReplicationTarget replicationTarget;
221    protected boolean checksum;
222    protected boolean checkForCorruptionOnStartup;
223    protected boolean enableAsyncDiskSync = true;
224    private int nextDataFileId = 1;
225    private Object dataFileIdLock = new Object();
226    private final AtomicReference<DataFile> currentDataFile = new AtomicReference<>(null);
227    private volatile DataFile nextDataFile;
228
229    protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
230    protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
231    private File osKernelCopyTemplateFile = null;
232    private ByteBuffer preAllocateDirectBuffer = null;
233
234    protected JournalDiskSyncStrategy journalDiskSyncStrategy = JournalDiskSyncStrategy.ALWAYS;
235
236    public interface DataFileRemovedListener {
237        void fileRemoved(DataFile datafile);
238    }
239
240    private DataFileRemovedListener dataFileRemovedListener;
241
242    public synchronized void start() throws IOException {
243        if (started) {
244            return;
245        }
246
247        long start = System.currentTimeMillis();
248        accessorPool = new DataFileAccessorPool(this);
249        started = true;
250
251        appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
252
253        File[] files = directory.listFiles(new FilenameFilter() {
254            @Override
255            public boolean accept(File dir, String n) {
256                return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
257            }
258        });
259
260        if (files != null) {
261            for (File file : files) {
262                try {
263                    String n = file.getName();
264                    String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
265                    int num = Integer.parseInt(numStr);
266                    DataFile dataFile = new DataFile(file, num);
267                    fileMap.put(dataFile.getDataFileId(), dataFile);
268                    totalLength.addAndGet(dataFile.getLength());
269                } catch (NumberFormatException e) {
270                    // Ignore file that do not match the pattern.
271                }
272            }
273
274            // Sort the list so that we can link the DataFiles together in the
275            // right order.
276            LinkedList<DataFile> l = new LinkedList<>(fileMap.values());
277            Collections.sort(l);
278            for (DataFile df : l) {
279                if (df.getLength() == 0) {
280                    // possibly the result of a previous failed write
281                    LOG.info("ignoring zero length, partially initialised journal data file: " + df);
282                    continue;
283                } else if (l.getLast().equals(df) && isUnusedPreallocated(df)) {
284                    continue;
285                }
286                dataFiles.addLast(df);
287                fileByFileMap.put(df.getFile(), df);
288
289                if( isCheckForCorruptionOnStartup() ) {
290                    lastAppendLocation.set(recoveryCheck(df));
291                }
292            }
293        }
294
295        if (preallocationScope != PreallocationScope.NONE) {
296            switch (preallocationStrategy) {
297                case SPARSE_FILE:
298                    break;
299                case OS_KERNEL_COPY: {
300                    osKernelCopyTemplateFile = createJournalTemplateFile();
301                }
302                break;
303                case CHUNKED_ZEROS: {
304                    preAllocateDirectBuffer = allocateDirectBuffer(PREALLOC_CHUNK_SIZE);
305                }
306                break;
307                case ZEROS: {
308                    preAllocateDirectBuffer = allocateDirectBuffer(getMaxFileLength());
309                }
310                break;
311            }
312        }
313        scheduler = Executors.newScheduledThreadPool(1, new ThreadFactory() {
314            @Override
315            public Thread newThread(Runnable r) {
316                Thread schedulerThread = new Thread(r);
317                schedulerThread.setName("ActiveMQ Journal Scheduled executor");
318                schedulerThread.setDaemon(true);
319                return schedulerThread;
320            }
321        });
322
323        // init current write file
324        if (dataFiles.isEmpty()) {
325            nextDataFileId = 1;
326            rotateWriteFile();
327        } else {
328            currentDataFile.set(dataFiles.getTail());
329            nextDataFileId = currentDataFile.get().dataFileId + 1;
330        }
331
332        if( lastAppendLocation.get()==null ) {
333            DataFile df = dataFiles.getTail();
334            lastAppendLocation.set(recoveryCheck(df));
335        }
336
337        // ensure we don't report unused space of last journal file in size metric
338        if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) {
339            totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
340        }
341
342        cleanupTask = scheduler.scheduleAtFixedRate(new Runnable() {
343            @Override
344            public void run() {
345                cleanup();
346            }
347        }, DEFAULT_CLEANUP_INTERVAL, DEFAULT_CLEANUP_INTERVAL, TimeUnit.MILLISECONDS);
348
349        long end = System.currentTimeMillis();
350        LOG.trace("Startup took: "+(end-start)+" ms");
351    }
352
353    private ByteBuffer allocateDirectBuffer(int size) {
354        ByteBuffer buffer = ByteBuffer.allocateDirect(size);
355        buffer.put(EOF_RECORD);
356        return buffer;
357    }
358
359    public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
360
361        if (PreallocationScope.NONE != preallocationScope) {
362
363            try {
364                if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
365                    doPreallocationKernelCopy(file);
366                } else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
367                    doPreallocationZeros(file);
368                } else if (PreallocationStrategy.CHUNKED_ZEROS == preallocationStrategy) {
369                    doPreallocationChunkedZeros(file);
370                } else {
371                    doPreallocationSparseFile(file);
372                }
373            } catch (Throwable continueWithNoPrealloc) {
374                // error on preallocation is non fatal, and we don't want to leak the journal handle
375                LOG.error("cound not preallocate journal data file", continueWithNoPrealloc);
376            }
377        }
378    }
379
380    private void doPreallocationSparseFile(RecoverableRandomAccessFile file) {
381        final ByteBuffer journalEof = ByteBuffer.wrap(EOF_RECORD);
382        try {
383            FileChannel channel = file.getChannel();
384            channel.position(0);
385            channel.write(journalEof);
386            channel.position(maxFileLength - 5);
387            journalEof.rewind();
388            channel.write(journalEof);
389            channel.force(false);
390            channel.position(0);
391        } catch (ClosedByInterruptException ignored) {
392            LOG.trace("Could not preallocate journal file with sparse file", ignored);
393        } catch (IOException e) {
394            LOG.error("Could not preallocate journal file with sparse file", e);
395        }
396    }
397
398    private void doPreallocationZeros(RecoverableRandomAccessFile file) {
399        preAllocateDirectBuffer.rewind();
400        try {
401            FileChannel channel = file.getChannel();
402            channel.write(preAllocateDirectBuffer);
403            channel.force(false);
404            channel.position(0);
405        } catch (ClosedByInterruptException ignored) {
406            LOG.trace("Could not preallocate journal file with zeros", ignored);
407        } catch (IOException e) {
408            LOG.error("Could not preallocate journal file with zeros", e);
409        }
410    }
411
412    private void doPreallocationKernelCopy(RecoverableRandomAccessFile file) {
413        try {
414            RandomAccessFile templateRaf = new RandomAccessFile(osKernelCopyTemplateFile, "rw");
415            templateRaf.getChannel().transferTo(0, getMaxFileLength(), file.getChannel());
416            templateRaf.close();
417        } catch (ClosedByInterruptException ignored) {
418            LOG.trace("Could not preallocate journal file with kernel copy", ignored);
419        } catch (FileNotFoundException e) {
420            LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
421        } catch (IOException e) {
422            LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
423        }
424    }
425
426    private File createJournalTemplateFile() {
427        String fileName = "db-log.template";
428        File rc = new File(directory, fileName);
429        try (RandomAccessFile templateRaf = new RandomAccessFile(rc, "rw");) {
430            templateRaf.getChannel().write(ByteBuffer.wrap(EOF_RECORD));
431            templateRaf.setLength(maxFileLength);
432            templateRaf.getChannel().force(true);
433        } catch (FileNotFoundException e) {
434            LOG.error("Could not find the template file on disk at " + osKernelCopyTemplateFile.getAbsolutePath(), e);
435        } catch (IOException e) {
436            LOG.error("Could not transfer the template file to journal, transferFile=" + osKernelCopyTemplateFile.getAbsolutePath(), e);
437        }
438        return rc;
439    }
440
441    private void doPreallocationChunkedZeros(RecoverableRandomAccessFile file) {
442        preAllocateDirectBuffer.limit(preAllocateDirectBuffer.capacity());
443        preAllocateDirectBuffer.rewind();
444        try {
445            FileChannel channel = file.getChannel();
446
447            int remLen = maxFileLength;
448            while (remLen > 0) {
449                if (remLen < preAllocateDirectBuffer.remaining()) {
450                    preAllocateDirectBuffer.limit(remLen);
451                }
452                int writeLen = channel.write(preAllocateDirectBuffer);
453                remLen -= writeLen;
454                preAllocateDirectBuffer.rewind();
455            }
456
457            channel.force(false);
458            channel.position(0);
459        } catch (ClosedByInterruptException ignored) {
460            LOG.trace("Could not preallocate journal file with zeros", ignored);
461        } catch (IOException e) {
462            LOG.error("Could not preallocate journal file with zeros! Will continue without preallocation", e);
463        }
464    }
465
466    private static byte[] bytes(String string) {
467        try {
468            return string.getBytes("UTF-8");
469        } catch (UnsupportedEncodingException e) {
470            throw new RuntimeException(e);
471        }
472    }
473
474    public boolean isUnusedPreallocated(DataFile dataFile) throws IOException {
475        if (preallocationScope == PreallocationScope.ENTIRE_JOURNAL_ASYNC) {
476            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
477            try {
478                byte[] firstFewBytes = new byte[BATCH_CONTROL_RECORD_HEADER.length];
479                reader.readFully(0, firstFewBytes);
480                ByteSequence bs = new ByteSequence(firstFewBytes);
481                return bs.startsWith(EOF_RECORD);
482            } catch (Exception ignored) {
483            } finally {
484                accessorPool.closeDataFileAccessor(reader);
485            }
486        }
487        return false;
488    }
489
490    protected Location recoveryCheck(DataFile dataFile) throws IOException {
491        Location location = new Location();
492        location.setDataFileId(dataFile.getDataFileId());
493        location.setOffset(0);
494
495        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
496        try {
497            RandomAccessFile randomAccessFile = reader.getRaf().getRaf();
498            randomAccessFile.seek(0);
499            final long totalFileLength = randomAccessFile.length();
500            byte[] data = new byte[getWriteBatchSize()];
501            ByteSequence bs = new ByteSequence(data, 0, randomAccessFile.read(data));
502
503            while (true) {
504                int size = checkBatchRecord(bs, randomAccessFile);
505                if (size >= 0 && location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size <= totalFileLength) {
506                    if (size == 0) {
507                        // eof batch record
508                        break;
509                    }
510                    location.setOffset(location.getOffset() + BATCH_CONTROL_RECORD_SIZE + size);
511                } else {
512
513                    // Perhaps it's just some corruption... scan through the
514                    // file to find the next valid batch record. We
515                    // may have subsequent valid batch records.
516                    if (findNextBatchRecord(bs, randomAccessFile) >= 0) {
517                        int nextOffset = (int) randomAccessFile.getFilePointer() - bs.remaining();
518                        Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1);
519                        LOG.warn("Corrupt journal records found in '" + dataFile.getFile() + "' between offsets: " + sequence);
520                        dataFile.corruptedBlocks.add(sequence);
521                        location.setOffset(nextOffset);
522                    } else {
523                        break;
524                    }
525                }
526            }
527
528        } catch (IOException e) {
529        } finally {
530            accessorPool.closeDataFileAccessor(reader);
531        }
532
533        int existingLen = dataFile.getLength();
534        dataFile.setLength(location.getOffset());
535        if (existingLen > dataFile.getLength()) {
536            totalLength.addAndGet(dataFile.getLength() - existingLen);
537        }
538
539        if (!dataFile.corruptedBlocks.isEmpty()) {
540            // Is the end of the data file corrupted?
541            if (dataFile.corruptedBlocks.getTail().getLast() + 1 == location.getOffset()) {
542                dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
543            }
544        }
545
546        return location;
547    }
548
549    private int findNextBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
550        final ByteSequence header = new ByteSequence(BATCH_CONTROL_RECORD_HEADER);
551        int pos = 0;
552        while (true) {
553            pos = bs.indexOf(header, 0);
554            if (pos >= 0) {
555                bs.setOffset(bs.offset + pos);
556                return pos;
557            } else {
558                // need to load the next data chunck in..
559                if (bs.length != bs.data.length) {
560                    // If we had a short read then we were at EOF
561                    return -1;
562                }
563                bs.setOffset(bs.length - BATCH_CONTROL_RECORD_HEADER.length);
564                bs.reset();
565                bs.setLength(bs.length + reader.read(bs.data, bs.length, bs.data.length - BATCH_CONTROL_RECORD_HEADER.length));
566            }
567        }
568    }
569
570    private int checkBatchRecord(ByteSequence bs, RandomAccessFile reader) throws IOException {
571        ensureAvailable(bs, reader, EOF_RECORD.length);
572        if (bs.startsWith(EOF_RECORD)) {
573            return 0; // eof
574        }
575        ensureAvailable(bs, reader, BATCH_CONTROL_RECORD_SIZE);
576        try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(bs)) {
577
578            // Assert that it's a batch record.
579            for (int i = 0; i < BATCH_CONTROL_RECORD_HEADER.length; i++) {
580                if (controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i]) {
581                    return -1;
582                }
583            }
584
585            int size = controlIs.readInt();
586            if (size < 0 || size > Integer.MAX_VALUE - (BATCH_CONTROL_RECORD_SIZE + EOF_RECORD.length)) {
587                return -2;
588            }
589
590            long expectedChecksum = controlIs.readLong();
591            Checksum checksum = null;
592            if (isChecksum() && expectedChecksum > 0) {
593                checksum = new Adler32();
594            }
595
596            // revert to bs to consume data
597            bs.setOffset(controlIs.position());
598            int toRead = size;
599            while (toRead > 0) {
600                if (bs.remaining() >= toRead) {
601                    if (checksum != null) {
602                        checksum.update(bs.getData(), bs.getOffset(), toRead);
603                    }
604                    bs.setOffset(bs.offset + toRead);
605                    toRead = 0;
606                } else {
607                    if (bs.length != bs.data.length) {
608                        // buffer exhausted
609                        return  -3;
610                    }
611
612                    toRead -= bs.remaining();
613                    if (checksum != null) {
614                        checksum.update(bs.getData(), bs.getOffset(), bs.remaining());
615                    }
616                    bs.setLength(reader.read(bs.data));
617                    bs.setOffset(0);
618                }
619            }
620            if (checksum != null && expectedChecksum != checksum.getValue()) {
621                return -4;
622            }
623
624            return size;
625        }
626    }
627
628    private void ensureAvailable(ByteSequence bs, RandomAccessFile reader, int required) throws IOException {
629        if (bs.remaining() < required) {
630            bs.reset();
631            int read = reader.read(bs.data, bs.length, bs.data.length - bs.length);
632            if (read < 0) {
633                if (bs.remaining() == 0) {
634                    throw new EOFException("request for " + required + " bytes reached EOF");
635                }
636            }
637            bs.setLength(bs.length + read);
638        }
639    }
640
641    void addToTotalLength(int size) {
642        totalLength.addAndGet(size);
643    }
644
645    public long length() {
646        return totalLength.get();
647    }
648
649    private void rotateWriteFile() throws IOException {
650       synchronized (dataFileIdLock) {
651            DataFile dataFile = nextDataFile;
652            if (dataFile == null) {
653                dataFile = newDataFile();
654            }
655            synchronized (currentDataFile) {
656                fileMap.put(dataFile.getDataFileId(), dataFile);
657                fileByFileMap.put(dataFile.getFile(), dataFile);
658                dataFiles.addLast(dataFile);
659                currentDataFile.set(dataFile);
660            }
661            nextDataFile = null;
662        }
663        if (PreallocationScope.ENTIRE_JOURNAL_ASYNC == preallocationScope) {
664            preAllocateNextDataFileFuture = scheduler.submit(preAllocateNextDataFileTask);
665        }
666    }
667
668    private Runnable preAllocateNextDataFileTask = new Runnable() {
669        @Override
670        public void run() {
671            if (nextDataFile == null) {
672                synchronized (dataFileIdLock){
673                    try {
674                        nextDataFile = newDataFile();
675                    } catch (IOException e) {
676                        LOG.warn("Failed to proactively allocate data file", e);
677                    }
678                }
679            }
680        }
681    };
682
683    private volatile Future preAllocateNextDataFileFuture;
684
685    private DataFile newDataFile() throws IOException {
686        int nextNum = nextDataFileId++;
687        File file = getFile(nextNum);
688        DataFile nextWriteFile = new DataFile(file, nextNum);
689        preallocateEntireJournalDataFile(nextWriteFile.appendRandomAccessFile());
690        return nextWriteFile;
691    }
692
693
694    public DataFile reserveDataFile() {
695        synchronized (dataFileIdLock) {
696            int nextNum = nextDataFileId++;
697            File file = getFile(nextNum);
698            DataFile reservedDataFile = new DataFile(file, nextNum);
699            synchronized (currentDataFile) {
700                fileMap.put(reservedDataFile.getDataFileId(), reservedDataFile);
701                fileByFileMap.put(file, reservedDataFile);
702                if (dataFiles.isEmpty()) {
703                    dataFiles.addLast(reservedDataFile);
704                } else {
705                    dataFiles.getTail().linkBefore(reservedDataFile);
706                }
707            }
708            return reservedDataFile;
709        }
710    }
711
712    public File getFile(int nextNum) {
713        String fileName = filePrefix + nextNum + fileSuffix;
714        File file = new File(directory, fileName);
715        return file;
716    }
717
718    DataFile getDataFile(Location item) throws IOException {
719        Integer key = Integer.valueOf(item.getDataFileId());
720        DataFile dataFile = null;
721        synchronized (currentDataFile) {
722            dataFile = fileMap.get(key);
723        }
724        if (dataFile == null) {
725            LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
726            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
727        }
728        return dataFile;
729    }
730
731    public void close() throws IOException {
732        synchronized (this) {
733            if (!started) {
734                return;
735            }
736            cleanupTask.cancel(true);
737            if (preAllocateNextDataFileFuture != null) {
738                preAllocateNextDataFileFuture.cancel(true);
739            }
740            ThreadPoolUtils.shutdownGraceful(scheduler, 4000);
741            accessorPool.close();
742        }
743        // the appender can be calling back to to the journal blocking a close AMQ-5620
744        appender.close();
745        synchronized (currentDataFile) {
746            fileMap.clear();
747            fileByFileMap.clear();
748            dataFiles.clear();
749            lastAppendLocation.set(null);
750            started = false;
751        }
752    }
753
754    public synchronized void cleanup() {
755        if (accessorPool != null) {
756            accessorPool.disposeUnused();
757        }
758    }
759
760    public synchronized boolean delete() throws IOException {
761
762        // Close all open file handles...
763        appender.close();
764        accessorPool.close();
765
766        boolean result = true;
767        for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
768            DataFile dataFile = i.next();
769            result &= dataFile.delete();
770        }
771
772        if (preAllocateNextDataFileFuture != null) {
773            preAllocateNextDataFileFuture.cancel(true);
774        }
775        synchronized (dataFileIdLock) {
776            if (nextDataFile != null) {
777                nextDataFile.delete();
778                nextDataFile = null;
779            }
780        }
781
782        totalLength.set(0);
783        synchronized (currentDataFile) {
784            fileMap.clear();
785            fileByFileMap.clear();
786            lastAppendLocation.set(null);
787            dataFiles = new LinkedNodeList<DataFile>();
788        }
789        // reopen open file handles...
790        accessorPool = new DataFileAccessorPool(this);
791        appender = new DataFileAppender(this);
792        return result;
793    }
794
795    public void removeDataFiles(Set<Integer> files) throws IOException {
796        for (Integer key : files) {
797            // Can't remove the data file (or subsequent files) that is currently being written to.
798            if (key >= lastAppendLocation.get().getDataFileId()) {
799                continue;
800            }
801            DataFile dataFile = null;
802            synchronized (currentDataFile) {
803                dataFile = fileMap.remove(key);
804                if (dataFile != null) {
805                    fileByFileMap.remove(dataFile.getFile());
806                    dataFile.unlink();
807                }
808            }
809            if (dataFile != null) {
810                forceRemoveDataFile(dataFile);
811            }
812        }
813    }
814
815    private void forceRemoveDataFile(DataFile dataFile) throws IOException {
816        accessorPool.disposeDataFileAccessors(dataFile);
817        totalLength.addAndGet(-dataFile.getLength());
818        if (archiveDataLogs) {
819            File directoryArchive = getDirectoryArchive();
820            if (directoryArchive.exists()) {
821                LOG.debug("Archive directory exists: {}", directoryArchive);
822            } else {
823                if (directoryArchive.isAbsolute())
824                if (LOG.isDebugEnabled()) {
825                    LOG.debug("Archive directory [{}] does not exist - creating it now",
826                            directoryArchive.getAbsolutePath());
827                }
828                IOHelper.mkdirs(directoryArchive);
829            }
830            LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath());
831            dataFile.move(directoryArchive);
832            LOG.debug("Successfully moved data file");
833        } else {
834            LOG.debug("Deleting data file: {}", dataFile);
835            if (dataFile.delete()) {
836                LOG.debug("Discarded data file: {}", dataFile);
837            } else {
838                LOG.warn("Failed to discard data file : {}", dataFile.getFile());
839            }
840        }
841        if (dataFileRemovedListener != null) {
842            dataFileRemovedListener.fileRemoved(dataFile);
843        }
844    }
845
846    /**
847     * @return the maxFileLength
848     */
849    public int getMaxFileLength() {
850        return maxFileLength;
851    }
852
853    /**
854     * @param maxFileLength the maxFileLength to set
855     */
856    public void setMaxFileLength(int maxFileLength) {
857        this.maxFileLength = maxFileLength;
858    }
859
860    @Override
861    public String toString() {
862        return directory.toString();
863    }
864
865    public Location getNextLocation(Location location) throws IOException, IllegalStateException {
866        return getNextLocation(location, null);
867    }
868
869    public Location getNextLocation(Location location, Location limit) throws IOException, IllegalStateException {
870        Location cur = null;
871        while (true) {
872            if (cur == null) {
873                if (location == null) {
874                    DataFile head = null;
875                    synchronized (currentDataFile) {
876                        head = dataFiles.getHead();
877                    }
878                    if (head == null) {
879                        return null;
880                    }
881                    cur = new Location();
882                    cur.setDataFileId(head.getDataFileId());
883                    cur.setOffset(0);
884                } else {
885                    // Set to the next offset..
886                    if (location.getSize() == -1) {
887                        cur = new Location(location);
888                    } else {
889                        cur = new Location(location);
890                        cur.setOffset(location.getOffset() + location.getSize());
891                    }
892                }
893            } else {
894                cur.setOffset(cur.getOffset() + cur.getSize());
895            }
896
897            DataFile dataFile = getDataFile(cur);
898
899            // Did it go into the next file??
900            if (dataFile.getLength() <= cur.getOffset()) {
901                synchronized (currentDataFile) {
902                    dataFile = dataFile.getNext();
903                }
904                if (dataFile == null) {
905                    return null;
906                } else {
907                    cur.setDataFileId(dataFile.getDataFileId().intValue());
908                    cur.setOffset(0);
909                    if (limit != null && cur.compareTo(limit) >= 0) {
910                        LOG.trace("reached limit: {} at: {}", limit, cur);
911                        return null;
912                    }
913                }
914            }
915
916            // Load in location size and type.
917            DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
918            try {
919                reader.readLocationDetails(cur);
920            } catch (EOFException eof) {
921                LOG.trace("EOF on next: " + location + ", cur: " + cur);
922                throw eof;
923            } finally {
924                accessorPool.closeDataFileAccessor(reader);
925            }
926
927            Sequence corruptedRange = dataFile.corruptedBlocks.get(cur.getOffset());
928            if (corruptedRange != null) {
929                // skip corruption
930                cur.setSize((int) corruptedRange.range());
931            } else if (cur.getSize() == EOF_INT && cur.getType() == EOF_EOT ||
932                    (cur.getType() == 0 && cur.getSize() == 0)) {
933                // eof - jump to next datafile
934                // EOF_INT and EOF_EOT replace 0,0 - we need to react to both for
935                // replay of existing journals
936                // possibly journal is larger than maxFileLength after config change
937                cur.setSize(EOF_RECORD.length);
938                cur.setOffset(Math.max(maxFileLength, dataFile.getLength()));
939            } else if (cur.getType() == USER_RECORD_TYPE) {
940                // Only return user records.
941                return cur;
942            }
943        }
944    }
945
946    public ByteSequence read(Location location) throws IOException, IllegalStateException {
947        DataFile dataFile = getDataFile(location);
948        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
949        ByteSequence rc = null;
950        try {
951            rc = reader.readRecord(location);
952        } finally {
953            accessorPool.closeDataFileAccessor(reader);
954        }
955        return rc;
956    }
957
958    public Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
959        Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
960        return loc;
961    }
962
963    public Location write(ByteSequence data, Runnable onComplete) throws IOException, IllegalStateException {
964        Location loc = appender.storeItem(data, Location.USER_TYPE, onComplete);
965        return loc;
966    }
967
968    public void update(Location location, ByteSequence data, boolean sync) throws IOException {
969        DataFile dataFile = getDataFile(location);
970        DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
971        try {
972            updater.updateRecord(location, data, sync);
973        } finally {
974            accessorPool.closeDataFileAccessor(updater);
975        }
976    }
977
978    public PreallocationStrategy getPreallocationStrategy() {
979        return preallocationStrategy;
980    }
981
982    public void setPreallocationStrategy(PreallocationStrategy preallocationStrategy) {
983        this.preallocationStrategy = preallocationStrategy;
984    }
985
986    public PreallocationScope getPreallocationScope() {
987        return preallocationScope;
988    }
989
990    public void setPreallocationScope(PreallocationScope preallocationScope) {
991        this.preallocationScope = preallocationScope;
992    }
993
994    public File getDirectory() {
995        return directory;
996    }
997
998    public void setDirectory(File directory) {
999        this.directory = directory;
1000    }
1001
1002    public String getFilePrefix() {
1003        return filePrefix;
1004    }
1005
1006    public void setFilePrefix(String filePrefix) {
1007        this.filePrefix = filePrefix;
1008    }
1009
1010    public Map<WriteKey, WriteCommand> getInflightWrites() {
1011        return inflightWrites;
1012    }
1013
1014    public Location getLastAppendLocation() {
1015        return lastAppendLocation.get();
1016    }
1017
1018    public void setLastAppendLocation(Location lastSyncedLocation) {
1019        this.lastAppendLocation.set(lastSyncedLocation);
1020    }
1021
1022    public File getDirectoryArchive() {
1023        if (!directoryArchiveOverridden && (directoryArchive == null)) {
1024            // create the directoryArchive relative to the journal location
1025            directoryArchive = new File(directory.getAbsolutePath() +
1026                    File.separator + DEFAULT_ARCHIVE_DIRECTORY);
1027        }
1028        return directoryArchive;
1029    }
1030
1031    public void setDirectoryArchive(File directoryArchive) {
1032        directoryArchiveOverridden = true;
1033        this.directoryArchive = directoryArchive;
1034    }
1035
1036    public boolean isArchiveDataLogs() {
1037        return archiveDataLogs;
1038    }
1039
1040    public void setArchiveDataLogs(boolean archiveDataLogs) {
1041        this.archiveDataLogs = archiveDataLogs;
1042    }
1043
1044    public DataFile getDataFileById(int dataFileId) {
1045        synchronized (currentDataFile) {
1046            return fileMap.get(Integer.valueOf(dataFileId));
1047        }
1048    }
1049
1050    public DataFile getCurrentDataFile(int capacity) throws IOException {
1051        //First just acquire the currentDataFile lock and return if no rotation needed
1052        synchronized (currentDataFile) {
1053            if (currentDataFile.get().getLength() + capacity < maxFileLength) {
1054                return currentDataFile.get();
1055            }
1056        }
1057
1058        //AMQ-6545 - if rotation needed, acquire dataFileIdLock first to prevent deadlocks
1059        //then re-check if rotation is needed
1060        synchronized (dataFileIdLock) {
1061            synchronized (currentDataFile) {
1062                if (currentDataFile.get().getLength() + capacity >= maxFileLength) {
1063                    rotateWriteFile();
1064                }
1065                return currentDataFile.get();
1066            }
1067        }
1068    }
1069
1070    public Integer getCurrentDataFileId() {
1071        synchronized (currentDataFile) {
1072            return currentDataFile.get().getDataFileId();
1073        }
1074    }
1075
1076    /**
1077     * Get a set of files - only valid after start()
1078     *
1079     * @return files currently being used
1080     */
1081    public Set<File> getFiles() {
1082        synchronized (currentDataFile) {
1083            return fileByFileMap.keySet();
1084        }
1085    }
1086
1087    public Map<Integer, DataFile> getFileMap() {
1088        synchronized (currentDataFile) {
1089            return new TreeMap<Integer, DataFile>(fileMap);
1090        }
1091    }
1092
1093    public long getDiskSize() {
1094        return totalLength.get();
1095    }
1096
1097    public void setReplicationTarget(ReplicationTarget replicationTarget) {
1098        this.replicationTarget = replicationTarget;
1099    }
1100
1101    public ReplicationTarget getReplicationTarget() {
1102        return replicationTarget;
1103    }
1104
1105    public String getFileSuffix() {
1106        return fileSuffix;
1107    }
1108
1109    public void setFileSuffix(String fileSuffix) {
1110        this.fileSuffix = fileSuffix;
1111    }
1112
1113    public boolean isChecksum() {
1114        return checksum;
1115    }
1116
1117    public void setChecksum(boolean checksumWrites) {
1118        this.checksum = checksumWrites;
1119    }
1120
1121    public boolean isCheckForCorruptionOnStartup() {
1122        return checkForCorruptionOnStartup;
1123    }
1124
1125    public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
1126        this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
1127    }
1128
1129    public void setWriteBatchSize(int writeBatchSize) {
1130        this.writeBatchSize = writeBatchSize;
1131    }
1132
1133    public int getWriteBatchSize() {
1134        return writeBatchSize;
1135    }
1136
1137    public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
1138       this.totalLength = storeSizeAccumulator;
1139    }
1140
1141    public void setEnableAsyncDiskSync(boolean val) {
1142        this.enableAsyncDiskSync = val;
1143    }
1144
1145    public boolean isEnableAsyncDiskSync() {
1146        return enableAsyncDiskSync;
1147    }
1148
1149    public JournalDiskSyncStrategy getJournalDiskSyncStrategy() {
1150        return journalDiskSyncStrategy;
1151    }
1152
1153    public void setJournalDiskSyncStrategy(JournalDiskSyncStrategy journalDiskSyncStrategy) {
1154        this.journalDiskSyncStrategy = journalDiskSyncStrategy;
1155    }
1156
1157    public boolean isJournalDiskSyncPeriodic() {
1158        return JournalDiskSyncStrategy.PERIODIC.equals(journalDiskSyncStrategy);
1159    }
1160
1161    public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) {
1162        this.dataFileRemovedListener = dataFileRemovedListener;
1163    }
1164
1165    public static class WriteCommand extends LinkedNode<WriteCommand> {
1166        public final Location location;
1167        public final ByteSequence data;
1168        final boolean sync;
1169        public final Runnable onComplete;
1170
1171        public WriteCommand(Location location, ByteSequence data, boolean sync) {
1172            this.location = location;
1173            this.data = data;
1174            this.sync = sync;
1175            this.onComplete = null;
1176        }
1177
1178        public WriteCommand(Location location, ByteSequence data, Runnable onComplete) {
1179            this.location = location;
1180            this.data = data;
1181            this.onComplete = onComplete;
1182            this.sync = false;
1183        }
1184    }
1185
1186    public static class WriteKey {
1187        private final int file;
1188        private final long offset;
1189        private final int hash;
1190
1191        public WriteKey(Location item) {
1192            file = item.getDataFileId();
1193            offset = item.getOffset();
1194            // TODO: see if we can build a better hash
1195            hash = (int)(file ^ offset);
1196        }
1197
1198        @Override
1199        public int hashCode() {
1200            return hash;
1201        }
1202
1203        @Override
1204        public boolean equals(Object obj) {
1205            if (obj instanceof WriteKey) {
1206                WriteKey di = (WriteKey)obj;
1207                return di.file == file && di.offset == offset;
1208            }
1209            return false;
1210        }
1211    }
1212}