001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.apache.activemq.store.kahadb.scheduler;
019
020import java.io.DataInput;
021import java.io.DataOutput;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Iterator;
025import java.util.List;
026import java.util.Map;
027import java.util.Map.Entry;
028import java.util.UUID;
029
030import org.apache.activemq.store.kahadb.AbstractKahaDBMetaData;
031import org.apache.activemq.store.kahadb.disk.index.BTreeIndex;
032import org.apache.activemq.store.kahadb.disk.page.Transaction;
033import org.apache.activemq.store.kahadb.disk.util.IntegerMarshaller;
034import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
035import org.apache.activemq.store.kahadb.disk.util.StringMarshaller;
036import org.apache.activemq.store.kahadb.disk.util.VariableMarshaller;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * The KahaDB MetaData used to house the Index data for the KahaDB implementation
042 * of a JobSchedulerStore.
043 */
044public class JobSchedulerKahaDBMetaData extends AbstractKahaDBMetaData<JobSchedulerKahaDBMetaData> {
045
046    static final Logger LOG = LoggerFactory.getLogger(JobSchedulerKahaDBMetaData.class);
047
048    private final JobSchedulerStoreImpl store;
049
050    private UUID token = JobSchedulerStoreImpl.SCHEDULER_STORE_TOKEN;
051    private int version = JobSchedulerStoreImpl.CURRENT_VERSION;
052
053    private BTreeIndex<Integer, List<Integer>> removeLocationTracker;
054    private BTreeIndex<Integer, Integer> journalRC;
055    private BTreeIndex<String, JobSchedulerImpl> storedSchedulers;
056
057    /**
058     * Creates a new instance of this meta data object with the assigned
059     * parent JobSchedulerStore instance.
060     *
061     * @param store
062     *        the store instance that owns this meta data.
063     */
064    public JobSchedulerKahaDBMetaData(JobSchedulerStoreImpl store) {
065        this.store = store;
066    }
067
068    /**
069     * @return the current value of the Scheduler store identification token.
070     */
071    public UUID getToken() {
072        return this.token;
073    }
074
075    /**
076     * @return the current value of the version tag for this meta data instance.
077     */
078    public int getVersion() {
079        return this.version;
080    }
081
082    /**
083     * Gets the index that contains the location tracking information for Jobs
084     * that have been removed from the index but whose add operation has yet
085     * to be removed from the Journal.
086     *
087     * The Journal log file where a remove command is written cannot be released
088     * until the log file with the original add command has also been released,
089     * otherwise on a log replay the scheduled job could reappear in the scheduler
090     * since its corresponding remove might no longer be present.
091     *
092     * @return the remove command location tracker index.
093     */
094    public BTreeIndex<Integer, List<Integer>> getRemoveLocationTracker() {
095        return this.removeLocationTracker;
096    }
097
098    /**
099     * Gets the index used to track the number of reference to a Journal log file.
100     *
101     * A log file in the Journal can only be considered for removal after all the
102     * references to it have been released.
103     *
104     * @return the journal log file reference counter index.
105     */
106    public BTreeIndex<Integer, Integer> getJournalRC() {
107        return this.journalRC;
108    }
109
110    /**
111     * Gets the index of JobScheduler instances that have been created and stored
112     * in the JobSchedulerStore instance.
113     *
114     * @return the index of stored JobScheduler instances.
115     */
116    public BTreeIndex<String, JobSchedulerImpl> getJobSchedulers() {
117        return this.storedSchedulers;
118    }
119
120    @Override
121    public void initialize(Transaction tx) throws IOException {
122        this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(store.getPageFile(), tx.allocate().getPageId());
123        this.journalRC = new BTreeIndex<Integer, Integer>(store.getPageFile(), tx.allocate().getPageId());
124        this.removeLocationTracker = new BTreeIndex<Integer, List<Integer>>(store.getPageFile(), tx.allocate().getPageId());
125    }
126
127    @Override
128    public void load(Transaction tx) throws IOException {
129        this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
130        this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
131        this.storedSchedulers.load(tx);
132        this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
133        this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
134        this.journalRC.load(tx);
135        this.removeLocationTracker.setKeyMarshaller(IntegerMarshaller.INSTANCE);
136        this.removeLocationTracker.setValueMarshaller(new IntegerListMarshaller());
137        this.removeLocationTracker.load(tx);
138    }
139
140    /**
141     * Loads all the stored JobScheduler instances into the provided map.
142     *
143     * @param tx
144     *        the Transaction under which the load operation should be executed.
145     * @param schedulers
146     *        a Map<String, JobSchedulerImpl> into which the loaded schedulers are stored.
147     *
148     * @throws IOException if an error occurs while performing the load operation.
149     */
150    public void loadScheduler(Transaction tx, Map<String, JobSchedulerImpl> schedulers) throws IOException {
151        for (Iterator<Entry<String, JobSchedulerImpl>> i = this.storedSchedulers.iterator(tx); i.hasNext();) {
152            Entry<String, JobSchedulerImpl> entry = i.next();
153            entry.getValue().load(tx);
154            schedulers.put(entry.getKey(), entry.getValue());
155        }
156    }
157
158    @Override
159    public void read(DataInput in) throws IOException {
160        try {
161            long msb = in.readLong();
162            long lsb = in.readLong();
163            this.token = new UUID(msb, lsb);
164        } catch (Exception e) {
165            throw new UnknownStoreVersionException(e);
166        }
167
168        if (!token.equals(JobSchedulerStoreImpl.SCHEDULER_STORE_TOKEN)) {
169            throw new UnknownStoreVersionException(token.toString());
170        }
171        this.version = in.readInt();
172        if (in.readBoolean()) {
173            setLastUpdateLocation(LocationMarshaller.INSTANCE.readPayload(in));
174        } else {
175            setLastUpdateLocation(null);
176        }
177        this.storedSchedulers = new BTreeIndex<String, JobSchedulerImpl>(store.getPageFile(), in.readLong());
178        this.storedSchedulers.setKeyMarshaller(StringMarshaller.INSTANCE);
179        this.storedSchedulers.setValueMarshaller(new JobSchedulerMarshaller(this.store));
180        this.journalRC = new BTreeIndex<Integer, Integer>(store.getPageFile(), in.readLong());
181        this.journalRC.setKeyMarshaller(IntegerMarshaller.INSTANCE);
182        this.journalRC.setValueMarshaller(IntegerMarshaller.INSTANCE);
183        this.removeLocationTracker = new BTreeIndex<Integer, List<Integer>>(store.getPageFile(), in.readLong());
184        this.removeLocationTracker.setKeyMarshaller(IntegerMarshaller.INSTANCE);
185        this.removeLocationTracker.setValueMarshaller(new IntegerListMarshaller());
186
187        LOG.info("Scheduler Store version {} loaded", this.version);
188    }
189
190    @Override
191    public void write(DataOutput out) throws IOException {
192        out.writeLong(this.token.getMostSignificantBits());
193        out.writeLong(this.token.getLeastSignificantBits());
194        out.writeInt(this.version);
195        if (getLastUpdateLocation() != null) {
196            out.writeBoolean(true);
197            LocationMarshaller.INSTANCE.writePayload(getLastUpdateLocation(), out);
198        } else {
199            out.writeBoolean(false);
200        }
201        out.writeLong(this.storedSchedulers.getPageId());
202        out.writeLong(this.journalRC.getPageId());
203        out.writeLong(this.removeLocationTracker.getPageId());
204    }
205
206    private class JobSchedulerMarshaller extends VariableMarshaller<JobSchedulerImpl> {
207        private final JobSchedulerStoreImpl store;
208
209        JobSchedulerMarshaller(JobSchedulerStoreImpl store) {
210            this.store = store;
211        }
212
213        @Override
214        public JobSchedulerImpl readPayload(DataInput dataIn) throws IOException {
215            JobSchedulerImpl result = new JobSchedulerImpl(this.store);
216            result.read(dataIn);
217            return result;
218        }
219
220        @Override
221        public void writePayload(JobSchedulerImpl js, DataOutput dataOut) throws IOException {
222            js.write(dataOut);
223        }
224    }
225
226    private class IntegerListMarshaller extends VariableMarshaller<List<Integer>> {
227
228        @Override
229        public List<Integer> readPayload(DataInput dataIn) throws IOException {
230            List<Integer> result = new ArrayList<Integer>();
231            int size = dataIn.readInt();
232            for (int i = 0; i < size; i++) {
233                result.add(IntegerMarshaller.INSTANCE.readPayload(dataIn));
234            }
235            return result;
236        }
237
238        @Override
239        public void writePayload(List<Integer> value, DataOutput dataOut) throws IOException {
240            dataOut.writeInt(value.size());
241            for (Integer integer : value) {
242                IntegerMarshaller.INSTANCE.writePayload(integer, dataOut);
243            }
244        }
245    }
246}