001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc.adapter; 018 019import java.io.IOException; 020import java.sql.PreparedStatement; 021import java.sql.ResultSet; 022import java.sql.SQLException; 023 024import org.apache.activemq.store.jdbc.DefaultDatabaseLocker; 025import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * Represents an exclusive lock on a database to avoid multiple brokers running 031 * against the same logical database. 032 * 033 * @org.apache.xbean.XBean element="transact-database-locker" 034 * 035 */ 036public class TransactDatabaseLocker extends DefaultDatabaseLocker { 037 private static final Logger LOG = LoggerFactory.getLogger(TransactDatabaseLocker.class); 038 039 @Override 040 public void doStart() throws Exception { 041 042 LOG.info("Attempting to acquire the exclusive lock to become the Master broker"); 043 PreparedStatement statement = null; 044 while (true) { 045 try { 046 connection = dataSource.getConnection(); 047 connection.setAutoCommit(false); 048 String sql = getStatements().getLockCreateStatement(); 049 statement = connection.prepareStatement(sql); 050 if (statement.getMetaData() != null) { 051 ResultSet rs = statement.executeQuery(); 052 // if not already locked the statement below blocks until lock acquired 053 rs.next(); 054 } else { 055 statement.execute(); 056 } 057 break; 058 } catch (Exception e) { 059 if (isStopping()) { 060 throw new Exception("Cannot start broker as being asked to shut down. Interrupted attempt to acquire lock: " + e, e); 061 } 062 063 if (exceptionHandler != null) { 064 try { 065 exceptionHandler.handle(e); 066 } catch (Throwable handlerException) { 067 LOG.error("The exception handler " + exceptionHandler.getClass().getCanonicalName() + " threw this exception: " + handlerException 068 + " while trying to handle this excpetion: " + e, handlerException); 069 } 070 071 } else { 072 LOG.error("Failed to acquire lock: " + e, e); 073 } 074 } finally { 075 076 if (null != statement) { 077 try { 078 statement.close(); 079 } catch (SQLException e1) { 080 LOG.warn("Caught while closing statement: " + e1, e1); 081 } 082 statement = null; 083 } 084 } 085 086 LOG.debug("Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again to get the lock..."); 087 try { 088 Thread.sleep(lockAcquireSleepInterval); 089 } catch (InterruptedException ie) { 090 LOG.warn("Master lock retry sleep interrupted", ie); 091 } 092 } 093 094 LOG.info("Becoming the master on dataSource: " + dataSource); 095 } 096 097}