001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc.adapter; 018 019import static javax.xml.bind.DatatypeConverter.parseBase64Binary; 020import static javax.xml.bind.DatatypeConverter.printBase64Binary; 021 022import java.io.IOException; 023import java.sql.Connection; 024import java.sql.PreparedStatement; 025import java.sql.ResultSet; 026import java.sql.SQLException; 027import java.sql.Statement; 028import java.util.ArrayList; 029import java.util.HashSet; 030import java.util.LinkedList; 031import java.util.Set; 032import java.util.concurrent.locks.ReadWriteLock; 033import java.util.concurrent.locks.ReentrantReadWriteLock; 034 035import org.apache.activemq.command.ActiveMQDestination; 036import org.apache.activemq.command.MessageId; 037import org.apache.activemq.command.ProducerId; 038import org.apache.activemq.command.SubscriptionInfo; 039import org.apache.activemq.command.XATransactionId; 040import org.apache.activemq.store.jdbc.JDBCAdapter; 041import org.apache.activemq.store.jdbc.JDBCMessageIdScanListener; 042import org.apache.activemq.store.jdbc.JDBCMessageRecoveryListener; 043import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; 044import org.apache.activemq.store.jdbc.JdbcMemoryTransactionStore; 045import org.apache.activemq.store.jdbc.Statements; 046import org.apache.activemq.store.jdbc.TransactionContext; 047import org.apache.activemq.util.DataByteArrayOutputStream; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * Implements all the default JDBC operations that are used by the JDBCPersistenceAdapter. <p/> sub-classing is 053 * encouraged to override the default implementation of methods to account for differences in JDBC Driver 054 * implementations. <p/> The JDBCAdapter inserts and extracts BLOB data using the getBytes()/setBytes() operations. <p/> 055 * The databases/JDBC drivers that use this adapter are: 056 * <ul> 057 * <li></li> 058 * </ul> 059 * 060 * @org.apache.xbean.XBean element="defaultJDBCAdapter" 061 * 062 * 063 */ 064public class DefaultJDBCAdapter implements JDBCAdapter { 065 private static final Logger LOG = LoggerFactory.getLogger(DefaultJDBCAdapter.class); 066 public static final int MAX_ROWS = org.apache.activemq.ActiveMQPrefetchPolicy.MAX_PREFETCH_SIZE; 067 private static final String FAILURE_MESSAGE = "Failure was: %s Message: %s SQLState: %s Vendor code: %s"; 068 protected Statements statements; 069 private boolean batchStatements = true; 070 //This is deprecated and should be removed in a future release 071 protected boolean batchStatments = true; 072 protected boolean prioritizedMessages; 073 protected int maxRows = MAX_ROWS; 074 075 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException { 076 s.setBytes(index, data); 077 } 078 079 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException { 080 return rs.getBytes(index); 081 } 082 083 @Override 084 public void doCreateTables(TransactionContext transactionContext) throws SQLException, IOException { 085 // Check to see if the table already exists. If it does, then don't log warnings during startup. 086 // Need to run the scripts anyways since they may contain ALTER statements that upgrade a previous version of the table 087 boolean messageTableAlreadyExists = messageTableAlreadyExists(transactionContext); 088 089 for (String createStatement : this.statements.getCreateSchemaStatements()) { 090 // This will fail usually since the tables will be 091 // created already. 092 executeStatement(transactionContext, createStatement, messageTableAlreadyExists); 093 } 094 } 095 096 private boolean messageTableAlreadyExists(TransactionContext transactionContext) { 097 boolean alreadyExists = false; 098 ResultSet rs = null; 099 try { 100 rs = transactionContext.getConnection().getMetaData().getTables(null, null, this.statements.getFullMessageTableName(), new String[] { "TABLE" }); 101 alreadyExists = rs.next(); 102 } catch (Throwable ignore) { 103 } finally { 104 close(rs); 105 } 106 return alreadyExists; 107 } 108 109 private void executeStatement(TransactionContext transactionContext, String createStatement, boolean ignoreStatementExecutionFailure) throws IOException { 110 Statement statement = null; 111 try { 112 LOG.debug("Executing SQL: " + createStatement); 113 statement = transactionContext.getConnection().createStatement(); 114 statement.execute(createStatement); 115 116 commitIfAutoCommitIsDisabled(transactionContext); 117 } catch (SQLException e) { 118 if (ignoreStatementExecutionFailure) { 119 LOG.debug("Could not create JDBC tables; The message table already existed. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), e.getSQLState(), e.getErrorCode())); 120 } else { 121 LOG.warn("Could not create JDBC tables; they could already exist. " + String.format(FAILURE_MESSAGE, createStatement, e.getMessage(), e.getSQLState(), e.getErrorCode())); 122 JDBCPersistenceAdapter.log("Failure details: ", e); 123 } 124 } finally { 125 closeStatement(statement); 126 } 127 } 128 129 private void closeStatement(Statement statement) { 130 try { 131 if (statement != null) { 132 statement.close(); 133 } 134 } catch (SQLException ignored) {} 135 } 136 137 private void commitIfAutoCommitIsDisabled(TransactionContext c) throws SQLException, IOException { 138 if (!c.getConnection().getAutoCommit()) { 139 c.getConnection().commit(); 140 } 141 } 142 143 public void doDropTables(TransactionContext c) throws SQLException, IOException { 144 Statement s = null; 145 try { 146 s = c.getConnection().createStatement(); 147 String[] dropStatments = this.statements.getDropSchemaStatements(); 148 for (int i = 0; i < dropStatments.length; i++) { 149 // This will fail usually since the tables will be 150 // created already. 151 try { 152 LOG.debug("Executing SQL: " + dropStatments[i]); 153 s.execute(dropStatments[i]); 154 } catch (SQLException e) { 155 LOG.warn("Could not drop JDBC tables; they may not exist." + " Failure was: " + dropStatments[i] 156 + " Message: " + e.getMessage() + " SQLState: " + e.getSQLState() + " Vendor code: " 157 + e.getErrorCode()); 158 JDBCPersistenceAdapter.log("Failure details: ", e); 159 } 160 } 161 commitIfAutoCommitIsDisabled(c); 162 } finally { 163 try { 164 s.close(); 165 } catch (Throwable e) { 166 } 167 } 168 } 169 170 public long doGetLastMessageStoreSequenceId(TransactionContext c) throws SQLException, IOException { 171 PreparedStatement s = null; 172 ResultSet rs = null; 173 try { 174 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); 175 rs = s.executeQuery(); 176 long seq1 = 0; 177 if (rs.next()) { 178 seq1 = rs.getLong(1); 179 } 180 rs.close(); 181 s.close(); 182 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInAcksStatement()); 183 rs = s.executeQuery(); 184 long seq2 = 0; 185 if (rs.next()) { 186 seq2 = rs.getLong(1); 187 } 188 long seq = Math.max(seq1, seq2); 189 return seq; 190 } finally { 191 close(rs); 192 close(s); 193 } 194 } 195 196 public byte[] doGetMessageById(TransactionContext c, long storeSequenceId) throws SQLException, IOException { 197 PreparedStatement s = null; 198 ResultSet rs = null; 199 try { 200 s = c.getConnection().prepareStatement( 201 this.statements.getFindMessageByIdStatement()); 202 s.setLong(1, storeSequenceId); 203 rs = s.executeQuery(); 204 if (!rs.next()) { 205 return null; 206 } 207 return getBinaryData(rs, 1); 208 } finally { 209 close(rs); 210 close(s); 211 } 212 } 213 214 215 /** 216 * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome 217 */ 218 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data, 219 long expiration, byte priority, XATransactionId xid) throws SQLException, IOException { 220 PreparedStatement s = c.getAddMessageStatement(); 221 try { 222 if (s == null) { 223 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); 224 if (this.batchStatements) { 225 c.setAddMessageStatement(s); 226 } 227 } 228 s.setLong(1, sequence); 229 s.setString(2, messageID.getProducerId().toString()); 230 s.setLong(3, messageID.getProducerSequenceId()); 231 s.setString(4, destination.getQualifiedName()); 232 s.setLong(5, expiration); 233 s.setLong(6, priority); 234 setBinaryData(s, 7, data); 235 if (xid != null) { 236 byte[] xidVal = xid.getEncodedXidBytes(); 237 xidVal[0] = '+'; 238 String xidString = printBase64Binary(xidVal); 239 s.setString(8, xidString); 240 } else { 241 s.setString(8, null); 242 } 243 if (this.batchStatements) { 244 s.addBatch(); 245 } else if (s.executeUpdate() != 1) { 246 throw new SQLException("Failed add a message"); 247 } 248 } finally { 249 if (!this.batchStatements) { 250 if (s != null) { 251 s.close(); 252 } 253 } 254 } 255 } 256 257 @Override 258 public void doUpdateMessage(TransactionContext c, ActiveMQDestination destination, MessageId id, byte[] data) throws SQLException, IOException { 259 PreparedStatement s = null; 260 try { 261 s = c.getConnection().prepareStatement(this.statements.getUpdateMessageStatement()); 262 setBinaryData(s, 1, data); 263 s.setString(2, id.getProducerId().toString()); 264 s.setLong(3, id.getProducerSequenceId()); 265 s.setString(4, destination.getQualifiedName()); 266 if (s.executeUpdate() != 1) { 267 throw new IOException("Could not update message: " + id + " in " + destination); 268 } 269 } finally { 270 close(s); 271 } 272 } 273 274 275 public void doAddMessageReference(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, 276 long expirationTime, String messageRef) throws SQLException, IOException { 277 PreparedStatement s = c.getAddMessageStatement(); 278 try { 279 if (s == null) { 280 s = c.getConnection().prepareStatement(this.statements.getAddMessageStatement()); 281 if (this.batchStatements) { 282 c.setAddMessageStatement(s); 283 } 284 } 285 s.setLong(1, messageID.getBrokerSequenceId()); 286 s.setString(2, messageID.getProducerId().toString()); 287 s.setLong(3, messageID.getProducerSequenceId()); 288 s.setString(4, destination.getQualifiedName()); 289 s.setLong(5, expirationTime); 290 s.setString(6, messageRef); 291 if (this.batchStatements) { 292 s.addBatch(); 293 } else if (s.executeUpdate() != 1) { 294 throw new SQLException("Failed add a message"); 295 } 296 } finally { 297 if (!this.batchStatements) { 298 s.close(); 299 } 300 } 301 } 302 303 public long[] getStoreSequenceId(TransactionContext c, ActiveMQDestination destination, MessageId messageID) throws SQLException, IOException { 304 PreparedStatement s = null; 305 ResultSet rs = null; 306 try { 307 s = c.getConnection().prepareStatement(this.statements.getFindMessageSequenceIdStatement()); 308 s.setString(1, messageID.getProducerId().toString()); 309 s.setLong(2, messageID.getProducerSequenceId()); 310 s.setString(3, destination.getQualifiedName()); 311 rs = s.executeQuery(); 312 if (!rs.next()) { 313 return new long[]{0,0}; 314 } 315 return new long[]{rs.getLong(1), rs.getLong(2)}; 316 } finally { 317 close(rs); 318 close(s); 319 } 320 } 321 322 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException { 323 PreparedStatement s = null; 324 ResultSet rs = null; 325 try { 326 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); 327 s.setString(1, id.getProducerId().toString()); 328 s.setLong(2, id.getProducerSequenceId()); 329 rs = s.executeQuery(); 330 if (!rs.next()) { 331 return null; 332 } 333 return getBinaryData(rs, 1); 334 } finally { 335 close(rs); 336 close(s); 337 } 338 } 339 340 public String doGetMessageReference(TransactionContext c, long seq) throws SQLException, IOException { 341 PreparedStatement s = null; 342 ResultSet rs = null; 343 try { 344 s = c.getConnection().prepareStatement(this.statements.getFindMessageStatement()); 345 s.setLong(1, seq); 346 rs = s.executeQuery(); 347 if (!rs.next()) { 348 return null; 349 } 350 return rs.getString(1); 351 } finally { 352 close(rs); 353 close(s); 354 } 355 } 356 357 /** 358 * A non null xid indicated the op is part of 2pc prepare, so ops are flagged pending outcome 359 */ 360 public void doRemoveMessage(TransactionContext c, long seq, XATransactionId xid) throws SQLException, IOException { 361 PreparedStatement s = c.getRemovedMessageStatement(); 362 try { 363 if (s == null) { 364 s = c.getConnection().prepareStatement(xid == null ? 365 this.statements.getRemoveMessageStatement() : this.statements.getUpdateXidFlagStatement()); 366 if (this.batchStatements) { 367 c.setRemovedMessageStatement(s); 368 } 369 } 370 if (xid == null) { 371 s.setLong(1, seq); 372 } else { 373 byte[] xidVal = xid.getEncodedXidBytes(); 374 xidVal[0] = '-'; 375 String xidString = printBase64Binary(xidVal); 376 s.setString(1, xidString); 377 s.setLong(2, seq); 378 } 379 if (this.batchStatements) { 380 s.addBatch(); 381 } else if (s.executeUpdate() != 1) { 382 throw new SQLException("Failed to remove message seq: " + seq); 383 } 384 } finally { 385 if (!this.batchStatements && s != null) { 386 s.close(); 387 } 388 } 389 } 390 391 public void doRecover(TransactionContext c, ActiveMQDestination destination, JDBCMessageRecoveryListener listener) 392 throws Exception { 393 PreparedStatement s = null; 394 ResultSet rs = null; 395 try { 396 s = c.getConnection().prepareStatement(this.statements.getFindAllMessagesStatement()); 397 s.setString(1, destination.getQualifiedName()); 398 rs = s.executeQuery(); 399 if (this.statements.isUseExternalMessageReferences()) { 400 while (rs.next()) { 401 if (!listener.recoverMessageReference(rs.getString(2))) { 402 break; 403 } 404 } 405 } else { 406 while (rs.next()) { 407 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 408 break; 409 } 410 } 411 } 412 } finally { 413 close(rs); 414 close(s); 415 } 416 } 417 418 public void doMessageIdScan(TransactionContext c, int limit, 419 JDBCMessageIdScanListener listener) throws SQLException, IOException { 420 PreparedStatement s = null; 421 ResultSet rs = null; 422 try { 423 s = c.getConnection().prepareStatement(this.statements.getFindAllMessageIdsStatement()); 424 s.setMaxRows(limit); 425 rs = s.executeQuery(); 426 // jdbc scrollable cursor requires jdbc ver > 1.0 and is often implemented locally so avoid 427 LinkedList<MessageId> reverseOrderIds = new LinkedList<MessageId>(); 428 while (rs.next()) { 429 reverseOrderIds.addFirst(new MessageId(rs.getString(2), rs.getLong(3))); 430 } 431 if (LOG.isDebugEnabled()) { 432 LOG.debug("messageIdScan with limit (" + limit + "), resulted in: " + reverseOrderIds.size() + " ids"); 433 } 434 for (MessageId id : reverseOrderIds) { 435 listener.messageId(id); 436 } 437 } finally { 438 close(rs); 439 close(s); 440 } 441 } 442 443 public void doSetLastAckWithPriority(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId, 444 String subscriptionName, long seq, long priority) throws SQLException, IOException { 445 PreparedStatement s = c.getUpdateLastAckStatement(); 446 try { 447 if (s == null) { 448 s = c.getConnection().prepareStatement(xid == null ? 449 this.statements.getUpdateDurableLastAckWithPriorityStatement() : 450 this.statements.getUpdateDurableLastAckWithPriorityInTxStatement()); 451 if (this.batchStatements) { 452 c.setUpdateLastAckStatement(s); 453 } 454 } 455 if (xid != null) { 456 byte[] xidVal = encodeXid(xid, seq, priority); 457 String xidString = printBase64Binary(xidVal); 458 s.setString(1, xidString); 459 } else { 460 s.setLong(1, seq); 461 } 462 s.setString(2, destination.getQualifiedName()); 463 s.setString(3, clientId); 464 s.setString(4, subscriptionName); 465 s.setLong(5, priority); 466 if (this.batchStatements) { 467 s.addBatch(); 468 } else if (s.executeUpdate() != 1) { 469 throw new SQLException("Failed update last ack with priority: " + priority + ", for sub: " + subscriptionName); 470 } 471 } finally { 472 if (!this.batchStatements) { 473 close(s); 474 } 475 } 476 } 477 478 479 public void doSetLastAck(TransactionContext c, ActiveMQDestination destination, XATransactionId xid, String clientId, 480 String subscriptionName, long seq, long priority) throws SQLException, IOException { 481 PreparedStatement s = c.getUpdateLastAckStatement(); 482 try { 483 if (s == null) { 484 s = c.getConnection().prepareStatement(xid == null ? 485 this.statements.getUpdateDurableLastAckStatement() : 486 this.statements.getUpdateDurableLastAckInTxStatement()); 487 if (this.batchStatements) { 488 c.setUpdateLastAckStatement(s); 489 } 490 } 491 if (xid != null) { 492 byte[] xidVal = encodeXid(xid, seq, priority); 493 String xidString = printBase64Binary(xidVal); 494 s.setString(1, xidString); 495 } else { 496 s.setLong(1, seq); 497 } 498 s.setString(2, destination.getQualifiedName()); 499 s.setString(3, clientId); 500 s.setString(4, subscriptionName); 501 502 if (this.batchStatements) { 503 s.addBatch(); 504 } else if (s.executeUpdate() != 1) { 505 throw new IOException("Could not update last ack seq : " 506 + seq + ", for sub: " + subscriptionName); 507 } 508 } finally { 509 if (!this.batchStatements) { 510 close(s); 511 } 512 } 513 } 514 515 private byte[] encodeXid(XATransactionId xid, long seq, long priority) { 516 byte[] xidVal = xid.getEncodedXidBytes(); 517 // encode the update 518 DataByteArrayOutputStream outputStream = xid.internalOutputStream(); 519 outputStream.position(1); 520 outputStream.writeLong(seq); 521 outputStream.writeByte(Long.valueOf(priority).byteValue()); 522 return xidVal; 523 } 524 525 @Override 526 public void doClearLastAck(TransactionContext c, ActiveMQDestination destination, byte priority, String clientId, String subName) throws SQLException, IOException { 527 PreparedStatement s = null; 528 try { 529 s = c.getConnection().prepareStatement(this.statements.getClearDurableLastAckInTxStatement()); 530 s.setString(1, destination.getQualifiedName()); 531 s.setString(2, clientId); 532 s.setString(3, subName); 533 s.setLong(4, priority); 534 if (s.executeUpdate() != 1) { 535 throw new IOException("Could not remove prepared transaction state from message ack for: " + clientId + ":" + subName); 536 } 537 } finally { 538 close(s); 539 } 540 } 541 542 public void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, 543 String subscriptionName, JDBCMessageRecoveryListener listener) throws Exception { 544 // dumpTables(c, 545 // destination.getQualifiedName(),clientId,subscriptionName); 546 PreparedStatement s = null; 547 ResultSet rs = null; 548 try { 549 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubMessagesStatement()); 550 s.setString(1, destination.getQualifiedName()); 551 s.setString(2, clientId); 552 s.setString(3, subscriptionName); 553 rs = s.executeQuery(); 554 if (this.statements.isUseExternalMessageReferences()) { 555 while (rs.next()) { 556 if (!listener.recoverMessageReference(rs.getString(2))) { 557 break; 558 } 559 } 560 } else { 561 while (rs.next()) { 562 if (!listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 563 break; 564 } 565 } 566 } 567 } finally { 568 close(rs); 569 close(s); 570 } 571 } 572 573 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, String clientId, 574 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { 575 576 PreparedStatement s = null; 577 ResultSet rs = null; 578 try { 579 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesStatement()); 580 s.setMaxRows(Math.min(maxReturned * 2, maxRows)); 581 s.setString(1, destination.getQualifiedName()); 582 s.setString(2, clientId); 583 s.setString(3, subscriptionName); 584 s.setLong(4, seq); 585 rs = s.executeQuery(); 586 int count = 0; 587 if (this.statements.isUseExternalMessageReferences()) { 588 while (rs.next() && count < maxReturned) { 589 if (listener.recoverMessageReference(rs.getString(1))) { 590 count++; 591 } 592 } 593 } else { 594 while (rs.next() && count < maxReturned) { 595 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 596 count++; 597 } 598 } 599 } 600 } finally { 601 close(rs); 602 close(s); 603 } 604 } 605 606 public void doRecoverNextMessagesWithPriority(TransactionContext c, ActiveMQDestination destination, String clientId, 607 String subscriptionName, long seq, long priority, int maxReturned, JDBCMessageRecoveryListener listener) throws Exception { 608 609 PreparedStatement s = null; 610 ResultSet rs = null; 611 try { 612 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubMessagesByPriorityStatement()); 613 s.setMaxRows(Math.min(maxReturned * 2, maxRows)); 614 s.setString(1, destination.getQualifiedName()); 615 s.setString(2, clientId); 616 s.setString(3, subscriptionName); 617 s.setLong(4, seq); 618 s.setLong(5, priority); 619 rs = s.executeQuery(); 620 int count = 0; 621 if (this.statements.isUseExternalMessageReferences()) { 622 while (rs.next() && count < maxReturned) { 623 if (listener.recoverMessageReference(rs.getString(1))) { 624 count++; 625 } 626 } 627 } else { 628 while (rs.next() && count < maxReturned) { 629 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 630 count++; 631 } 632 } 633 } 634 } finally { 635 close(rs); 636 close(s); 637 } 638 } 639 640 public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination, 641 String clientId, String subscriptionName, boolean isPrioritizedMessages) throws SQLException, IOException { 642 PreparedStatement s = null; 643 ResultSet rs = null; 644 int result = 0; 645 try { 646 if (isPrioritizedMessages) { 647 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatementWithPriority()); 648 } else { 649 s = c.getConnection().prepareStatement(this.statements.getDurableSubscriberMessageCountStatement()); 650 } 651 s.setString(1, destination.getQualifiedName()); 652 s.setString(2, clientId); 653 s.setString(3, subscriptionName); 654 rs = s.executeQuery(); 655 if (rs.next()) { 656 result = rs.getInt(1); 657 } 658 } finally { 659 close(rs); 660 close(s); 661 } 662 return result; 663 } 664 665 /** 666 * @param c 667 * @param info 668 * @param retroactive 669 * @throws SQLException 670 * @throws IOException 671 */ 672 public void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo info, boolean retroactive, boolean isPrioritizedMessages) 673 throws SQLException, IOException { 674 // dumpTables(c, destination.getQualifiedName(), clientId, 675 // subscriptionName); 676 PreparedStatement s = null; 677 try { 678 long lastMessageId = -1; 679 if (!retroactive) { 680 s = c.getConnection().prepareStatement(this.statements.getFindLastSequenceIdInMsgsStatement()); 681 ResultSet rs = null; 682 try { 683 rs = s.executeQuery(); 684 if (rs.next()) { 685 lastMessageId = rs.getLong(1); 686 } 687 } finally { 688 close(rs); 689 close(s); 690 } 691 } 692 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); 693 int maxPriority = 1; 694 if (isPrioritizedMessages) { 695 maxPriority = 10; 696 } 697 698 for (int priority = 0; priority < maxPriority; priority++) { 699 s.setString(1, info.getDestination().getQualifiedName()); 700 s.setString(2, info.getClientId()); 701 s.setString(3, info.getSubscriptionName()); 702 s.setString(4, info.getSelector()); 703 s.setLong(5, lastMessageId); 704 s.setString(6, info.getSubscribedDestination().getQualifiedName()); 705 s.setLong(7, priority); 706 707 if (s.executeUpdate() != 1) { 708 throw new IOException("Could not create durable subscription for: " + info.getClientId()); 709 } 710 } 711 712 } finally { 713 close(s); 714 } 715 } 716 717 public SubscriptionInfo doGetSubscriberEntry(TransactionContext c, ActiveMQDestination destination, 718 String clientId, String subscriptionName) throws SQLException, IOException { 719 PreparedStatement s = null; 720 ResultSet rs = null; 721 try { 722 s = c.getConnection().prepareStatement(this.statements.getFindDurableSubStatement()); 723 s.setString(1, destination.getQualifiedName()); 724 s.setString(2, clientId); 725 s.setString(3, subscriptionName); 726 rs = s.executeQuery(); 727 if (!rs.next()) { 728 return null; 729 } 730 SubscriptionInfo subscription = new SubscriptionInfo(); 731 subscription.setDestination(destination); 732 subscription.setClientId(clientId); 733 subscription.setSubscriptionName(subscriptionName); 734 subscription.setSelector(rs.getString(1)); 735 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(2), 736 ActiveMQDestination.QUEUE_TYPE)); 737 return subscription; 738 } finally { 739 close(rs); 740 close(s); 741 } 742 } 743 744 public SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c, ActiveMQDestination destination) 745 throws SQLException, IOException { 746 PreparedStatement s = null; 747 ResultSet rs = null; 748 try { 749 s = c.getConnection().prepareStatement(this.statements.getFindAllDurableSubsStatement()); 750 s.setString(1, destination.getQualifiedName()); 751 rs = s.executeQuery(); 752 ArrayList<SubscriptionInfo> rc = new ArrayList<SubscriptionInfo>(); 753 while (rs.next()) { 754 SubscriptionInfo subscription = new SubscriptionInfo(); 755 subscription.setDestination(destination); 756 subscription.setSelector(rs.getString(1)); 757 subscription.setSubscriptionName(rs.getString(2)); 758 subscription.setClientId(rs.getString(3)); 759 subscription.setSubscribedDestination(ActiveMQDestination.createDestination(rs.getString(4), 760 ActiveMQDestination.QUEUE_TYPE)); 761 rc.add(subscription); 762 } 763 return rc.toArray(new SubscriptionInfo[rc.size()]); 764 } finally { 765 close(rs); 766 close(s); 767 } 768 } 769 770 public void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName) throws SQLException, 771 IOException { 772 PreparedStatement s = null; 773 try { 774 s = c.getConnection().prepareStatement(this.statements.getRemoveAllMessagesStatement()); 775 s.setString(1, destinationName.getQualifiedName()); 776 s.executeUpdate(); 777 s.close(); 778 s = c.getConnection().prepareStatement(this.statements.getRemoveAllSubscriptionsStatement()); 779 s.setString(1, destinationName.getQualifiedName()); 780 s.executeUpdate(); 781 } finally { 782 close(s); 783 } 784 } 785 786 public void doDeleteSubscription(TransactionContext c, ActiveMQDestination destination, String clientId, 787 String subscriptionName) throws SQLException, IOException { 788 PreparedStatement s = null; 789 try { 790 s = c.getConnection().prepareStatement(this.statements.getDeleteSubscriptionStatement()); 791 s.setString(1, destination.getQualifiedName()); 792 s.setString(2, clientId); 793 s.setString(3, subscriptionName); 794 s.executeUpdate(); 795 } finally { 796 close(s); 797 } 798 } 799 800 char priorityIterator = 0; // unsigned 801 public void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException { 802 PreparedStatement s = null; 803 try { 804 LOG.debug("Executing SQL: " + this.statements.getDeleteOldMessagesStatementWithPriority()); 805 s = c.getExclusiveConnection().prepareStatement(this.statements.getDeleteOldMessagesStatementWithPriority()); 806 int priority = priorityIterator++%10; 807 s.setInt(1, priority); 808 s.setInt(2, priority); 809 int i = s.executeUpdate(); 810 LOG.debug("Deleted " + i + " old message(s) at priority: " + priority); 811 } finally { 812 close(s); 813 } 814 } 815 816 public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c, ActiveMQDestination destination, 817 String clientId, String subscriberName) throws SQLException, IOException { 818 PreparedStatement s = null; 819 ResultSet rs = null; 820 long result = -1; 821 try { 822 s = c.getConnection().prepareStatement(this.statements.getLastAckedDurableSubscriberMessageStatement()); 823 s.setString(1, destination.getQualifiedName()); 824 s.setString(2, clientId); 825 s.setString(3, subscriberName); 826 rs = s.executeQuery(); 827 if (rs.next()) { 828 result = rs.getLong(1); 829 if (result == 0 && rs.wasNull()) { 830 result = -1; 831 } 832 } 833 } finally { 834 close(rs); 835 close(s); 836 } 837 return result; 838 } 839 840 protected static void close(PreparedStatement s) { 841 try { 842 s.close(); 843 } catch (Throwable e) { 844 } 845 } 846 847 protected static void close(ResultSet rs) { 848 try { 849 rs.close(); 850 } catch (Throwable e) { 851 } 852 } 853 854 public Set<ActiveMQDestination> doGetDestinations(TransactionContext c) throws SQLException, IOException { 855 HashSet<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>(); 856 PreparedStatement s = null; 857 ResultSet rs = null; 858 try { 859 s = c.getConnection().prepareStatement(this.statements.getFindAllDestinationsStatement()); 860 rs = s.executeQuery(); 861 while (rs.next()) { 862 rc.add(ActiveMQDestination.createDestination(rs.getString(1), ActiveMQDestination.QUEUE_TYPE)); 863 } 864 } finally { 865 close(rs); 866 close(s); 867 } 868 return rc; 869 } 870 871 /** 872 * @return true if batchStatements 873 */ 874 public boolean isBatchStatements() { 875 return batchStatements; 876 } 877 878 /** 879 * Set the number of statements to process as a single batch DB update 880 * @param batchStatements 881 */ 882 public void setBatchStatements(boolean batchStatements) { 883 this.batchStatements = batchStatements; 884 // The next lines are deprecated and should be removed in a future release 885 // and is here in case someone created their own 886 // this.batchStatments = batchStatements; 887 } 888 889 // Note - remove batchStatment in future distributions. Here for backward compatibility 890 /** 891 * @return true if batchStements 892 */ 893 public boolean isBatchStatments() { 894 return this.batchStatements; 895 } 896 897 /** 898 * This value batchStatments is deprecated and will be removed in a future release. Use batchStatements instead (Note the 'e' in Statement)" 899 * @deprecated 900 * @param batchStatments 901 */ 902 public void setBatchStatments(boolean batchStatments) { 903 LOG.warn("batchStatments is deprecated and will be removed in a future release. Use batchStatements instead (Note the 'e' in Statement)"); 904 this.batchStatements = batchStatments; 905 this.batchStatments = batchStatments; 906 } 907 908 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 909 this.statements.setUseExternalMessageReferences(useExternalMessageReferences); 910 } 911 912 /** 913 * @return the statements 914 */ 915 public Statements getStatements() { 916 return this.statements; 917 } 918 919 public void setStatements(Statements statements) { 920 this.statements = statements; 921 } 922 923 public int getMaxRows() { 924 return maxRows; 925 } 926 927 /** 928 * the max value for statement maxRows, used to limit jdbc queries 929 */ 930 public void setMaxRows(int maxRows) { 931 this.maxRows = maxRows; 932 } 933 934 @Override 935 public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException { 936 PreparedStatement s = null; 937 try { 938 s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); 939 s.setString(1, destination.getQualifiedName()); 940 s.setString(2, destination.getQualifiedName()); 941 s.setString(3, destination.getQualifiedName()); 942 s.setString(4, null); 943 s.setLong(5, 0); 944 s.setString(6, destination.getQualifiedName()); 945 s.setLong(7, 11); // entry out of priority range 946 947 if (s.executeUpdate() != 1) { 948 throw new IOException("Could not create ack record for destination: " + destination); 949 } 950 } finally { 951 close(s); 952 } 953 } 954 955 @Override 956 public void doRecoverPreparedOps(TransactionContext c, JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws SQLException, IOException { 957 PreparedStatement s = null; 958 ResultSet rs = null; 959 try { 960 s = c.getConnection().prepareStatement(this.statements.getFindOpsPendingOutcomeStatement()); 961 rs = s.executeQuery(); 962 while (rs.next()) { 963 long id = rs.getLong(1); 964 String encodedString = rs.getString(2); 965 byte[] encodedXid = parseBase64Binary(encodedString); 966 if (encodedXid[0] == '+') { 967 jdbcMemoryTransactionStore.recoverAdd(id, getBinaryData(rs, 3)); 968 } else { 969 jdbcMemoryTransactionStore.recoverAck(id, encodedXid, getBinaryData(rs, 3)); 970 } 971 } 972 973 close(rs); 974 close(s); 975 976 s = c.getConnection().prepareStatement(this.statements.getFindAcksPendingOutcomeStatement()); 977 rs = s.executeQuery(); 978 while (rs.next()) { 979 String encodedString = rs.getString(1); 980 byte[] encodedXid = parseBase64Binary(encodedString); 981 String destination = rs.getString(2); 982 String subId = rs.getString(3); 983 String subName = rs.getString(4); 984 jdbcMemoryTransactionStore.recoverLastAck(encodedXid, 985 ActiveMQDestination.createDestination(destination, ActiveMQDestination.TOPIC_TYPE), 986 subName, subId); 987 } 988 } finally { 989 close(rs); 990 close(s); 991 } 992 } 993 994 @Override 995 public void doCommitAddOp(TransactionContext c, long preparedSequence, long sequence) throws SQLException, IOException { 996 PreparedStatement s = null; 997 try { 998 s = c.getConnection().prepareStatement(this.statements.getClearXidFlagStatement()); 999 s.setLong(1, sequence); 1000 s.setLong(2, preparedSequence); 1001 if (s.executeUpdate() != 1) { 1002 throw new IOException("Could not remove prepared transaction state from message add for sequenceId: " + sequence); 1003 } 1004 } finally { 1005 close(s); 1006 } 1007 } 1008 1009 1010 public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, 1011 IOException { 1012 PreparedStatement s = null; 1013 ResultSet rs = null; 1014 int result = 0; 1015 try { 1016 s = c.getConnection().prepareStatement(this.statements.getDestinationMessageCountStatement()); 1017 s.setString(1, destination.getQualifiedName()); 1018 rs = s.executeQuery(); 1019 if (rs.next()) { 1020 result = rs.getInt(1); 1021 } 1022 } finally { 1023 close(rs); 1024 close(s); 1025 } 1026 return result; 1027 } 1028 1029 public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long[] lastRecoveredEntries, 1030 long maxSeq, int maxReturned, boolean isPrioritizedMessages, JDBCMessageRecoveryListener listener) throws Exception { 1031 PreparedStatement s = null; 1032 ResultSet rs = null; 1033 try { 1034 if (isPrioritizedMessages) { 1035 s = c.getConnection().prepareStatement(limitQuery(this.statements.getFindNextMessagesByPriorityStatement())); 1036 } else { 1037 s = c.getConnection().prepareStatement(limitQuery(this.statements.getFindNextMessagesStatement())); 1038 } 1039 s.setMaxRows(Math.min(maxReturned, maxRows)); 1040 s.setString(1, destination.getQualifiedName()); 1041 s.setLong(2, maxSeq); 1042 int paramId = 3; 1043 if (isPrioritizedMessages) { 1044 for (int i=9;i>=0;i--) { 1045 s.setLong(paramId++, lastRecoveredEntries[i]); 1046 } 1047 } else { 1048 s.setLong(paramId, lastRecoveredEntries[0]); 1049 } 1050 rs = s.executeQuery(); 1051 int count = 0; 1052 if (this.statements.isUseExternalMessageReferences()) { 1053 while (rs.next() && count < maxReturned) { 1054 if (listener.recoverMessageReference(rs.getString(1))) { 1055 count++; 1056 } else { 1057 LOG.debug("Stopped recover next messages"); 1058 break; 1059 } 1060 } 1061 } else { 1062 while (rs.next() && count < maxReturned) { 1063 if (listener.recoverMessage(rs.getLong(1), getBinaryData(rs, 2))) { 1064 count++; 1065 } else { 1066 LOG.debug("Stopped recover next messages"); 1067 break; 1068 } 1069 } 1070 } 1071 } catch (Exception e) { 1072 LOG.warn("Exception recovering next messages", e); 1073 } finally { 1074 close(rs); 1075 close(s); 1076 } 1077 } 1078 1079 public long doGetLastProducerSequenceId(TransactionContext c, ProducerId id) 1080 throws SQLException, IOException { 1081 PreparedStatement s = null; 1082 ResultSet rs = null; 1083 try { 1084 s = c.getConnection().prepareStatement(this.statements.getLastProducerSequenceIdStatement()); 1085 s.setString(1, id.toString()); 1086 rs = s.executeQuery(); 1087 long seq = -1; 1088 if (rs.next()) { 1089 seq = rs.getLong(1); 1090 } 1091 return seq; 1092 } finally { 1093 close(rs); 1094 close(s); 1095 } 1096 } 1097 1098 public static void dumpTables(Connection c, String destinationName, String clientId, String 1099 subscriptionName) throws SQLException { 1100 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out); 1101 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 1102 PreparedStatement s = c.prepareStatement("SELECT M.ID, D.LAST_ACKED_ID FROM " 1103 + "ACTIVEMQ_MSGS M, " +"ACTIVEMQ_ACKS D " 1104 + "WHERE D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?" 1105 + " AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID" 1106 + " ORDER BY M.ID"); 1107 s.setString(1,destinationName); s.setString(2,clientId); s.setString(3,subscriptionName); 1108 printQuery(s,System.out); } 1109 1110 public static void dumpTables(java.sql.Connection c) throws SQLException { 1111 printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_MSGS", System.out); 1112 1113 //printQuery(c, "SELECT COUNT(*) from ACTIVEMQ_ACKS", System.out); 1114 1115 //printQuery(c, "Select * from ACTIVEMQ_MSGS ORDER BY ID", System.out); 1116 //printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out); 1117 } 1118 1119 public static void printQuery(java.sql.Connection c, String query, java.io.PrintStream out) 1120 throws SQLException { 1121 printQuery(c.prepareStatement(query), out); 1122 } 1123 1124 public static void printQuery(java.sql.PreparedStatement s, java.io.PrintStream out) 1125 throws SQLException { 1126 1127 ResultSet set = null; 1128 try { 1129 set = s.executeQuery(); 1130 java.sql.ResultSetMetaData metaData = set.getMetaData(); 1131 for (int i = 1; i <= metaData.getColumnCount(); i++) { 1132 if (i == 1) 1133 out.print("||"); 1134 out.print(metaData.getColumnName(i) + "||"); 1135 } 1136 out.println(); 1137 while (set.next()) { 1138 for (int i = 1; i <= metaData.getColumnCount(); i++) { 1139 if (i == 1) 1140 out.print("|"); 1141 out.print(set.getString(i) + "|"); 1142 } 1143 out.println(); 1144 } 1145 } finally { 1146 try { 1147 set.close(); 1148 } catch (Throwable ignore) { 1149 } 1150 try { 1151 s.close(); 1152 } catch (Throwable ignore) { 1153 } 1154 } 1155 } 1156 1157 @Override 1158 public String limitQuery(String query) { 1159 return query; 1160 } 1161}