001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.File; 020import java.io.IOException; 021import java.sql.Connection; 022import java.sql.SQLException; 023import java.util.Collections; 024import java.util.HashMap; 025import java.util.Locale; 026import java.util.Set; 027import java.util.concurrent.ScheduledFuture; 028import java.util.concurrent.ScheduledThreadPoolExecutor; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.TimeUnit; 031 032import javax.sql.DataSource; 033 034import org.apache.activemq.ActiveMQMessageAudit; 035import org.apache.activemq.broker.BrokerService; 036import org.apache.activemq.broker.ConnectionContext; 037import org.apache.activemq.broker.Locker; 038import org.apache.activemq.broker.scheduler.JobSchedulerStore; 039import org.apache.activemq.command.ActiveMQDestination; 040import org.apache.activemq.command.ActiveMQQueue; 041import org.apache.activemq.command.ActiveMQTopic; 042import org.apache.activemq.command.Message; 043import org.apache.activemq.command.MessageAck; 044import org.apache.activemq.command.MessageId; 045import org.apache.activemq.command.ProducerId; 046import org.apache.activemq.openwire.OpenWireFormat; 047import org.apache.activemq.store.MessageStore; 048import org.apache.activemq.store.PersistenceAdapter; 049import org.apache.activemq.store.TopicMessageStore; 050import org.apache.activemq.store.TransactionStore; 051import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter; 052import org.apache.activemq.store.memory.MemoryTransactionStore; 053import org.apache.activemq.usage.SystemUsage; 054import org.apache.activemq.util.ByteSequence; 055import org.apache.activemq.util.FactoryFinder; 056import org.apache.activemq.util.IOExceptionSupport; 057import org.apache.activemq.util.LongSequenceGenerator; 058import org.apache.activemq.util.ServiceStopper; 059import org.apache.activemq.util.ThreadPoolUtils; 060import org.apache.activemq.wireformat.WireFormat; 061import org.slf4j.Logger; 062import org.slf4j.LoggerFactory; 063 064/** 065 * A {@link PersistenceAdapter} implementation using JDBC for persistence 066 * storage. 067 * 068 * This persistence adapter will correctly remember prepared XA transactions, 069 * but it will not keep track of local transaction commits so that operations 070 * performed against the Message store are done as a single uow. 071 * 072 * @org.apache.xbean.XBean element="jdbcPersistenceAdapter" 073 * 074 */ 075public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter { 076 077 private static final Logger LOG = LoggerFactory.getLogger(JDBCPersistenceAdapter.class); 078 private static FactoryFinder adapterFactoryFinder = new FactoryFinder( 079 "META-INF/services/org/apache/activemq/store/jdbc/"); 080 private static FactoryFinder lockFactoryFinder = new FactoryFinder( 081 "META-INF/services/org/apache/activemq/store/jdbc/lock/"); 082 083 public static final long DEFAULT_LOCK_KEEP_ALIVE_PERIOD = 30 * 1000; 084 085 private WireFormat wireFormat = new OpenWireFormat(); 086 private Statements statements; 087 private JDBCAdapter adapter; 088 private final JdbcMemoryTransactionStore transactionStore = new JdbcMemoryTransactionStore(this); 089 private ScheduledFuture<?> cleanupTicket; 090 private int cleanupPeriod = 1000 * 60 * 5; 091 private boolean useExternalMessageReferences; 092 private boolean createTablesOnStartup = true; 093 private DataSource lockDataSource; 094 private int transactionIsolation; 095 private File directory; 096 private boolean changeAutoCommitAllowed = true; 097 098 protected int maxProducersToAudit=1024; 099 protected int maxAuditDepth=1000; 100 protected boolean enableAudit=false; 101 protected int auditRecoveryDepth = 1024; 102 protected ActiveMQMessageAudit audit; 103 104 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator(); 105 protected int maxRows = DefaultJDBCAdapter.MAX_ROWS; 106 protected final HashMap<ActiveMQDestination, MessageStore> storeCache = new HashMap<>(); 107 108 { 109 setLockKeepAlivePeriod(DEFAULT_LOCK_KEEP_ALIVE_PERIOD); 110 } 111 112 public JDBCPersistenceAdapter() { 113 } 114 115 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) { 116 super(ds); 117 this.wireFormat = wireFormat; 118 } 119 120 @Override 121 public Set<ActiveMQDestination> getDestinations() { 122 TransactionContext c = null; 123 try { 124 c = getTransactionContext(); 125 return getAdapter().doGetDestinations(c); 126 } catch (IOException e) { 127 return emptyDestinationSet(); 128 } catch (SQLException e) { 129 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 130 return emptyDestinationSet(); 131 } finally { 132 if (c != null) { 133 try { 134 c.close(); 135 } catch (Throwable e) { 136 } 137 } 138 } 139 } 140 141 @SuppressWarnings("unchecked") 142 private Set<ActiveMQDestination> emptyDestinationSet() { 143 return Collections.EMPTY_SET; 144 } 145 146 protected void createMessageAudit() { 147 if (enableAudit && audit == null) { 148 audit = new ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit); 149 TransactionContext c = null; 150 151 try { 152 c = getTransactionContext(); 153 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { 154 @Override 155 public void messageId(MessageId id) { 156 audit.isDuplicate(id); 157 } 158 }); 159 } catch (Exception e) { 160 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); 161 } finally { 162 if (c != null) { 163 try { 164 c.close(); 165 } catch (Throwable e) { 166 } 167 } 168 } 169 } 170 } 171 172 public void initSequenceIdGenerator() { 173 TransactionContext c = null; 174 try { 175 c = getTransactionContext(); 176 getAdapter().doMessageIdScan(c, auditRecoveryDepth, new JDBCMessageIdScanListener() { 177 @Override 178 public void messageId(MessageId id) { 179 audit.isDuplicate(id); 180 } 181 }); 182 } catch (Exception e) { 183 LOG.error("Failed to reload store message audit for JDBC persistence adapter", e); 184 } finally { 185 if (c != null) { 186 try { 187 c.close(); 188 } catch (Throwable e) { 189 } 190 } 191 } 192 } 193 194 @Override 195 public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException { 196 MessageStore rc = storeCache.get(destination); 197 if (rc == null) { 198 MessageStore store = transactionStore.proxy(new JDBCMessageStore(this, getAdapter(), wireFormat, destination, audit)); 199 synchronized (storeCache) { 200 rc = storeCache.put(destination, store); 201 if (rc != null) { 202 storeCache.put(destination, rc); 203 } else { 204 rc = store; 205 } 206 } 207 } 208 return rc; 209 } 210 211 @Override 212 public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { 213 TopicMessageStore rc = (TopicMessageStore) storeCache.get(destination); 214 if (rc == null) { 215 TopicMessageStore store = transactionStore.proxy(new JDBCTopicMessageStore(this, getAdapter(), wireFormat, destination, audit)); 216 synchronized (storeCache) { 217 rc = (TopicMessageStore) storeCache.put(destination, store); 218 if (rc != null) { 219 storeCache.put(destination, rc); 220 } else { 221 rc = store; 222 } 223 } 224 } 225 return rc; 226 } 227 228 /** 229 * Cleanup method to remove any state associated with the given destination 230 * @param destination Destination to forget 231 */ 232 @Override 233 public void removeQueueMessageStore(ActiveMQQueue destination) { 234 if (destination.isQueue() && getBrokerService().shouldRecordVirtualDestination(destination)) { 235 try { 236 removeConsumerDestination(destination); 237 } catch (IOException ioe) { 238 LOG.error("Failed to remove consumer destination: " + destination, ioe); 239 } 240 } 241 storeCache.remove(destination); 242 } 243 244 private void removeConsumerDestination(ActiveMQQueue destination) throws IOException { 245 TransactionContext c = getTransactionContext(); 246 try { 247 String id = destination.getQualifiedName(); 248 getAdapter().doDeleteSubscription(c, destination, id, id); 249 } catch (SQLException e) { 250 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 251 throw IOExceptionSupport.create("Failed to remove consumer destination: " + destination, e); 252 } finally { 253 c.close(); 254 } 255 } 256 257 /** 258 * Cleanup method to remove any state associated with the given destination 259 * No state retained.... nothing to do 260 * 261 * @param destination Destination to forget 262 */ 263 @Override 264 public void removeTopicMessageStore(ActiveMQTopic destination) { 265 storeCache.remove(destination); 266 } 267 268 @Override 269 public TransactionStore createTransactionStore() throws IOException { 270 return this.transactionStore; 271 } 272 273 @Override 274 public long getLastMessageBrokerSequenceId() throws IOException { 275 TransactionContext c = getTransactionContext(); 276 try { 277 long seq = getAdapter().doGetLastMessageStoreSequenceId(c); 278 sequenceGenerator.setLastSequenceId(seq); 279 long brokerSeq = 0; 280 if (seq != 0) { 281 byte[] msg = getAdapter().doGetMessageById(c, seq); 282 if (msg != null) { 283 Message last = (Message)wireFormat.unmarshal(new ByteSequence(msg)); 284 brokerSeq = last.getMessageId().getBrokerSequenceId(); 285 } else { 286 LOG.warn("Broker sequence id wasn't recovered properly, possible duplicates!"); 287 } 288 } 289 return brokerSeq; 290 } catch (SQLException e) { 291 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 292 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); 293 } finally { 294 c.close(); 295 } 296 } 297 298 @Override 299 public long getLastProducerSequenceId(ProducerId id) throws IOException { 300 TransactionContext c = getTransactionContext(); 301 try { 302 return getAdapter().doGetLastProducerSequenceId(c, id); 303 } catch (SQLException e) { 304 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 305 throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e); 306 } finally { 307 c.close(); 308 } 309 } 310 311 @Override 312 public void allowIOResumption() {} 313 314 @Override 315 public void init() throws Exception { 316 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 317 318 if (isCreateTablesOnStartup()) { 319 TransactionContext transactionContext = getTransactionContext(); 320 transactionContext.getExclusiveConnection(); 321 transactionContext.begin(); 322 try { 323 try { 324 getAdapter().doCreateTables(transactionContext); 325 } catch (SQLException e) { 326 LOG.warn("Cannot create tables due to: " + e); 327 JDBCPersistenceAdapter.log("Failure Details: ", e); 328 } 329 } finally { 330 transactionContext.commit(); 331 } 332 } 333 } 334 335 @Override 336 public void doStart() throws Exception { 337 338 if( brokerService!=null ) { 339 wireFormat.setVersion(brokerService.getStoreOpenWireVersion()); 340 } 341 342 // Cleanup the db periodically. 343 if (cleanupPeriod > 0) { 344 cleanupTicket = getScheduledThreadPoolExecutor().scheduleWithFixedDelay(new Runnable() { 345 @Override 346 public void run() { 347 cleanup(); 348 } 349 }, 0, cleanupPeriod, TimeUnit.MILLISECONDS); 350 } 351 createMessageAudit(); 352 } 353 354 @Override 355 public synchronized void doStop(ServiceStopper stopper) throws Exception { 356 if (cleanupTicket != null) { 357 cleanupTicket.cancel(true); 358 cleanupTicket = null; 359 } 360 closeDataSource(getDataSource()); 361 } 362 363 public void cleanup() { 364 TransactionContext c = null; 365 try { 366 LOG.debug("Cleaning up old messages."); 367 c = getTransactionContext(); 368 c.getExclusiveConnection(); 369 getAdapter().doDeleteOldMessages(c); 370 } catch (IOException e) { 371 LOG.warn("Old message cleanup failed due to: " + e, e); 372 } catch (SQLException e) { 373 LOG.warn("Old message cleanup failed due to: " + e); 374 JDBCPersistenceAdapter.log("Failure Details: ", e); 375 } finally { 376 if (c != null) { 377 try { 378 c.close(); 379 } catch (Throwable e) { 380 } 381 } 382 LOG.debug("Cleanup done."); 383 } 384 } 385 386 @Override 387 public ScheduledThreadPoolExecutor getScheduledThreadPoolExecutor() { 388 if (clockDaemon == null) { 389 clockDaemon = new ScheduledThreadPoolExecutor(5, new ThreadFactory() { 390 @Override 391 public Thread newThread(Runnable runnable) { 392 Thread thread = new Thread(runnable, "ActiveMQ JDBC PA Scheduled Task"); 393 thread.setDaemon(true); 394 return thread; 395 } 396 }); 397 } 398 return clockDaemon; 399 } 400 401 public JDBCAdapter getAdapter() throws IOException { 402 if (adapter == null) { 403 setAdapter(createAdapter()); 404 } 405 return adapter; 406 } 407 408 /** 409 * @deprecated as of 5.7.0, replaced by {@link #getLocker()} 410 */ 411 @Deprecated 412 public Locker getDatabaseLocker() throws IOException { 413 return getLocker(); 414 } 415 416 /** 417 * Sets the database locker strategy to use to lock the database on startup 418 * @throws IOException 419 * 420 * @deprecated as of 5.7.0, replaced by {@link #setLocker(org.apache.activemq.broker.Locker)} 421 */ 422 @Deprecated 423 public void setDatabaseLocker(Locker locker) throws IOException { 424 setLocker(locker); 425 } 426 427 public DataSource getLockDataSource() throws IOException { 428 if (lockDataSource == null) { 429 lockDataSource = getDataSource(); 430 if (lockDataSource == null) { 431 throw new IllegalArgumentException( 432 "No dataSource property has been configured"); 433 } 434 } 435 return lockDataSource; 436 } 437 438 public void setLockDataSource(DataSource dataSource) { 439 this.lockDataSource = dataSource; 440 LOG.info("Using a separate dataSource for locking: " 441 + lockDataSource); 442 } 443 444 @Override 445 public BrokerService getBrokerService() { 446 return brokerService; 447 } 448 449 /** 450 * @throws IOException 451 */ 452 protected JDBCAdapter createAdapter() throws IOException { 453 454 adapter = (JDBCAdapter) loadAdapter(adapterFactoryFinder, "adapter"); 455 456 // Use the default JDBC adapter if the 457 // Database type is not recognized. 458 if (adapter == null) { 459 adapter = new DefaultJDBCAdapter(); 460 LOG.debug("Using default JDBC Adapter: " + adapter); 461 } 462 return adapter; 463 } 464 465 private Object loadAdapter(FactoryFinder finder, String kind) throws IOException { 466 Object adapter = null; 467 TransactionContext c = getTransactionContext(); 468 try { 469 try { 470 // Make the filename file system safe. 471 String dirverName = c.getConnection().getMetaData().getDriverName(); 472 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase(Locale.ENGLISH); 473 474 try { 475 adapter = finder.newInstance(dirverName); 476 LOG.info("Database " + kind + " driver override recognized for : [" + dirverName + "] - adapter: " + adapter.getClass()); 477 } catch (Throwable e) { 478 LOG.info("Database " + kind + " driver override not found for : [" + dirverName 479 + "]. Will use default implementation."); 480 } 481 } catch (SQLException e) { 482 LOG.warn("JDBC error occurred while trying to detect database type for overrides. Will use default implementations: " 483 + e.getMessage()); 484 JDBCPersistenceAdapter.log("Failure Details: ", e); 485 } 486 } finally { 487 c.close(); 488 } 489 return adapter; 490 } 491 492 public void setAdapter(JDBCAdapter adapter) { 493 this.adapter = adapter; 494 this.adapter.setStatements(getStatements()); 495 this.adapter.setMaxRows(getMaxRows()); 496 } 497 498 public WireFormat getWireFormat() { 499 return wireFormat; 500 } 501 502 public void setWireFormat(WireFormat wireFormat) { 503 this.wireFormat = wireFormat; 504 } 505 506 public TransactionContext getTransactionContext(ConnectionContext context) throws IOException { 507 if (context == null || isBrokerContext(context)) { 508 return getTransactionContext(); 509 } else { 510 TransactionContext answer = (TransactionContext)context.getLongTermStoreContext(); 511 if (answer == null) { 512 answer = getTransactionContext(); 513 context.setLongTermStoreContext(answer); 514 } 515 return answer; 516 } 517 } 518 519 private boolean isBrokerContext(ConnectionContext context) { 520 return context.getSecurityContext() != null && context.getSecurityContext().isBrokerContext(); 521 } 522 523 public TransactionContext getTransactionContext() throws IOException { 524 TransactionContext answer = new TransactionContext(this); 525 if (transactionIsolation > 0) { 526 answer.setTransactionIsolation(transactionIsolation); 527 } 528 return answer; 529 } 530 531 @Override 532 public void beginTransaction(ConnectionContext context) throws IOException { 533 TransactionContext transactionContext = getTransactionContext(context); 534 transactionContext.begin(); 535 } 536 537 @Override 538 public void commitTransaction(ConnectionContext context) throws IOException { 539 TransactionContext transactionContext = getTransactionContext(context); 540 transactionContext.commit(); 541 } 542 543 @Override 544 public void rollbackTransaction(ConnectionContext context) throws IOException { 545 TransactionContext transactionContext = getTransactionContext(context); 546 transactionContext.rollback(); 547 } 548 549 public int getCleanupPeriod() { 550 return cleanupPeriod; 551 } 552 553 /** 554 * Sets the number of milliseconds until the database is attempted to be 555 * cleaned up for durable topics 556 */ 557 public void setCleanupPeriod(int cleanupPeriod) { 558 this.cleanupPeriod = cleanupPeriod; 559 } 560 561 public boolean isChangeAutoCommitAllowed() { 562 return changeAutoCommitAllowed; 563 } 564 565 /** 566 * Whether the JDBC driver allows to set the auto commit. 567 * Some drivers does not allow changing the auto commit. The default value is true. 568 * 569 * @param changeAutoCommitAllowed true to change, false to not change. 570 */ 571 public void setChangeAutoCommitAllowed(boolean changeAutoCommitAllowed) { 572 this.changeAutoCommitAllowed = changeAutoCommitAllowed; 573 } 574 575 @Override 576 public void deleteAllMessages() throws IOException { 577 TransactionContext c = getTransactionContext(); 578 c.getExclusiveConnection(); 579 try { 580 getAdapter().doDropTables(c); 581 getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences()); 582 getAdapter().doCreateTables(c); 583 LOG.info("Persistence store purged."); 584 } catch (SQLException e) { 585 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 586 throw IOExceptionSupport.create(e); 587 } finally { 588 c.close(); 589 } 590 } 591 592 public boolean isUseExternalMessageReferences() { 593 return useExternalMessageReferences; 594 } 595 596 public void setUseExternalMessageReferences(boolean useExternalMessageReferences) { 597 this.useExternalMessageReferences = useExternalMessageReferences; 598 } 599 600 public boolean isCreateTablesOnStartup() { 601 return createTablesOnStartup; 602 } 603 604 /** 605 * Sets whether or not tables are created on startup 606 */ 607 public void setCreateTablesOnStartup(boolean createTablesOnStartup) { 608 this.createTablesOnStartup = createTablesOnStartup; 609 } 610 611 /** 612 * @deprecated use {@link #setUseLock(boolean)} instead 613 * 614 * Sets whether or not an exclusive database lock should be used to enable 615 * JDBC Master/Slave. Enabled by default. 616 */ 617 @Deprecated 618 public void setUseDatabaseLock(boolean useDatabaseLock) { 619 setUseLock(useDatabaseLock); 620 } 621 622 public static void log(String msg, SQLException e) { 623 String s = msg + e.getMessage(); 624 while (e.getNextException() != null) { 625 e = e.getNextException(); 626 s += ", due to: " + e.getMessage(); 627 } 628 LOG.warn(s, e); 629 } 630 631 public Statements getStatements() { 632 if (statements == null) { 633 statements = new Statements(); 634 } 635 return statements; 636 } 637 638 public void setStatements(Statements statements) { 639 this.statements = statements; 640 if (adapter != null) { 641 this.adapter.setStatements(getStatements()); 642 } 643 } 644 645 /** 646 * @param usageManager The UsageManager that is controlling the 647 * destination's memory usage. 648 */ 649 @Override 650 public void setUsageManager(SystemUsage usageManager) { 651 } 652 653 @Override 654 public Locker createDefaultLocker() throws IOException { 655 Locker locker = (Locker) loadAdapter(lockFactoryFinder, "lock"); 656 if (locker == null) { 657 locker = new DefaultDatabaseLocker(); 658 LOG.debug("Using default JDBC Locker: " + locker); 659 } 660 locker.configure(this); 661 return locker; 662 } 663 664 @Override 665 public void setBrokerName(String brokerName) { 666 } 667 668 @Override 669 public String toString() { 670 return "JDBCPersistenceAdapter(" + super.toString() + ")"; 671 } 672 673 @Override 674 public void setDirectory(File dir) { 675 this.directory=dir; 676 } 677 678 @Override 679 public File getDirectory(){ 680 if (this.directory==null && brokerService != null){ 681 this.directory=brokerService.getBrokerDataDirectory(); 682 } 683 return this.directory; 684 } 685 686 // interesting bit here is proof that DB is ok 687 @Override 688 public void checkpoint(boolean sync) throws IOException { 689 // by pass TransactionContext to avoid IO Exception handler 690 Connection connection = null; 691 try { 692 connection = getDataSource().getConnection(); 693 if (!connection.isValid(10)) { 694 throw new IOException("isValid(10) failed for: " + connection); 695 } 696 } catch (SQLException e) { 697 LOG.debug("Could not get JDBC connection for checkpoint: " + e); 698 throw IOExceptionSupport.create(e); 699 } finally { 700 if (connection != null) { 701 try { 702 connection.close(); 703 } catch (Throwable ignored) { 704 } 705 } 706 } 707 } 708 709 @Override 710 public long size(){ 711 return 0; 712 } 713 714 /** 715 * @deprecated use {@link Locker#setLockAcquireSleepInterval(long)} instead 716 * 717 * millisecond interval between lock acquire attempts, applied to newly created DefaultDatabaseLocker 718 * not applied if DataBaseLocker is injected. 719 * 720 */ 721 @Deprecated 722 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) throws IOException { 723 getLocker().setLockAcquireSleepInterval(lockAcquireSleepInterval); 724 } 725 726 /** 727 * set the Transaction isolation level to something other that TRANSACTION_READ_UNCOMMITTED 728 * This allowable dirty isolation level may not be achievable in clustered DB environments 729 * so a more restrictive and expensive option may be needed like TRANSACTION_REPEATABLE_READ 730 * see isolation level constants in {@link java.sql.Connection} 731 * @param transactionIsolation the isolation level to use 732 */ 733 public void setTransactionIsolation(int transactionIsolation) { 734 this.transactionIsolation = transactionIsolation; 735 } 736 737 public int getMaxProducersToAudit() { 738 return maxProducersToAudit; 739 } 740 741 public void setMaxProducersToAudit(int maxProducersToAudit) { 742 this.maxProducersToAudit = maxProducersToAudit; 743 } 744 745 public int getMaxAuditDepth() { 746 return maxAuditDepth; 747 } 748 749 public void setMaxAuditDepth(int maxAuditDepth) { 750 this.maxAuditDepth = maxAuditDepth; 751 } 752 753 public boolean isEnableAudit() { 754 return enableAudit; 755 } 756 757 public void setEnableAudit(boolean enableAudit) { 758 this.enableAudit = enableAudit; 759 } 760 761 public int getAuditRecoveryDepth() { 762 return auditRecoveryDepth; 763 } 764 765 public void setAuditRecoveryDepth(int auditRecoveryDepth) { 766 this.auditRecoveryDepth = auditRecoveryDepth; 767 } 768 769 public long getNextSequenceId() { 770 return sequenceGenerator.getNextSequenceId(); 771 } 772 773 public int getMaxRows() { 774 return maxRows; 775 } 776 777 /* 778 * the max rows return from queries, with sparse selectors this may need to be increased 779 */ 780 public void setMaxRows(int maxRows) { 781 this.maxRows = maxRows; 782 } 783 784 public void recover(JdbcMemoryTransactionStore jdbcMemoryTransactionStore) throws IOException { 785 TransactionContext c = getTransactionContext(); 786 try { 787 getAdapter().doRecoverPreparedOps(c, jdbcMemoryTransactionStore); 788 } catch (SQLException e) { 789 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 790 throw IOExceptionSupport.create("Failed to recover from: " + jdbcMemoryTransactionStore + ". Reason: " + e,e); 791 } finally { 792 c.close(); 793 } 794 } 795 796 public void commitAdd(ConnectionContext context, final MessageId messageId, final long preparedSequenceId, final long newSequence) throws IOException { 797 TransactionContext c = getTransactionContext(context); 798 try { 799 getAdapter().doCommitAddOp(c, preparedSequenceId, newSequence); 800 } catch (SQLException e) { 801 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 802 throw IOExceptionSupport.create("Failed to commit add: " + messageId + ". Reason: " + e, e); 803 } finally { 804 c.close(); 805 } 806 } 807 808 public void commitRemove(ConnectionContext context, MessageAck ack) throws IOException { 809 TransactionContext c = getTransactionContext(context); 810 try { 811 getAdapter().doRemoveMessage(c, (Long)ack.getLastMessageId().getEntryLocator(), null); 812 } catch (SQLException e) { 813 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 814 throw IOExceptionSupport.create("Failed to commit last ack: " + ack + ". Reason: " + e,e); 815 } finally { 816 c.close(); 817 } 818 } 819 820 public void commitLastAck(ConnectionContext context, long xidLastAck, long priority, ActiveMQDestination destination, String subName, String clientId) throws IOException { 821 TransactionContext c = getTransactionContext(context); 822 try { 823 getAdapter().doSetLastAck(c, destination, null, clientId, subName, xidLastAck, priority); 824 } catch (SQLException e) { 825 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 826 throw IOExceptionSupport.create("Failed to commit last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e,e); 827 } finally { 828 c.close(); 829 } 830 } 831 832 public void rollbackLastAck(ConnectionContext context, JDBCTopicMessageStore store, MessageAck ack, String subName, String clientId) throws IOException { 833 TransactionContext c = getTransactionContext(context); 834 try { 835 byte priority = (byte) store.getCachedStoreSequenceId(c, store.getDestination(), ack.getLastMessageId())[1]; 836 getAdapter().doClearLastAck(c, store.getDestination(), priority, clientId, subName); 837 } catch (SQLException e) { 838 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 839 throw IOExceptionSupport.create("Failed to rollback last ack: " + ack + " on " + store.getDestination() + " for " + subName + ":" + clientId + ". Reason: " + e,e); 840 } finally { 841 c.close(); 842 } 843 } 844 845 // after recovery there is no record of the original messageId for the ack 846 public void rollbackLastAck(ConnectionContext context, byte priority, ActiveMQDestination destination, String subName, String clientId) throws IOException { 847 TransactionContext c = getTransactionContext(context); 848 try { 849 getAdapter().doClearLastAck(c, destination, priority, clientId, subName); 850 } catch (SQLException e) { 851 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 852 throw IOExceptionSupport.create("Failed to rollback last ack with priority: " + priority + " on " + destination + " for " + subName + ":" + clientId + ". Reason: " + e, e); 853 } finally { 854 c.close(); 855 } 856 } 857 858 long[] getStoreSequenceIdForMessageId(ConnectionContext context, MessageId messageId, ActiveMQDestination destination) throws IOException { 859 long[] result = new long[]{-1, Byte.MAX_VALUE -1}; 860 TransactionContext c = getTransactionContext(context); 861 try { 862 result = adapter.getStoreSequenceId(c, destination, messageId); 863 } catch (SQLException e) { 864 JDBCPersistenceAdapter.log("JDBC Failure: ", e); 865 throw IOExceptionSupport.create("Failed to get store sequenceId for messageId: " + messageId +", on: " + destination + ". Reason: " + e, e); 866 } finally { 867 c.close(); 868 } 869 return result; 870 } 871 872 @Override 873 public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { 874 throw new UnsupportedOperationException(); 875 } 876}