001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.store.kahadb.scheduler;
018
019import java.io.DataInput;
020import java.io.DataOutput;
021import java.io.File;
022import java.io.FilenameFilter;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collection;
026import java.util.HashMap;
027import java.util.HashSet;
028import java.util.Iterator;
029import java.util.List;
030import java.util.Map;
031import java.util.Map.Entry;
032import java.util.Set;
033import java.util.TreeSet;
034import java.util.UUID;
035
036import org.apache.activemq.broker.scheduler.JobScheduler;
037import org.apache.activemq.broker.scheduler.JobSchedulerStore;
038import org.apache.activemq.protobuf.Buffer;
039import org.apache.activemq.store.kahadb.AbstractKahaDBStore;
040import org.apache.activemq.store.kahadb.JournalCommand;
041import org.apache.activemq.store.kahadb.KahaDBMetaData;
042import org.apache.activemq.store.kahadb.Visitor;
043import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand;
044import org.apache.activemq.store.kahadb.data.KahaDestroySchedulerCommand;
045import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobCommand;
046import org.apache.activemq.store.kahadb.data.KahaRemoveScheduledJobsCommand;
047import org.apache.activemq.store.kahadb.data.KahaRescheduleJobCommand;
048import org.apache.activemq.store.kahadb.data.KahaTraceCommand;
049import org.apache.activemq.store.kahadb.disk.index.BTreeVisitor;
050import org.apache.activemq.store.kahadb.disk.journal.DataFile;
051import org.apache.activemq.store.kahadb.disk.journal.Location;
052import org.apache.activemq.store.kahadb.disk.page.Page;
053import org.apache.activemq.store.kahadb.disk.page.PageFile;
054import org.apache.activemq.store.kahadb.disk.page.Transaction;
055import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
056import org.apache.activemq.store.kahadb.scheduler.legacy.LegacyStoreReplayer;
057import org.apache.activemq.util.ByteSequence;
058import org.apache.activemq.util.IOHelper;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061
062/*
063 * @org.apache.xbean.XBean element="kahaDBJobScheduler"
064 */
065
066public class JobSchedulerStoreImpl extends AbstractKahaDBStore implements JobSchedulerStore {
067
068    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreImpl.class);
069
070    private JobSchedulerKahaDBMetaData metaData = new JobSchedulerKahaDBMetaData(this);
071    private final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
072    private final Map<String, JobSchedulerImpl> schedulers = new HashMap<String, JobSchedulerImpl>();
073    private File legacyStoreArchiveDirectory;
074
075    /**
076     * The Scheduler Token is used to identify base revisions of the Scheduler store.  A store
077     * based on the initial scheduler design will not have this tag in it's meta-data and will
078     * indicate an update is needed.  Later versions of the scheduler can also change this value
079     * to indicate incompatible store bases which require complete meta-data and journal rewrites
080     * instead of simpler meta-data updates.
081     */
082    static final UUID SCHEDULER_STORE_TOKEN = UUID.fromString("57ed642b-1ee3-47b3-be6d-b7297d500409");
083
084    /**
085     * The default scheduler store version.  All new store instance will be given this version and
086     * earlier versions will be updated to this version.
087     */
088    static final int CURRENT_VERSION = 1;
089
090    @Override
091    public JobScheduler getJobScheduler(final String name) throws Exception {
092        this.indexLock.writeLock().lock();
093        try {
094            JobSchedulerImpl result = this.schedulers.get(name);
095            if (result == null) {
096                final JobSchedulerImpl js = new JobSchedulerImpl(this);
097                js.setName(name);
098                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
099                    @Override
100                    public void execute(Transaction tx) throws IOException {
101                        js.createIndexes(tx);
102                        js.load(tx);
103                        metaData.getJobSchedulers().put(tx, name, js);
104                    }
105                });
106                result = js;
107                this.schedulers.put(name, js);
108                if (isStarted()) {
109                    result.start();
110                }
111                this.pageFile.flush();
112            }
113            return result;
114        } finally {
115            this.indexLock.writeLock().unlock();
116        }
117    }
118
119    @Override
120    public boolean removeJobScheduler(final String name) throws Exception {
121        boolean result = false;
122
123        this.indexLock.writeLock().lock();
124        try {
125            final JobSchedulerImpl js = this.schedulers.remove(name);
126            result = js != null;
127            if (result) {
128                js.stop();
129                getPageFile().tx().execute(new Transaction.Closure<IOException>() {
130                    @Override
131                    public void execute(Transaction tx) throws IOException {
132                        metaData.getJobSchedulers().remove(tx, name);
133                        js.removeAll(tx);
134                    }
135                });
136            }
137        } finally {
138            this.indexLock.writeLock().unlock();
139        }
140        return result;
141    }
142
143    /**
144     * Sets the directory where the legacy scheduler store files are archived before an
145     * update attempt is made.  Both the legacy index files and the journal files are moved
146     * to this folder prior to an upgrade attempt.
147     *
148     * @param directory
149     *      The directory to move the legacy Scheduler Store files to.
150     */
151    public void setLegacyStoreArchiveDirectory(File directory) {
152        this.legacyStoreArchiveDirectory = directory;
153    }
154
155    /**
156     * Gets the directory where the legacy Scheduler Store files will be archived if the
157     * broker is started and an existing Job Scheduler Store from an old version is detected.
158     *
159     * @return the directory where scheduler store legacy files are archived on upgrade.
160     */
161    public File getLegacyStoreArchiveDirectory() {
162        if (this.legacyStoreArchiveDirectory == null) {
163            this.legacyStoreArchiveDirectory = new File(getDirectory(), "legacySchedulerStore");
164        }
165
166        return this.legacyStoreArchiveDirectory.getAbsoluteFile();
167    }
168
169    @Override
170    public void load() throws IOException {
171        if (opened.compareAndSet(false, true)) {
172            getJournal().start();
173            try {
174                loadPageFile();
175            } catch (UnknownStoreVersionException ex) {
176                LOG.info("Can't start until store update is performed.");
177                upgradeFromLegacy();
178                // Restart with the updated store
179                getJournal().start();
180                loadPageFile();
181                LOG.info("Update from legacy Scheduler store completed successfully.");
182            } catch (Throwable t) {
183                LOG.warn("Index corrupted. Recovering the index through journal replay. Cause: {}", t.toString());
184                LOG.debug("Index load failure", t);
185
186                // try to recover index
187                try {
188                    pageFile.unload();
189                } catch (Exception ignore) {
190                }
191                if (isArchiveCorruptedIndex()) {
192                    pageFile.archive();
193                } else {
194                    pageFile.delete();
195                }
196                metaData = new JobSchedulerKahaDBMetaData(this);
197                pageFile = null;
198                loadPageFile();
199            }
200            startCheckpoint();
201            recover();
202        }
203        LOG.info("{} started.", this);
204    }
205
206    @Override
207    public void unload() throws IOException {
208        if (opened.compareAndSet(true, false)) {
209            for (JobSchedulerImpl js : this.schedulers.values()) {
210                try {
211                    js.stop();
212                } catch (Exception e) {
213                    throw new IOException(e);
214                }
215            }
216            this.indexLock.writeLock().lock();
217            try {
218                if (pageFile != null && pageFile.isLoaded()) {
219                    metaData.setState(KahaDBMetaData.CLOSED_STATE);
220
221                    if (metaData.getPage() != null) {
222                        pageFile.tx().execute(new Transaction.Closure<IOException>() {
223                            @Override
224                            public void execute(Transaction tx) throws IOException {
225                                tx.store(metaData.getPage(), metaDataMarshaller, true);
226                            }
227                        });
228                    }
229                }
230            } finally {
231                this.indexLock.writeLock().unlock();
232            }
233
234            checkpointLock.writeLock().lock();
235            try {
236                if (metaData.getPage() != null) {
237                    checkpointUpdate(getCleanupOnStop());
238                }
239            } finally {
240                checkpointLock.writeLock().unlock();
241            }
242            synchronized (checkpointThreadLock) {
243                if (checkpointThread != null) {
244                    try {
245                        checkpointThread.join();
246                        checkpointThread = null;
247                    } catch (InterruptedException e) {
248                    }
249                }
250            }
251
252            if (pageFile != null) {
253                pageFile.unload();
254                pageFile = null;
255            }
256            if (this.journal != null) {
257                journal.close();
258                journal = null;
259            }
260
261            metaData = new JobSchedulerKahaDBMetaData(this);
262        }
263        LOG.info("{} stopped.", this);
264    }
265
266    private void loadPageFile() throws IOException {
267        this.indexLock.writeLock().lock();
268        try {
269            final PageFile pageFile = getPageFile();
270            pageFile.load();
271            pageFile.tx().execute(new Transaction.Closure<IOException>() {
272                @Override
273                public void execute(Transaction tx) throws IOException {
274                    if (pageFile.getPageCount() == 0) {
275                        Page<JobSchedulerKahaDBMetaData> page = tx.allocate();
276                        assert page.getPageId() == 0;
277                        page.set(metaData);
278                        metaData.setPage(page);
279                        metaData.setState(KahaDBMetaData.CLOSED_STATE);
280                        metaData.initialize(tx);
281                        tx.store(metaData.getPage(), metaDataMarshaller, true);
282                    } else {
283                        Page<JobSchedulerKahaDBMetaData> page = null;
284                        page = tx.load(0, metaDataMarshaller);
285                        metaData = page.get();
286                        metaData.setPage(page);
287                    }
288                    metaData.load(tx);
289                    metaData.loadScheduler(tx, schedulers);
290                    for (JobSchedulerImpl js : schedulers.values()) {
291                        try {
292                            js.start();
293                        } catch (Exception e) {
294                            JobSchedulerStoreImpl.LOG.error("Failed to load " + js.getName(), e);
295                        }
296                    }
297                }
298            });
299
300            pageFile.flush();
301        } finally {
302            this.indexLock.writeLock().unlock();
303        }
304    }
305
306    private void upgradeFromLegacy() throws IOException {
307
308        journal.close();
309        journal = null;
310        try {
311            pageFile.unload();
312            pageFile = null;
313        } catch (Exception ignore) {}
314
315        File storeDir = getDirectory().getAbsoluteFile();
316        File storeArchiveDir = getLegacyStoreArchiveDirectory();
317
318        LOG.info("Attempting to move old store files from {} to {}", storeDir, storeArchiveDir);
319
320        // Move only the known store files, locks and other items left in place.
321        IOHelper.moveFiles(storeDir, storeArchiveDir, new FilenameFilter() {
322
323            @Override
324            public boolean accept(File dir, String name) {
325                if (name.endsWith(".data") || name.endsWith(".redo") || name.endsWith(".log")) {
326                    return true;
327                }
328                return false;
329            }
330        });
331
332        // We reset everything to clean state, then we can read from the old
333        // scheduler store and replay the scheduled jobs into this one as adds.
334        getJournal().start();
335        metaData = new JobSchedulerKahaDBMetaData(this);
336        pageFile = null;
337        loadPageFile();
338
339        LegacyStoreReplayer replayer = new LegacyStoreReplayer(getLegacyStoreArchiveDirectory());
340        replayer.load();
341        replayer.startReplay(this);
342
343        // Cleanup after replay and store what we've done.
344        pageFile.tx().execute(new Transaction.Closure<IOException>() {
345            @Override
346            public void execute(Transaction tx) throws IOException {
347                tx.store(metaData.getPage(), metaDataMarshaller, true);
348            }
349        });
350
351        checkpointUpdate(true);
352        getJournal().close();
353        getPageFile().unload();
354    }
355
356    @Override
357    protected void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
358        LOG.debug("Job Scheduler Store Checkpoint started.");
359
360        // reflect last update exclusive of current checkpoint
361        Location lastUpdate = metaData.getLastUpdateLocation();
362        metaData.setState(KahaDBMetaData.OPEN_STATE);
363        tx.store(metaData.getPage(), metaDataMarshaller, true);
364        pageFile.flush();
365
366        if (cleanup) {
367            final TreeSet<Integer> completeFileSet = new TreeSet<Integer>(journal.getFileMap().keySet());
368            final TreeSet<Integer> gcCandidateSet = new TreeSet<Integer>(completeFileSet);
369
370            LOG.trace("Last update: {}, full gc candidates set: {}", lastUpdate, gcCandidateSet);
371
372            if (lastUpdate != null) {
373                gcCandidateSet.remove(lastUpdate.getDataFileId());
374            }
375
376            this.metaData.getJournalRC().visit(tx, new BTreeVisitor<Integer, Integer>() {
377
378                @Override
379                public void visit(List<Integer> keys, List<Integer> values) {
380                    for (Integer key : keys) {
381                        if (gcCandidateSet.remove(key)) {
382                            LOG.trace("Removed referenced file: {} from GC set", key);
383                        }
384                    }
385                }
386
387                @Override
388                public boolean isInterestedInKeysBetween(Integer first, Integer second) {
389                    return true;
390                }
391            });
392
393            LOG.trace("gc candidates after reference check: {}", gcCandidateSet);
394
395            // If there are GC candidates then check the remove command location to see
396            // if any of them can go or if they must stay in order to ensure proper recover.
397            //
398            // A log containing any remove commands must be kept until all the logs with the
399            // add commands for all the removed jobs have been dropped.
400            if (!gcCandidateSet.isEmpty()) {
401                Iterator<Entry<Integer, List<Integer>>> removals = metaData.getRemoveLocationTracker().iterator(tx);
402                List<Integer> orphans = new ArrayList<Integer>();
403                while (removals.hasNext()) {
404                    boolean orphanedRemove = true;
405                    Entry<Integer, List<Integer>> entry = removals.next();
406
407                    // If this log is not a GC candidate then there's no need to do a check to rule it out
408                    if (gcCandidateSet.contains(entry.getKey())) {
409                        for (Integer addLocation : entry.getValue()) {
410                            if (completeFileSet.contains(addLocation)) {
411                                LOG.trace("A remove in log {} has an add still in existance in {}.", entry.getKey(), addLocation);
412                                orphanedRemove = false;
413                                break;
414                            }
415                        }
416
417                        // If it's not orphaned than we can't remove it, otherwise we
418                        // stop tracking it it's log will get deleted on the next check.
419                        if (!orphanedRemove) {
420                            gcCandidateSet.remove(entry.getKey());
421                        } else {
422                            LOG.trace("All removes in log {} are orphaned, file can be GC'd", entry.getKey());
423                            orphans.add(entry.getKey());
424                        }
425                    }
426                }
427
428                // Drop all orphaned removes from the tracker.
429                for (Integer orphan : orphans) {
430                    metaData.getRemoveLocationTracker().remove(tx, orphan);
431                }
432            }
433
434            LOG.trace("gc candidates after removals check: {}", gcCandidateSet);
435            if (!gcCandidateSet.isEmpty()) {
436                if (LOG.isDebugEnabled()) {
437                    LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
438                }
439                journal.removeDataFiles(gcCandidateSet);
440            }
441        }
442
443        LOG.debug("Job Scheduler Store Checkpoint complete.");
444    }
445
446    /**
447     * Adds a reference for the journal log file pointed to by the given Location value.
448     *
449     * To prevent log files in the journal that still contain valid data that needs to be
450     * kept in order to allow for recovery the logs must have active references.  Each Job
451     * scheduler should ensure that the logs are accurately referenced.
452     *
453     * @param tx
454     *      The TX under which the update is to be performed.
455     * @param location
456     *      The location value to update the reference count of.
457     *
458     * @throws IOException if an error occurs while updating the journal references table.
459     */
460    protected void incrementJournalCount(Transaction tx, Location location) throws IOException {
461        int logId = location.getDataFileId();
462        Integer val = metaData.getJournalRC().get(tx, logId);
463        int refCount = val != null ? val.intValue() + 1 : 1;
464        metaData.getJournalRC().put(tx, logId, refCount);
465    }
466
467    /**
468     * Removes one reference for the Journal log file indicated in the given Location value.
469     *
470     * The references are used to track which log files cannot be GC'd.  When the reference count
471     * on a log file reaches zero the file id is removed from the tracker and the log will be
472     * removed on the next check point update.
473     *
474     * @param tx
475     *      The TX under which the update is to be performed.
476     * @param location
477     *      The location value to update the reference count of.
478     *
479     * @throws IOException if an error occurs while updating the journal references table.
480     */
481    protected void decrementJournalCount(Transaction tx, Location location) throws IOException {
482        int logId = location.getDataFileId();
483        Integer refCount = metaData.getJournalRC().get(tx, logId);
484        if (refCount != null) {
485            int refCountValue = refCount;
486            refCountValue--;
487            if (refCountValue <= 0) {
488                metaData.getJournalRC().remove(tx, logId);
489            } else {
490                metaData.getJournalRC().put(tx, logId, refCountValue);
491            }
492        }
493    }
494
495    /**
496     * Updates the Job removal tracking index with the location of a remove command and the
497     * original JobLocation entry.
498     *
499     * The JobLocation holds the locations in the logs where the add and update commands for
500     * a job stored.  The log file containing the remove command can only be discarded after
501     * both the add and latest update log files have also been discarded.
502     *
503     * @param tx
504     *      The TX under which the update is to be performed.
505     * @param location
506     *      The location value to reference a remove command.
507     * @param removedJob
508     *      The original JobLocation instance that holds the add and update locations
509     *
510     * @throws IOException if an error occurs while updating the remove location tracker.
511     */
512    protected void referenceRemovedLocation(Transaction tx, Location location, JobLocation removedJob) throws IOException {
513        int logId = location.getDataFileId();
514        List<Integer> removed = this.metaData.getRemoveLocationTracker().get(tx, logId);
515        if (removed == null) {
516            removed = new ArrayList<Integer>();
517        }
518        removed.add(removedJob.getLocation().getDataFileId());
519        this.metaData.getRemoveLocationTracker().put(tx, logId, removed);
520    }
521
522    /**
523     * Retrieve the scheduled Job's byte blob from the journal.
524     *
525     * @param location
526     *      The location of the KahaAddScheduledJobCommand that originated the Job.
527     *
528     * @return a ByteSequence containing the payload of the scheduled Job.
529     *
530     * @throws IOException if an error occurs while reading the payload value.
531     */
532    protected ByteSequence getPayload(Location location) throws IOException {
533        KahaAddScheduledJobCommand job = (KahaAddScheduledJobCommand) this.load(location);
534        Buffer payload = job.getPayload();
535        return new ByteSequence(payload.getData(), payload.getOffset(), payload.getLength());
536    }
537
538    public void readLockIndex() {
539        this.indexLock.readLock().lock();
540    }
541
542    public void readUnlockIndex() {
543        this.indexLock.readLock().unlock();
544    }
545
546    public void writeLockIndex() {
547        this.indexLock.writeLock().lock();
548    }
549
550    public void writeUnlockIndex() {
551        this.indexLock.writeLock().unlock();
552    }
553
554    @Override
555    public String toString() {
556        return "JobSchedulerStore: " + getDirectory();
557    }
558
559    @Override
560    protected String getPageFileName() {
561        return "scheduleDB";
562    }
563
564    @Override
565    protected File getDefaultDataDirectory() {
566        return new File(IOHelper.getDefaultDataDirectory(), "delayedDB");
567    }
568
569    private class MetaDataMarshaller extends VariableMarshaller<JobSchedulerKahaDBMetaData> {
570
571        private final JobSchedulerStoreImpl store;
572
573        MetaDataMarshaller(JobSchedulerStoreImpl store) {
574            this.store = store;
575        }
576
577        @Override
578        public JobSchedulerKahaDBMetaData readPayload(DataInput dataIn) throws IOException {
579            JobSchedulerKahaDBMetaData rc = new JobSchedulerKahaDBMetaData(store);
580            rc.read(dataIn);
581            return rc;
582        }
583
584        @Override
585        public void writePayload(JobSchedulerKahaDBMetaData object, DataOutput dataOut) throws IOException {
586            object.write(dataOut);
587        }
588    }
589
590    /**
591     * Called during index recovery to rebuild the index from the last known good location.  For
592     * entries that occur before the last known good position we just ignore then and move on.
593     *
594     * @param command
595     *        the command read from the Journal which should be used to update the index.
596     * @param location
597     *        the location in the index where the command was read.
598     * @param inDoubtlocation
599     *        the location in the index known to be the last time the index was valid.
600     *
601     * @throws IOException if an error occurs while recovering the index.
602     */
603    protected void doRecover(JournalCommand<?> data, final Location location, final Location inDoubtlocation) throws IOException {
604        if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >= 0) {
605            process(data, location);
606        }
607    }
608
609    /**
610     * Called during recovery to allow the store to rebuild from scratch.
611     *
612     * @param data
613     *      The command to process, which was read from the Journal.
614     * @param location
615     *      The location of the command in the Journal.
616     *
617     * @throws IOException if an error occurs during command processing.
618     */
619    @Override
620    protected void process(JournalCommand<?> data, final Location location) throws IOException {
621        data.visit(new Visitor() {
622            @Override
623            public void visit(final KahaAddScheduledJobCommand command) throws IOException {
624                final JobSchedulerImpl scheduler;
625
626                indexLock.writeLock().lock();
627                try {
628                    try {
629                        scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
630                    } catch (Exception e) {
631                        throw new IOException(e);
632                    }
633                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
634                        @Override
635                        public void execute(Transaction tx) throws IOException {
636                            scheduler.process(tx, command, location);
637                        }
638                    });
639
640                    processLocation(location);
641                } finally {
642                    indexLock.writeLock().unlock();
643                }
644            }
645
646            @Override
647            public void visit(final KahaRemoveScheduledJobCommand command) throws IOException {
648                final JobSchedulerImpl scheduler;
649
650                indexLock.writeLock().lock();
651                try {
652                    try {
653                        scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
654                    } catch (Exception e) {
655                        throw new IOException(e);
656                    }
657                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
658                        @Override
659                        public void execute(Transaction tx) throws IOException {
660                            scheduler.process(tx, command, location);
661                        }
662                    });
663
664                    processLocation(location);
665                } finally {
666                    indexLock.writeLock().unlock();
667                }
668            }
669
670            @Override
671            public void visit(final KahaRemoveScheduledJobsCommand command) throws IOException {
672                final JobSchedulerImpl scheduler;
673
674                indexLock.writeLock().lock();
675                try {
676                    try {
677                        scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
678                    } catch (Exception e) {
679                        throw new IOException(e);
680                    }
681                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
682                        @Override
683                        public void execute(Transaction tx) throws IOException {
684                            scheduler.process(tx, command, location);
685                        }
686                    });
687
688                    processLocation(location);
689                } finally {
690                    indexLock.writeLock().unlock();
691                }
692            }
693
694            @Override
695            public void visit(final KahaRescheduleJobCommand command) throws IOException {
696                final JobSchedulerImpl scheduler;
697
698                indexLock.writeLock().lock();
699                try {
700                    try {
701                        scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
702                    } catch (Exception e) {
703                        throw new IOException(e);
704                    }
705                    getPageFile().tx().execute(new Transaction.Closure<IOException>() {
706                        @Override
707                        public void execute(Transaction tx) throws IOException {
708                            scheduler.process(tx, command, location);
709                        }
710                    });
711
712                    processLocation(location);
713                } finally {
714                    indexLock.writeLock().unlock();
715                }
716            }
717
718            @Override
719            public void visit(final KahaDestroySchedulerCommand command) {
720                try {
721                    removeJobScheduler(command.getScheduler());
722                } catch (Exception e) {
723                    LOG.warn("Failed to remove scheduler: {}", command.getScheduler());
724                }
725
726                processLocation(location);
727            }
728
729            @Override
730            public void visit(KahaTraceCommand command) {
731                processLocation(location);
732            }
733        });
734    }
735
736    protected void processLocation(final Location location) {
737        indexLock.writeLock().lock();
738        try {
739            this.metaData.setLastUpdateLocation(location);
740        } finally {
741            indexLock.writeLock().unlock();
742        }
743    }
744
745    /**
746     * We recover from the Journal logs as needed to restore the index.
747     *
748     * @throws IllegalStateException
749     * @throws IOException
750     */
751    private void recover() throws IllegalStateException, IOException {
752        this.indexLock.writeLock().lock();
753        try {
754            long start = System.currentTimeMillis();
755            Location lastIndoubtPosition = getRecoveryPosition();
756            Location recoveryPosition = lastIndoubtPosition;
757
758            if (recoveryPosition != null) {
759                int redoCounter = 0;
760                LOG.info("Recovering from the scheduled job journal @" + recoveryPosition);
761                while (recoveryPosition != null) {
762                    try {
763                        JournalCommand<?> message = load(recoveryPosition);
764                        metaData.setLastUpdateLocation(recoveryPosition);
765                        doRecover(message, recoveryPosition, lastIndoubtPosition);
766                        redoCounter++;
767                    } catch (IOException failedRecovery) {
768                        if (isIgnoreMissingJournalfiles()) {
769                            LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery);
770                            // track this dud location
771                            journal.corruptRecoveryLocation(recoveryPosition);
772                        } else {
773                            throw new IOException("Failed to recover data at position:" + recoveryPosition, failedRecovery);
774                        }
775                    }
776                    recoveryPosition = journal.getNextLocation(recoveryPosition);
777                     if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
778                         LOG.info("@ {}, {} entries recovered ..", recoveryPosition, redoCounter);
779                     }
780                }
781                long end = System.currentTimeMillis();
782                LOG.info("Recovery replayed {} operations from the journal in {} seconds.",
783                         redoCounter, ((end - start) / 1000.0f));
784            }
785
786            // We may have to undo some index updates.
787            pageFile.tx().execute(new Transaction.Closure<IOException>() {
788                @Override
789                public void execute(Transaction tx) throws IOException {
790                    recoverIndex(tx);
791                }
792            });
793
794        } finally {
795            this.indexLock.writeLock().unlock();
796        }
797    }
798
799    private Location getRecoveryPosition() throws IOException {
800        // This loads the first position and we completely rebuild the index if we
801        // do not override it with some known recovery start location.
802        Location result = null;
803
804        if (!isForceRecoverIndex()) {
805            if (metaData.getLastUpdateLocation() != null) {
806                result = metaData.getLastUpdateLocation();
807            }
808        }
809
810        return journal.getNextLocation(result);
811    }
812
813    private void recoverIndex(Transaction tx) throws IOException {
814        long start = System.currentTimeMillis();
815
816        // It is possible index updates got applied before the journal updates..
817        // in that case we need to removed references to Jobs that are not in the journal
818        final Location lastAppendLocation = journal.getLastAppendLocation();
819        long undoCounter = 0;
820
821        // Go through all the jobs in each scheduler and check if any are added after
822        // the last appended location and remove those.  For now we ignore the update
823        // location since the scheduled job will update itself after the next fire and
824        // a new update will replace any existing update.
825        for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) {
826            Map.Entry<String, JobSchedulerImpl> entry = i.next();
827            JobSchedulerImpl scheduler = entry.getValue();
828
829            List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
830            for (JobLocation job : jobs) {
831                if (job.getLocation().compareTo(lastAppendLocation) >= 0) {
832                    if (scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime())) {
833                        LOG.trace("Removed Job past last appened in the journal: {}", job.getJobId());
834                        undoCounter++;
835                    }
836                }
837            }
838        }
839
840        if (undoCounter > 0) {
841            // The rolled back operations are basically in flight journal writes.  To avoid getting
842            // these the end user should do sync writes to the journal.
843            long end = System.currentTimeMillis();
844            LOG.info("Rolled back {} messages from the index in {} seconds.", undoCounter, ((end - start) / 1000.0f));
845            undoCounter = 0;
846        }
847
848        // Now we check for missing and corrupt journal files.
849
850        // 1. Collect the set of all referenced journal files based on the Location of the
851        //    the scheduled jobs and the marked last update field.
852        HashSet<Integer> missingJournalFiles = new HashSet<Integer>();
853        for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) {
854            Map.Entry<String, JobSchedulerImpl> entry = i.next();
855            JobSchedulerImpl scheduler = entry.getValue();
856
857            List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
858            for (JobLocation job : jobs) {
859                missingJournalFiles.add(job.getLocation().getDataFileId());
860                if (job.getLastUpdate() != null) {
861                    missingJournalFiles.add(job.getLastUpdate().getDataFileId());
862                }
863            }
864        }
865
866        // 2. Remove from that set all known data file Id's in the journal and what's left
867        //    is the missing set which will soon also contain the corrupted set.
868        missingJournalFiles.removeAll(journal.getFileMap().keySet());
869        if (!missingJournalFiles.isEmpty()) {
870            LOG.info("Some journal files are missing: {}", missingJournalFiles);
871        }
872
873        // 3. Now check all references in the journal logs for corruption and add any
874        //    corrupt journal files to the missing set.
875        HashSet<Location> corruptedLocations = new HashSet<Location>();
876
877        if (isCheckForCorruptJournalFiles()) {
878            Collection<DataFile> dataFiles = journal.getFileMap().values();
879            for (DataFile dataFile : dataFiles) {
880                int id = dataFile.getDataFileId();
881                for (long offset : dataFile.getCorruptedBlocks()) {
882                    corruptedLocations.add(new Location(id, (int) offset));
883                }
884            }
885
886            if (!corruptedLocations.isEmpty()) {
887                LOG.debug("Found some corrupted data blocks in the journal: {}", corruptedLocations.size());
888            }
889        }
890
891        // 4. Now we either fail or we remove all references to missing or corrupt journal
892        //    files from the various JobSchedulerImpl instances.  We only remove the Job if
893        //    the initial Add operation is missing when the ignore option is set, the updates
894        //    could be lost but that's price you pay when ignoring the missing logs.
895        if (!missingJournalFiles.isEmpty() || !corruptedLocations.isEmpty()) {
896            if (!isIgnoreMissingJournalfiles()) {
897                throw new IOException("Detected missing/corrupt journal files.");
898            }
899
900            // Remove all Jobs that reference an Location that is either missing or corrupt.
901            undoCounter = removeJobsInMissingOrCorruptJounralFiles(tx, missingJournalFiles, corruptedLocations);
902
903            // Clean up the Journal Reference count Map.
904            removeJournalRCForMissingFiles(tx, missingJournalFiles);
905        }
906
907        if (undoCounter > 0) {
908            long end = System.currentTimeMillis();
909            LOG.info("Detected missing/corrupt journal files.  Dropped {} jobs from the " +
910                     "index in {} seconds.", undoCounter, ((end - start) / 1000.0f));
911        }
912    }
913
914    private void removeJournalRCForMissingFiles(Transaction tx, Set<Integer> missing) throws IOException {
915        List<Integer> matches = new ArrayList<Integer>();
916
917        Iterator<Entry<Integer, Integer>> references = metaData.getJournalRC().iterator(tx);
918        while (references.hasNext()) {
919            int dataFileId = references.next().getKey();
920            if (missing.contains(dataFileId)) {
921                matches.add(dataFileId);
922            }
923        }
924
925        for (Integer match : matches) {
926            metaData.getJournalRC().remove(tx, match);
927        }
928    }
929
930    private int removeJobsInMissingOrCorruptJounralFiles(Transaction tx, Set<Integer> missing, Set<Location> corrupted) throws IOException {
931        int removed = 0;
932
933        // Remove Jobs that reference missing or corrupt files.
934        // Remove Reference counts to missing or corrupt files.
935        // Remove and remove command markers to missing or corrupt files.
936        for (Iterator<Map.Entry<String, JobSchedulerImpl>> i = metaData.getJobSchedulers().iterator(tx); i.hasNext();) {
937            Map.Entry<String, JobSchedulerImpl> entry = i.next();
938            JobSchedulerImpl scheduler = entry.getValue();
939
940            List<JobLocation> jobs = scheduler.getAllScheduledJobs(tx);
941            for (JobLocation job : jobs) {
942
943                // Remove all jobs in missing log files.
944                if (missing.contains(job.getLocation().getDataFileId())) {
945                    scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime());
946                    removed++;
947                    continue;
948                }
949
950                // Remove all jobs in corrupted parts of log files.
951                if (corrupted.contains(job.getLocation())) {
952                    scheduler.removeJobAtTime(tx, job.getJobId(), job.getNextTime());
953                    removed++;
954                }
955            }
956        }
957
958        return removed;
959    }
960}