001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.kahadb.scheduler.legacy; 018 019import java.io.File; 020import java.io.IOException; 021import java.util.List; 022import java.util.Set; 023 024import org.apache.activemq.store.kahadb.data.KahaAddScheduledJobCommand; 025import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * Used to upgrade a Legacy Job Scheduler store to the latest version this class 031 * loads a found legacy scheduler store and generates new add commands for all 032 * jobs currently in the store. 033 */ 034public class LegacyStoreReplayer { 035 036 static final Logger LOG = LoggerFactory.getLogger(LegacyStoreReplayer.class); 037 038 private LegacyJobSchedulerStoreImpl store; 039 private final File legacyStoreDirectory; 040 041 /** 042 * Creates a new Legacy Store Replayer with the given target store 043 * @param targetStore 044 * @param directory 045 */ 046 public LegacyStoreReplayer(File directory) { 047 this.legacyStoreDirectory = directory; 048 } 049 050 /** 051 * Loads the legacy store and prepares it for replay into a newer Store instance. 052 * 053 * @throws IOException if an error occurs while reading in the legacy store. 054 */ 055 public void load() throws IOException { 056 057 store = new LegacyJobSchedulerStoreImpl(); 058 store.setDirectory(legacyStoreDirectory); 059 store.setFailIfDatabaseIsLocked(true); 060 061 try { 062 store.start(); 063 } catch (IOException ioe) { 064 LOG.warn("Legacy store load failed: ", ioe); 065 throw ioe; 066 } catch (Exception e) { 067 LOG.warn("Legacy store load failed: ", e); 068 throw new IOException(e); 069 } 070 } 071 072 /** 073 * Unloads a previously loaded legacy store to release any resources associated with it. 074 * 075 * Once a store is unloaded it cannot be replayed again until it has been reloaded. 076 * @throws IOException 077 */ 078 public void unload() throws IOException { 079 080 if (store != null) { 081 try { 082 store.stop(); 083 } catch (Exception e) { 084 LOG.warn("Legacy store unload failed: ", e); 085 throw new IOException(e); 086 } finally { 087 store = null; 088 } 089 } 090 } 091 092 /** 093 * Performs a replay of scheduled jobs into the target JobSchedulerStore. 094 * 095 * @param targetStore 096 * The JobSchedulerStore that will receive the replay events from the legacy store. 097 * 098 * @throws IOException if an error occurs during replay of the legacy store. 099 */ 100 public void startReplay(JobSchedulerStoreImpl targetStore) throws IOException { 101 checkLoaded(); 102 103 if (targetStore == null) { 104 throw new IOException("Cannot replay to a null store"); 105 } 106 107 try { 108 Set<String> schedulers = store.getJobSchedulerNames(); 109 if (!schedulers.isEmpty()) { 110 111 for (String name : schedulers) { 112 LegacyJobSchedulerImpl scheduler = store.getJobScheduler(name); 113 LOG.info("Replay of legacy store {} starting.", name); 114 replayScheduler(scheduler, targetStore); 115 } 116 } 117 118 LOG.info("Replay of legacy store complate."); 119 } catch (IOException ioe) { 120 LOG.warn("Failed during replay of legacy store: ", ioe); 121 throw ioe; 122 } catch (Exception e) { 123 LOG.warn("Failed during replay of legacy store: ", e); 124 throw new IOException(e); 125 } 126 } 127 128 private final void replayScheduler(LegacyJobSchedulerImpl legacy, JobSchedulerStoreImpl target) throws Exception { 129 List<LegacyJobImpl> jobs = legacy.getAllJobs(); 130 131 String schedulerName = legacy.getName(); 132 133 for (LegacyJobImpl job : jobs) { 134 LOG.trace("Storing job from legacy store to new store: {}", job); 135 KahaAddScheduledJobCommand newJob = new KahaAddScheduledJobCommand(); 136 newJob.setScheduler(schedulerName); 137 newJob.setJobId(job.getJobId()); 138 newJob.setStartTime(job.getStartTime()); 139 newJob.setCronEntry(job.getCronEntry()); 140 newJob.setDelay(job.getDelay()); 141 newJob.setPeriod(job.getPeriod()); 142 newJob.setRepeat(job.getRepeat()); 143 newJob.setNextExecutionTime(job.getNextExecutionTime()); 144 newJob.setPayload(job.getPayload()); 145 146 target.store(newJob); 147 } 148 } 149 150 private final void checkLoaded() throws IOException { 151 if (this.store == null) { 152 throw new IOException("Cannot replay until legacy store is loaded."); 153 } 154 } 155}