001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.journal; 018 019import java.io.File; 020import java.io.IOException; 021 022import org.apache.activeio.journal.Journal; 023import org.apache.activeio.journal.active.JournalImpl; 024import org.apache.activeio.journal.active.JournalLockedException; 025import org.apache.activemq.broker.Locker; 026import org.apache.activemq.store.PersistenceAdapter; 027import org.apache.activemq.store.PersistenceAdapterFactory; 028import org.apache.activemq.store.jdbc.DataSourceServiceSupport; 029import org.apache.activemq.store.jdbc.JDBCAdapter; 030import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 031import org.apache.activemq.store.jdbc.Statements; 032import org.apache.activemq.thread.TaskRunnerFactory; 033import org.apache.activemq.util.ServiceStopper; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Factory class that can create PersistenceAdapter objects. 039 * 040 * @org.apache.xbean.XBean 041 * 042 */ 043public class JournalPersistenceAdapterFactory extends DataSourceServiceSupport implements PersistenceAdapterFactory { 044 045 private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000; 046 047 private static final Logger LOG = LoggerFactory.getLogger(JournalPersistenceAdapterFactory.class); 048 049 private long checkpointInterval = 1000 * 60 * 5; 050 private int journalLogFileSize = 1024 * 1024 * 20; 051 private int journalLogFiles = 2; 052 private TaskRunnerFactory taskRunnerFactory; 053 private Journal journal; 054 private boolean useJournal = true; 055 private boolean useQuickJournal; 056 private File journalArchiveDirectory; 057 private boolean failIfJournalIsLocked; 058 private int journalThreadPriority = Thread.MAX_PRIORITY; 059 private JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); 060 private boolean useDedicatedTaskRunner; 061 062 public PersistenceAdapter createPersistenceAdapter() throws IOException { 063 jdbcPersistenceAdapter.setDataSource(getDataSource()); 064 065 if (!useJournal) { 066 return jdbcPersistenceAdapter; 067 } 068 JournalPersistenceAdapter result = new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory()); 069 result.setDirectory(getDataDirectoryFile()); 070 result.setCheckpointInterval(getCheckpointInterval()); 071 return result; 072 073 } 074 075 public int getJournalLogFiles() { 076 return journalLogFiles; 077 } 078 079 /** 080 * Sets the number of journal log files to use 081 */ 082 public void setJournalLogFiles(int journalLogFiles) { 083 this.journalLogFiles = journalLogFiles; 084 } 085 086 public int getJournalLogFileSize() { 087 return journalLogFileSize; 088 } 089 090 /** 091 * Sets the size of the journal log files 092 * When set using Xbean, values of the form "20 Mb", "1024kb", and "1g" can be used 093 * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor" 094 */ 095 public void setJournalLogFileSize(int journalLogFileSize) { 096 this.journalLogFileSize = journalLogFileSize; 097 } 098 099 public JDBCPersistenceAdapter getJdbcAdapter() { 100 return jdbcPersistenceAdapter; 101 } 102 103 public void setJdbcAdapter(JDBCPersistenceAdapter jdbcAdapter) { 104 this.jdbcPersistenceAdapter = jdbcAdapter; 105 } 106 107 public boolean isUseJournal() { 108 return useJournal; 109 } 110 111 public long getCheckpointInterval() { 112 return checkpointInterval; 113 } 114 115 public void setCheckpointInterval(long checkpointInterval) { 116 this.checkpointInterval = checkpointInterval; 117 } 118 119 /** 120 * Enables or disables the use of the journal. The default is to use the 121 * journal 122 * 123 * @param useJournal 124 */ 125 public void setUseJournal(boolean useJournal) { 126 this.useJournal = useJournal; 127 } 128 129 public boolean isUseDedicatedTaskRunner() { 130 return useDedicatedTaskRunner; 131 } 132 133 public void setUseDedicatedTaskRunner(boolean useDedicatedTaskRunner) { 134 this.useDedicatedTaskRunner = useDedicatedTaskRunner; 135 } 136 137 public TaskRunnerFactory getTaskRunnerFactory() { 138 if (taskRunnerFactory == null) { 139 taskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", journalThreadPriority, 140 true, 1000, isUseDedicatedTaskRunner()); 141 } 142 return taskRunnerFactory; 143 } 144 145 public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) { 146 this.taskRunnerFactory = taskRunnerFactory; 147 } 148 149 public Journal getJournal() throws IOException { 150 if (journal == null) { 151 createJournal(); 152 } 153 return journal; 154 } 155 156 public void setJournal(Journal journal) { 157 this.journal = journal; 158 } 159 160 public File getJournalArchiveDirectory() { 161 if (journalArchiveDirectory == null && useQuickJournal) { 162 journalArchiveDirectory = new File(getDataDirectoryFile(), "journal"); 163 } 164 return journalArchiveDirectory; 165 } 166 167 public void setJournalArchiveDirectory(File journalArchiveDirectory) { 168 this.journalArchiveDirectory = journalArchiveDirectory; 169 } 170 171 public boolean isUseQuickJournal() { 172 return useQuickJournal; 173 } 174 175 /** 176 * Enables or disables the use of quick journal, which keeps messages in the 177 * journal and just stores a reference to the messages in JDBC. Defaults to 178 * false so that messages actually reside long term in the JDBC database. 179 */ 180 public void setUseQuickJournal(boolean useQuickJournal) { 181 this.useQuickJournal = useQuickJournal; 182 } 183 184 public JDBCAdapter getAdapter() throws IOException { 185 return jdbcPersistenceAdapter.getAdapter(); 186 } 187 188 public void setAdapter(JDBCAdapter adapter) { 189 jdbcPersistenceAdapter.setAdapter(adapter); 190 } 191 192 public Statements getStatements() { 193 return jdbcPersistenceAdapter.getStatements(); 194 } 195 196 public void setStatements(Statements statements) { 197 jdbcPersistenceAdapter.setStatements(statements); 198 } 199 200 /** 201 * Sets whether or not an exclusive database lock should be used to enable 202 * JDBC Master/Slave. Enabled by default. 203 */ 204 public void setUseDatabaseLock(boolean useDatabaseLock) { 205 jdbcPersistenceAdapter.setUseLock(useDatabaseLock); 206 } 207 208 public boolean isCreateTablesOnStartup() { 209 return jdbcPersistenceAdapter.isCreateTablesOnStartup(); 210 } 211 212 /** 213 * Sets whether or not tables are created on startup 214 */ 215 public void setCreateTablesOnStartup(boolean createTablesOnStartup) { 216 jdbcPersistenceAdapter.setCreateTablesOnStartup(createTablesOnStartup); 217 } 218 219 public int getJournalThreadPriority() { 220 return journalThreadPriority; 221 } 222 223 /** 224 * Sets the thread priority of the journal thread 225 */ 226 public void setJournalThreadPriority(int journalThreadPriority) { 227 this.journalThreadPriority = journalThreadPriority; 228 } 229 230 /** 231 * @throws IOException 232 */ 233 protected void createJournal() throws IOException { 234 File journalDir = new File(getDataDirectoryFile(), "journal").getCanonicalFile(); 235 if (failIfJournalIsLocked) { 236 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, 237 getJournalArchiveDirectory()); 238 } else { 239 while (true) { 240 try { 241 journal = new JournalImpl(journalDir, journalLogFiles, journalLogFileSize, 242 getJournalArchiveDirectory()); 243 break; 244 } catch (JournalLockedException e) { 245 LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) 246 + " seconds for the journal to be unlocked."); 247 try { 248 Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY); 249 } catch (InterruptedException e1) { 250 } 251 } 252 } 253 } 254 } 255 256 @Override 257 public Locker createDefaultLocker() throws IOException { 258 return null; 259 } 260 261 @Override 262 public void init() throws Exception { 263 } 264 265 @Override 266 protected void doStop(ServiceStopper stopper) throws Exception {} 267 268 @Override 269 protected void doStart() throws Exception {} 270}