001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.plist;
018
019import org.apache.activemq.broker.BrokerService;
020import org.apache.activemq.broker.BrokerServiceAware;
021import org.apache.activemq.openwire.OpenWireFormat;
022import org.apache.activemq.store.JournaledStore;
023import org.apache.activemq.store.PList;
024import org.apache.activemq.store.PListStore;
025import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
026import org.apache.activemq.store.kahadb.disk.journal.Journal;
027import org.apache.activemq.store.kahadb.disk.journal.Location;
028import org.apache.activemq.store.kahadb.disk.page.Page;
029import org.apache.activemq.store.kahadb.disk.page.PageFile;
030import org.apache.activemq.store.kahadb.disk.page.Transaction;
031import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
032import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
033import org.apache.activemq.thread.Scheduler;
034import org.apache.activemq.util.*;
035import org.apache.activemq.wireformat.WireFormat;
036import org.slf4j.Logger;
037import org.slf4j.LoggerFactory;
038
039import java.io.DataInput;
040import java.io.DataOutput;
041import java.io.File;
042import java.io.IOException;
043import java.util.*;
044import java.util.Map.Entry;
045
046/**
047 * @org.apache.xbean.XBean
048 */
049public class PListStoreImpl extends ServiceSupport implements BrokerServiceAware, Runnable, PListStore, JournaledStore {
050    static final Logger LOG = LoggerFactory.getLogger(PListStoreImpl.class);
051    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
052
053    static final int CLOSED_STATE = 1;
054    static final int OPEN_STATE = 2;
055
056    private File directory;
057    private File indexDirectory;
058    PageFile pageFile;
059    private Journal journal;
060    private LockFile lockFile;
061    private boolean failIfDatabaseIsLocked;
062    private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
063    private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
064    private boolean enableIndexWriteAsync = false;
065    private boolean initialized = false;
066    private boolean lazyInit = true;
067    // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
068    MetaData metaData = new MetaData(this);
069    final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
070    Map<String, PListImpl> persistentLists = new HashMap<String, PListImpl>();
071    final Object indexLock = new Object();
072    private Scheduler scheduler;
073    private long cleanupInterval = 30000;
074
075    private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE;
076    private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE;
077    private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
078    private boolean indexEnablePageCaching = true;
079
080    public Object getIndexLock() {
081        return indexLock;
082    }
083
084    @Override
085    public void setBrokerService(BrokerService brokerService) {
086        this.scheduler = brokerService.getScheduler();
087    }
088
089    public int getIndexPageSize() {
090        return indexPageSize;
091    }
092
093    public int getIndexCacheSize() {
094        return indexCacheSize;
095    }
096
097    public int getIndexWriteBatchSize() {
098        return indexWriteBatchSize;
099    }
100
101    public void setIndexPageSize(int indexPageSize) {
102        this.indexPageSize = indexPageSize;
103    }
104
105    public void setIndexCacheSize(int indexCacheSize) {
106        this.indexCacheSize = indexCacheSize;
107    }
108
109    public void setIndexWriteBatchSize(int indexWriteBatchSize) {
110        this.indexWriteBatchSize = indexWriteBatchSize;
111    }
112
113    public boolean getIndexEnablePageCaching() {
114        return indexEnablePageCaching;
115    }
116
117    public void setIndexEnablePageCaching(boolean indexEnablePageCaching) {
118        this.indexEnablePageCaching = indexEnablePageCaching;
119    }
120
121    protected class MetaData {
122        protected MetaData(PListStoreImpl store) {
123            this.store = store;
124        }
125
126        private final PListStoreImpl store;
127        Page<MetaData> page;
128        BTreeIndex<String, PListImpl> lists;
129
130        void createIndexes(Transaction tx) throws IOException {
131            this.lists = new BTreeIndex<String, PListImpl>(pageFile, tx.allocate().getPageId());
132        }
133
134        void load(Transaction tx) throws IOException {
135            this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
136            this.lists.setValueMarshaller(new PListMarshaller(this.store));
137            this.lists.load(tx);
138        }
139
140        void loadLists(Transaction tx, Map<String, PListImpl> lists) throws IOException {
141            for (Iterator<Entry<String, PListImpl>> i = this.lists.iterator(tx); i.hasNext();) {
142                Entry<String, PListImpl> entry = i.next();
143                entry.getValue().load(tx);
144                lists.put(entry.getKey(), entry.getValue());
145            }
146        }
147
148        public void read(DataInput is) throws IOException {
149            this.lists = new BTreeIndex<String, PListImpl>(pageFile, is.readLong());
150            this.lists.setKeyMarshaller(StringMarshaller.INSTANCE);
151            this.lists.setValueMarshaller(new PListMarshaller(this.store));
152        }
153
154        public void write(DataOutput os) throws IOException {
155            os.writeLong(this.lists.getPageId());
156        }
157    }
158
159    class MetaDataMarshaller extends VariableMarshaller<MetaData> {
160        private final PListStoreImpl store;
161
162        MetaDataMarshaller(PListStoreImpl store) {
163            this.store = store;
164        }
165        public MetaData readPayload(DataInput dataIn) throws IOException {
166            MetaData rc = new MetaData(this.store);
167            rc.read(dataIn);
168            return rc;
169        }
170
171        public void writePayload(MetaData object, DataOutput dataOut) throws IOException {
172            object.write(dataOut);
173        }
174    }
175
176    class PListMarshaller extends VariableMarshaller<PListImpl> {
177        private final PListStoreImpl store;
178        PListMarshaller(PListStoreImpl store) {
179            this.store = store;
180        }
181        public PListImpl readPayload(DataInput dataIn) throws IOException {
182            PListImpl result = new PListImpl(this.store);
183            result.read(dataIn);
184            return result;
185        }
186
187        public void writePayload(PListImpl list, DataOutput dataOut) throws IOException {
188            list.write(dataOut);
189        }
190    }
191
192    public Journal getJournal() {
193        return this.journal;
194    }
195
196    @Override
197    public File getDirectory() {
198        return directory;
199    }
200
201    @Override
202    public void setDirectory(File directory) {
203        this.directory = directory;
204    }
205
206    public File getIndexDirectory() {
207        return indexDirectory != null ? indexDirectory : directory;
208    }
209
210    public void setIndexDirectory(File indexDirectory) {
211        this.indexDirectory = indexDirectory;
212    }
213
214    public long size() {
215        synchronized (this) {
216            if (!initialized) {
217                return 0;
218            }
219        }
220        try {
221            return journal.getDiskSize() + pageFile.getDiskSize();
222        } catch (IOException e) {
223            throw new RuntimeException(e);
224        }
225    }
226
227    @Override
228    public PListImpl getPList(final String name) throws Exception {
229        if (!isStarted()) {
230            throw new IllegalStateException("Not started");
231        }
232        intialize();
233        synchronized (indexLock) {
234            synchronized (this) {
235                PListImpl result = this.persistentLists.get(name);
236                if (result == null) {
237                    final PListImpl pl = new PListImpl(this);
238                    pl.setName(name);
239                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
240                        public void execute(Transaction tx) throws IOException {
241                            pl.setHeadPageId(tx.allocate().getPageId());
242                            pl.load(tx);
243                            metaData.lists.put(tx, name, pl);
244                        }
245                    });
246                    result = pl;
247                    this.persistentLists.put(name, pl);
248                }
249                final PListImpl toLoad = result;
250                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
251                    public void execute(Transaction tx) throws IOException {
252                        toLoad.load(tx);
253                    }
254                });
255
256                return result;
257            }
258        }
259    }
260
261    @Override
262    public boolean removePList(final String name) throws Exception {
263        boolean result = false;
264        synchronized (indexLock) {
265            synchronized (this) {
266                final PList pl = this.persistentLists.remove(name);
267                result = pl != null;
268                if (result) {
269                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
270                        public void execute(Transaction tx) throws IOException {
271                            metaData.lists.remove(tx, name);
272                            pl.destroy();
273                        }
274                    });
275                }
276            }
277        }
278        return result;
279    }
280
281    protected synchronized void intialize() throws Exception {
282        if (isStarted()) {
283            if (this.initialized == false) {
284                if (this.directory == null) {
285                    this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB");
286                }
287                IOHelper.mkdirs(this.directory);
288                IOHelper.deleteChildren(this.directory);
289                if (this.indexDirectory != null) {
290                    IOHelper.mkdirs(this.indexDirectory);
291                    IOHelper.deleteChildren(this.indexDirectory);
292                }
293                lock();
294                this.journal = new Journal();
295                this.journal.setDirectory(directory);
296                this.journal.setMaxFileLength(getJournalMaxFileLength());
297                this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize());
298                this.journal.start();
299                this.pageFile = new PageFile(getIndexDirectory(), "tmpDB");
300                this.pageFile.setEnablePageCaching(getIndexEnablePageCaching());
301                this.pageFile.setPageSize(getIndexPageSize());
302                this.pageFile.setWriteBatchSize(getIndexWriteBatchSize());
303                this.pageFile.setPageCacheSize(getIndexCacheSize());
304                this.pageFile.load();
305
306                this.pageFile.tx().execute(new Transaction.Closure<IOException>() {
307                    public void execute(Transaction tx) throws IOException {
308                        if (pageFile.getPageCount() == 0) {
309                            Page<MetaData> page = tx.allocate();
310                            assert page.getPageId() == 0;
311                            page.set(metaData);
312                            metaData.page = page;
313                            metaData.createIndexes(tx);
314                            tx.store(metaData.page, metaDataMarshaller, true);
315
316                        } else {
317                            Page<MetaData> page = tx.load(0, metaDataMarshaller);
318                            metaData = page.get();
319                            metaData.page = page;
320                        }
321                        metaData.load(tx);
322                        metaData.loadLists(tx, persistentLists);
323                    }
324                });
325                this.pageFile.flush();
326
327                if (cleanupInterval > 0) {
328                    if (scheduler == null) {
329                        scheduler = new Scheduler(PListStoreImpl.class.getSimpleName());
330                        scheduler.start();
331                    }
332                    scheduler.executePeriodically(this, cleanupInterval);
333                }
334                this.initialized = true;
335                LOG.info(this + " initialized");
336            }
337        }
338    }
339
340    @Override
341    protected synchronized void doStart() throws Exception {
342        if (!lazyInit) {
343            intialize();
344        }
345        LOG.info(this + " started");
346    }
347
348    @Override
349    protected synchronized void doStop(ServiceStopper stopper) throws Exception {
350        if (scheduler != null) {
351            if (PListStoreImpl.class.getSimpleName().equals(scheduler.getName())) {
352                scheduler.stop();
353                scheduler = null;
354            }
355        }
356        for (PListImpl pl : this.persistentLists.values()) {
357            pl.unload(null);
358        }
359        if (this.pageFile != null) {
360            this.pageFile.unload();
361        }
362        if (this.journal != null) {
363            journal.close();
364        }
365        if (this.lockFile != null) {
366            this.lockFile.unlock();
367        }
368        this.lockFile = null;
369        this.initialized = false;
370        LOG.info(this + " stopped");
371
372    }
373
374    public void run() {
375        try {
376            if (isStopping()) {
377                return;
378            }
379            final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId();
380            final Set<Integer> candidates = journal.getFileMap().keySet();
381            LOG.trace("Full gc candidate set:" + candidates);
382            if (candidates.size() > 1) {
383                // prune current write
384                for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) {
385                    if (iterator.next() >= lastJournalFileId) {
386                        iterator.remove();
387                    }
388                }
389                List<PListImpl> plists = null;
390                synchronized (indexLock) {
391                    synchronized (this) {
392                        plists = new ArrayList<PListImpl>(persistentLists.values());
393                    }
394                }
395                for (PListImpl list : plists) {
396                    list.claimFileLocations(candidates);
397                    if (isStopping()) {
398                        return;
399                    }
400                    LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates);
401                }
402                LOG.trace("GC Candidate set:" + candidates);
403                this.journal.removeDataFiles(candidates);
404            }
405        } catch (IOException e) {
406            LOG.error("Exception on periodic cleanup: " + e, e);
407        }
408    }
409
410    ByteSequence getPayload(Location location) throws IllegalStateException, IOException {
411        ByteSequence result = null;
412        result = this.journal.read(location);
413        return result;
414    }
415
416    Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException {
417        return this.journal.write(payload, sync);
418    }
419
420    private void lock() throws IOException {
421        if (lockFile == null) {
422            File lockFileName = new File(directory, "lock");
423            lockFile = new LockFile(lockFileName, true);
424            if (failIfDatabaseIsLocked) {
425                lockFile.lock();
426            } else {
427                while (true) {
428                    try {
429                        lockFile.lock();
430                        break;
431                    } catch (IOException e) {
432                        LOG.info("Database " + lockFileName + " is locked... waiting "
433                                + (DATABASE_LOCKED_WAIT_DELAY / 1000)
434                                + " seconds for the database to be unlocked. Reason: " + e);
435                        try {
436                            Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
437                        } catch (InterruptedException e1) {
438                        }
439                    }
440                }
441            }
442        }
443    }
444
445    PageFile getPageFile() {
446        this.pageFile.isLoaded();
447        return this.pageFile;
448    }
449
450    public boolean isFailIfDatabaseIsLocked() {
451        return failIfDatabaseIsLocked;
452    }
453
454    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
455        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
456    }
457
458    public int getJournalMaxFileLength() {
459        return journalMaxFileLength;
460    }
461
462    public void setJournalMaxFileLength(int journalMaxFileLength) {
463        this.journalMaxFileLength = journalMaxFileLength;
464    }
465
466    public int getJournalMaxWriteBatchSize() {
467        return journalMaxWriteBatchSize;
468    }
469
470    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) {
471        this.journalMaxWriteBatchSize = journalMaxWriteBatchSize;
472    }
473
474    public boolean isEnableIndexWriteAsync() {
475        return enableIndexWriteAsync;
476    }
477
478    public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) {
479        this.enableIndexWriteAsync = enableIndexWriteAsync;
480    }
481
482    public long getCleanupInterval() {
483        return cleanupInterval;
484    }
485
486    public void setCleanupInterval(long cleanupInterval) {
487        this.cleanupInterval = cleanupInterval;
488    }
489
490    public boolean isLazyInit() {
491        return lazyInit;
492    }
493
494    public void setLazyInit(boolean lazyInit) {
495        this.lazyInit = lazyInit;
496    }
497
498    @Override
499    public String toString() {
500        String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
501        if (indexDirectory != null) {
502            path += "|" + indexDirectory.getAbsolutePath();
503        }
504        return "PListStore:[" + path + "]";
505    }
506}