001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.scheduler; 018 019import java.io.DataInput; 020import java.io.DataOutput; 021import java.io.File; 022import java.io.FilenameFilter; 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collection; 026import java.util.HashMap; 027import java.util.HashSet; 028import java.util.Iterator; 029import java.util.List; 030import java.util.Map; 031import java.util.Map.Entry; 032import java.util.Set; 033import java.util.TreeSet; 034import java.util.UUID; 035 036import org.apache.activemq.broker.scheduler.JobScheduler; 037import org.apache.activemq.broker.scheduler.JobSchedulerStore; 038import org.apache.activemq.protobuf.Buffer; 039import org.apache.activemq.store.kahadb.AbstractKahaDBStore; 040import org.apache.activemq.store.kahadb.JournalCommand; 041import org.apache.activemq.store.kahadb.KahaDBMetaData; 042import org.apache.activemq.store.kahadb.Visitor; 043import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; 044import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand; 045import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand; 046import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand; 047import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand; 048import org.apache.activemq.store.kahadb.data.KahaTraceCommand; 049import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor; 050import org.apache.activemq.store.kahadb.disk.journal.DataFile; 051import org.apache.activemq.store.kahadb.disk.journal.Location; 052import org.apache.activemq.store.kahadb.disk.page.Page; 053import org.apache.activemq.store.kahadb.disk.page.PageFile; 054import org.apache.activemq.store.kahadb.disk.page.Transaction; 055import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller; 056import org.apache.activemq.store.kahadb.scheduler.legacy.LegacyStoreReplayer; 057import org.apache.activemq.util.ByteSequence; 058import org.apache.activemq.util.IOHelper; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/* 063 * @org.apache.xbean.XBean element="kahaDBJobScheduler" 064 */ 065 066public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSchedulerStore { 067 068 private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class); 069 070 private JobSchedulerKahaDBMetaData metaData = new JobSchedulerKahaDBMetaData(this); 071 private final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this); 072 private final Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>(); 073 private File legacyStoreArchiveDirectory; 074 075 /** 076 * The Scheduler Token is used to identify base revisions of the Scheduler store. A store 077 * based on the initial scheduler design will not have this tag in it's meta-data and will 078 * indicate an update is needed. Later versions of the scheduler can also change this value 079 * to indicate incompatible store bases which require complete meta-data and journal rewrites 080 * instead of simpler meta-data updates. 081 */ 082 static final UUID SCHEDULER_STORE_TOKEN = UUID.fromString("57ed642b-1ee3-47b3-be6d-b7297d500409"); 083 084 /** 085 * The default scheduler store version. All new store instance will be given this version and 086 * earlier versions will be updated to this version. 087 */ 088 static final int CURRENT_VERSION = 1; 089 090 @Override 091 public JobScheduler getJobScheduler(final String name) throws Exception { 092 this.indexLock.writeLock().lock(); 093 try { 094 JobSchedulerImpl result = this.schedulers.get(name); 095 if (result == null) { 096 final JobSchedulerImpl js = new JobSchedulerImpl(this); 097 js.setName(name); 098 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 099 @Override 100 public void execute(Transaction tx) throws IOException { 101 js.createIndexes(tx); 102 js.load(tx); 103 metaData.getJobSchedulers().put(tx, name, js); 104 } 105 }); 106 result = js; 107 this.schedulers.put(name, js); 108 if (isStarted()) { 109 result.start(); 110 } 111 this.pageFile.flush(); 112 } 113 return result; 114 } finally { 115 this.indexLock.writeLock().unlock(); 116 } 117 } 118 119 @Override 120 public boolean removeJobScheduler(final String name) throws Exception { 121 boolean result = false; 122 123 this.indexLock.writeLock().lock(); 124 try { 125 final JobSchedulerImpl js = this.schedulers.remove(name); 126 result = js != null; 127 if (result) { 128 js.stop(); 129 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 130 @Override 131 public void execute(Transaction tx) throws IOException { 132 metaData.getJobSchedulers().remove(tx, name); 133 js.removeAll(tx); 134 } 135 }); 136 } 137 } finally { 138 this.indexLock.writeLock().unlock(); 139 } 140 return result; 141 } 142 143 /** 144 * Sets the directory where the legacy scheduler store files are archived before an 145 * update attempt is made. Both the legacy index files and the journal files are moved 146 * to this folder prior to an upgrade attempt. 147 * 148 * @param directory 149 * The directory to move the legacy Scheduler Store files to. 150 */ 151 public void setLegacyStoreArchiveDirectory(File directory) { 152 this.legacyStoreArchiveDirectory = directory; 153 } 154 155 /** 156 * Gets the directory where the legacy Scheduler Store files will be archived if the 157 * broker is started and an existing Job Scheduler Store from an old version is detected. 158 * 159 * @return the directory where scheduler store legacy files are archived on upgrade. 160 */ 161 public File getLegacyStoreArchiveDirectory() { 162 if (this.legacyStoreArchiveDirectory == null) { 163 this.legacyStoreArchiveDirectory = new File(getDirectory(), "legacySchedulerStore"); 164 } 165 166 return this.legacyStoreArchiveDirectory.getAbsoluteFile(); 167 } 168 169 @Override 170 public void load() throws IOException { 171 if (opened.compareAndSet(false, true)) { 172 getJournal().start(); 173 try { 174 loadPageFile(); 175 } catch (UnknownStoreVersionException ex) { 176 LOG.info("Can't start until store update is performed."); 177 upgradeFromLegacy(); 178 // Restart with the updated store 179 getJournal().start(); 180 loadPageFile(); 181 LOG.info("Update from legacy Scheduler store completed successfully."); 182 } catch (Throwable t) { 183 LOG.warn("Index corrupted. Recovering the index through journal replay. Cause: {}", t.toString()); 184 LOG.debug("Index load failure", t); 185 186 // try to recover index 187 try { 188 pageFile.unload(); 189 } catch (Exception ignore) { 190 } 191 if (isArchiveCorruptedIndex()) { 192 pageFile.archive(); 193 } else { 194 pageFile.delete(); 195 } 196 metaData = new JobSchedulerKahaDBMetaData(this); 197 pageFile = null; 198 loadPageFile(); 199 } 200 startCheckpoint(); 201 recover(); 202 } 203 LOG.info("{} started.", this); 204 } 205 206 @Override 207 public void unload() throws IOException { 208 if (opened.compareAndSet(true, false)) { 209 for (JobSchedulerImpl js : this.schedulers.values()) { 210 try { 211 js.stop(); 212 } catch (Exception e) { 213 throw new IOException(e); 214 } 215 } 216 this.indexLock.writeLock().lock(); 217 try { 218 if (pageFile != null && pageFile.isLoaded()) { 219 metaData.setState(KahaDBMetaData.CLOSED_STATE); 220 221 if (metaData.getPage() != null) { 222 pageFile.tx().execute(new Transaction.Closure<IOException>() { 223 @Override 224 public void execute(Transaction tx) throws IOException { 225 tx.store(metaData.getPage(), metaDataMarshaller, true); 226 } 227 }); 228 } 229 } 230 } finally { 231 this.indexLock.writeLock().unlock(); 232 } 233 234 checkpointLock.writeLock().lock(); 235 try { 236 if (metaData.getPage() != null) { 237 checkpointUpdate(getCleanupOnStop()); 238 } 239 } finally { 240 checkpointLock.writeLock().unlock(); 241 } 242 synchronized (checkpointThreadLock) { 243 if (checkpointThread != null) { 244 try { 245 checkpointThread.join(); 246 checkpointThread = null; 247 } catch (InterruptedException e) { 248 } 249 } 250 } 251 252 if (pageFile != null) { 253 pageFile.unload(); 254 pageFile = null; 255 } 256 if (this.journal != null) { 257 journal.close(); 258 journal = null; 259 } 260 261 metaData = new JobSchedulerKahaDBMetaData(this); 262 } 263 LOG.info("{} stopped.", this); 264 } 265 266 private void loadPageFile() throws IOException { 267 this.indexLock.writeLock().lock(); 268 try { 269 final PageFile pageFile = getPageFile(); 270 pageFile.load(); 271 pageFile.tx().execute(new Transaction.Closure<IOException>() { 272 @Override 273 public void execute(Transaction tx) throws IOException { 274 if (pageFile.getPageCount() == 0) { 275 Page<JobSchedulerKahaDBMetaData> page = tx.allocate(); 276 assert page.getPageId() == 0; 277 page.set(metaData); 278 metaData.setPage(page); 279 metaData.setState(KahaDBMetaData.CLOSED_STATE); 280 metaData.initialize(tx); 281 tx.store(metaData.getPage(), metaDataMarshaller, true); 282 } else { 283 Page<JobSchedulerKahaDBMetaData> page = null; 284 page = tx.load(0, metaDataMarshaller); 285 metaData = page.get(); 286 metaData.setPage(page); 287 } 288 metaData.load(tx); 289 metaData.loadScheduler(tx, schedulers); 290 for (JobSchedulerImpl js : schedulers.values()) { 291 try { 292 js.start(); 293 } catch (Exception e) { 294 JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e); 295 } 296 } 297 } 298 }); 299 300 pageFile.flush(); 301 } finally { 302 this.indexLock.writeLock().unlock(); 303 } 304 } 305 306 private void upgradeFromLegacy() throws IOException { 307 308 journal.close(); 309 journal = null; 310 try { 311 pageFile.unload(); 312 pageFile = null; 313 } catch (Exception ignore) {} 314 315 File storeDir = getDirectory().getAbsoluteFile(); 316 File storeArchiveDir = getLegacyStoreArchiveDirectory(); 317 318 LOG.info("Attempting to move old store files from {} to {}", storeDir, storeArchiveDir); 319 320 // Move only the known store files, locks and other items left in place. 321 IOHelper.moveFiles(storeDir, storeArchiveDir, new FilenameFilter() { 322 323 @Override 324 public boolean accept(File dir, String name) { 325 if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log")) { 326 return true; 327 } 328 return false; 329 } 330 }); 331 332 // We reset everything to clean state, then we can read from the old 333 // scheduler store and replay the scheduled jobs into this one as adds. 334 getJournal().start(); 335 metaData = new JobSchedulerKahaDBMetaData(this); 336 pageFile = null; 337 loadPageFile(); 338 339 LegacyStoreReplayer replayer = new LegacyStoreReplayer(getLegacyStoreArchiveDirectory()); 340 replayer.load(); 341 replayer.startReplay(this); 342 343 // Cleanup after replay and store what we've done. 344 pageFile.tx().execute(new Transaction.Closure<IOException>() { 345 @Override 346 public void execute(Transaction tx) throws IOException { 347 tx.store(metaData.getPage(), metaDataMarshaller, true); 348 } 349 }); 350 351 checkpointUpdate(true); 352 getJournal().close(); 353 getPageFile().unload(); 354 } 355 356 @Override 357 protected void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { 358 LOG.debug("Job Scheduler Store Checkpoint started."); 359 360 // reflect last update exclusive of current checkpoint 361 Location lastUpdate = metaData.getLastUpdateLocation(); 362 metaData.setState(KahaDBMetaData.OPEN_STATE); 363 tx.store(metaData.getPage(), metaDataMarshaller, true); 364 pageFile.flush(); 365 366 if (cleanup) { 367 final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet()); 368 final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet); 369 370 LOG.trace("Last update: {}, full gc candidates set: {}", lastUpdate, gcCandidateSet); 371 372 if (lastUpdate != null) { 373 gcCandidateSet.remove(lastUpdate.getDataFileId()); 374 } 375 376 this.metaData.getJournalRC().visit(tx, new BTreeVisitor<Integer, Integer>() { 377 378 @Override 379 public void visit(List<Integer> keys, List<Integer> values) { 380 for (Integer key : keys) { 381 if (gcCandidateSet.remove(key)) { 382 LOG.trace("Removed referenced file: {} from GC set", key); 383 } 384 } 385 } 386 387 @Override 388 public boolean isInterestedInKeysBetween(Integer first, Integer second) { 389 return true; 390 } 391 }); 392 393 LOG.trace("gc candidates after reference check: {}", gcCandidateSet); 394 395 // If there are GC candidates then check the remove command location to see 396 // if any of them can go or if they must stay in order to ensure proper recover. 397 // 398 // A log containing any remove commands must be kept until all the logs with the 399 // add commands for all the removed jobs have been dropped. 400 if (!gcCandidateSet.isEmpty()) { 401 Iterator<Entry<Integer, List<Integer>>> removals = metaData.getRemoveLocationTracker().iterator(tx); 402 List<Integer> orphans = new ArrayList<Integer>(); 403 while (removals.hasNext()) { 404 boolean orphanedRemove = true; 405 Entry<Integer, List<Integer>> entry = removals.next(); 406 407 // If this log is not a GC candidate then there's no need to do a check to rule it out 408 if (gcCandidateSet.contains(entry.getKey())) { 409 for (Integer addLocation : entry.getValue()) { 410 if (completeFileSet.contains(addLocation)) { 411 LOG.trace("A remove in log {} has an add still in existance in {}.", entry.getKey(), addLocation); 412 orphanedRemove = false; 413 break; 414 } 415 } 416 417 // If it's not orphaned than we can't remove it, otherwise we 418 // stop tracking it it's log will get deleted on the next check. 419 if (!orphanedRemove) { 420 gcCandidateSet.remove(entry.getKey()); 421 } else { 422 LOG.trace("All removes in log {} are orphaned, file can be GC'd", entry.getKey()); 423 orphans.add(entry.getKey()); 424 } 425 } 426 } 427 428 // Drop all orphaned removes from the tracker. 429 for (Integer orphan : orphans) { 430 metaData.getRemoveLocationTracker().remove(tx, orphan); 431 } 432 } 433 434 LOG.trace("gc candidates after removals check: {}", gcCandidateSet); 435 if (!gcCandidateSet.isEmpty()) { 436 if (LOG.isDebugEnabled()) { 437 LOG.debug("Cleanup removing the data files: " + gcCandidateSet); 438 } 439 journal.removeDataFiles(gcCandidateSet); 440 } 441 } 442 443 LOG.debug("Job Scheduler Store Checkpoint complete."); 444 } 445 446 /** 447 * Adds a reference for the journal log file pointed to by the given Location value. 448 * 449 * To prevent log files in the journal that still contain valid data that needs to be 450 * kept in order to allow for recovery the logs must have active references. Each Job 451 * scheduler should ensure that the logs are accurately referenced. 452 * 453 * @param tx 454 * The TX under which the update is to be performed. 455 * @param location 456 * The location value to update the reference count of. 457 * 458 * @throws IOException if an error occurs while updating the journal references table. 459 */ 460 protected void incrementJournalCount(Transaction tx, Location location) throws IOException { 461 int logId = location.getDataFileId(); 462 Integer val = metaData.getJournalRC().get(tx, logId); 463 int refCount = val != null ? val.intValue() + 1 : 1; 464 metaData.getJournalRC().put(tx, logId, refCount); 465 } 466 467 /** 468 * Removes one reference for the Journal log file indicated in the given Location value. 469 * 470 * The references are used to track which log files cannot be GC'd. When the reference count 471 * on a log file reaches zero the file id is removed from the tracker and the log will be 472 * removed on the next check point update. 473 * 474 * @param tx 475 * The TX under which the update is to be performed. 476 * @param location 477 * The location value to update the reference count of. 478 * 479 * @throws IOException if an error occurs while updating the journal references table. 480 */ 481 protected void decrementJournalCount(Transaction tx, Location location) throws IOException { 482 int logId = location.getDataFileId(); 483 Integer refCount = metaData.getJournalRC().get(tx, logId); 484 if (refCount != null) { 485 int refCountValue = refCount; 486 refCountValue--; 487 if (refCountValue <= 0) { 488 metaData.getJournalRC().remove(tx, logId); 489 } else { 490 metaData.getJournalRC().put(tx, logId, refCountValue); 491 } 492 } 493 } 494 495 /** 496 * Updates the Job removal tracking index with the location of a remove command and the 497 * original JobLocation entry. 498 * 499 * The JobLocation holds the locations in the logs where the add and update commands for 500 * a job stored. The log file containing the remove command can only be discarded after 501 * both the add and latest update log files have also been discarded. 502 * 503 * @param tx 504 * The TX under which the update is to be performed. 505 * @param location 506 * The location value to reference a remove command. 507 * @param removedJob 508 * The original JobLocation instance that holds the add and update locations 509 * 510 * @throws IOException if an error occurs while updating the remove location tracker. 511 */ 512 protected void referenceRemovedLocation(Transaction tx, Location location, JobLocation removedJob) throws IOException { 513 int logId = location.getDataFileId(); 514 List<Integer> removed = this.metaData.getRemoveLocationTracker().get(tx, logId); 515 if (removed == null) { 516 removed = new ArrayList<Integer>(); 517 } 518 removed.add(removedJob.getLocation().getDataFileId()); 519 this.metaData.getRemoveLocationTracker().put(tx, logId, removed); 520 } 521 522 /** 523 * Retrieve the scheduled Job's byte blob from the journal. 524 * 525 * @param location 526 * The location of the KahaAddScheduledJobCommand that originated the Job. 527 * 528 * @return a ByteSequence containing the payload of the scheduled Job. 529 * 530 * @throws IOException if an error occurs while reading the payload value. 531 */ 532 protected ByteSequence getPayload(Location location) throws IOException { 533 KahaAddScheduledJobCommand job = (KahaAddScheduledJobCommand) this.load(location); 534 Buffer payload = job.getPayload(); 535 return new ByteSequence(payload.getData(), payload.getOffset(), payload.getLength()); 536 } 537 538 public void readLockIndex() { 539 this.indexLock.readLock().lock(); 540 } 541 542 public void readUnlockIndex() { 543 this.indexLock.readLock().unlock(); 544 } 545 546 public void writeLockIndex() { 547 this.indexLock.writeLock().lock(); 548 } 549 550 public void writeUnlockIndex() { 551 this.indexLock.writeLock().unlock(); 552 } 553 554 @Override 555 public String toString() { 556 return "JobSchedulerStore: " + getDirectory(); 557 } 558 559 @Override 560 protected String getPageFileName() { 561 return "scheduleDB"; 562 } 563 564 @Override 565 protected File getDefaultDataDirectory() { 566 return new File(IOHelper.getDefaultDataDirectory(), "delayedDB"); 567 } 568 569 private class MetaDataMarshaller extends VariableMarshaller<JobSchedulerKahaDBMetaData> { 570 571 private final JobSchedulerStoreImpl store; 572 573 MetaDataMarshaller(JobSchedulerStoreImpl store) { 574 this.store = store; 575 } 576 577 @Override 578 public JobSchedulerKahaDBMetaData readPayload(DataInput dataIn) throws IOException { 579 JobSchedulerKahaDBMetaData rc = new JobSchedulerKahaDBMetaData(store); 580 rc.read(dataIn); 581 return rc; 582 } 583 584 @Override 585 public void writePayload(JobSchedulerKahaDBMetaData object, DataOutput dataOut) throws IOException { 586 object.write(dataOut); 587 } 588 } 589 590 /** 591 * Called during index recovery to rebuild the index from the last known good location. For 592 * entries that occur before the last known good position we just ignore then and move on. 593 * 594 * @param command 595 * the command read from the Journal which should be used to update the index. 596 * @param location 597 * the location in the index where the command was read. 598 * @param inDoubtlocation 599 * the location in the index known to be the last time the index was valid. 600 * 601 * @throws IOException if an error occurs while recovering the index. 602 */ 603 protected void doRecover(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException { 604 if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) { 605 process(data, location); 606 } 607 } 608 609 /** 610 * Called during recovery to allow the store to rebuild from scratch. 611 * 612 * @param data 613 * The command to process, which was read from the Journal. 614 * @param location 615 * The location of the command in the Journal. 616 * 617 * @throws IOException if an error occurs during command processing. 618 */ 619 @Override 620 protected void process(JournalCommand<?> data, final Location location) throws IOException { 621 data.visit(new Visitor() { 622 @Override 623 public void visit(final KahaAddScheduledJobCommand command) throws IOException { 624 final JobSchedulerImpl scheduler; 625 626 indexLock.writeLock().lock(); 627 try { 628 try { 629 scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); 630 } catch (Exception e) { 631 throw new IOException(e); 632 } 633 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 634 @Override 635 public void execute(Transaction tx) throws IOException { 636 scheduler.process(tx, command, location); 637 } 638 }); 639 640 processLocation(location); 641 } finally { 642 indexLock.writeLock().unlock(); 643 } 644 } 645 646 @Override 647 public void visit(final KahaRemoveScheduledJobCommand command) throws IOException { 648 final JobSchedulerImpl scheduler; 649 650 indexLock.writeLock().lock(); 651 try { 652 try { 653 scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); 654 } catch (Exception e) { 655 throw new IOException(e); 656 } 657 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 658 @Override 659 public void execute(Transaction tx) throws IOException { 660 scheduler.process(tx, command, location); 661 } 662 }); 663 664 processLocation(location); 665 } finally { 666 indexLock.writeLock().unlock(); 667 } 668 } 669 670 @Override 671 public void visit(final KahaRemoveScheduledJobsCommand command) throws IOException { 672 final JobSchedulerImpl scheduler; 673 674 indexLock.writeLock().lock(); 675 try { 676 try { 677 scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); 678 } catch (Exception e) { 679 throw new IOException(e); 680 } 681 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 682 @Override 683 public void execute(Transaction tx) throws IOException { 684 scheduler.process(tx, command, location); 685 } 686 }); 687 688 processLocation(location); 689 } finally { 690 indexLock.writeLock().unlock(); 691 } 692 } 693 694 @Override 695 public void visit(final KahaRescheduleJobCommand command) throws IOException { 696 final JobSchedulerImpl scheduler; 697 698 indexLock.writeLock().lock(); 699 try { 700 try { 701 scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler()); 702 } catch (Exception e) { 703 throw new IOException(e); 704 } 705 getPageFile().tx().execute(new Transaction.Closure<IOException>() { 706 @Override 707 public void execute(Transaction tx) throws IOException { 708 scheduler.process(tx, command, location); 709 } 710 }); 711 712 processLocation(location); 713 } finally { 714 indexLock.writeLock().unlock(); 715 } 716 } 717 718 @Override 719 public void visit(final KahaDestroySchedulerCommand command) { 720 try { 721 removeJobScheduler(command.getScheduler()); 722 } catch (Exception e) { 723 LOG.warn("Failed to remove scheduler: {}", command.getScheduler()); 724 } 725 726 processLocation(location); 727 } 728 729 @Override 730 public void visit(KahaTraceCommand command) { 731 processLocation(location); 732 } 733 }); 734 } 735 736 protected void processLocation(final Location location) { 737 indexLock.writeLock().lock(); 738 try { 739 this.metaData.setLastUpdateLocation(location); 740 } finally { 741 indexLock.writeLock().unlock(); 742 } 743 } 744 745 /** 746 * We recover from the Journal logs as needed to restore the index. 747 * 748 * @throws IllegalStateException 749 * @throws IOException 750 */ 751 private void recover() throws IllegalStateException, IOException { 752 this.indexLock.writeLock().lock(); 753 try { 754 long start = System.currentTimeMillis(); 755 Location lastIndoubtPosition = getRecoveryPosition(); 756 Location recoveryPosition = lastIndoubtPosition; 757 758 if (recoveryPosition != null) { 759 int redoCounter = 0; 760 LOG.info("Recovering from the scheduled job journal @" + recoveryPosition); 761 while (recoveryPosition != null) { 762 try { 763 JournalCommand<?> message = load(recoveryPosition); 764 metaData.setLastUpdateLocation(recoveryPosition); 765 doRecover(message, recoveryPosition, lastIndoubtPosition); 766 redoCounter++; 767 } catch (IOException failedRecovery) { 768 if (isIgnoreMissingJournalfiles()) { 769 LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); 770 // track this dud location 771 journal.corruptRecoveryLocation(recoveryPosition); 772 } else { 773 throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery); 774 } 775 } 776 recoveryPosition = journal.getNextLocation(recoveryPosition); 777 if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { 778 LOG.info("@ {}, {} entries recovered ..", recoveryPosition, redoCounter); 779 } 780 } 781 long end = System.currentTimeMillis(); 782 LOG.info("Recovery replayed {} operations from the journal in {} seconds.", 783 redoCounter, ((end - start) / 1000.0f)); 784 } 785 786 // We may have to undo some index updates. 787 pageFile.tx().execute(new Transaction.Closure<IOException>() { 788 @Override 789 public void execute(Transaction tx) throws IOException { 790 recoverIndex(tx); 791 } 792 }); 793 794 } finally { 795 this.indexLock.writeLock().unlock(); 796 } 797 } 798 799 private Location getRecoveryPosition() throws IOException { 800 // This loads the first position and we completely rebuild the index if we 801 // do not override it with some known recovery start location. 802 Location result = null; 803 804 if (!isForceRecoverIndex()) { 805 if (metaData.getLastUpdateLocation() != null) { 806 result = metaData.getLastUpdateLocation(); 807 } 808 } 809 810 return journal.getNextLocation(result); 811 } 812 813 private void recoverIndex(Transaction tx) throws IOException { 814 long start = System.currentTimeMillis(); 815 816 // It is possible index updates got applied before the journal updates.. 817 // in that case we need to removed references to Jobs that are not in the journal 818 final Location lastAppendLocation = journal.getLastAppendLocation(); 819 long undoCounter = 0; 820 821 // Go through all the jobs in each scheduler and check if any are added after 822 // the last appended location and remove those. For now we ignore the update 823 // location since the scheduled job will update itself after the next fire and 824 // a new update will replace any existing update. 825 for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) { 826 Map.Entry<String, JobSchedulerImpl> entry = i.next(); 827 JobSchedulerImpl scheduler = entry.getValue(); 828 829 List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx); 830 for (JobLocation job : jobs) { 831 if (job.getLocation().compareTo(lastAppendLocation) >= 0) { 832 if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) { 833 LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId()); 834 undoCounter++; 835 } 836 } 837 } 838 } 839 840 if (undoCounter > 0) { 841 // The rolled back operations are basically in flight journal writes. To avoid getting 842 // these the end user should do sync writes to the journal. 843 long end = System.currentTimeMillis(); 844 LOG.info("Rolled back {} messages from the index in {} seconds.", undoCounter, ((end - start) / 1000.0f)); 845 undoCounter = 0; 846 } 847 848 // Now we check for missing and corrupt journal files. 849 850 // 1. Collect the set of all referenced journal files based on the Location of the 851 // the scheduled jobs and the marked last update field. 852 HashSet<Integer> missingJournalFiles = new HashSet<Integer>(); 853 for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) { 854 Map.Entry<String, JobSchedulerImpl> entry = i.next(); 855 JobSchedulerImpl scheduler = entry.getValue(); 856 857 List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx); 858 for (JobLocation job : jobs) { 859 missingJournalFiles.add(job.getLocation().getDataFileId()); 860 if (job.getLastUpdate() != null) { 861 missingJournalFiles.add(job.getLastUpdate().getDataFileId()); 862 } 863 } 864 } 865 866 // 2. Remove from that set all known data file Id's in the journal and what's left 867 // is the missing set which will soon also contain the corrupted set. 868 missingJournalFiles.removeAll(journal.getFileMap().keySet()); 869 if (!missingJournalFiles.isEmpty()) { 870 LOG.info("Some journal files are missing: {}", missingJournalFiles); 871 } 872 873 // 3. Now check all references in the journal logs for corruption and add any 874 // corrupt journal files to the missing set. 875 HashSet<Location> corruptedLocations = new HashSet<Location>(); 876 877 if (isCheckForCorruptJournalFiles()) { 878 Collection<DataFile> dataFiles = journal.getFileMap().values(); 879 for (DataFile dataFile : dataFiles) { 880 int id = dataFile.getDataFileId(); 881 for (long offset : dataFile.getCorruptedBlocks()) { 882 corruptedLocations.add(new Location(id, (int) offset)); 883 } 884 } 885 886 if (!corruptedLocations.isEmpty()) { 887 LOG.debug("Found some corrupted data blocks in the journal: {}", corruptedLocations.size()); 888 } 889 } 890 891 // 4. Now we either fail or we remove all references to missing or corrupt journal 892 // files from the various JobSchedulerImpl instances. We only remove the Job if 893 // the initial Add operation is missing when the ignore option is set, the updates 894 // could be lost but that's price you pay when ignoring the missing logs. 895 if (!missingJournalFiles.isEmpty() || !corruptedLocations.isEmpty()) { 896 if (!isIgnoreMissingJournalfiles()) { 897 throw new IOException("Detected missing/corrupt journal files."); 898 } 899 900 // Remove all Jobs that reference an Location that is either missing or corrupt. 901 undoCounter = removeJobsInMissingOrCorruptJounralFiles(tx, missingJournalFiles, corruptedLocations); 902 903 // Clean up the Journal Reference count Map. 904 removeJournalRCForMissingFiles(tx, missingJournalFiles); 905 } 906 907 if (undoCounter > 0) { 908 long end = System.currentTimeMillis(); 909 LOG.info("Detected missing/corrupt journal files. Dropped {} jobs from the " + 910 "index in {} seconds.", undoCounter, ((end - start) / 1000.0f)); 911 } 912 } 913 914 private void removeJournalRCForMissingFiles(Transaction tx, Set<Integer> missing) throws IOException { 915 List<Integer> matches = new ArrayList<Integer>(); 916 917 Iterator<Entry<Integer, Integer>> references = metaData.getJournalRC().iterator(tx); 918 while (references.hasNext()) { 919 int dataFileId = references.next().getKey(); 920 if (missing.contains(dataFileId)) { 921 matches.add(dataFileId); 922 } 923 } 924 925 for (Integer match : matches) { 926 metaData.getJournalRC().remove(tx, match); 927 } 928 } 929 930 private int removeJobsInMissingOrCorruptJounralFiles(Transaction tx, Set<Integer> missing, Set<Location> corrupted) throws IOException { 931 int removed = 0; 932 933 // Remove Jobs that reference missing or corrupt files. 934 // Remove Reference counts to missing or corrupt files. 935 // Remove and remove command markers to missing or corrupt files. 936 for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) { 937 Map.Entry<String, JobSchedulerImpl> entry = i.next(); 938 JobSchedulerImpl scheduler = entry.getValue(); 939 940 List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx); 941 for (JobLocation job : jobs) { 942 943 // Remove all jobs in missing log files. 944 if (missing.contains(job.getLocation().getDataFileId())) { 945 scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime()); 946 removed++; 947 continue; 948 } 949 950 // Remove all jobs in corrupted parts of log files. 951 if (corrupted.contains(job.getLocation())) { 952 scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime()); 953 removed++; 954 } 955 } 956 } 957 958 return removed; 959 } 960}