001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.plist; 018 019import org.apache.activemq.broker.BrokerService; 020import org.apache.activemq.broker.BrokerServiceAware; 021import org.apache.activemq.openwire.OpenWireFormat; 022import org.apache.activemq.store.JournaledStore; 023import org.apache.activemq.store.PList; 024import org.apache.activemq.store.PListStore; 025import org.apache.activemq.store.kahadb.disk.index.BTreeIndex; 026import org.apache.activemq.store.kahadb.disk.journal.Journal; 027import org.apache.activemq.store.kahadb.disk.journal.Location; 028import org.apache.activemq.store.kahadb.disk.page.Page; 029import org.apache.activemq.store.kahadb.disk.page.PageFile; 030import org.apache.activemq.store.kahadb.disk.page.Transaction; 031import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; 032import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 033import org.apache.activemq.thread.Scheduler; 034import org.apache.activemq.util.*; 035import org.apache.activemq.wireformat.WireFormat; 036import org.slf4j.Logger; 037import org.slf4j.LoggerFactory; 038 039import java.io.DataInput; 040import java.io.DataOutput; 041import java.io.File; 042import java.io.IOException; 043import java.util.*; 044import java.util.Map.Entry; 045 046/** 047 * @org.apache.xbean.XBean 048 */ 049public class PListStoreImpl extends ServiceSupport implements BrokerServiceAware, Runnable, PListStore, JournaledStore { 050 static final Logger LOG = LoggerFactory.getLogger(PListStoreImpl.class); 051 private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000; 052 053 static final int CLOSED_STATE = 1; 054 static final int OPEN_STATE = 2; 055 056 private File directory; 057 private File indexDirectory; 058 PageFile pageFile; 059 private Journal journal; 060 private LockFile lockFile; 061 private boolean failIfDatabaseIsLocked; 062 private int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; 063 private int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE; 064 private boolean enableIndexWriteAsync = false; 065 private boolean initialized = false; 066 private boolean lazyInit = true; 067 // private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 068 MetaData metaData = new MetaData(this); 069 final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); 070 Map<String, PListImpl> persistentLists = new HashMap<String, PListImpl>(); 071 final Object indexLock = new Object(); 072 private Scheduler scheduler; 073 private long cleanupInterval = 30000; 074 075 private int indexPageSize = PageFile.DEFAULT_PAGE_SIZE; 076 private int indexCacheSize = PageFile.DEFAULT_PAGE_CACHE_SIZE; 077 private int indexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE; 078 private boolean indexEnablePageCaching = true; 079 080 public Object getIndexLock() { 081 return indexLock; 082 } 083 084 @Override 085 public void setBrokerService(BrokerService brokerService) { 086 this.scheduler = brokerService.getScheduler(); 087 } 088 089 public int getIndexPageSize() { 090 return indexPageSize; 091 } 092 093 public int getIndexCacheSize() { 094 return indexCacheSize; 095 } 096 097 public int getIndexWriteBatchSize() { 098 return indexWriteBatchSize; 099 } 100 101 public void setIndexPageSize(int indexPageSize) { 102 this.indexPageSize = indexPageSize; 103 } 104 105 public void setIndexCacheSize(int indexCacheSize) { 106 this.indexCacheSize = indexCacheSize; 107 } 108 109 public void setIndexWriteBatchSize(int indexWriteBatchSize) { 110 this.indexWriteBatchSize = indexWriteBatchSize; 111 } 112 113 public boolean getIndexEnablePageCaching() { 114 return indexEnablePageCaching; 115 } 116 117 public void setIndexEnablePageCaching(boolean indexEnablePageCaching) { 118 this.indexEnablePageCaching = indexEnablePageCaching; 119 } 120 121 protected class MetaData { 122 protected MetaData(PListStoreImpl store) { 123 this.store = store; 124 } 125 126 private final PListStoreImpl store; 127 Page<MetaData> page; 128 BTreeIndex<String, PListImpl> lists; 129 130 void createIndexes(Transaction tx) throws IOException { 131 this.lists = new BTreeIndex<String, PListImpl>(pageFile, tx.allocate().getPageId()); 132 } 133 134 void load(Transaction tx) throws IOException { 135 this.lists.setKeyMarshaller(StringMarshaller.INSTANCE); 136 this.lists.setValueMarshaller(new PListMarshaller(this.store)); 137 this.lists.load(tx); 138 } 139 140 void loadLists(Transaction tx, Map<String, PListImpl> lists) throws IOException { 141 for (Iterator<Entry<String, PListImpl>> i = this.lists.iterator(tx); i.hasNext();) { 142 Entry<String, PListImpl> entry = i.next(); 143 entry.getValue().load(tx); 144 lists.put(entry.getKey(), entry.getValue()); 145 } 146 } 147 148 public void read(DataInput is) throws IOException { 149 this.lists = new BTreeIndex<String, PListImpl>(pageFile, is.readLong()); 150 this.lists.setKeyMarshaller(StringMarshaller.INSTANCE); 151 this.lists.setValueMarshaller(new PListMarshaller(this.store)); 152 } 153 154 public void write(DataOutput os) throws IOException { 155 os.writeLong(this.lists.getPageId()); 156 } 157 } 158 159 class MetaDataMarshaller extends VariableMarshaller<MetaData> { 160 private final PListStoreImpl store; 161 162 MetaDataMarshaller(PListStoreImpl store) { 163 this.store = store; 164 } 165 public MetaData readPayload(DataInput dataIn) throws IOException { 166 MetaData rc = new MetaData(this.store); 167 rc.read(dataIn); 168 return rc; 169 } 170 171 public void writePayload(MetaData object, DataOutput dataOut) throws IOException { 172 object.write(dataOut); 173 } 174 } 175 176 class PListMarshaller extends VariableMarshaller<PListImpl> { 177 private final PListStoreImpl store; 178 PListMarshaller(PListStoreImpl store) { 179 this.store = store; 180 } 181 public PListImpl readPayload(DataInput dataIn) throws IOException { 182 PListImpl result = new PListImpl(this.store); 183 result.read(dataIn); 184 return result; 185 } 186 187 public void writePayload(PListImpl list, DataOutput dataOut) throws IOException { 188 list.write(dataOut); 189 } 190 } 191 192 public Journal getJournal() { 193 return this.journal; 194 } 195 196 @Override 197 public File getDirectory() { 198 return directory; 199 } 200 201 @Override 202 public void setDirectory(File directory) { 203 this.directory = directory; 204 } 205 206 public File getIndexDirectory() { 207 return indexDirectory != null ? indexDirectory : directory; 208 } 209 210 public void setIndexDirectory(File indexDirectory) { 211 this.indexDirectory = indexDirectory; 212 } 213 214 public long size() { 215 synchronized (this) { 216 if (!initialized) { 217 return 0; 218 } 219 } 220 try { 221 return journal.getDiskSize() + pageFile.getDiskSize(); 222 } catch (IOException e) { 223 throw new RuntimeException(e); 224 } 225 } 226 227 @Override 228 public PListImpl getPList(final String name) throws Exception { 229 if (!isStarted()) { 230 throw new IllegalStateException("Not started"); 231 } 232 intialize(); 233 synchronized (indexLock) { 234 synchronized (this) { 235 PListImpl result = this.persistentLists.get(name); 236 if (result == null) { 237 final PListImpl pl = new PListImpl(this); 238 pl.setName(name); 239 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 240 public void execute(Transaction tx) throws IOException { 241 pl.setHeadPageId(tx.allocate().getPageId()); 242 pl.load(tx); 243 metaData.lists.put(tx, name, pl); 244 } 245 }); 246 result = pl; 247 this.persistentLists.put(name, pl); 248 } 249 final PListImpl toLoad = result; 250 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 251 public void execute(Transaction tx) throws IOException { 252 toLoad.load(tx); 253 } 254 }); 255 256 return result; 257 } 258 } 259 } 260 261 @Override 262 public boolean removePList(final String name) throws Exception { 263 boolean result = false; 264 synchronized (indexLock) { 265 synchronized (this) { 266 final PList pl = this.persistentLists.remove(name); 267 result = pl != null; 268 if (result) { 269 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 270 public void execute(Transaction tx) throws IOException { 271 metaData.lists.remove(tx, name); 272 pl.destroy(); 273 } 274 }); 275 } 276 } 277 } 278 return result; 279 } 280 281 protected synchronized void intialize() throws Exception { 282 if (isStarted()) { 283 if (this.initialized == false) { 284 if (this.directory == null) { 285 this.directory = new File(IOHelper.getDefaultDataDirectory() + File.pathSeparator + "delayedDB"); 286 } 287 IOHelper.mkdirs(this.directory); 288 IOHelper.deleteChildren(this.directory); 289 if (this.indexDirectory != null) { 290 IOHelper.mkdirs(this.indexDirectory); 291 IOHelper.deleteChildren(this.indexDirectory); 292 } 293 lock(); 294 this.journal = new Journal(); 295 this.journal.setDirectory(directory); 296 this.journal.setMaxFileLength(getJournalMaxFileLength()); 297 this.journal.setWriteBatchSize(getJournalMaxWriteBatchSize()); 298 this.journal.start(); 299 this.pageFile = new PageFile(getIndexDirectory(), "tmpDB"); 300 this.pageFile.setEnablePageCaching(getIndexEnablePageCaching()); 301 this.pageFile.setPageSize(getIndexPageSize()); 302 this.pageFile.setWriteBatchSize(getIndexWriteBatchSize()); 303 this.pageFile.setPageCacheSize(getIndexCacheSize()); 304 this.pageFile.load(); 305 306 this.pageFile.tx().execute(new Transaction.Closure<IOException>() { 307 public void execute(Transaction tx) throws IOException { 308 if (pageFile.getPageCount() == 0) { 309 Page<MetaData> page = tx.allocate(); 310 assert page.getPageId() == 0; 311 page.set(metaData); 312 metaData.page = page; 313 metaData.createIndexes(tx); 314 tx.store(metaData.page, metaDataMarshaller, true); 315 316 } else { 317 Page<MetaData> page = tx.load(0, metaDataMarshaller); 318 metaData = page.get(); 319 metaData.page = page; 320 } 321 metaData.load(tx); 322 metaData.loadLists(tx, persistentLists); 323 } 324 }); 325 this.pageFile.flush(); 326 327 if (cleanupInterval > 0) { 328 if (scheduler == null) { 329 scheduler = new Scheduler(PListStoreImpl.class.getSimpleName()); 330 scheduler.start(); 331 } 332 scheduler.executePeriodically(this, cleanupInterval); 333 } 334 this.initialized = true; 335 LOG.info(this + " initialized"); 336 } 337 } 338 } 339 340 @Override 341 protected synchronized void doStart() throws Exception { 342 if (!lazyInit) { 343 intialize(); 344 } 345 LOG.info(this + " started"); 346 } 347 348 @Override 349 protected synchronized void doStop(ServiceStopper stopper) throws Exception { 350 if (scheduler != null) { 351 if (PListStoreImpl.class.getSimpleName().equals(scheduler.getName())) { 352 scheduler.stop(); 353 scheduler = null; 354 } 355 } 356 for (PListImpl pl : this.persistentLists.values()) { 357 pl.unload(null); 358 } 359 if (this.pageFile != null) { 360 this.pageFile.unload(); 361 } 362 if (this.journal != null) { 363 journal.close(); 364 } 365 if (this.lockFile != null) { 366 this.lockFile.unlock(); 367 } 368 this.lockFile = null; 369 this.initialized = false; 370 LOG.info(this + " stopped"); 371 372 } 373 374 public void run() { 375 try { 376 if (isStopping()) { 377 return; 378 } 379 final int lastJournalFileId = journal.getLastAppendLocation().getDataFileId(); 380 final Set<Integer> candidates = journal.getFileMap().keySet(); 381 LOG.trace("Full gc candidate set:" + candidates); 382 if (candidates.size() > 1) { 383 // prune current write 384 for (Iterator<Integer> iterator = candidates.iterator(); iterator.hasNext();) { 385 if (iterator.next() >= lastJournalFileId) { 386 iterator.remove(); 387 } 388 } 389 List<PListImpl> plists = null; 390 synchronized (indexLock) { 391 synchronized (this) { 392 plists = new ArrayList<PListImpl>(persistentLists.values()); 393 } 394 } 395 for (PListImpl list : plists) { 396 list.claimFileLocations(candidates); 397 if (isStopping()) { 398 return; 399 } 400 LOG.trace("Remaining gc candidate set after refs from: " + list.getName() + ":" + candidates); 401 } 402 LOG.trace("GC Candidate set:" + candidates); 403 this.journal.removeDataFiles(candidates); 404 } 405 } catch (IOException e) { 406 LOG.error("Exception on periodic cleanup: " + e, e); 407 } 408 } 409 410 ByteSequence getPayload(Location location) throws IllegalStateException, IOException { 411 ByteSequence result = null; 412 result = this.journal.read(location); 413 return result; 414 } 415 416 Location write(ByteSequence payload, boolean sync) throws IllegalStateException, IOException { 417 return this.journal.write(payload, sync); 418 } 419 420 private void lock() throws IOException { 421 if (lockFile == null) { 422 File lockFileName = new File(directory, "lock"); 423 lockFile = new LockFile(lockFileName, true); 424 if (failIfDatabaseIsLocked) { 425 lockFile.lock(); 426 } else { 427 while (true) { 428 try { 429 lockFile.lock(); 430 break; 431 } catch (IOException e) { 432 LOG.info("Database " + lockFileName + " is locked... waiting " 433 + (DATABASE_LOCKED_WAIT_DELAY / 1000) 434 + " seconds for the database to be unlocked. Reason: " + e); 435 try { 436 Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); 437 } catch (InterruptedException e1) { 438 } 439 } 440 } 441 } 442 } 443 } 444 445 PageFile getPageFile() { 446 this.pageFile.isLoaded(); 447 return this.pageFile; 448 } 449 450 public boolean isFailIfDatabaseIsLocked() { 451 return failIfDatabaseIsLocked; 452 } 453 454 public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { 455 this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; 456 } 457 458 public int getJournalMaxFileLength() { 459 return journalMaxFileLength; 460 } 461 462 public void setJournalMaxFileLength(int journalMaxFileLength) { 463 this.journalMaxFileLength = journalMaxFileLength; 464 } 465 466 public int getJournalMaxWriteBatchSize() { 467 return journalMaxWriteBatchSize; 468 } 469 470 public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize) { 471 this.journalMaxWriteBatchSize = journalMaxWriteBatchSize; 472 } 473 474 public boolean isEnableIndexWriteAsync() { 475 return enableIndexWriteAsync; 476 } 477 478 public void setEnableIndexWriteAsync(boolean enableIndexWriteAsync) { 479 this.enableIndexWriteAsync = enableIndexWriteAsync; 480 } 481 482 public long getCleanupInterval() { 483 return cleanupInterval; 484 } 485 486 public void setCleanupInterval(long cleanupInterval) { 487 this.cleanupInterval = cleanupInterval; 488 } 489 490 public boolean isLazyInit() { 491 return lazyInit; 492 } 493 494 public void setLazyInit(boolean lazyInit) { 495 this.lazyInit = lazyInit; 496 } 497 498 @Override 499 public String toString() { 500 String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; 501 if (indexDirectory != null) { 502 path += "|" + indexDirectory.getAbsolutePath(); 503 } 504 return "PListStore:[" + path + "]"; 505 } 506}