001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.*; 021import java.util.LinkedList; 022import java.util.Map; 023import java.util.Properties; 024import java.util.concurrent.Executor; 025import java.util.concurrent.locks.Lock; 026import java.util.concurrent.locks.ReentrantReadWriteLock; 027 028import javax.sql.DataSource; 029 030import org.apache.activemq.util.IOExceptionSupport; 031import org.slf4j.Logger; 032import org.slf4j.LoggerFactory; 033 034/** 035 * Helps keep track of the current transaction/JDBC connection. 036 */ 037public class TransactionContext { 038 039 private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); 040 041 private final DataSource dataSource; 042 private final JDBCPersistenceAdapter persistenceAdapter; 043 private Connection connection; 044 private boolean inTx; 045 private PreparedStatement addMessageStatement; 046 private PreparedStatement removedMessageStatement; 047 private PreparedStatement updateLastAckStatement; 048 // a cheap dirty level that we can live with 049 private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED; 050 private LinkedList<Runnable> completions = new LinkedList<Runnable>(); 051 private ReentrantReadWriteLock exclusiveConnectionLock = new ReentrantReadWriteLock(); 052 053 public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException { 054 this.persistenceAdapter = persistenceAdapter; 055 this.dataSource = persistenceAdapter.getDataSource(); 056 } 057 058 public Connection getExclusiveConnection() throws IOException { 059 return lockAndWrapped(exclusiveConnectionLock.writeLock()); 060 } 061 062 public Connection getConnection() throws IOException { 063 return lockAndWrapped(exclusiveConnectionLock.readLock()); 064 } 065 066 private Connection lockAndWrapped(Lock toLock) throws IOException { 067 if (connection == null) { 068 toLock.lock(); 069 try { 070 connection = dataSource.getConnection(); 071 if (persistenceAdapter.isChangeAutoCommitAllowed()) { 072 boolean autoCommit = !inTx; 073 if (connection.getAutoCommit() != autoCommit) { 074 LOG.trace("Setting auto commit to {} on connection {}", autoCommit, connection); 075 connection.setAutoCommit(autoCommit); 076 } 077 } 078 connection = new UnlockOnCloseConnection(connection, toLock); 079 } catch (SQLException e) { 080 JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e); 081 inTx = false; 082 try { 083 toLock.unlock(); 084 } catch (IllegalMonitorStateException oops) { 085 LOG.error("Thread does not hold the context lock on close of:" + connection, oops); 086 } 087 silentClose(); 088 IOException ioe = IOExceptionSupport.create(e); 089 if (persistenceAdapter.getBrokerService() != null) { 090 persistenceAdapter.getBrokerService().handleIOException(ioe); 091 } 092 throw ioe; 093 } 094 095 try { 096 connection.setTransactionIsolation(transactionIsolation); 097 } catch (Throwable e) { 098 // ignore 099 LOG.trace("Cannot set transaction isolation to " + transactionIsolation + " due " + e.getMessage() 100 + ". This exception is ignored.", e); 101 } 102 } 103 return connection; 104 } 105 106 public void executeBatch() throws SQLException { 107 try { 108 executeBatch(addMessageStatement, "Failed add a message"); 109 } finally { 110 addMessageStatement = null; 111 try { 112 executeBatch(removedMessageStatement, "Failed to remove a message"); 113 } finally { 114 removedMessageStatement = null; 115 try { 116 executeBatch(updateLastAckStatement, "Failed to ack a message"); 117 } finally { 118 updateLastAckStatement = null; 119 } 120 } 121 } 122 } 123 124 private void executeBatch(PreparedStatement p, String message) throws SQLException { 125 if (p == null) { 126 return; 127 } 128 129 try { 130 int[] rc = p.executeBatch(); 131 for (int i = 0; i < rc.length; i++) { 132 int code = rc[i]; 133 if (code < 0 && code != Statement.SUCCESS_NO_INFO) { 134 throw new SQLException(message + ". Response code: " + code); 135 } 136 } 137 } finally { 138 try { 139 p.close(); 140 } catch (Throwable ignored) {} 141 } 142 } 143 144 private void silentClose() { 145 silentClosePreparedStatements(); 146 if (connection != null) { 147 try { 148 connection.close(); 149 } catch (Throwable ignored) {} 150 connection = null; 151 } 152 } 153 154 155 public void close() throws IOException { 156 if (!inTx) { 157 try { 158 // can be null for topic ops that bypass the store via existing cursor state 159 if (connection != null) { 160 final boolean needsCommit = !connection.getAutoCommit(); 161 executeBatch(); 162 if (needsCommit) { 163 connection.commit(); 164 } 165 } 166 } catch (SQLException e) { 167 JDBCPersistenceAdapter.log("Error while closing connection: ", e); 168 IOException ioe = IOExceptionSupport.create(e); 169 persistenceAdapter.getBrokerService().handleIOException(ioe); 170 throw ioe; 171 } finally { 172 silentClose(); 173 for (Runnable completion: completions) { 174 completion.run(); 175 } 176 completions.clear(); 177 } 178 } 179 } 180 181 public void begin() throws IOException { 182 if (inTx) { 183 throw new IOException("Already started."); 184 } 185 inTx = true; 186 connection = getConnection(); 187 } 188 189 public void commit() throws IOException { 190 if (!inTx) { 191 throw new IOException("Not started."); 192 } 193 try { 194 final boolean needsCommit = !connection.getAutoCommit(); 195 executeBatch(); 196 if (needsCommit) { 197 connection.commit(); 198 } 199 } catch (SQLException e) { 200 JDBCPersistenceAdapter.log("Commit failed: ", e); 201 try { 202 doRollback(); 203 } catch (Exception ignored) {} 204 IOException ioe = IOExceptionSupport.create(e); 205 persistenceAdapter.getBrokerService().handleIOException(ioe); 206 throw ioe; 207 } finally { 208 inTx = false; 209 close(); 210 } 211 } 212 213 public void rollback() throws IOException { 214 if (!inTx) { 215 throw new IOException("Not started."); 216 } 217 try { 218 doRollback(); 219 } catch (SQLException e) { 220 JDBCPersistenceAdapter.log("Rollback failed: ", e); 221 throw IOExceptionSupport.create(e); 222 } finally { 223 inTx = false; 224 close(); 225 } 226 } 227 228 private PreparedStatement silentClosePreparedStatement(PreparedStatement preparedStatement) { 229 if (preparedStatement != null) { 230 try { 231 preparedStatement.close(); 232 } catch (Throwable ignored) {} 233 } 234 return null; 235 } 236 237 private void silentClosePreparedStatements() { 238 addMessageStatement = silentClosePreparedStatement(addMessageStatement); 239 removedMessageStatement = silentClosePreparedStatement(removedMessageStatement); 240 updateLastAckStatement = silentClosePreparedStatement(updateLastAckStatement); 241 } 242 243 private void doRollback() throws SQLException { 244 silentClosePreparedStatements(); 245 completions.clear(); 246 connection.rollback(); 247 } 248 249 public PreparedStatement getAddMessageStatement() { 250 return addMessageStatement; 251 } 252 253 public void setAddMessageStatement(PreparedStatement addMessageStatement) { 254 this.addMessageStatement = addMessageStatement; 255 } 256 257 public PreparedStatement getUpdateLastAckStatement() { 258 return updateLastAckStatement; 259 } 260 261 public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) { 262 this.updateLastAckStatement = ackMessageStatement; 263 } 264 265 public PreparedStatement getRemovedMessageStatement() { 266 return removedMessageStatement; 267 } 268 269 public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) { 270 this.removedMessageStatement = removedMessageStatement; 271 } 272 273 public void setTransactionIsolation(int transactionIsolation) { 274 this.transactionIsolation = transactionIsolation; 275 } 276 277 public void onCompletion(Runnable runnable) { 278 completions.add(runnable); 279 } 280 281 final private class UnlockOnCloseConnection implements Connection { 282 283 private final Connection delegate; 284 private final Lock lock; 285 286 UnlockOnCloseConnection(Connection delegate, Lock toUnlockOnClose) { 287 this.delegate = delegate; 288 this.lock = toUnlockOnClose; 289 } 290 291 @Override 292 public void close() throws SQLException { 293 try { 294 delegate.close(); 295 } finally { 296 lock.unlock(); 297 } 298 } 299 300 // simple delegate for the rest of the impl.. 301 @Override 302 public Statement createStatement() throws SQLException { 303 return delegate.createStatement(); 304 } 305 306 @Override 307 public PreparedStatement prepareStatement(String sql) throws SQLException { 308 return delegate.prepareStatement(sql); 309 } 310 311 @Override 312 public CallableStatement prepareCall(String sql) throws SQLException { 313 return delegate.prepareCall(sql); 314 } 315 316 @Override 317 public String nativeSQL(String sql) throws SQLException { 318 return delegate.nativeSQL(sql); 319 } 320 321 @Override 322 public void setAutoCommit(boolean autoCommit) throws SQLException { 323 delegate.setAutoCommit(autoCommit); 324 } 325 326 @Override 327 public boolean getAutoCommit() throws SQLException { 328 return delegate.getAutoCommit(); 329 } 330 331 @Override 332 public void commit() throws SQLException { 333 delegate.commit(); 334 } 335 336 @Override 337 public void rollback() throws SQLException { 338 delegate.rollback(); 339 } 340 341 @Override 342 public boolean isClosed() throws SQLException { 343 return delegate.isClosed(); 344 } 345 346 @Override 347 public DatabaseMetaData getMetaData() throws SQLException { 348 return delegate.getMetaData(); 349 } 350 351 @Override 352 public void setReadOnly(boolean readOnly) throws SQLException { 353 delegate.setReadOnly(readOnly); 354 } 355 356 @Override 357 public boolean isReadOnly() throws SQLException { 358 return delegate.isReadOnly(); 359 } 360 361 @Override 362 public void setCatalog(String catalog) throws SQLException { 363 delegate.setCatalog(catalog); 364 } 365 366 @Override 367 public String getCatalog() throws SQLException { 368 return delegate.getCatalog(); 369 } 370 371 @Override 372 public void setTransactionIsolation(int level) throws SQLException { 373 delegate.setTransactionIsolation(level); 374 } 375 376 @Override 377 public int getTransactionIsolation() throws SQLException { 378 return delegate.getTransactionIsolation(); 379 } 380 381 @Override 382 public SQLWarning getWarnings() throws SQLException { 383 return delegate.getWarnings(); 384 } 385 386 @Override 387 public void clearWarnings() throws SQLException { 388 delegate.clearWarnings(); 389 } 390 391 @Override 392 public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { 393 return delegate.createStatement(resultSetType, resultSetConcurrency); 394 } 395 396 @Override 397 public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { 398 return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency); 399 } 400 401 @Override 402 public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException { 403 return delegate.prepareCall(sql, resultSetType, resultSetConcurrency); 404 } 405 406 @Override 407 public Map<String, Class<?>> getTypeMap() throws SQLException { 408 return delegate.getTypeMap(); 409 } 410 411 @Override 412 public void setTypeMap(Map<String, Class<?>> map) throws SQLException { 413 delegate.setTypeMap(map); 414 } 415 416 @Override 417 public void setHoldability(int holdability) throws SQLException { 418 delegate.setHoldability(holdability); 419 } 420 421 @Override 422 public int getHoldability() throws SQLException { 423 return delegate.getHoldability(); 424 } 425 426 @Override 427 public Savepoint setSavepoint() throws SQLException { 428 return delegate.setSavepoint(); 429 } 430 431 @Override 432 public Savepoint setSavepoint(String name) throws SQLException { 433 return delegate.setSavepoint(name); 434 } 435 436 @Override 437 public void rollback(Savepoint savepoint) throws SQLException { 438 delegate.rollback(savepoint); 439 } 440 441 @Override 442 public void releaseSavepoint(Savepoint savepoint) throws SQLException { 443 delegate.releaseSavepoint(savepoint); 444 } 445 446 @Override 447 public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { 448 return delegate.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); 449 } 450 451 @Override 452 public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { 453 return delegate.prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability); 454 } 455 456 @Override 457 public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { 458 return delegate.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability); 459 } 460 461 @Override 462 public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { 463 return delegate.prepareStatement(sql, autoGeneratedKeys); 464 } 465 466 @Override 467 public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException { 468 return delegate.prepareStatement(sql, columnIndexes); 469 } 470 471 @Override 472 public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException { 473 return delegate.prepareStatement(sql, columnNames); 474 } 475 476 @Override 477 public Clob createClob() throws SQLException { 478 return delegate.createClob(); 479 } 480 481 @Override 482 public Blob createBlob() throws SQLException { 483 return delegate.createBlob(); 484 } 485 486 @Override 487 public NClob createNClob() throws SQLException { 488 return delegate.createNClob(); 489 } 490 491 @Override 492 public SQLXML createSQLXML() throws SQLException { 493 return delegate.createSQLXML(); 494 } 495 496 @Override 497 public boolean isValid(int timeout) throws SQLException { 498 return delegate.isValid(timeout); 499 } 500 501 @Override 502 public void setClientInfo(String name, String value) throws SQLClientInfoException { 503 delegate.setClientInfo(name, value); 504 } 505 506 @Override 507 public void setClientInfo(Properties properties) throws SQLClientInfoException { 508 delegate.setClientInfo(properties); 509 } 510 511 @Override 512 public String getClientInfo(String name) throws SQLException { 513 return delegate.getClientInfo(name); 514 } 515 516 @Override 517 public Properties getClientInfo() throws SQLException { 518 return delegate.getClientInfo(); 519 } 520 521 @Override 522 public Array createArrayOf(String typeName, Object[] elements) throws SQLException { 523 return delegate.createArrayOf(typeName, elements); 524 } 525 526 @Override 527 public Struct createStruct(String typeName, Object[] attributes) throws SQLException { 528 return delegate.createStruct(typeName, attributes); 529 } 530 531 @Override 532 public void setSchema(String schema) throws SQLException { 533 delegate.setSchema(schema); 534 } 535 536 @Override 537 public String getSchema() throws SQLException { 538 return delegate.getSchema(); 539 } 540 541 @Override 542 public void abort(Executor executor) throws SQLException { 543 delegate.abort(executor); 544 } 545 546 @Override 547 public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException { 548 delegate.setNetworkTimeout(executor, milliseconds); 549 } 550 551 @Override 552 public int getNetworkTimeout() throws SQLException { 553 return delegate.getNetworkTimeout(); 554 } 555 556 @Override 557 public <T> T unwrap(Class<T> iface) throws SQLException { 558 return delegate.unwrap(iface); 559 } 560 561 @Override 562 public boolean isWrapperFor(Class<?> iface) throws SQLException { 563 return delegate.isWrapperFor(iface); 564 } 565 } 566}