001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.Connection; 021import java.sql.PreparedStatement; 022import java.sql.SQLException; 023import java.sql.SQLFeatureNotSupportedException; 024import org.apache.activemq.util.Handler; 025import org.apache.activemq.util.ServiceStopper; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * Represents an exclusive lock on a database to avoid multiple brokers running 031 * against the same logical database. 032 * 033 * @org.apache.xbean.XBean element="database-locker" 034 * 035 */ 036public class DefaultDatabaseLocker extends AbstractJDBCLocker { 037 private static final Logger LOG = LoggerFactory.getLogger(DefaultDatabaseLocker.class); 038 039 protected volatile PreparedStatement lockCreateStatement; 040 protected volatile PreparedStatement lockUpdateStatement; 041 protected volatile Connection connection; 042 protected Handler<Exception> exceptionHandler; 043 044 public void doStart() throws Exception { 045 046 LOG.info("Attempting to acquire the exclusive lock to become the Master broker"); 047 String sql = getStatements().getLockCreateStatement(); 048 LOG.debug("Locking Query is "+sql); 049 050 while (true) { 051 try { 052 connection = dataSource.getConnection(); 053 connection.setAutoCommit(false); 054 lockCreateStatement = connection.prepareStatement(sql); 055 lockCreateStatement.execute(); 056 break; 057 } catch (Exception e) { 058 try { 059 if (isStopping()) { 060 throw new Exception( 061 "Cannot start broker as being asked to shut down. " 062 + "Interrupted attempt to acquire lock: " 063 + e, e); 064 } 065 if (exceptionHandler != null) { 066 try { 067 exceptionHandler.handle(e); 068 } catch (Throwable handlerException) { 069 LOG.error( "The exception handler " 070 + exceptionHandler.getClass().getCanonicalName() 071 + " threw this exception: " 072 + handlerException 073 + " while trying to handle this exception: " 074 + e, handlerException); 075 } 076 077 } else { 078 LOG.debug("Lock failure: "+ e, e); 079 } 080 } finally { 081 // Let's make sure the database connection is properly 082 // closed when an error occurs so that we're not leaking 083 // connections 084 if (null != connection) { 085 try { 086 connection.rollback(); 087 } catch (SQLException e1) { 088 LOG.debug("Caught exception during rollback on connection: " + e1, e1); 089 } 090 try { 091 connection.close(); 092 } catch (SQLException e1) { 093 LOG.debug("Caught exception while closing connection: " + e1, e1); 094 } 095 096 connection = null; 097 } 098 } 099 } finally { 100 if (null != lockCreateStatement) { 101 try { 102 lockCreateStatement.close(); 103 } catch (SQLException e1) { 104 LOG.debug("Caught while closing statement: " + e1, e1); 105 } 106 lockCreateStatement = null; 107 } 108 } 109 110 LOG.info("Failed to acquire lock. Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again..."); 111 try { 112 Thread.sleep(lockAcquireSleepInterval); 113 } catch (InterruptedException ie) { 114 LOG.warn("Master lock retry sleep interrupted", ie); 115 } 116 } 117 118 LOG.info("Becoming the master on dataSource: " + dataSource); 119 } 120 121 public void doStop(ServiceStopper stopper) throws Exception { 122 try { 123 if (lockCreateStatement != null) { 124 lockCreateStatement.cancel(); 125 } 126 } catch (SQLFeatureNotSupportedException e) { 127 LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e); 128 } 129 try { 130 if (lockUpdateStatement != null) { 131 lockUpdateStatement.cancel(); 132 } 133 } catch (SQLFeatureNotSupportedException e) { 134 LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e); 135 } 136 137 // when the connection is closed from an outside source (lost TCP 138 // connection, db server, etc) and this connection is managed by a pool 139 // it is important to close the connection so that we don't leak 140 // connections 141 142 if (connection != null) { 143 try { 144 connection.rollback(); 145 } catch (SQLException sqle) { 146 LOG.debug("Exception while rollbacking the connection on shutdown. This exception is ignored.", sqle); 147 } finally { 148 try { 149 connection.close(); 150 } catch (SQLException ignored) { 151 LOG.debug("Exception while closing connection on shutdown. This exception is ignored.", ignored); 152 } 153 lockCreateStatement = null; 154 } 155 } 156 } 157 158 public boolean keepAlive() throws IOException { 159 boolean result = false; 160 try { 161 lockUpdateStatement = connection.prepareStatement(getStatements().getLockUpdateStatement()); 162 lockUpdateStatement.setLong(1, System.currentTimeMillis()); 163 setQueryTimeout(lockUpdateStatement); 164 int rows = lockUpdateStatement.executeUpdate(); 165 if (rows == 1) { 166 result=true; 167 } 168 } catch (Exception e) { 169 LOG.error("Failed to update database lock: " + e, e); 170 } finally { 171 if (lockUpdateStatement != null) { 172 try { 173 lockUpdateStatement.close(); 174 } catch (SQLException e) { 175 LOG.error("Failed to close statement",e); 176 } 177 lockUpdateStatement = null; 178 } 179 } 180 return result; 181 } 182 183 public long getLockAcquireSleepInterval() { 184 return lockAcquireSleepInterval; 185 } 186 187 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) { 188 this.lockAcquireSleepInterval = lockAcquireSleepInterval; 189 } 190 191 public Handler getExceptionHandler() { 192 return exceptionHandler; 193 } 194 195 public void setExceptionHandler(Handler exceptionHandler) { 196 this.exceptionHandler = exceptionHandler; 197 } 198 199}